[RxJava] 리액티브 연산자 입문

7 minute read

리액티브 연산자란?

: 부수 효과가 없는 순수 함수 다.

리액티브 연산자의 종류

연산자 설명
생성(Creating)연산자 데이터의 흐름을 만들어내는 함수 ex) Observable, Single 클래스, create(), just(), fromArray()
변환(Transforming)연산자 어떤 입력을 받아서 원하는 출력 결과를 내는 함수 ex) map(), flatMap()
필터(Filter)연산자 입력 데이터 중에 원하는 데이터만 걸러내는 함수 ex) filter(), first(), take()
합성(Combining)연산자 여러 Observable을 조합하는 함수
오류처리(Error Handling)연산자 ex) onErrorReturn(), onErrorResumeNext(), retry()
유틸리티(Utility)연산자 비동기 프로그래밍을 지원하는 함수 ex) subscribeOn(), observeOn()
조건(Conditional)연산자 Observable의 흐름을 제어하는 함수
수학과 집합형(Mathematical and Aggregate) 연산자 수학 함수와 연관 있는 연산자
배압(Back pressure)연산자 배압 이슈에 대응하는 연산자

이렇게 다양한 연산자가 400개 정도 된다고 한다. 모두 기억하기는 어렵지만 함수 대부분 이름에 내용을 내포하고, 마블다이어그램을 통해 어떤 동작을 하는지 알 수 있다.

map()

: 입력값을 어떤 함수에 넣어서 원하는 값으로 변환하는 함수

주로 Observable에서 받은 데이터를 Observer로 가기전에 변형해줄때(중간연산) 쓴다고 한다.

String[] balls = {"1", "2", "3", "4"};
Observable<String> source = Observable.fromArray(balls)
     .map (ball -> ball+"A");
source.subscribe(System.out::println);
1A
2A
3A
4A

여기서 ball은 람다 표현식의 인자고, ball+”A”은 ball의 반환값이다.

  • map()함수의 원형
@CheckReturnValue //반환값을 확인한다. 
@SchedulerSupport(value="none) //스케줄러를 지원하지 않는다. -> 현재 스레드에서 실행한다.
public final <R> Observable<R> map(Funciton<? super T,? extends R> mapper)

여기서 Funciton은 RxJava의 제네릭 함수형 인터페이스 중 하나로, RxJava의 제네릭 함수형 인터페이스는 다음과 같이 3가지가 있다.

인터페이스 이름 포함 메서드 설명
Predicate boolean test(T t) t 값을 받아서 참이나 거짓을 반환한다.
Consumer void accept(T t) t 값을 받아서처리, 반환값은 없다.
Function<T, R> R apply(T t) t 값을 받아서 결과를 반환한다.
Function<String, String> getA = ball -> ball+"A";
String[] balls = {"1", "2", "3", "4"};
Observable<String> source = Observable.fromArray(balls)
     .map (getA);
source.subscribe(System.out::println);
1A
2A
3A
4A

Function 인터페이스를 이용하여 분리한 위의 예제이다.

아까의 ()안의 내용을 밖으로 분리했더니 똑같은 결과가 나오는 것을 알 수 있다.

public static void main(String args[]) {
        Function<String, Integer> ballToIndex = ball -> {
            switch (ball){
                case "RED": return 1;
                case "YELLOW": return 2;
                case "GREEN": return 3;
                case "BLUE": return 4;
                default: return -1;
            }
        };

        String[] balls = {"RED", "YELLOW", "GREEN", "BLUE"};
        Observable<Integer> source = Observable.fromArray(balls)
                .map (ballToIndex);
        source.subscribe(System.out::println);
    }
1
2
3
4

map()함수는 반드시 입력 타입과 반환 타입이 같을 필요는 없다 ex) String 입력 → Integer 반환

그래서 map()함수는 원하는 값을 ‘어떤 함수’(Function 인터페이스 객체나 람다 표현식)에 넣는 것이며 원하는 함수를 정의할 수 있느냐가 관건이라고 한다!

flatMap()

