Scheduler for RxAndroid and RxSwing

With reference to RxAndroid, I made a Scheduler compatible with RxJava 2.x for Swing's UI thread. ** Not Swift **. Thank you for your support.

doOnSubscribe

The aim of this article

Is the goal of this article. It seems that no one is using RxSwing. Also, no knowledge of Swing is required.

RxSwing exists, but

RxSwing exists as one of the ReactiveX projects. https://github.com/ReactiveX/RxSwing

But unlike RxAndroid and RxCocoa, Not included in ReactiveX for platforms and frameworks. http://reactivex.io/languages.html

Also, there is no support for RxJava 1.x yet.

There is a Pull Request that supports RxJava 2.x, but it feels like it has been left unattended. https://github.com/ReactiveX/RxSwing/pull/63

The official RxSwing2 project also has a frame, but this Pull Request is also neglected. https://github.com/ReactiveX/RxSwing2/pull/1

There is also the question of how long Swing will remain, Perhaps RxJava 3 will come out before Swing is over. (I'm saying something appropriate.) I think it's hard to put out and maintain RxSwing2 compatible with RxJava2 in such an early cycle, so Is it a place where the release and the reviews for it are hesitant?

Meanwhile, if there is minimal support, it would be thread control to run the UI. * 1

UI thread

Like the Android UI thread, Swing also has an Event Dispatch Thread (EDT) to run the UI. (Event here is an event related to UI.) However, although the UI thread is the main thread on Android, * 2 Swing's EDT is a separate thread from the Java main thread.

In the Rx series, thread control is left to Scheduler. So that you can write code with almost the same API in the Rx series The handling of threads in each platform and language is confined inside the Scheduler. Looper and Handler for Android, GCD for iOS, It seems that each is used. https://github.com/ReactiveX/RxAndroid/blob/2.x/rxandroid/src/main/java/io/reactivex/android/schedulers/AndroidSchedulers.java https://github.com/ReactiveX/RxSwift/blob/master/RxSwift/Schedulers/MainScheduler.swift

Fortunately, in the case of Swing, you don't have to deal directly with Java's Thread or ʻExecutor. The Timer and SwingUtilities` classes are used. https://github.com/ReactiveX/RxSwing/blob/0.x/src/main/java/rx/schedulers/SwingScheduler.java

Scheduler in RxJava

public abstract Worker createWorker();

Creating a Scheduler in RxJava is, first and foremost, implementing the following methods.

public abstract Worker createWorker();

At a minimum, this Woker must implement the following methods and Disposable.

public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit);
public interface Disposable {
    void dispose();
    boolean isDisposed();
}

Responsibilities of Worker

And about the Runnable passed to theschedule ()method

It is the responsibility of the Worker to guarantee.

RxAndroid Scheduler

If you want to run the UI with RxJava For RxAndroid, use ʻAndroidSchedulers.mainThread (). This is an instance of HandlerScheduler`, The one created by passing the Handler for the main thread to the constructor is reused.

new HandlerScheduler(new Handler(Looper.getMainLooper()));

AndroidSchedulers.java HandlerScheduler.java

Using the function of Handler, the callback is called at the specified time and the callback is deleted. I'll excerpt it below, but if you can't wait, look at the code and say "I see."

RxSwing2 Worker

As mentioned above, there are two PRs in the official repository that support RxJava 2.x, It's basically the same, so only look at the Scheduler and Worker that are PR to RxSwing2.

There is little difference from the one for RxJava 1.x. However, there is one thing to worry about.

Below is an excerpt from the RxSwing2 Worker class. https://github.com/UeliKurmann/RxSwing2/blob/8012ae881aa58cbb829433554489f9b83e6411ea/src/main/java/rx/schedulers/SwingScheduler.java

		// Worker of RxSwing2 for RxJava2.x
		
		private final CompositeDisposable innerSubscription = new CompositeDisposable();

		@Override
		public Disposable schedule(final Runnable action, long delayTime, TimeUnit unit) {
			//abridgement

			//Returning innerSubscription as it is
			return innerSubscription;
		}

		@Override
		public void dispose() {
			innerSubscription.dispose();
		}

		@Override
		public boolean isDisposed() {
			return innerSubscription.isDisposed();
		}

In other words, the following two processes are equal.

And this usage is not Composite Disposable I think you should use Disposables.empty (). The use of CompositeDisposable may simply be a remnant of the old RxSwing.

RxSwing1 Worker

For those who have forgotten about RxJava 1.x, here are the corresponding classes.

Version class Method
RxJava1.x Subscription unsubscribe()
RxJava2.x Disposable dispose()

Below is an excerpt from the Worker class of RxSwing1. https://github.com/ReactiveX/RxSwing/blob/281ddb9500bea4ce8b44fccde907963712647ab4/src/main/java/rx/schedulers/SwingScheduler.java

        // Worker of RxSwing for RxJava1.x

        private final CompositeSubscription innerSubscription = new CompositeSubscription();

        @Override
        public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
            //abridgement
            			
            final BooleanSubscription s = BooleanSubscription.create();

            //abridgement

            innerSubscription.add(s);

            // wrap for returning so it also removes it from the 'innerSubscription'
            return Subscriptions.create(new Action0() {

                @Override
                public void call() {
                    timer.stop();
                    s.unsubscribe();
                    innerSubscription.remove(s);
                }
            });
        }

        @Override
        public void unsubscribe() {
            innerSubscription.unsubscribe();
        }

        @Override
        public boolean isUnsubscribed() {
            return innerSubscription.isUnsubscribed();
        }

Let's call RxSwing for RxJava1.x RxSwing1. In RxSwing1, the two ʻunsubscribe ()` are different as described above.

RxAndroid Worker

