[RxJava] 리액티브 연산자의 활용 -2

7 minute read

🎯 결합 연산자

생성 연산자와 변환 연산자는 1개의 데이터 흐름을 다뤘지만, 결합 연산자는 여러개의 Observable을 조합하여 활용하는 연산자다.

zip

: 각각의 Observable을 모두 활용해 2개 혹은 그 이상의 Observable을 결합하는데 사용하는 연산자, 예를 들어 A B 두 개의 Observable을 결합한다면 2개의 Observable에서 모두 데이터를 발행해야 결합할 수 있다.

  • zip 함수의 원형
@SchedulerSupport(SchedulerSupport.NONE)
public static <T1, T2, R> Observable<R> zip ( ObservableSource<? extends T1> source1, ObservableSource<? extends T2> source2, BiFunction<? super T1, ? extends R> zipper)

source1에 첫 번째 Observable 을 넣고, source2에 두 번째 Observable을 넣은 후 그것을 결합해줄 zipper 변수에 원하는 함수를 넣으면 된다.

public class JavaExample {
    public static void main(String[] args) {
        String[] shapes = {"BALL", "PENTAGON", "STAR"};
        String[] coloredTriangles = {"2-T", "6-T", "4-T"};

        Observable<String> source = Observable.zip(
                Observable.fromArray(shapes).map(Shape::getSuffix),
                Observable.fromArray(coloredTriangles).map(Shape::getColor),
                (suffix , color) -> color + suffix);

        source.subscribe(Log::it);
    }
}
main|1638373433219|value = 2
main|1638373433221|value = 6-P
main|1638373433221|value = 4-S

첫번째 인자로 모양 아이템의 접두사를 가져오고, 두번째 인자로 색깔 아이템에서 숫자를 가져와서 두개를 더해주는 함수를 적용한 결과를 볼 수 있다.

public class JavaExample {
    public static void main(String[] args) {
        Observable<String> source = Observable.zip(
                Observable.just("RED", "GREEN", "BLUE"),
                Observable.interval(200L, TimeUnit.MILLISECONDS),
                (value, time) -> value
        );
        CommonUtils.exampleStart();
        source.subscribe(Log::it);
        CommonUtils.sleep(1000);
    }
}
RxComputationThreadPool-1|214|value = RED
RxComputationThreadPool-1|412|value = GREEN
RxComputationThreadPool-1|613|value = BLUE

이렇게 시간과 문자열처럼 타입이 다른 두가지도 결합할 수 있다. 시간과 결합하면 데이터를 발행하는 시간을 조절할 수 있다.

zipWith()

: zip()과 동일하지만 Observable을 다양한 함수와 조합하면서 틈틈이 호출할 수 있다.

combineLatest()

: 2개 이상의 Observable을 기반으로 Observable 각각의 값이 변경되었을 때 갱신해주는 연산자다.

첫 번째 Observable에서만 데이터를 발행하거나 두 번째 Observable의 데이터 흐름만 있으면 구독자에게 어떤 데이터도 발행하지 않는다. 하지만 두 Observable 모두 값을 발행하면 그때는 결과값이 나오고, 그 다음부터는 둘 중에 어떤 것이 갱신되던지 최신 결과값을 보여준다.

  • combineLatest() 원형
@SchedulerSupport(SchedulerSupport.NONE)
public static <T1, T2, R> Observable<R> combineLatest ( ObservableSource<? extends T1> source1, ObservableSource<? extends T2> source2, BiFunction<? super T1, ? super T2, ? extends R> combiner)

zip()처럼 결합하고자 하는 첫 번째와 두 번째 Observable을 넣고 마지막으로 그걸 결합하는 combiner()함수를 넣으면 된다. 입력할 수 있는 Observable 인자의 개수는 최대 9개다.

public class JavaExample {
    public static void main(String[] args) {
        String[] data1 = {"6", "7", "4", "2"};
        String[] data2 = {"DIAMOND", "STAR", "PENTAGON"};

        Observable<String> source = Observable.combineLatest(
                Observable.fromArray(data1)
                        .zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS),
                                (shape, notUsed) -> Shape.getColor(shape)),
                Observable.fromArray(data2)
                        .zipWith(Observable.interval(150L, 200L, TimeUnit.MILLISECONDS),
                                (shape, notUsed) -> Shape.getSuffix(shape)), (v1, v2) -> v1 + v2);
        source.subscribe(Log::it);
        CommonUtils.sleep(1000);
    }
}
RxComputationThreadPool-2|1638374850570|value = 6<>
RxComputationThreadPool-1|1638374850620|value = 7<>
RxComputationThreadPool-1|1638374850720|value = 4<>
RxComputationThreadPool-2|1638374850771|value = 4-S
RxComputationThreadPool-1|1638374850823|value = 2-S
RxComputationThreadPool-2|1638374850968|value = 2-P