: map()함수는 원하는 입력값을 어떤 함수에 넣어서 변환할 수 있는 일대일 함수지만, flatMap()은 결과가 Observable로 나온다. 그리고 map()함수가 일대일 함수라면, flatMap()은 일대다 혹은 일대일 Observable 함수다. (= 결과 타입이 Observable이기 때문에 입력 데이터의 속성에 따라서 1개를 발행할 수도 있고 여러 개를 발행해도 무방함)

Function<String, Observable<String>> getDoubleA =
                ball  -> Observable.just(ball + "A", ball + "A");

        String[] balls = {"1","3","5"};
        Observable<String> source = Observable.fromArray(balls)
                .flatMap (getDoubleA);
        source.subscribe(System.out::println);

//        String[] balls = {"1","3","5"};
//        Observable<String> source = Observable.fromArray(balls)
//                .flatMap (ball -> Observable.just(ball + "A", ball+ "A"));
//        source.subscribe(System.out::println);
1A
1A
3A
3A
5A
5A

getDoubleAgkatnsms ball을 받아서 ball+”A”를 두 번 발행하는데, 다른 Observable처럼 두 번 발행한 후 onComplete 알림을 전달한다.

source는 balls 배열 값을 가져온 후 getDoubleA()함수를 통해서 flatMap()함수를 호출함, 함수형 프로그래밍에서는 함수가 일급 객체이기 때문에 함수는 입력값도 될 수 있고, 출력값도 될 수 있고, 함수가 함수를 반활할 수도 있다.

(하나의 값에 대해 여러개의 변형된 값을 발행할 수 있기 때문에, 여러값을 발행하기 위해서는 flatMap을 사용하는듯)

구구단 만들기

제일 기본

Scanner in = new Scanner(System.in);
System.out.println("구구단 Input: ");
int dan = Integer.parseInt(in.nextLine());

for (int row = 1; row <= 9; row++){
System.out.println(dan + "*"+ row + "=" + dan * row);

Step1. for문을 Observable로 변환하기

Scanner in = new Scanner(System.in);
System.out.println("구구단 Input: ");
int dan = Integer.parseInt(in.nextLine());

Observable<Integer> source = Observable.range(1,9);
source.subscribe(row -> System.out.println(dan + "*"+ row + "=" + dan * row));
  • Observable.range(start, count)

    : start 부터 count 개수만큼 숫자 값을 발행함 , 반환값은 Observable<Integer<

사용자가 입력한 dan에 문제가 없는지도 Observable에서 처리해야하는데 dan 변수에 대한 예외 처리를 Observable 에서는 할 수 없다는 문제가 있음 (ex) dan에 음수 → 음수 출력) (예외처리는 나중에 한다고한다~)

Step2. 사용자 함수 정의하기

Scanner in = new Scanner(System.in);
System.out.println("구구단 Input: ");
int dan = Integer.parseInt(in.nextLine());

Function<Integer, Observable<String>> gugudan =
        num -> Observable.range(1,9)
        .map(row -> num + "*"+ row + "=" + num * row);
Observable<String> source = Observable.just(dan).flatMap(gugudan);
source.subscribe(System.out::println);

just에서 발행하고 있는 입력받은 숫자(dan)를 num 으로 받고, 1~9까지 발행되는 숫자는 row로 받아서 매핑해주는 gugudan함수를 만들었다.

이렇게 사용자 함수로 만들어두면 다른 곳에서도 재활용할 수 있고, subscribe()를 통해 메서드를 호출해서 출력하기만 하면 된다!!

Step3. flatMap()함수를 좀 더 활용하기

Scanner in = new Scanner(System.in);
System.out.println("구구단 Input: ");
int dan = Integer.parseInt(in.nextLine());

//Function<Integer, Observable<String>> gugudan =
//        num -> Observable.range(1,9)
//        .map(row -> num + "*"+ row + "=" + num * row);
//Observable<String> source = Observable.just(dan).flatMap(gugudan);
//source.subscribe(System.out::println);

        Observable<String> source = Observable.just(dan)
                .flatMap(num -> Observable.range(1,9)
                        .map(row -> num + "*"+ row + "=" + num * row));
        source.subscribe(System.out::println);

