[RxJava] 리액티브 연산자의 활용 -1

12 minute read

이번 챕터에서는 다양한 리액티브 연산자들을 카테고리별로 알아본다 :)

🎯  생성 연산자

: 생성 연산자의 역할은 데이터 흐름을 만드는 것이다. ex) Observable(Observable, Single, Maybe)

interval()

: 일정 시간 간격으로 데이터 흐름을 생성, 주어진 시간 간격으로 0부터 1씩 증가하는 Long객체를 발행

  • interval()함수의 원형

1)

@SchedulerSupport(SchedulerSupport.COMPUTATION) //계산 스케줄러에서 실행
public static Observable<Long> interval(long period, TimeUnit unit)

일정 시간(period)을 쉬었다가 데이터를 발행

public class FirstExample {
    public static void main(String args[]) {
        CommonUtils.exampleStart();
        Observable<Long> source = Observable.interval(100L, TimeUnit.MILLISECONDS)
                .map(data -> (data + 1) * 100)
                .take(5);
				source.subscribe(Log::it);
        CommonUtils.sleep(1000); //주석 처리되면 main스레드에서 기다려주지않기때문에 실행 종료 
    }
}
//계산 스케줄러에서 실행할 때 사용하는 스레드 이름 |실행 시작 시간부터 지금까지의 실행 시간(ms)|interval()함수 값
RxComputationThreadPool-1|241|value = 100  
RxComputationThreadPool-1|338|value = 200
RxComputationThreadPool-1|439|value = 300
RxComputationThreadPool-1|536|value = 400
RxComputationThreadPool-1|638|value = 500

source는 100ms 간격으로 0부터 데이터를 발행한 후 map()를 통해 1을 더하고 100을 곱한다. CommonUtils.sleep(1000)을 통해 100~1000이 출력되지만 take(5)를 통해서 100 ~500 까지만 출력한다.

  • CommonUtils

      class CommonUtils{
        
          public static long startTime;
        
          public static void exampleStart(){ //시작 시각을 표시 
              startTime = System.currentTimeMillis();
          }
        
          public static void exampleComplete(){
              startTime = System.currentTimeMillis();
          }
        
          public static void sleep(int mills){ 
              try{
                  Thread.sleep(mills);
              }catch (InterruptedException e){
                  e.printStackTrace();
              }
          }
        
          public static String getThreadName() {
              String threadName = Thread.currentThread().getName();
              if (threadName.length() > 30) {
                  threadName = threadName.substring(0, 30) + "...";
              }
              return threadName;
          }
        
          public static void doSomething(){
              try{
                  Thread.sleep(new Random().nextInt(100));
              }catch(InterruptedException e){
                  e.printStackTrace();
              }
          }
      }
    

→ interval()함수는 COMPUTATION스케줄러에서 실행하기 때문에 (메인과 다른 스레드) CommonUtils의 Thread.sleep 을 사용해서 mainThread가 바로 종료되지 않도록 기다려줘야한다.

  • Log

      class Log { //로그를 출력할 때 실행되는 스레드 이름과 실행 시간을 표시 
          public static void it(Object obj){
              long time = System.currentTimeMillis() - CommonUtils.startTime;
              System.out.println(CommonUtils.getThreadName() + "|" + time + "|" + "value = "+ obj);
          }
      }
    

2)

@SchedulerSupport(SchedulerSupport.COMPUTATION)
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit)

첫번째 원형과 동작은 같지만, 최초 지연 시간을 조절할 수 있다는 점이 다르다! 보통 초기 지연 시간 없이 바로 데이터를 발행하기 위해서 사용된다.(initialDelay = 0)

public class FirstExample {
    public static void main(String args[]) {
        CommonUtils.exampleStart();
        Observable<Long> source = Observable.interval(0L,100L, TimeUnit.MILLISECONDS)
                .map(data -> (data + 1) * 100)
                .take(5);
        source.subscribe(Log::it);
        CommonUtils.sleep(1000);
    }
}
//계산 스케줄러에서 실행할 때 사용하는 스레드 이름 |실행 시작 시간부터 지금까지의 실행 시간(ms)|interval()함수 값
RxComputationThreadPool-1|147|value = 100
RxComputationThreadPool-1|250|value = 200
RxComputationThreadPool-1|350|value = 300
RxComputationThreadPool-1|451|value = 400
RxComputationThreadPool-1|551|value = 500