첫 번째 데이터와 두 번째 데이터의 발행 시간이 달라 개수(첫번째는 100ms마다, 두번째는 150ms 쉬고 200ms 마다 발행)가 일치하지 않아도 zip()함수와는 다르게 어느 1개의 값만 변경되어도 결과가 발행된다.

merge()

: 입력 Observable의 순서와 모든 Observable이 데이터를 발행하는지 등에 관여하지 않고 어느 것이든 업스트림에서 먼저 입력되는 데이터를 그대로 발행

public class JavaExample {
    public static void main(String[] args) {
        String[] data1 = {"1", "3"};
        String[] data2 = {"2", "4", "6"};

        Observable<String> source1 = Observable.interval(0L, 100L, TimeUnit.MILLISECONDS)
                .map(Long::intValue)
                .map(idx -> data1[idx])
                .take(data1.length);
        Observable<String> source2 = Observable.interval(50L, TimeUnit.MILLISECONDS)
                .map(Long::intValue)
                .map(idx -> data2[idx])
                .take(data2.length);

        Observable<String> source = Observable.merge(source1, source2);
        source.subscribe(Log::it);
        CommonUtils.sleep(1000);
    }
}
RxComputationThreadPool-1|1638375525596|value = 1
RxComputationThreadPool-2|1638375525647|value = 2
RxComputationThreadPool-1|1638375525699|value = 3
RxComputationThreadPool-2|1638375525708|value = 4
RxComputationThreadPool-2|1638375525750|value = 6

각각 개별 스레드에서 이뤄지지만, 발행하는대로 결과를 출력한다.

concat()

: 2개 이상의 Observable을 이어붙여주는 연산자

첫 번째 Observable에 onComplete 이벤트가 발생해야 두 번째 Observable을 구독한다. (첫 번째 Observable에 onComplete 이벤트가 발생하지 않게 하면 두 번째 Observable은 영원히 대기한다. 이는 메모리 누수의 위험을 가지고 있기 때문에 입력 Observable이 반드시 완료 될 수 있게 해야한다!!)

  • concat() 원형
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> concat( ObservableSource<? extends T> source1, ObservableSource<? extends T> source2)
public class JavaExample {
    public static void main(String[] args) {
        Action onCompleteAction = () -> Log.d("onComplete()");

        String[] data1 = {"1", "3"};
        String[] data2 = {"2", "4", "6"};

        Observable<String> source1 = Observable.fromArray(data1)
                .doOnComplete(onCompleteAction);
        Observable<String> source2 = Observable.interval(50L, TimeUnit.MILLISECONDS)
                .map(Long::intValue)
                .map(idx -> data2[idx])
                .take(data2.length)
                .doOnComplete(onCompleteAction);

        Observable<String> source = Observable.merge(source1, source2)
                .doOnComplete(onCompleteAction);

        source.subscribe(Log::it);
        CommonUtils.sleep(1000);
    }
}
main | 1638376018265 | value = 1
main | 1638376018266 | value = 3
main | debug = onComplete()
RxComputationThreadPool-1 | 1638376018322 | value = 2
RxComputationThreadPool-1 | 1638376018372 | value = 4
RxComputationThreadPool-1 | 1638376018420 | value = 6
RxComputationThreadPool-1 | debug = onComplete()
RxComputationThreadPool-1 | debug = onComplete()

첫 번째 Observable에 onComplete 이벤트가 발생하면 두 번째 Observable이 바로 시작되고, 두 번째 Observable을 완료하면 결과 Observable또한 완료된다.

🎯 조건 연산자

: Observable의 흐름을 제어하는 역할, 지금까지의 흐름을 어떻게 제어할 것인지에 초점을 맞추는 연산자

amb()

