일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | ||
6 | 7 | 8 | 9 | 10 | 11 | 12 |
13 | 14 | 15 | 16 | 17 | 18 | 19 |
20 | 21 | 22 | 23 | 24 | 25 | 26 |
27 | 28 | 29 | 30 |
- spring boot
- 마블 다이어그램
- 원격 저장소
- 건국대학교
- flab
- 자바스크립트
- CLI
- 큐시즘
- OOAD
- RxJava
- time slice
- Cold Publisher
- github
- Round Robin
- Observable
- Hot Publish
- OS
- CPU Scheduling
- Depromeet
- 에프랩
- Hot Publisher
- spring
- 생활코딩
- Git
- 멘토링 후기
- js 개발자라면 알아야하는 핵심 컨셉
- js
- 버전관리
- 디프만
- 파이썬
- Today
- Total
글쓰는 개발자
[RxJava] Flowable과 Observable 본문
Flowable과 Observable의 비교
Flowable은 Reactive Streams의 인터페이스를 구현하고, Observable은 그렇지 않다.
아래의 Flowable<T>는 Publisher<T>를 구현하고 있다.
/* Flowable.java */
public abstract class Flowable<T> implements Publisher<T> {
...
}
/* Publisher.java */
package org.reactivestreams; // reactive streams 패키지
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
반면 아래의 Observable<T>는 ObservableSource<T>를 구현하고 있다.
package io.reactivex; // reactivex 패키지
...
public abstract class Observable<T> implements ObservableSource<T> {
...
}
Floable은 배압(Back pressure) 기능을 지원하고, Observable은 지원하지 않는다.
배압이란, Flowable에서 데이터를 통지하는 속도가 Subscriber에서 통지된 데이터를 전달받아 처리하는 속도보다 빠를 때, 밸런스를 맞추기 위해 데이터 통지량을 제어하는 기능을 말한다.

위와 같이, Flowable이 데이터 100개를 전달하는 동안 Subscriber는 데이터 1개를 처리한다면 어떤 일이 발생할까?
아래의 코드를 실행해보자.
public class MissingBackpressureExceptionExample {
public static void main(String[] args) throws InterruptedException {
Flowable.interval(1L, TimeUnit.MILLISECONDS)
.doOnNext(data -> System.out.println(Thread.currentThread().getName() + " - 데이터 통지 : " + data))
.observeOn(Schedulers.computation())
.subscribe(
data -> {
System.out.println("# 소비자 처리 대기중..");
Thread.sleep(1000L);
System.out.println(Thread.currentThread().getName() + " - 데이터 처리 : " + data);
},
err -> System.err.println(err),
() -> System.out.println("complete")
);
Thread.sleep(2000L);
}
}