앞의 예제와 다르게 interval 의 initialDelay에 0을 넣어줬다. 큰 차이는 없지만, 초기 지연값이 100ms 에서 0ms로 줄었기 때문에 맨 처음 실행 시작 시간부터 지금까지의 실행시간이 241 → 147 ms로 거의 100ms 가 줄었다.

timer()

: interval()함수와 유사하지만 한 번만 실행하는 함수이다.

일정 시간이 지난 후에 한 개의 데이터를 발행하고 onComplete() 이벤트가 발생한다.

  • timer()함수의 원형
@SchedulerSupport(SchedulerSupport.COMPUTATION) 
public static Observable<Long> timer(long delay, TimeUnit unit)

계산 스케쥴러에서 실행되는 부분과 발행되는 데이터도 0L인걸 보면 interval()함수와 매우 비슷하다.

보통 일정 시간이 지난 후 어떤 동작을 실행할 때 활용한다.

public class FirstExample {
    public static void main(String args[]) {
        CommonUtils.exampleStart();
        Observable<String> source = Observable.timer(500L, TimeUnit.MILLISECONDS)
                .map(notUsed -> {
                    return new SimpleDateFormat("yyyy//MM//dd HH:mm:ss")
                            .format(new Date());
                });
        source.subscribe(Log::it);
        CommonUtils.sleep(1000);
    }
}
RxComputationThreadPool-1|758|value = 2021//12//01 17:07:30

500ms 후 함수가 호출된 날짜와 시간을 출력한다. (interval()이랑 다르게 여러번 호출하지 않고 일정 시간이 지난 후 딱 한번 호출하고 끝난다.)

range()

: 주어진 값(n)부터 m개의 Integer 객체를 발행한다. 앞서 interval()와 timer()함수는 Long 객체를 발행했지만 ragne()함수는 Integer객체를 발행하는 것이 다르다.

  • range()함수의 원형
@SchedulerSupport(SchedulerSupport.NONE) 
public static Observable<Integer> ragne(final int start, final int count)

계산 스케줄러에서 사용되는 interval(), timer()와는 다르게 스케줄러에서 실행되지 않고 현재 스레드에서 실행된다. 그리고 range()함수는 for, while등 반복문을 대체할 수 있다.

public class FirstExample {
    public static void main(String args[]) {
        Observable<Integer> source = Observable.range(1, 10)
                .filter(number -> number %2 ==0);
        source.subscribe(System.out::println);
    }
}
2
4
6
8
10

source에서는 1부터 10까지를 발행하고, filter 에서 짝수만 거르기 때문에 2 4 6 8 10이라는 결과가 나왔다.

intervalRange()

: interval() + range()로 interval()함수 처럼 일정한 시간 간격으로 값을 출력하지만 range() 함수처럼 시작 숫자(n)로 부터 m개만큼의 값만 생성하고 onComplete()이벤트가 발생한다. (interval() 함수처럼 무한히 데이터 흐름을 발행하지 않음) return 타입은 Long !

  • intervalRange()함수의 원형
@SchedulerSupport(SchedulerSupport.COMPUTATION) 
public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit) { }
public class FirstExample {
    public static void main(String args[]) {
        CommonUtils.exampleStart();
        Observable<Long> source = Observable.intervalRange(1,
                5,
                100L,
                100L,
                TimeUnit.MILLISECONDS);
        source.subscribe(Log::it) ;
        CommonUtils.sleep(1000);
    }
}
RxComputationThreadPool-1|297|value = 1
RxComputationThreadPool-1|400|value = 2
RxComputationThreadPool-1|499|value = 3
RxComputationThreadPool-1|598|value = 4
RxComputationThreadPool-1|699|value = 5

intervalRange()는 계산 스케줄러에서 실행되기 때문에 Thread.sleep(1000)을 걸어주고, 초기 지연시간 100ms 후에 100ms의 간격으로 1부터 5까지 출력한다.

interval() 함수로 intervalRange()만들기