: ambiguous의 줄임말로 여러 개의 Observable 중에서 1개의 Observable을 선택하는데, 선택 기준은 가장 먼저 데이터를 발행하는 Observable이고 이후에 나머지 Observable에서 발행하는 데이터는 모두 무시한다.

  • amb()의 원형
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> amb(Iterable<? extends ObservableSource<? extends T>> sources)
public class JavaExample {
    public static void main(String[] args) {
        String[] data1 = {"1", "3", "5"};
        String[] data2 = {"2-R", "4-R"};

        List<Observable<String>> sources = Arrays.asList(
                Observable.fromArray(data1)
                .doOnComplete(() -> Log.d("Observable #1 : onComplete()")),
                Observable.fromArray(data2)
												.delay(100L, TimeUnit.MILLISECONDS)
                        .doOnComplete(() -> Log.d("Observable #2 : onComplete()"))
        );
        Observable.amb(sources)
                .doOnComplete(() -> Log.d("Result : onComplete()"))
                .subscribe(Log::i);
        CommonUtils.sleep(1000);
    }
}
main | value = 1
main | value = 3
main | value = 5
main | debug = Observable #1 : onComplete()
main | debug = Result : onComplete()

1,3,5를 발행하는 Observable과 100ms 간격으로 2-R, 4-R을 발행하는 Observable이 있지만 값을 발행하는 첫 번째 Observable들만 출력한다. 첫 번째 Observable에서 onComplete이벤트가 발생하면 Observable도 최종완료된다.

takeUntil()

: take() 함수에 조건을 설정할 수 있다. 인자로 받은 Observable에서 어떤 값을 발행하면 현재 Observable의 데이터 발행을 중단하고 onComplete이벤트를 발생한다. (즉, take() 처럼 일정 개수만 값을 발행하되 완료 기준을 다른 Observable에서 값을 발행하는지로 판단)

  • takeUntil()의 원형
@SchedulerSupport(SchedulerSupport.NONE)
public final <U> Observable<T> takeUntil(ObservableSource<U> other)

takeUntil의 인자로는 값을 발행할 수 있는 other Observable이 필요함! other Observable에서 값이 발행되면 기존 Observable에서 나오는 값을 더이상 발행하지 않고 완료(onComplete())한다.

public class JavaExample {
    public static void main(String[] args) {
        String[] data = {"1", "2", "3", "4", "5", "6"};

        Observable<String> source = Observable.fromArray(data)
                .zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (val, notUsed) -> val)
                .takeUntil(Observable.timer(500L, TimeUnit.MILLISECONDS));

        source.subscribe(Log::i);
        CommonUtils.sleep(1000);
    }
}
RxComputationThreadPool-2 | value = 1
RxComputationThreadPool-2 | value = 2
RxComputationThreadPool-2 | value = 3
RxComputationThreadPool-2 | value = 4

100ms 간격으로 1 2 3 4 5 가 발행되고, timer 를 사용해서 500ms 후에 값을 발행하도록 했다.

다른 Observable에서 값이 발행되면 기존 Observable은 완료하기때문에 1234만 출력됐다.

skipUntil()

: takeUntil()(발행하다가 발행하지 않음!)과 정반대의 연산자, 다른 Observable을 인자로 받는다는 점은 같지만 Observable에서 데이터를 발행할 때까지 값을 건너뛴다(발행하지 않다가 발행!) .

public class JavaExample {
    public static void main(String[] args) {
        String[] data = {"1", "2", "3", "4", "5", "6"};

        Observable<String> source = Observable.fromArray(data)
                .zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (val, notUsed) -> val)
                .skipUntil(Observable.timer(500L, TimeUnit.MILLISECONDS));

        source.subscribe(Log::i);
        CommonUtils.sleep(1000);
    }
}
RxComputationThreadPool-2 | value = 5
RxComputationThreadPool-2 | value = 6

all()

: 주어진 조건에 100% 맞을 때만 true를 발행하고 조건에 맞지 않는 데이터가 발행되면 바로 false를 발행한다.

  • all()의 원형
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<Boolean> all(Predicate<? super T> predicate)
public class JavaExample {
    public static void main(String[] args) {
        String[] data = {"1", "2", "3", "4"};

        Single<Boolean> source =Observable.fromArray(data)
                .map(Shape::getShape)
                .all(Shape.BALL::equals);
                //.all(val -> Shape.BALL.equals(Shape.getShape(val));
        source.subscribe(Log::it);
    }
}
main | 1638461303263 | value = true

