글쓰는 개발자

[RxJava] Reactive Streams 본문

Development/Java

[RxJava] Reactive Streams

개발하자 2022. 8. 25. 23:14

목차

  1. Reactive Streams?
  2. Cold Publisher & Hot Publisher
  3. Cold Publisher & Hot Publisher 예제로 살펴보기

Reactive Streams?

  • 리액티브 프로그래밍 라이브러리의 표준 사양(https://github.com/reactive-streams/reactive-streams-jvm)
  • 리액티브 프로그래밍에 대한 인터페이스만을 제공함
  • RxJava는 Reactive Streams의 인터페이스들을 구현한 구현체이다.
  • Reactive Streams는 다음 4개의 인터페이스를 제공 

    • Publisher
    • Subscriber
    • Subscription
    • Processor

Publisher

public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}

Publisher 인터페이스는 데이터를 생성 / 통지하는 역할을 하며, subscribe 메서드를 구현하도록 강제한다.

Subscriber

public interface Subscriber<T> {
    public void onSubscribe(Subscription s); // 데이터를 최초 통지할 때 호출
    public void onNext(T t); // 데이터를 통지할 때마다 호출
    public void onError(Throwable t); // 데이터 통지에 에러가 발생할 때 호출
    public void onComplete(); // 데이터 통지가 끝났을 때 호출
}

Subscriber 인터페이스는 통지된 데이터를 전달받아 처리하는 역할을 하며, 4개의 메서드를 구현하도록 강제한다.

기능이 많아, 주석으로 각 기능이 호출되는 시점을 명시해두었다.

Subscription

public interface Subscription {
    public void request(long n);
    public void cancel();
}

Subscription 인터페이스는 전달 받을 데이터의 개수를 요청하고 구독을 해지하는 역할을 하며, 2개의 메서드를 구현하도록 강제한다.

Processor

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}

Processor 인터페이스는 Publisher와 Subscriber의 기능을 모두 가지고 있다. 위의 코드에서 Subscriber와 Publisher를 상속하는 것을 확인할 수 있다.

Publisher와 Subscriber간의 프로세스 흐름

publisher - subscriber간 프로세스 흐름

 

위와 같은 흐름으로 Reactive Streams 인터페이스가 활용되니, 인터페이스 스펙과 다이어그램을 비교해서 살펴보도록 하자.

Cold Publisher & Hot Publisher

Cold Publisher

Cold Publisher

  • Publisher는 Subscriber가 구독할 때마다 데이터를 새로 통지함
  • 즉, 데이터를 통지하는 새로운 타임라인이 생성됨
  • Subscriber는 구독 시점과 상관없이, 통지된 데이터를 처음부터 전달받을 수 있음

Hot Publisher

Hot Publisher

  • Publisher는 Subscriber의 수와 관계없이 데이터를 한번만 통지함
  • 즉, 데이터를 통지하는 타임라인은 하나임
  • Subscriber는 발행된 데이터를 처음부터 전달 받는 것이 아니라, 구독한 시점에 통지된 데이터들만 전달 받을 수 있음

Cold Publisher & Hot Publisher 예제로 살펴보기

Cold Publisher

/* ColdPublisherExample.java */
import io.reactivex.Flowable;

public class ColdPublisherExample {

    public static void main(String[] args) {
        Flowable<Integer> flowable = Flowable.just(1, 3, 5, 7);

        flowable.subscribe(ColdPublisherExample::print);
        flowable.subscribe(ColdPublisherExample::print);
    }
    
    public static void print(Integer data) {
        System.out.println("data : " + data);
    }
}

위 코드는 ColdPublisher인 Flowable을 사용한 예시이다. 결과를 살펴보자.

ColdPublisherExample 실행 결과

보다시피, flowable을 구독한 시점과 관계없이 1,3,5,7을 반복해서 출력한다.

Hot Publisher

/* HotPublisherExample.java */
import io.reactivex.processors.PublishProcessor;

public class HotPublisherExample {

    public static void main(String[] args) {
        PublishProcessor<Integer> processor = PublishProcessor.create();
        processor.subscribe(data -> System.out.println("processor 1: " + data));
        processor.onNext(1);
        processor.onNext(3);

        processor.subscribe(data -> System.out.println("processor 2: " + data));
        processor.onNext(5);
        processor.onNext(7);
        processor.onComplete();
    }

}

위 코드는 HotPublisher인 Flowable을 사용한 예시이다. 결과를 살펴보자.

HotPublisherExample 실행 결과

HotPublisherExample에서는 processor1은 1,3,5,7 데이터를 모두 전달받았고, processor2는 데이터 1,3이 발행된 이후에 구독하였기 때문에, 5,7 두 개의 데이터만 전달받았다.

 

참고: https://www.inflearn.com/course/자바-리액티브프로그래밍-1

 

Kevin의 알기 쉬운 RxJava 1부 - 인프런 | 강의

리액티브 프로그래밍이라는 진입 장벽을 넘고 싶으신가요? Kevin의 알기 쉬운 RxJava가 그 벽을 넘을 수 있는 힘을 키워드리겠습니다., - 강의 소개 | 인프런...

www.inflearn.com

 

반응형

'Development > Java' 카테고리의 다른 글

[RxJava] Single, Maybe, Completable  (0) 2022.09.26
[RxJava] Flowable과 Observable  (0) 2022.09.25
[RxJava] 리액티브 프로그래밍?  (1) 2022.08.21
Comments