public class FirstExample {
    public static void main(String args[]) {
        CommonUtils.exampleStart();
        Observable<Long> source = Observable.interval(100L, TimeUnit.MILLISECONDS)
                .map(val -> val + 1)
                .take(5);
        source.subscribe(Log::it);
        CommonUtils.sleep(1000);
    }
}
RxComputationThreadPool-1|298|value = 1
RxComputationThreadPool-1|399|value = 2
RxComputationThreadPool-1|494|value = 3
RxComputationThreadPool-1|598|value = 4
RxComputationThreadPool-1|694|value = 5

interval()함수를 통해 100ms 에 한번씩 0부터 값을 발행한다. 발행된 값은 map 을 통해 1을 증가시키고 take(5)를 통해 5개의 값을 출력한다.

interval(), map(), take()를 사용해서 앞의 예제에서 intervalRange()를 사용한것과 결과가 똑같은 결과를 만들었다. intervalRange의 인자는 5개나 되기때문에 이렇게 여러 리액티브 연산자들을 조합해서 원하는 기능을 만들어내는게 중요하다고 한다!!

defer()

: timer() 함수와 비슷하지만, 데이터 흐름 생성을 구독자가 subscribe()함수를 호출할 때까지 미룰 수 있다.

( = 최신 데이터를 얻을 수 있다)

  • defer()함수의 원형
@SchedulerSupport(SchedulerSupport.NONE) 
public static <T> Observable<T> defer( Callable<? extends ObservableSource<? extends T>> supplier) { }

스케줄러가 NONE 이기 때문에 현재 스레드에서 실행되고 인자로는 Callable<Observable>를 받기때문에 구독자가 subscribe()를 호출할 때까지 call()메서드의 호출을 미룰 수 있다.

public class FirstExample{
    Iterator<String> colors = Arrays.asList(Shape.RED, Shape.GREEN, Shape.BLUE, Shape.PUPPLE).iterator();
    
    public void marbleDiagram() {
//        Callable<Observable<String>> supplier = () -> getObservable();
//        Observable<String> source = Observable.defer(supplier); 에러남

        Observable<String> source = Observable.defer(() -> getObservable());
        source.subscribe(val -> System.out.println("Subscriber #1:" + val));
        source.subscribe(val -> System.out.println("Subscriber #2:" + val));
        source.subscribe(val -> System.out.println("Subscriber #3:" + val));
        source.subscribe(val -> System.out.println("Subscriber #4:" + val));

    }

    private Observable<String> getObservable() {
        if (colors.hasNext()) {
            System.out.println("아이템 발행");
            String color = colors.next();
            return Observable.just(
                    Shape.getString(color, Shape.BALL),
                    Shape.getString(color, Shape.RECTANGLE),
                    Shape.getString(color, Shape.PENTAGON));
        }
        return Observable.empty();
    }

    public static void main(String[] args) {
        FirstExample demo = new FirstExample();
        demo.marbleDiagram();
    }
}
아이템 발행
Subscriber #1:1
Subscriber #1:1-R
Subscriber #1:1-P
아이템 발행
Subscriber #2:3
Subscriber #2:3-R
Subscriber #2:3-P
아이템 발행
Subscriber #3:5
Subscriber #3:5-R
Subscriber #3:5-P
아이템 발행
Subscriber #4:6
Subscriber #4:6-R
Subscriber #4:6-P

FirstExample()객체를 만들어 marbleDiagram을 호출하면 getObservable()이 호출되고, colors의 아이템 하나씩 거쳐 Shape.getString 으로 3가지가 발행된다. (getString 은 Shape에 존재하는 도형일때 제일 앞글자를 떼온다. color는 숫자로 값이 정해져있기때문에 위의 값이 나온다. ex) RED = “1” 이고, BALL은 존재하지 않는 도형이라 1 , RECTANGLE은 존재하는 도형이기때문에 1-R, PENTAGON도 존재하는 도형이라 1-P 가 리턴된다.)

defer함수는 subscribe()함수를 호출 해야 데이터 흐름을 발행하기 때문에 첫번째 구독자가 구독했을때 1 1-R 1-P가 발행되고, 두 번째 구독자가 구독했을때 3 3-R 3-P라는 값이 발행된다.

( 옵저버가 subscribe() 함수를 호출할 때까지 stream 생성을 미루고, 호출될 때마다 새로운 Observable을 생성하여 데이터를 보낸다. → stream 생성을 지연하는 효과를 얻을 수 있음)

여기서 defer가 아닌 그냥 just로 호출했을때는 어떤 결과가 나올까??