For example, it is possible that RxJava2.x has changed what is required of Woker. Take a look at our Jake God's RxAndroid to see what's right.

        // Worker of RxAndroid for RxJava2.x

        private volatile boolean disposed;

 @Override
        public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
            
            //abridgement

            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

            Message message = Message.obtain(handler, scheduled);
            message.obj = this; // Used as token for batch disposal of this worker's runnables.

            handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
            
            //abridgement

            return scheduled;
        }

        @Override
        public void dispose() {
            disposed = true;
            handler.removeCallbacksAndMessages(this /* token */);
        }

        @Override
        public boolean isDisposed() {
            return disposed;
        }
    private static final class ScheduledRunnable implements Runnable, Disposable {
        private final Handler handler;
        private final Runnable delegate;

        private volatile boolean disposed;

        ScheduledRunnable(Handler handler, Runnable delegate) {
            this.handler = handler;
            this.delegate = delegate;
        }

        @Override
        public void run() {
            try {
                delegate.run();
            } catch (Throwable t) {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public void dispose() {
            disposed = true;
            handler.removeCallbacks(this);
        }

        @Override
        public boolean isDisposed() {
            return disposed;
        }
    }

In RxAndroid, which should support RxJava 2.x like this, The following two operations are not equal.

  • dispose () to the Worker instance
  • dispose () to the Disposable instance returned by schedule () of the Worker instance

So the fact that the two dispose ()s are equal is likely to be a problem. However, I haven't found a usage pattern. It's best to show the right or wrong regardless of the number of RxAndroid users and the faith in Jake God. I've run out of power and interest around here.

Implementation example of RxSwing Scheduler corresponding to RxJava 2.x

so, "For one instance, you can callschedule ()for each task anddispose ()individually." I think it is the responsibility of the Scheduler to return such a Worker.

Based on the above, it is an implementation example of RxSwing Scheduler corresponding to RxJava 2.x. https://github.com/guignol/SwingBinding/blob/05913b51d9aa6daf8cb0aa3113c699f23879c77e/src/main/java/com/github/guignol/swing/rx/SwingScheduler.java

public class SwingScheduler extends Scheduler {

    private static final SwingScheduler INSTANCE = new SwingScheduler();

    public static SwingScheduler getInstance() {
        return INSTANCE;
    }

    private SwingScheduler() {
    }

    @Override
    public Disposable scheduleDirect(Runnable run) {
        //Why TODO Rx Android overrides this
        //TODO If you leave it as it is, is there a possibility that it will be disposed by Disposable Task?
        return super.scheduleDirect(run);
    }

    @Override
    public Worker createWorker() {
        return new InnerSwingScheduler();
    }

    private static class InnerSwingScheduler extends Worker {

        private final CompositeDisposable composite = new CompositeDisposable();

        @Override
        public Disposable schedule(Runnable original, long delayTime, TimeUnit unit) {
            if (original == null) throw new NullPointerException("run == null");
            if (unit == null) throw new NullPointerException("unit == null");

            final long delay = Math.max(0, unit.toMillis(delayTime));
            assertThatTheDelayIsValidForTheSwingTimer(delay);

            final Disposable local;
            if (delay == 0) {
                local = Disposables.empty();
                if (SwingUtilities.isEventDispatchThread()) {
                    //Immediate execution
                    original.run();
                } else {
                    SwingUtilities.invokeLater(() -> {
                        if (composite.isDisposed() || local.isDisposed()) {
                            return;
                        }
                        original.run();
                        composite.remove(local);
                    });
                }
            } else {
                final Timer timer = new Timer((int) delay, null);
                local = Disposables.fromRunnable(timer::stop);
                timer.setRepeats(false);
                timer.addActionListener(e -> {
                    if (composite.isDisposed() || local.isDisposed()) {
                        return;
                    }
                    original.run();
                    composite.remove(local);
                });
                timer.start();
            }
            composite.add(local);

            //Ueli Kurmann's Swing Scheduler is returning a Worker's Composite Disposable and cannot dispose on a task-by-task basis
            //Android Scheduler also provides task-based dispose, so I think it is necessary, but I have not found a usage pattern that makes a difference.
            return local;
        }

        @Override
        public void dispose() {
            composite.dispose();
        }

        @Override
        public boolean isDisposed() {
            return composite.isDisposed();
        }

        private static void assertThatTheDelayIsValidForTheSwingTimer(long delay) {
            if (delay < 0 || delay > Integer.MAX_VALUE) {
                throw new IllegalArgumentException(String.format("The swing timer only accepts non-negative delays up to %d milliseconds.", Integer.MAX_VALUE));
            }
        }
    }
}

It's a small code, but There is a sample called RxAndroid, there is an implementation example called RxSwing, It was a fun task.

doOnDispose

There are other mysteries I'm curious about about Scheduler, but that's it for this article. Thanks for your work.

Note

*1

In fact, RxAndroid is a library that provides a Scheduler. It seems that RxSwing2 can be released if such a policy is adopted.

*2

It's not running on the JVM, so that's it, The main () method of the ActivtyThread class is the first thing Zygote forks the app's process to execute. Here, the event loop that becomes the UI thread goes around. Therefore, the substance is the same as the Java main thread. For more information on this area, read "Technologies that support Android." Come on because it is the best. https://www.amazon.co.jp/dp/4774187593

Recommended Posts

Scheduler for RxAndroid and RxSwing
About for statement and if statement
Java while and for statements
[Android] Cause and remedy for TransactionTooLargeException
AWS SDK for Java 1.11.x and 2.x
Java for beginners, expressions and operators 1
Default values for MaxHeapSize and InitialHeapSize
Java for beginners, expressions and operators 2
Classes and instances Java for beginners