아까 만들었던 gugudan 함수를 flatMap을 사용해서 인라인으로 넣어줬더니 훨씬 깔끔해졌다. (1개의 입력값을 받아 9개의 결과를 만들어야 하지만 map()은 1개의 값을 받아 1개의 결과만 만들어내기때문에 사용할 수 없음 → flatMap()사용)

❗️flatMap함수에서 num을 dan으로 바꾸면 안됨 → 컴파일러가 사용자 입력으로 받은 변수인 dan과 혼동한다!

  • flatMap()함수의 원형
@SchedulerSupport(SchedulerSupport.NONE) //스케줄러를 지원하지 않는다. -> 현재 스레드에서 실행한다.
public final <R> Observable<R> flatMap(Funciton<? super T,? extends ObservableSource<? extends R>> mapper)

: T를 넣으면 여러 개의 R이 나오도록 매핑한다 ~

  • ObservableSource

    : Observable, AsyncSubject, BehaviorSubject, ConnectableObservable 등이 공통으로 구현한 인터페이스, Observable 처럼 데이터를 발행할 수 있는 객체를 포괄해서 지칭함

    ex) Single 클래스 - SingleSource

  • flatMap()함수의 원형 2번째~

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE) //스케줄러를 지원하지 않는다. -> 현재 스레드에서 실행한다.
public final <U, R> Observable<R> flatMap(Funciton<? super T,? extends ObservableSource<? extends U>> mapper,
BiFunction<? super T, ? super U, ? extends R> resultSelector)

: 두 번째 인자로 BiFunction<T,U,R> 형태의 resultSelector 가 추가되었다.

첫 번째 mapper의 인자로 받은 T와 그것의 결과로 나오는 U를 기반으로 새로운 Observable 을 만들 수 있음!

Scanner in = new Scanner(System.in);
        System.out.println("구구단 Input: ");
        int dan = Integer.parseInt(in.nextLine());

        Observable<String> source = Observable.just(dan)
                .flatMap(gugu -> Observable.range(1,9),
                        (gugu, i) -> gugu + "*"+ i + "=" + gugu * i);
        source.subscribe(System.out::println);

여기서는 입력받은 숫자 dan을 gugu 로 넘겨받고, 1~9까지 발행된 숫자를 i로 넘겨받았다. (flatMap하나만 사용했더니 제일 깔끔하다.. 여러번 써보면서 감을 익혀야할거같다 )

filter()

: Observable에서 원하는 데이터만 걸러내는 역할

String[] objs = {"1 CIRCLE", "2 DIAMOND", "3 TRIANGLE", "4 DIAMOND", "5 CIRCLE", "6 HEXAGON"};
Observable<String> source = Observable.fromArray(objs)
        .filter(obj -> obj.endsWith("CIRCLE"));
source.subscribe(System.out::println);
1 CIRCLE
5 CIRCLE

filter()함수는 boolean 값을 리턴하는 함수형 인터페이스인 Predicate를 인자로 넣는다. Predicate는 진위 판별이라는 뜻이 있으며 boolean값을 리턴하는 특수한 함수형 인터페이스로 람다 표현식을 인자로 넣으면 Function인지 Predicate인지 신경 쓰지 않고 동일하게 코딩할 수 있다는 장점이 있다고한다!(구분은 컴파일러가 해준다고함)

  • filterCircle 의 원형
Predicate<String> filterCircle = obj -> obj.endsWith("CIRCLE");
Integer[] data = {100, 34, 27, 99, 50};
        Observable<Integer> source = Observable.fromArray(data)
                .filter(number -> number % 2 == 0);
        source.subscribe(System.out::println);
100
34
50