public class FirstExample{
    Iterator<String> colors = Arrays.asList(Shape.RED, Shape.GREEN, Shape.BLUE, Shape.PUPPLE).iterator();

    public void marbleDiagram() {
//        Callable<Observable<String>> supplier = () -> getObservable();
//        Observable<String> source = Observable.defer(supplier); 에러남

        Observable<String> source = getObservable();
        source.subscribe(val -> System.out.println("Subscriber #1:" + val));
        source.subscribe(val -> System.out.println("Subscriber #2:" + val));
        source.subscribe(val -> System.out.println("Subscriber #3:" + val));
        source.subscribe(val -> System.out.println("Subscriber #4:" + val));

    }

    private Observable<String> getObservable() {
        if (colors.hasNext()) {
            System.out.println("아이템 발행");
            String color = colors.next();
            return Observable.just(
                    Shape.getString(color, Shape.BALL),
                    Shape.getString(color, Shape.RECTANGLE),
                    Shape.getString(color, Shape.PENTAGON));
        }
        return Observable.empty();
    }

    public static void main(String[] args) {
        FirstExample demo = new FirstExample();
        demo.marbleDiagram();
    }
}
아이템 발행
Subscriber #1:1
Subscriber #1:1-R
Subscriber #1:1-P
Subscriber #2:1
Subscriber #2:1-R
Subscriber #2:1-P
Subscriber #3:1
Subscriber #3:1-R
Subscriber #3:1-P
Subscriber #4:1
Subscriber #4:1-R
Subscriber #4:1-P

맨 처음 getObservable()로 데이터가 발행된게 딱 하나이기 때문에 구독자 1234 모두 1 1-R 1-P를 출력하는걸 볼 수 있다.

public void marbleDiagram() {
        Observable<String> source = getObservable();
        source.subscribe(val -> System.out.println("Subscriber #1:" + val));
        Observable<String> source2 = getObservable();
        source2.subscribe(val -> System.out.println("Subscriber #2:" + val));
        Observable<String> source3 = getObservable();
        source3.subscribe(val -> System.out.println("Subscriber #3:" + val));
        Observable<String> source4 = getObservable();
        source4.subscribe(val -> System.out.println("Subscriber #4:" + val));
    }
아이템 발행
Subscriber #1:1
Subscriber #1:1-R
Subscriber #1:1-P
아이템 발행
Subscriber #2:3
Subscriber #2:3-R
Subscriber #2:3-P
아이템 발행
Subscriber #3:5
Subscriber #3:5-R
Subscriber #3:5-P
아이템 발행
Subscriber #4:6
Subscriber #4:6-R
Subscriber #4:6-P

이렇게 구독 전 하나씩 발행하면 defer 와 같은 값을 얻을 수 있다. (이럴바엔 defer 쓰는게 나을듯!)

repeat()

: 단순히 반복 실행을 하는 연산자

public class FirstExample{
    public static void main(String[] args) {
       String[] balls = {"1", "3", "5"};
       Observable<String> source = Observable.fromArray(balls)
               .repeat(3);
       source.doOnComplete(() -> System.out.println("반복 끝~")).subscribe(System.out::println);
    }
}
1
3
5
1
3
5
1
3
5
반복 끝~

1,3,5를 가지고 있는 balls 배열을 3번 반복해서 발행하고 있다.

❗️ repeat()에 인자를 입력하지 않으면 무한반복되니 주의

서버 연동 실습

public class JavaExample {
    public static void main(String[] args) {
       CommonUtils.exampleStart();
       String serverUrl = "https://api.github.com/zen";

       Observable.timer(2, TimeUnit.MILLISECONDS)
               .map(val -> serverUrl)
               .map(OkHttpHelper::get)
               .repeat()
               .subscribe(res -> Log.it("Ping Result: " + res));
       CommonUtils.sleep(10000);
    }
}
RxComputationThreadPool-4|719|value = Ping Result: {"message":"API rate limit exceeded for 1.227.114.223. (But here's the good news: Authenticated requests get a higher rate limit. Check out the documentation for more details.)","documentation_url":"https://docs.github.com/rest/overview/resources-in-the-rest-api#rate-limiting"}

