[RxJava] 리액티브 연산자의 활용 -2
🎯 결합 연산자
생성 연산자와 변환 연산자는 1개의 데이터 흐름을 다뤘지만, 결합 연산자는 여러개의 Observable을 조합하여 활용하는 연산자다.
zip
: 각각의 Observable을 모두 활용해 2개 혹은 그 이상의 Observable을 결합하는데 사용하는 연산자, 예를 들어 A B 두 개의 Observable을 결합한다면 2개의 Observable에서 모두 데이터를 발행해야 결합할 수 있다.
- zip 함수의 원형
@SchedulerSupport(SchedulerSupport.NONE)
public static <T1, T2, R> Observable<R> zip ( ObservableSource<? extends T1> source1, ObservableSource<? extends T2> source2, BiFunction<? super T1, ? extends R> zipper)
source1에 첫 번째 Observable 을 넣고, source2에 두 번째 Observable을 넣은 후 그것을 결합해줄 zipper 변수에 원하는 함수를 넣으면 된다.
public class JavaExample {
public static void main(String[] args) {
String[] shapes = {"BALL", "PENTAGON", "STAR"};
String[] coloredTriangles = {"2-T", "6-T", "4-T"};
Observable<String> source = Observable.zip(
Observable.fromArray(shapes).map(Shape::getSuffix),
Observable.fromArray(coloredTriangles).map(Shape::getColor),
(suffix , color) -> color + suffix);
source.subscribe(Log::it);
}
}
main|1638373433219|value = 2
main|1638373433221|value = 6-P
main|1638373433221|value = 4-S
첫번째 인자로 모양 아이템의 접두사를 가져오고, 두번째 인자로 색깔 아이템에서 숫자를 가져와서 두개를 더해주는 함수를 적용한 결과를 볼 수 있다.
public class JavaExample {
public static void main(String[] args) {
Observable<String> source = Observable.zip(
Observable.just("RED", "GREEN", "BLUE"),
Observable.interval(200L, TimeUnit.MILLISECONDS),
(value, time) -> value
);
CommonUtils.exampleStart();
source.subscribe(Log::it);
CommonUtils.sleep(1000);
}
}
RxComputationThreadPool-1|214|value = RED
RxComputationThreadPool-1|412|value = GREEN
RxComputationThreadPool-1|613|value = BLUE
이렇게 시간과 문자열처럼 타입이 다른 두가지도 결합할 수 있다. 시간과 결합하면 데이터를 발행하는 시간을 조절할 수 있다.
zipWith()
: zip()과 동일하지만 Observable을 다양한 함수와 조합하면서 틈틈이 호출할 수 있다.
combineLatest()
: 2개 이상의 Observable을 기반으로 Observable 각각의 값이 변경되었을 때 갱신해주는 연산자다.
첫 번째 Observable에서만 데이터를 발행하거나 두 번째 Observable의 데이터 흐름만 있으면 구독자에게 어떤 데이터도 발행하지 않는다. 하지만 두 Observable 모두 값을 발행하면 그때는 결과값이 나오고, 그 다음부터는 둘 중에 어떤 것이 갱신되던지 최신 결과값을 보여준다.
- combineLatest() 원형
@SchedulerSupport(SchedulerSupport.NONE)
public static <T1, T2, R> Observable<R> combineLatest ( ObservableSource<? extends T1> source1, ObservableSource<? extends T2> source2, BiFunction<? super T1, ? super T2, ? extends R> combiner)
zip()처럼 결합하고자 하는 첫 번째와 두 번째 Observable을 넣고 마지막으로 그걸 결합하는 combiner()함수를 넣으면 된다. 입력할 수 있는 Observable 인자의 개수는 최대 9개다.
public class JavaExample {
public static void main(String[] args) {
String[] data1 = {"6", "7", "4", "2"};
String[] data2 = {"DIAMOND", "STAR", "PENTAGON"};
Observable<String> source = Observable.combineLatest(
Observable.fromArray(data1)
.zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS),
(shape, notUsed) -> Shape.getColor(shape)),
Observable.fromArray(data2)
.zipWith(Observable.interval(150L, 200L, TimeUnit.MILLISECONDS),
(shape, notUsed) -> Shape.getSuffix(shape)), (v1, v2) -> v1 + v2);
source.subscribe(Log::it);
CommonUtils.sleep(1000);
}
}
RxComputationThreadPool-2|1638374850570|value = 6<>
RxComputationThreadPool-1|1638374850620|value = 7<>
RxComputationThreadPool-1|1638374850720|value = 4<>
RxComputationThreadPool-2|1638374850771|value = 4-S
RxComputationThreadPool-1|1638374850823|value = 2-S
RxComputationThreadPool-2|1638374850968|value = 2-P
첫 번째 데이터와 두 번째 데이터의 발행 시간이 달라 개수(첫번째는 100ms마다, 두번째는 150ms 쉬고 200ms 마다 발행)가 일치하지 않아도 zip()함수와는 다르게 어느 1개의 값만 변경되어도 결과가 발행된다.
merge()
: 입력 Observable의 순서와 모든 Observable이 데이터를 발행하는지 등에 관여하지 않고 어느 것이든 업스트림에서 먼저 입력되는 데이터를 그대로 발행
public class JavaExample {
public static void main(String[] args) {
String[] data1 = {"1", "3"};
String[] data2 = {"2", "4", "6"};
Observable<String> source1 = Observable.interval(0L, 100L, TimeUnit.MILLISECONDS)
.map(Long::intValue)
.map(idx -> data1[idx])
.take(data1.length);
Observable<String> source2 = Observable.interval(50L, TimeUnit.MILLISECONDS)
.map(Long::intValue)
.map(idx -> data2[idx])
.take(data2.length);
Observable<String> source = Observable.merge(source1, source2);
source.subscribe(Log::it);
CommonUtils.sleep(1000);
}
}
RxComputationThreadPool-1|1638375525596|value = 1
RxComputationThreadPool-2|1638375525647|value = 2
RxComputationThreadPool-1|1638375525699|value = 3
RxComputationThreadPool-2|1638375525708|value = 4
RxComputationThreadPool-2|1638375525750|value = 6
각각 개별 스레드에서 이뤄지지만, 발행하는대로 결과를 출력한다.
concat()
: 2개 이상의 Observable을 이어붙여주는 연산자
첫 번째 Observable에 onComplete 이벤트가 발생해야 두 번째 Observable을 구독한다. (첫 번째 Observable에 onComplete 이벤트가 발생하지 않게 하면 두 번째 Observable은 영원히 대기한다. 이는 메모리 누수의 위험을 가지고 있기 때문에 입력 Observable이 반드시 완료 될 수 있게 해야한다!!)
- concat() 원형
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> concat( ObservableSource<? extends T> source1, ObservableSource<? extends T> source2)
public class JavaExample {
public static void main(String[] args) {
Action onCompleteAction = () -> Log.d("onComplete()");
String[] data1 = {"1", "3"};
String[] data2 = {"2", "4", "6"};
Observable<String> source1 = Observable.fromArray(data1)
.doOnComplete(onCompleteAction);
Observable<String> source2 = Observable.interval(50L, TimeUnit.MILLISECONDS)
.map(Long::intValue)
.map(idx -> data2[idx])
.take(data2.length)
.doOnComplete(onCompleteAction);
Observable<String> source = Observable.merge(source1, source2)
.doOnComplete(onCompleteAction);
source.subscribe(Log::it);
CommonUtils.sleep(1000);
}
}
main | 1638376018265 | value = 1
main | 1638376018266 | value = 3
main | debug = onComplete()
RxComputationThreadPool-1 | 1638376018322 | value = 2
RxComputationThreadPool-1 | 1638376018372 | value = 4
RxComputationThreadPool-1 | 1638376018420 | value = 6
RxComputationThreadPool-1 | debug = onComplete()
RxComputationThreadPool-1 | debug = onComplete()
첫 번째 Observable에 onComplete 이벤트가 발생하면 두 번째 Observable이 바로 시작되고, 두 번째 Observable을 완료하면 결과 Observable또한 완료된다.
🎯 조건 연산자
: Observable의 흐름을 제어하는 역할, 지금까지의 흐름을 어떻게 제어할 것인지에 초점을 맞추는 연산자
amb()
: ambiguous의 줄임말로 여러 개의 Observable 중에서 1개의 Observable을 선택하는데, 선택 기준은 가장 먼저 데이터를 발행하는 Observable이고 이후에 나머지 Observable에서 발행하는 데이터는 모두 무시한다.
- amb()의 원형
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> amb(Iterable<? extends ObservableSource<? extends T>> sources)
public class JavaExample {
public static void main(String[] args) {
String[] data1 = {"1", "3", "5"};
String[] data2 = {"2-R", "4-R"};
List<Observable<String>> sources = Arrays.asList(
Observable.fromArray(data1)
.doOnComplete(() -> Log.d("Observable #1 : onComplete()")),
Observable.fromArray(data2)
.delay(100L, TimeUnit.MILLISECONDS)
.doOnComplete(() -> Log.d("Observable #2 : onComplete()"))
);
Observable.amb(sources)
.doOnComplete(() -> Log.d("Result : onComplete()"))
.subscribe(Log::i);
CommonUtils.sleep(1000);
}
}
main | value = 1
main | value = 3
main | value = 5
main | debug = Observable #1 : onComplete()
main | debug = Result : onComplete()
1,3,5를 발행하는 Observable과 100ms 간격으로 2-R, 4-R을 발행하는 Observable이 있지만 값을 발행하는 첫 번째 Observable들만 출력한다. 첫 번째 Observable에서 onComplete이벤트가 발생하면 Observable도 최종완료된다.
takeUntil()
: take() 함수에 조건을 설정할 수 있다. 인자로 받은 Observable에서 어떤 값을 발행하면 현재 Observable의 데이터 발행을 중단하고 onComplete이벤트를 발생한다. (즉, take() 처럼 일정 개수만 값을 발행하되 완료 기준을 다른 Observable에서 값을 발행하는지로 판단)
- takeUntil()의 원형
@SchedulerSupport(SchedulerSupport.NONE)
public final <U> Observable<T> takeUntil(ObservableSource<U> other)
takeUntil의 인자로는 값을 발행할 수 있는 other Observable이 필요함! other Observable에서 값이 발행되면 기존 Observable에서 나오는 값을 더이상 발행하지 않고 완료(onComplete())한다.
public class JavaExample {
public static void main(String[] args) {
String[] data = {"1", "2", "3", "4", "5", "6"};
Observable<String> source = Observable.fromArray(data)
.zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (val, notUsed) -> val)
.takeUntil(Observable.timer(500L, TimeUnit.MILLISECONDS));
source.subscribe(Log::i);
CommonUtils.sleep(1000);
}
}
RxComputationThreadPool-2 | value = 1
RxComputationThreadPool-2 | value = 2
RxComputationThreadPool-2 | value = 3
RxComputationThreadPool-2 | value = 4
100ms 간격으로 1 2 3 4 5 가 발행되고, timer 를 사용해서 500ms 후에 값을 발행하도록 했다.
다른 Observable에서 값이 발행되면 기존 Observable은 완료하기때문에 1234만 출력됐다.
skipUntil()
: takeUntil()(발행하다가 발행하지 않음!)과 정반대의 연산자, 다른 Observable을 인자로 받는다는 점은 같지만 Observable에서 데이터를 발행할 때까지 값을 건너뛴다(발행하지 않다가 발행!) .
public class JavaExample {
public static void main(String[] args) {
String[] data = {"1", "2", "3", "4", "5", "6"};
Observable<String> source = Observable.fromArray(data)
.zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (val, notUsed) -> val)
.skipUntil(Observable.timer(500L, TimeUnit.MILLISECONDS));
source.subscribe(Log::i);
CommonUtils.sleep(1000);
}
}
RxComputationThreadPool-2 | value = 5
RxComputationThreadPool-2 | value = 6
all()
: 주어진 조건에 100% 맞을 때만 true를 발행하고 조건에 맞지 않는 데이터가 발행되면 바로 false를 발행한다.
- all()의 원형
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<Boolean> all(Predicate<? super T> predicate)
public class JavaExample {
public static void main(String[] args) {
String[] data = {"1", "2", "3", "4"};
Single<Boolean> source =Observable.fromArray(data)
.map(Shape::getShape)
.all(Shape.BALL::equals);
//.all(val -> Shape.BALL.equals(Shape.getShape(val));
source.subscribe(Log::it);
}
}
main | 1638461303263 | value = true
1 2 3 4 는 모두 getShape라는 함수를 거치면 BALL이라는 값을 얻게되는데, 모두 Shape.BALL 과 같기때문에 true 가 나온다.
🎯 수학 및 기타 연산자
수학 함수
: RxJava 1.x에는 수학 함수들을 모은 RxJavaMath가 있다. 하지만 RxJavaMath는 RxJava 2.x 를 지원하지 않으므로, 다른 라이브러리인 RxJava2Extensions라이브러리를 사용해야한다.
- count() : Observable에서 발행한 데이터의 개수를 발행
- max()
- min()
- sum()
- average
public class JavaExample {
public static void main(String[] args) {
Integer[] data = {1, 2, 3, 4};
//1. count
Single<Long> source = Observable.fromArray(data)
.count();
source.subscribe(count -> Log.i("count is " + count));
//2. max() & min()
Flowable.fromArray(data)
.to(MathFlowable::max)
.subscribe(max -> Log.i("max is " + max));
Flowable.fromArray(data)
.to(MathFlowable::min)
.subscribe(min -> Log.i("min is " + min));
//3. sum() & average
Flowable<Integer> flowable = Flowable.fromArray(data)
.to(MathFlowable::sumInt);
flowable.subscribe(sum -> Log.i("sum is " + sum));
Flowable<Double> flowable2 = Observable.fromArray(data)
.toFlowable(BackpressureStrategy.BUFFER) //Flowable을 생성할 때 활용하는 배압 전략 기본값
.to(MathFlowable::averageDouble);
flowable2.subscribe(avg -> Log.i("average is " + avg));
}
}
main | value = count is 4
main | value = max is 4
main | value = min is 1
main | value = sum is 10
main | value = average is 2.5
delay()
: 인자로 전달받은 time과 시간 단위만큼 입력받은 Observable의 데이터 발행을 지연시켜주는 역할을 하는 연산자(interval(), timer(), defer() 등 앞에서 배운 시간 관련 메소드들은 Observable을 생성하는 역할이였지만 얘는 유틸리티 연산자로서 보조 역할을 한다.)
- delay()의 원형
@SchedulerSupport(SchedulerSupport.COMPUTATION) //별도의 스레드에서 실행
public final Observable<T> delay(long delay, TimeUnit unit)
public class JavaExample {
public static void main(String[] args) {
CommonUtils.exampleStart();
String[] data = {"1", "7", "2", "3", "4"};
Observable<String> source = Observable.fromArray(data)
.delay(100L, TimeUnit.MILLISECONDS);
source.subscribe(Log::it);
CommonUtils.sleep(1000);
}
}
RxComputationThreadPool-1 | 289 | value = 1
RxComputationThreadPool-1 | 290 | value = 7
RxComputationThreadPool-1 | 290 | value = 2
RxComputationThreadPool-1 | 290 | value = 3
RxComputationThreadPool-1 | 290 | value = 4
timeInterval()
: 어떤 값을 발행했을 때 이전 값을 발행한 이후 얼마나 시간이 흘렀는지 알려주는 연산자
- timeInterval()의 원형
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<Timed<T>> timeInterval()
Timed
public class JavaExample {
public static void main(String[] args) {
String[] data = {"1", "3", "7"};
CommonUtils.exampleStart();
Observable<Timed<String>> source = Observable.fromArray(data)
.delay(item -> {
CommonUtils.doSomething();
return Observable.just(item);
})
.timeInterval();
source.subscribe(Log::it);
CommonUtils.sleep(1000);
}
}
main | 311 | value = Timed[time=10, unit=MILLISECONDS, value=1]
main | 339 | value = Timed[time=28, unit=MILLISECONDS, value=3]
main | 371 | value = Timed[time=32, unit=MILLISECONDS, value=7]
발행되는 값 사이의 시간 간격을 알기 위해 무작위로 스레드에 sleep()메서드 실행, 차이 계산해서 출력
Comments