1 2 3 4 는 모두 getShape라는 함수를 거치면 BALL이라는 값을 얻게되는데, 모두 Shape.BALL 과 같기때문에 true 가 나온다.

🎯 수학 및 기타 연산자

수학 함수

: RxJava 1.x에는 수학 함수들을 모은 RxJavaMath가 있다. 하지만 RxJavaMath는 RxJava 2.x 를 지원하지 않으므로, 다른 라이브러리인 RxJava2Extensions라이브러리를 사용해야한다.

  • count() : Observable에서 발행한 데이터의 개수를 발행
  • max()
  • min()
  • sum()
  • average
public class JavaExample {
    public static void main(String[] args) {
        Integer[] data = {1, 2, 3, 4};

        //1. count
        Single<Long> source = Observable.fromArray(data)
                .count();
        source.subscribe(count -> Log.i("count is " + count));

        //2. max() & min()
        Flowable.fromArray(data)
                .to(MathFlowable::max)
                .subscribe(max -> Log.i("max is " + max));

        Flowable.fromArray(data)
                .to(MathFlowable::min)
                .subscribe(min -> Log.i("min is " + min));

        //3. sum() & average
        Flowable<Integer> flowable = Flowable.fromArray(data)
                .to(MathFlowable::sumInt);
        flowable.subscribe(sum -> Log.i("sum is " + sum));

        Flowable<Double> flowable2 = Observable.fromArray(data)
                .toFlowable(BackpressureStrategy.BUFFER) //Flowable을 생성할 때 활용하는 배압 전략 기본값 
                .to(MathFlowable::averageDouble);
        flowable2.subscribe(avg -> Log.i("average is " + avg));

    }
}
main | value = count is 4
main | value = max is 4
main | value = min is 1
main | value = sum is 10
main | value = average is 2.5

delay()

: 인자로 전달받은 time과 시간 단위만큼 입력받은 Observable의 데이터 발행을 지연시켜주는 역할을 하는 연산자(interval(), timer(), defer() 등 앞에서 배운 시간 관련 메소드들은 Observable을 생성하는 역할이였지만 얘는 유틸리티 연산자로서 보조 역할을 한다.)

  • delay()의 원형
@SchedulerSupport(SchedulerSupport.COMPUTATION) //별도의 스레드에서 실행 
public final Observable<T> delay(long delay, TimeUnit unit)
public class JavaExample {
    public static void main(String[] args) {
				CommonUtils.exampleStart();
        String[] data = {"1", "7", "2", "3", "4"};
        Observable<String> source = Observable.fromArray(data)
                .delay(100L, TimeUnit.MILLISECONDS);
        source.subscribe(Log::it);
        CommonUtils.sleep(1000);
    }
}
RxComputationThreadPool-1 | 289 | value = 1
RxComputationThreadPool-1 | 290 | value = 7
RxComputationThreadPool-1 | 290 | value = 2
RxComputationThreadPool-1 | 290 | value = 3
RxComputationThreadPool-1 | 290 | value = 4

timeInterval()

: 어떤 값을 발행했을 때 이전 값을 발행한 이후 얼마나 시간이 흘렀는지 알려주는 연산자

  • timeInterval()의 원형
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<Timed<T>> timeInterval()

Timed 객체에서는 시간을 얻어오거나 Observable의 데이터를 얻을 수 있는 메서드를 제공할 수 있다.

public class JavaExample {
    public static void main(String[] args) {
        String[] data = {"1", "3", "7"};

        CommonUtils.exampleStart();
        Observable<Timed<String>> source = Observable.fromArray(data)
                .delay(item -> {
                    CommonUtils.doSomething();
                    return Observable.just(item);
                })
                .timeInterval();

        source.subscribe(Log::it);
        CommonUtils.sleep(1000);
    }
}
main | 311 | value = Timed[time=10, unit=MILLISECONDS, value=1]
main | 339 | value = Timed[time=28, unit=MILLISECONDS, value=3]
main | 371 | value = Timed[time=32, unit=MILLISECONDS, value=7]

발행되는 값 사이의 시간 간격을 알기 위해 무작위로 스레드에 sleep()메서드 실행, 차이 계산해서 출력

Comments