RxComputationThreadPool-5|734|value = Ping Result: {"message":"API rate limit exceeded for 1.227.114.223. (But here's the good news: Authenticated requests get a higher rate limit. Check out the documentation for more details.)","documentation_url":"https://docs.github.com/rest/overview/resources-in-the-rest-api#rate-limiting"}

RxComputationThreadPool-6|746|value = Ping Result: {"message":"API rate limit exceeded for 1.227.114.223. (But here's the good news: Authenticated requests get a higher rate limit. Check out the documentation for more details.)","documentation_url":"https://docs.github.com/rest/overview/resources-in-the-rest-api#rate-limiting"}

RxComputationThreadPool-7|760|value = Ping Result: {"message":"API rate limit exceeded for 1.227.114.223. (But here's the good news: Authenticated requests get a higher rate limit. Check out the documentation for more details.)","documentation_url":"https://docs.github.com/rest/overview/resources-in-the-rest-api#rate-limiting"}
  • OkHttpHelper

      public class OkHttpHelper {
          private static OkHttpClient client = new OkHttpClient();
          public static String ERROR = "ERROR";
        
          public static String get(String url) throws IOException {
              Request request = new Request.Builder()
                      .url(url)
                      .build();
              try {
                   Response res = client.newCall(request).execute();
                  return res.body().string();
              } catch (IOException e) {
                  Log.it(e.getMessage());
                  throw e;
              }
          }
      }
    

repeat을 사용해서 2초마다 OkHttpHelper get 메서드를 실행해서 serverUrl에 저장된 URL정보를 얻어오는 코드이다. 해당 코드는 repeat() 인자에 아무것도 안넣어놔서 직접 중단해야 멈추지만, 서버와 지속적으로 통신하기 위해 repet을 사용하면 좋을거같다.

repeat()은 동작이 한 번 끝난 다음에 다시 구독하는 방식으로 동작한다. 하지만 다시 구독할 때마다 동작하는 스레드의 번호가 달라진다. 동작하는 스레드를 동일하게 맞추려면 timer()와 repeat()함수를 빼고 interval()함수를 대신 넣어 호출하면 된다.

public class JavaExample {
    public static void main(String[] args) {
       CommonUtils.exampleStart();
       String serverUrl = "https://api.github.com/zen";

       Observable.interval(2, TimeUnit.MILLISECONDS)
               .map(val -> serverUrl)
               .map(OkHttpHelper::get)
               .subscribe(res -> Log.it("Ping Result: " + res));
       CommonUtils.sleep(10000);
    }
}
RxComputationThreadPool-1|3847|value = Ping Result: {"message":"API rate limit exceeded for 1.227.114.223. (But here's the good news: Authenticated requests get a higher rate limit. Check out the documentation for more details.)","documentation_url":"https://docs.github.com/rest/overview/resources-in-the-rest-api#rate-limiting"}

RxComputationThreadPool-1|3856|value = Ping Result: {"message":"API rate limit exceeded for 1.227.114.223. (But here's the good news: Authenticated requests get a higher rate limit. Check out the documentation for more details.)","documentation_url":"https://docs.github.com/rest/overview/resources-in-the-rest-api#rate-limiting"}

RxComputationThreadPool-1|3867|value = Ping Result: {"message":"API rate limit exceeded for 1.227.114.223. (But here's the good news: Authenticated requests get a higher rate limit. Check out the documentation for more details.)","documentation_url":"https://docs.github.com/rest/overview/resources-in-the-rest-api#rate-limiting"}

RxComputationThreadPool-1|3880|value = Ping Result: {"message":"API rate limit exceeded for 1.227.114.223. (But here's the good news: Authenticated requests get a higher rate limit. Check out the documentation for more details.)","documentation_url":"https://docs.github.com/rest/overview/resources-in-the-rest-api#rate-limiting"}

timer()와 repeat()대신 interval로 바꿔주니 4 5 6 7 이렇게 계속 바뀌던 스레드가 , 다시 호출해도 바뀌지 않고 1로 고정되어 있다.

🎯  변환 연산자

: 데이터 흐름을 원하는 대로 변형할 수 있는 연산자

concatMap()

: flatMap()과 비슷하지만, 인터리빙 없이 먼저 들어온 데이터 순서대로 처리해서 결과를 낼 수 있도록 보장해줌

인터리빙(끼어들기)

먼저 들어온 데이터를 처리하는 도중에 새로운 데이터가 들어오면 나중에 들어온 데이터의 처리 결과가 먼저 출력될 수도 있음