filter()와 비슷한 함수들

  • first(default): Observable의 첫 번째 값을 필터함. 만약 값없이 완료되면 대신 기본값을 리턴함
  • last(default): Observable의 마지막 값을 필터함. 만약 값없이 완료되면 대신 기본값을 리턴함
  • take(N): 최초 N개만 가져옴
  • takeLast(N): 마지막 N개만 가져옴
  • skip(N) : 최초 N개 값을 건너뜀
  • skipLast(N) : 마지막 N개 값을 건너 뜀

앞의 함수들을 사용한 예제

				Integer[] numbers = {100, 200, 300, 400, 500};
        Single<Integer> single;
        Observable<Integer> source;

        single = Observable.fromArray(numbers).first(-1);
        single.subscribe(data -> System.out.println("first() value = "+data));

        single = Observable.fromArray(numbers).last(999);
        single.subscribe(data -> System.out.println("last() value = "+data));

        source = Observable.fromArray(numbers).take(3);
        source.subscribe(data -> System.out.println("take(3) value = "+data));

        source = Observable.fromArray(numbers).takeLast(3);
        source.subscribe(data -> System.out.println("takeLast(3) value = "+data));

        source = Observable.fromArray(numbers).skip(2);
        source.subscribe(data -> System.out.println("skip(2) value = "+data));

        source = Observable.fromArray(numbers).skipLast(2);
        source.subscribe(data -> System.out.println("skipLast(2) value = "+data));
first() value = 100
last() value = 500
take(3) value = 100
take(3) value = 200
take(3) value = 300
takeLast(3) value = 300
takeLast(3) value = 400
takeLast(3) value = 500
skip(2) value = 300
skip(2) value = 400
skip(2) value = 500
skipLast(2) value = 100
skipLast(2) value = 200
skipLast(2) value = 300

reduce()

: 발행한 데이터를 모두 사용하여 어떤 최종 결과 데이터를 합성할 때 활용

보통 Observable에 입력된 데이터를 필요한 map() 함수로 매핑하고, 원하는 데이터만 추출할 때는 불필요한 데이터를 걸러내는 filter()함수를 호출한다. 또한 상황에 따라 발행된 데이터를 취합하여 어떤 결과를 만들어낼 때는 reduce 계열의 함수를 사용한다고 한다~

				String[] balls = {"1", "3", "5"};
        Maybe<String> source = Observable.fromArray(balls)
                .reduce((ball1, ball2) -> ball2 +"(" + ball1 + ")");
        source.subscribe(System.out::println);

reduce()함수를 호출하면 인자로 넘긴 람다 표현식에 의해 결과 없이 완료될 수도 있기 때문에 Observable이 아니라 Maybe를 사용한다!

5(3(1))
  • reduce() 의 원형
public final Maybe<T> reduce(BiFunction<T, T, T> reducer)

데이터 쿼리하기

: map(), filter(), reduce() 함수를 활용해서 간단한 데이터 쿼리 예제 만들기

  • TV: $2,500
  • Camera: $400
  • TV: $1,600
  • Phone: $800

해당 정보를 가지고 오늘 발생한 TV매출의 총합을 계산

  1. 전체 매출 데이터를 입력
  2. 매출 데이터중 TV 매출을 필터링
  3. TV 매출의 합을 구함
public class FirstExample {
    public static void main(String args[]) {
        List<Pair<String, Integer>> sales = new ArrayList();

        sales.add(new Pair("TV", 2500));
        sales.add(new Pair("Camera", 400));
        sales.add(new Pair("TV", 1600));
        sales.add(new Pair("Phone", 800));

        Maybe<Integer> tvSales = Observable.fromIterable(sales) //Maybe: 최대 데이터 하나 발행, 없어도 됨 
                .filter(sale -> "TV".equals(sale.getFirst())) // -> (TV,2500),(TV, 1600) 만 남음 
                .map(sale -> sale.getSecond()) //2500 1600 
                .reduce((sale1, sale2) -> sale1 + sale2); // 4100 

        tvSales.subscribe(tot -> System.out.println("TV Sales: $" + tot));
    }
}
TV Sales: $4100

리액티브 연산자를 사용하는건 아직 낯설지만 사용방법만 잘 알고 쓴다면 편하고 직관적인 방법인 것 같다 :)

Comments