RxComputationThreadPool-1에서 데이터를 127까지 통지했지만, RxComputationThreadPool-2에서는 데이터를 하나밖에 처리하지 못하고, 이후 MissingBackpressureException이 발생하였다.
MissingBackpressureException이 발생하는 이유가 무엇일까? 해답은 Flowable의 배압 전략(Backpressure Strategy)에서 찾을 수 있다.
배압 전략(Backpressure Strategy)
RxJava에서는 배압 전략을 통해, Flowable이 통지 대기 중인 데이터를 어떻게 다룰 지에 대한 스펙을 제공한다. 배압 전략에는 다음과 같은 것들이 있다.
- Missing 전략
- 배압을 적용하지 않음
- 나중에 onBackpressureXXX()로 배압을 적용할 수 있다.
- Error 전략
- 통지된 데이터가 버퍼의 크기를 초과하면 MissingBackpressureException 에러를 통지한다.
- 소비자가 생산자의 통지 속도를 따라잡지 못할 때 발생한다.
- 즉, 위의 코드에서 MissingBackpressureException이 발생한 것은 Flowable의 기본 배압 전략이 Error 전략임을 의미한다.
- Buffer 전략 - buffer 전략에는 2가지 전략이 있다.
- DROP_LATEST
- 버퍼가 가득 찬 시점에, 버퍼 내에서 가장 최근에 버퍼로 들어온 데이터를 DROP
- DROP된 빈 자리에 버퍼 밖에서 대기하던 데이터를 채운다.
- DROP_OLDEST
- 버퍼가 가득 찬 시점에, 버퍼 내에서 가장 먼저 버퍼로 들어온 데이터를 DROP
- DROP된 빈 자리에 버퍼 밖에서 대기하던 데이터를 채운다.
- DROP
- 버퍼가 가득 찬 시점에, 버퍼에 들어오려는 모든 데이터를 버림
- LATEST
- OLDEST
- DROP_LATEST
DROP_LATEST
아래는 DROP_LATEST Backpressure Strategy를 사용하는 코드이다.
public static void main(String[] args) throws InterruptedException {
System.out.println("# start: " + Thread.currentThread().getName());
Flowable.interval(300L, TimeUnit.MILLISECONDS)
.doOnNext(data -> System.out.println("# publish : " + data))
.onBackpressureBuffer(
2, // capacity == 2
() -> System.out.println("overflow!"),
BackpressureOverflowStrategy.DROP_LATEST
)
.doOnNext(data -> System.out.println("# buffer publish : " + data))
.observeOn(Schedulers.computation(), false, 1) // subscriber가 데이터를 1개씩 요청
.subscribe(
data -> {
Thread.sleep(1000L);
System.out.println("subscribe : " + data);
},
err -> System.out.println("error : " + err)
);
Thread.sleep(2800);
}

DROP_LATEST 전략은 Buffer 전략의 하위 전략이기 때문에, buffer에서도 publish라는 개념이 존재한다.

Buffer의 capacity가 2이고, subscriber가 데이터를 처리할 때마다 buffer에서 subscriber에게 데이터를 publish하는 것을 볼 수 있다. 또한, Flowable에서 Buffer에 데이터를 전송할 때, capacity를 넘어서면 overflow가 발생하면서 buffer의 최신 데이터가 pop되어 사라지는 것을 확인할 수 있다. 즉, 아래와 같은 그림으로 나타낼 수 있을 것이다.

DROP_OLDEST는 위 그림과 대부분의 과정이 비슷하고, 버퍼가 가득 찼을 때 DROP되는 아이템이 더 먼저 들어온 녀석이라는 차이만을 가진다.
DROP
아래는 DROP Backpressure Strategy를 사용하는 코드이다.
public static void main(String[] args) throws InterruptedException {
System.out.println("# start: " + Thread.currentThread().getName());
Flowable.interval(300L, TimeUnit.MILLISECONDS)
.doOnNext(data -> System.out.println("# publish : " + data))
.onBackpressureDrop(data -> System.out.println("drop : " + data))
.observeOn(Schedulers.computation(), false, 1) // buffer size를 1로 지정
.subscribe(
data -> {
Thread.sleep(1000L);
System.out.println("subscribe : " + data);
},
err -> System.out.println("error : " + err)
);
Thread.sleep(5500L);
}

DROP 전략의 경우, onBackPressureBuffer가 아닌 onBackPressureDrop 메서드를 Flowable에 체이닝하면 된다.
참고: https://www.inflearn.com/course/자바-리액티브프로그래밍-1
Kevin의 알기 쉬운 RxJava 1부 - 인프런 | 강의
리액티브 프로그래밍이라는 진입 장벽을 넘고 싶으신가요? Kevin의 알기 쉬운 RxJava가 그 벽을 넘을 수 있는 힘을 키워드리겠습니다., - 강의 소개 | 인프런...
www.inflearn.com
'Development > Java' 카테고리의 다른 글
[RxJava] Single, Maybe, Completable (0) | 2022.09.26 |
---|---|
[RxJava] Reactive Streams (0) | 2022.08.25 |
[RxJava] 리액티브 프로그래밍? (1) | 2022.08.21 |