public class JavaExample {
    public static void main(String[] args) {
        CommonUtils.exampleStart();

        String[] balls = {"1", "3", "5"}; 
        Observable<String> source = Observable.interval(100L, TimeUnit.MILLISECONDS)
                .map(Long::intValue)//0 1 2...-> int 로 변경 
                .map(idx -> balls[idx]) //1 3 5 쭉쭉
                .take(balls.length)// 1 3 5 까지만 딱 짜르기
                .concatMap(ball -> Observable.interval(200L, TimeUnit.MILLISECONDS)
                        .map(notUsed -> ball + "A")
                        .take(2)); //A붙은거 2번출력 
        source.subscribe(Log::it);
        CommonUtils.sleep(2000);
    }
}
RxComputationThreadPool-2|506|value = 1A
RxComputationThreadPool-2|705|value = 1A
RxComputationThreadPool-3|909|value = 3A
RxComputationThreadPool-3|1109|value = 3A
RxComputationThreadPool-4|1314|value = 5A
RxComputationThreadPool-4|1515|value = 5A

1 3 5 의 발생이 100ms 간격이지만 A를 붙이는게 200ms 간격이라 입력과 출력의 순서가 역전될 수 있는데, 이걸 concatMap으로 잡아 들어온 순서대로 출력될 수 있도록 해줬다. 하지만 flatMap을 쓰면 ?

public class JavaExample {
    public static void main(String[] args) {
        CommonUtils.exampleStart();

        String[] balls = {"1", "3", "5"};
        Observable<String> source = Observable.interval(100L, TimeUnit.MILLISECONDS)
                .map(Long::intValue)
                .map(idx -> balls[idx]) 
                .take(balls.length)
                .flatMap(ball -> Observable.interval(200L, TimeUnit.MILLISECONDS)
                        .map(notUsed -> ball + "A")
                        .take(2));
        source.subscribe(Log::it);
        CommonUtils.sleep(2000);
    }
}
RxComputationThreadPool-2|468|value = 1A
RxComputationThreadPool-3|568|value = 3A
RxComputationThreadPool-4|666|value = 5A
RxComputationThreadPool-2|671|value = 1A
RxComputationThreadPool-3|765|value = 3A
RxComputationThreadPool-4|868|value = 5A

인터리빙을 허용하기 때문에 실행되는 시간은 훨신 빨랐지만, 이렇게 입력된 순서가 보장되지 않을 수 있다는걸 알 수 있다.

(100ms를 딱딱 지켜서 순서가 항상 보장될줄 알았는데, Observable에 의하여 방출된 아이템들은 아이템별로 새로운 Observable을 생성하고 각각의 stream을 가지기 때문에 별도의 lifecycle를 가지게 되고, 그래서 랜덤으로 생성된 second delay로 인하여 어떤 아이템은 빨리 방출되고 어떤 아이템은 늦게 방출된다고 한다..ㅎㅎ)

switchMap()

: concatMap()함수가 인터리빙이 발생할 수 있는 상황에서 동작의 순서를 보장해준다면, switchMap()함수는 순서를 보장하기 위해 기존에 진행중이던 작업을 바로 중단한다. 중간에 끊기더라도 마지막 데이터의 처리는 보장하기 때문에, 주로 여러 개의 값이 발행되었을 때 마지막에 들어온 값만 처리하고 싶을 때 사용한다.

public class JavaExample {
    public static void main(String[] args) {
        CommonUtils.exampleStart();

        String[] balls = {"1", "3", "5"};
        Observable<String> source = Observable.interval(100L, TimeUnit.MILLISECONDS)
                .map(Long::intValue)
                .map(idx -> balls[idx])
                .take(balls.length)
                .doOnNext(Log::it)
                .switchMap(ball -> Observable.interval(200L, TimeUnit.MILLISECONDS)
                        .map(notUsed -> ball + "A")
                        .take(2));
        source.subscribe(Log::it);
        CommonUtils.sleep(2000);
    }
}
RxComputationThreadPool-1|344|value = 1
RxComputationThreadPool-1|444|value = 3
RxComputationThreadPool-1|548|value = 5
RxComputationThreadPool-4|750|value = 5A
RxComputationThreadPool-4|951|value = 5A

