This article is the 10th article of RxJava Advent Calendar 2016.
I rushed into the RxJava Advent Calendar with momentum, but I was wondering what to write, By the way, I thought I was a little confused and decided to study Operators again.
There are many of them, so I decided to divide them, so I plan to divide them into 5 times in total.
Now, let's take a look at the two things, Creating Observables and Transforming Observables, described in ReactiveX Introduction.
Creating Operators who mainly create new ʻObservable`s
Create An operator who makes ʻObservable` by scratch. On Android, it's the one you often see in asynchronous communication.
The usage is as follows.
public class Create {
public static void main(String[] args) {
Observable.create(e -> {
Person person = new Person();
person.age = 100;
person.name = "nshiba";
e.onNext(person);
e.onComplete();
}).subscribe(System.out::println);
}
private static class Person {
int age;
String name;
@Override
public String toString() {
return name + ":" + String.valueOf(age);
}
}
}
output
nshiba:100
It's like passing a value with ʻonNext and calling ʻonComplete
at the end.
Also, in case of an error, call ʻonError`.
Defer
defer
is the operator created when you subscribe
to the ʻObservable` to be executed.
Normal create
creates a ʻObservable to run on the fly, but
defer delays the creation of the ʻObservable
itself.
Observable observable = Observable.defer(() -> observer -> {
observer.onNext("test");
observer.onComplete();
});
//At this moment you can create a new Observable in defer
observable.subscribe(System.out::println);
Empty/Never/Throw I think these Operators will be used primarily for testing in limited applications.
Empty
Create an ʻObservable that has no value but ends normally. That is, only ʻonComplete
is called.
Never Create an ʻObservable` that has no value and never terminates.
Throw Creates an ʻObservable` that has no value but exits with the specified error.
From
Converts various objects to Observable.
I think I often convert lists, so I made a sample with fromArray
.
int[] nums = new int[] {1, 2, 3, 4, 5};
Observable
.fromArray(nums)
.subscribe(ints -> {
System.out.println("onNext");
System.out.println(Arrays.toString(ints));
},
throwable -> {
System.out.println("onError");
},
() -> {
System.out.println("onComplete");
});
output
onNext
[1, 2, 3, 4, 5]
onComplete
Interval Generates an ʻObservable` that outputs an integer value at specified regular intervals. You can also specify how much to delay first.
Observable
.interval(1, TimeUnit.SECONDS)
.subscribe(System.out::print);
output
01234567789...
Just
Generate ʻObservable with the object passed directly as an argument. Also, if you pass more than one, ʻonNext
will be called accordingly, and if you pass more than one, no error will occur even if the types are not unified.
Observable.just(3, 1, 5, 4, "test")
.subscribe(num -> {
System.out.println("onNext: " + num);
}, throwable -> {
System.out.println("onError");
}, () -> {
System.out.println("onComplete");
});
output
onNext: 3
onNext: 1
onNext: 5
onNext: 4
onNext: test
onComplete
Range Generates an ʻObservable` that outputs an integer in the specified range.
Observable.range(0, 10)
.subscribe(i -> {
System.out.println("onNext: " + i);
}, throwable -> {
System.out.println("onError");
}, () -> {
System.out.println("onComplete");
});
output
onNext: 0
onNext: 1
onNext: 2
onNext: 3
onNext: 4
onNext: 5
onNext: 6
onNext: 7
onNext: 8
onNext: 9
onComplete
Repeat Generates ʻObservable` that repeats a specified number of times.
Observable.just(1, 2, 3, 4, 5)
.repeat(3)
.subscribe(i -> {
System.out.println("onNext: " + i);
}, throwable -> {
System.out.println("onError");
}, () -> {
System.out.println("onComplete");
});
output
onNext: 1
onNext: 2
onNext: 3
onNext: 1
onNext: 2
onNext: 3
onNext: 1
onNext: 2
onNext: 3
onComplete
Start
Create ʻObservablethat outputs the return value of the method that can return the calculated value. There is something similar to
Create, but this one has an arbitrary return value and does not call ʻonNext
, ʻonComplete`.
Observable.fromCallable(() -> {
String str = "java";
str += ":" + "RxJava";
return str;
}).subscribe(System.out::println);
output
java:RxJava
Timer Creates an ʻObservable` that outputs the value after a delay of the specified amount of time.
System.out.println(System.currentTimeMillis());
Observable.timer(3, TimeUnit.SECONDS)
.subscribe(aLong -> {
System.out.println(System.currentTimeMillis());
});
output
1480975677330
1480975680651
Transforming Buffer An operator who creates a list by splitting a stream at specified intervals.
Observable.range(1, 5)
.buffer(3)
.subscribe(System.out::println);
output
[1, 2, 3]
[4, 5]
FlatMap An operator that processes what comes into the stream and then synthesizes it into a new ʻObservable`.
Observable.just(1, 2, 3)
.flatMap(i -> Observable.range(i, i * 2))
.subscribe(System.out::print);
output
122345345678
GroupBy An operator who divides a stream into groups according to conditions. If you return the same value for what you want to be in the same group, it will be in the same group.
Observable.range(1, 10)
.groupBy(integer -> integer % 3)
.subscribe(integerIntegerGroupedObservable -> {
integerIntegerGroupedObservable.toList().subscribe(System.out::println);
});
output
[3, 6, 9]
[1, 4, 7, 10]
[2, 5, 8]
Map
An operator who can change the value flowing into the stream.
The difference from FlatMap
above is that FlatMap
returns ʻObservable and
Map` returns the value itself.
Observable.just(1,2,3)
.map(i -> i * 10)
.subscribe(System.out::println);
output
10
20
30
Scan
Operator who accesses the list sequentially. Access two by two.
At first, the first and second are passed to the arguments, and after that, the value passed to the return value in the previous process is passed to the first argument, and the next value is passed to the second argument.
The value passed at the time of subscribe
is the value passed to the first element + return value.
Observable.range(1, 5)
.scan((sum, item) -> sum + item)
.subscribe(System.out::println);
output
1
3
6
10
15
Window
An operator that splits a stream at specified intervals and creates a new ʻObservablein the split stream. Similar to
Buffer above, but
Bufferoutputs
List , while
Window outputs ʻObservable <Integer>
.
Observable.range(1,5)
.window(3)
.subscribe(integerObservable -> {
integerObservable.toList().subscribe(System.out::println);
});
output
[1, 2, 3]
[4, 5]
I hope I can do other things like this. The source code is open to the public-> nshiba / rx-samples
Also, if you have any mistakes, I would be grateful if you could write them in the comments or issues on github.
Recommended Posts