글쓰는 개발자

[RxJava] Flowable과 Observable 본문

Development/Java

[RxJava] Flowable과 Observable

개발하자 2022. 9. 25. 20:46

Flowable과 Observable의 비교

Flowable은 Reactive Streams의 인터페이스를 구현하고, Observable은 그렇지 않다.

아래의 Flowable<T>는 Publisher<T>를 구현하고 있다.

/* Flowable.java */
public abstract class Flowable<T> implements Publisher<T> {
	...
}

/* Publisher.java */
package org.reactivestreams; // reactive streams 패키지

public interface Publisher<T> {

    public void subscribe(Subscriber<? super T> s);
}

 

반면 아래의 Observable<T>는 ObservableSource<T>를 구현하고 있다.

package io.reactivex; // reactivex 패키지
...

public abstract class Observable<T> implements ObservableSource<T> {
    ...
}

Floable은 배압(Back pressure) 기능을 지원하고, Observable은 지원하지 않는다.

배압이란, Flowable에서 데이터를 통지하는 속도가 Subscriber에서 통지된 데이터를 전달받아 처리하는 속도보다 빠를 때, 밸런스를 맞추기 위해 데이터 통지량을 제어하는 기능을 말한다.

Flowable 통지 속도 > Subscriber 처리 속도

위와 같이, Flowable이 데이터 100개를 전달하는 동안 Subscriber는 데이터 1개를 처리한다면 어떤 일이 발생할까?

아래의 코드를 실행해보자.

public class MissingBackpressureExceptionExample {

    public static void main(String[] args) throws InterruptedException {
        Flowable.interval(1L, TimeUnit.MILLISECONDS)
            .doOnNext(data -> System.out.println(Thread.currentThread().getName() + " - 데이터 통지 : " + data))
            .observeOn(Schedulers.computation())
            .subscribe(
                data -> {
                    System.out.println("# 소비자 처리 대기중..");
                    Thread.sleep(1000L);
                    System.out.println(Thread.currentThread().getName() + " - 데이터 처리 : " + data);
                },
                err -> System.err.println(err),
                () -> System.out.println("complete")
            );

        Thread.sleep(2000L);
    }

}

실행 결과

RxComputationThreadPool-1에서 데이터를 127까지 통지했지만, RxComputationThreadPool-2에서는 데이터를 하나밖에 처리하지 못하고, 이후 MissingBackpressureException이 발생하였다.

 

MissingBackpressureException이 발생하는 이유가 무엇일까? 해답은 Flowable의 배압 전략(Backpressure Strategy)에서 찾을 수 있다.

배압 전략(Backpressure Strategy)

RxJava에서는 배압 전략을 통해, Flowable이 통지 대기 중인 데이터를 어떻게 다룰 지에 대한 스펙을 제공한다. 배압 전략에는 다음과 같은 것들이 있다.

  • Missing 전략
    • 배압을 적용하지 않음
    • 나중에 onBackpressureXXX()로 배압을 적용할 수 있다.
  • Error 전략
    • 통지된 데이터가 버퍼의 크기를 초과하면 MissingBackpressureException 에러를 통지한다.
    • 소비자가 생산자의 통지 속도를 따라잡지 못할 때 발생한다.
    • 즉, 위의 코드에서 MissingBackpressureException이 발생한 것은 Flowable의 기본 배압 전략이 Error 전략임을 의미한다.
  • Buffer 전략 - buffer 전략에는 2가지 전략이 있다.
    • DROP_LATEST
      • 버퍼가 가득 찬 시점에, 버퍼 내에서 가장 최근에 버퍼로 들어온 데이터를 DROP
      • DROP된 빈 자리에 버퍼 밖에서 대기하던 데이터를 채운다.
    • DROP_OLDEST
      • 버퍼가 가득 찬 시점에, 버퍼 내에서 가장 먼저 버퍼로 들어온 데이터를 DROP
      • DROP된 빈 자리에 버퍼 밖에서 대기하던 데이터를 채운다.
    • DROP
      • 버퍼가 가득 찬 시점에, 버퍼에 들어오려는 모든 데이터를 버림
    • LATEST
    • OLDEST
DROP_LATEST

아래는 DROP_LATEST Backpressure Strategy를 사용하는 코드이다.

public static void main(String[] args) throws InterruptedException {
    System.out.println("# start: " + Thread.currentThread().getName());
    Flowable.interval(300L, TimeUnit.MILLISECONDS)
        .doOnNext(data -> System.out.println("# publish : " + data))
        .onBackpressureBuffer(
            2, // capacity == 2
            () -> System.out.println("overflow!"),
            BackpressureOverflowStrategy.DROP_LATEST
        )
        .doOnNext(data -> System.out.println("# buffer publish : " + data))
        .observeOn(Schedulers.computation(), false, 1) // subscriber가 데이터를 1개씩 요청
        .subscribe(
            data -> {
                Thread.sleep(1000L);
                System.out.println("subscribe : " + data);
            },
            err -> System.out.println("error : " + err)
        );

    Thread.sleep(2800);
}

DROP_LATEST 실행 결과

DROP_LATEST 전략은 Buffer 전략의 하위 전략이기 때문에, buffer에서도 publish라는 개념이 존재한다.

buffer publish

Buffer의 capacity가 2이고, subscriber가 데이터를 처리할 때마다 buffer에서 subscriber에게 데이터를 publish하는 것을 볼 수 있다. 또한, Flowable에서 Buffer에 데이터를 전송할 때, capacity를 넘어서면 overflow가 발생하면서 buffer의 최신 데이터가 pop되어 사라지는 것을 확인할 수 있다. 즉, 아래와 같은 그림으로 나타낼 수 있을 것이다.

DROP_LATEST

DROP_OLDEST는 위 그림과 대부분의 과정이 비슷하고, 버퍼가 가득 찼을 때 DROP되는 아이템이 더 먼저 들어온 녀석이라는 차이만을 가진다.

DROP

아래는 DROP Backpressure Strategy를 사용하는 코드이다.

public static void main(String[] args) throws InterruptedException {
    System.out.println("# start: " + Thread.currentThread().getName());
    Flowable.interval(300L, TimeUnit.MILLISECONDS)
        .doOnNext(data -> System.out.println("# publish : " + data))
        .onBackpressureDrop(data -> System.out.println("drop : " + data))
        .observeOn(Schedulers.computation(), false, 1) // buffer size를 1로 지정
        .subscribe(
            data -> {
                Thread.sleep(1000L);
                System.out.println("subscribe : " + data);
            },
            err -> System.out.println("error : " + err)
        );

    Thread.sleep(5500L);
}

DROP 실행 결과

DROP 전략의 경우, onBackPressureBuffer가 아닌 onBackPressureDrop 메서드를 Flowable에 체이닝하면 된다.

 

참고: https://www.inflearn.com/course/자바-리액티브프로그래밍-1

 

Kevin의 알기 쉬운 RxJava 1부 - 인프런 | 강의

리액티브 프로그래밍이라는 진입 장벽을 넘고 싶으신가요? Kevin의 알기 쉬운 RxJava가 그 벽을 넘을 수 있는 힘을 키워드리겠습니다., - 강의 소개 | 인프런...

www.inflearn.com

 

반응형

'Development > Java' 카테고리의 다른 글

[RxJava] Single, Maybe, Completable  (0) 2022.09.26
[RxJava] Reactive Streams  (0) 2022.08.25
[RxJava] 리액티브 프로그래밍?  (1) 2022.08.21
Comments