중간 결과를 살펴보기 위해서 doOnNext를 사용했다. 5에만 A가 붙은채로 출력되었는데, 1 3 5는 100 ms 간격으로 발행하고 A는 200ms 간격으로 발행하기 때문에 1A가 발행되기 전에 5가 발행된다. 그러므로 중간에 있는 3A의 발행이 취소되고 마지막 5를 이용한 처리 결과인 5A만 두 번 출력한다.

그리고 데이터를 발행하는 스레드와 그 값을 전달하는 스레드가 다르다는 점도 알 수 있다. (1번 스레드에서는 값을 발행하고, 2 3 4 번 스레드를 통해 구독자에게 값을 전달한다고 한다. 자세한건 스케줄러때 참고! )

groupBy

: 어떤 기준으로 단일 Observable을 여러 개로 이루어진 Observable 그룹으로 만드는 연산자다.

public class JavaExample {
    public static void main(String[] args) {
        String[] objs = {"6", "4", "2-T", "2", "6-T", "4-T"};
        Observable<GroupedObservable<String, String>> source =
                Observable.fromArray(objs).groupBy(Shape::getShape);

        source.subscribe(obj -> {
            obj.subscribe(
                    val -> System.out.println("GROUP: "+ obj.getKey() + "\t Value:" + val)
            );
        });
    }
}
GROUP: BALL	 Value:6
GROUP: BALL	 Value:4
GROUP: TRIANGLE	 Value:2-T
GROUP: BALL	 Value:2
GROUP: TRIANGLE	 Value:6-T
GROUP: TRIANGLE	 Value:4-T
  • getShape

      public static String getShape(String obj) {
              if (obj == null || obj.equals("")) return NO_SHAPE;
              if (obj.endsWith("-H")) return HEXAGON;
              if (obj.endsWith("-O")) return OCTAGON;
              if (obj.endsWith("-R")) return RECTANGLE;
              if (obj.endsWith("-T")) return TRIANGLE;
              if (obj.endsWith("<>")) return DIAMOND;
              if (obj.endsWith("-P")) return PENTAGON;
              if (obj.endsWith("-S")) return STAR;
              return "BALL";
          }
    

GroupedObservable 객체는 그룹별로 1개씩 생성되므로 생성된 obj 별로 subscribe()함수를 한 번 더 호출해야 한다.

여기서 특정 그룹만 처리하고 싶으면 filter함수를 사용하면 된다.

public class JavaExample {
    public static void main(String[] args) {
        String[] objs = {"6", "4", "2-T", "2", "6-T", "4-T"};
        Observable<GroupedObservable<String, String>> source =
                Observable.fromArray(objs).groupBy(Shape::getShape);

        source.subscribe(obj -> {
            obj.filter(val -> obj.getKey().equals(Shape.BALL))
                    .subscribe(
                            val -> System.out.println("GROUP: " + obj.getKey() + "\t Value:" + val)
                    );
        });
    }
}
GROUP: BALL	 Value:6
GROUP: BALL	 Value:4
GROUP: BALL	 Value:2

BALL 그룹으로만 필터링 되었다.

scan

: reduce()함수는 Observable에서 모든 데이터가 입력된 후 그것을 종합하여 마지막 1개의 데이터만을 구독자에게 발행했지만, scan함수는 실행할 때마다 입력값에 맞는 중간 결과 및 최종 결과를 구독자에게 발행하는 연산자다.

public class JavaExample {
    public static void main(String[] args) {
        String[] balls = {"1", "3", "5"};
        Observable<String> source = Observable.fromArray(balls)
                .scan((ball1, ball2) -> ball2 + "(" + ball1 + ")");
        source.subscribe(Log::it);
    }
}
main|1638372278188|value = 1
main|1638372278189|value = 3(1)
main|1638372278189|value = 5(3(1))
public class JavaExample {
    public static void main(String[] args) {
        String[] balls = {"1", "3", "5"};
        Maybe<String> source = Observable.fromArray(balls)
                .reduce((ball1, ball2) -> ball2 + "(" + ball1 + ")");
        source.subscribe(Log::it);
    }
}
main|1638372307317|value = 5(3(1))

맨 마지막 결과 1개만 나오는 reduce와는 다르게 scan연산자는 값이 입력될 때마다 구독자에게 값을 발행하는걸 볼 수 있다.

Comments