001 Reactive Streams Component

리액티브 스트림즈란? #

리액티브한 코드 작성을 위한 구성을 도와주는 리액티브 라이브러리가 있다. 이 리액티브 라이브러리를 어떻게 구현해야할지 정의해놓은 별도의 표준 사양을 리액티브 스트림즈(Reactive Streams)라고 한다.

리액티브 스트림즈는 ‘데이터 스트림을 Non-Blocking이면서 비동기적인 방식으로 처리하기 위한 리액티브 라이브러리 표준 사양’이라고 표현할 수 있다. 이를 구현한 구현체로는 RxJava, Reactor, Akka Streams, Java 9 Flow API 등이 있고, 그 중에 Spring framework와 가장 궁합이 잘 맞는 구현체는 Reactor이다.

리액티브 구성요소 #

리액티브 스트림즈를 통해 구현해야 되는 API 컴포넌트에는 Publisher, Subscriber, Subscription, Processor 가 있다. 이 4개의 컴포넌트를 반드시 기억해야한다. 아무래도 이 4개의 역할이 헷갈리면 전체적인 동작과정까지도 헷갈리게되기 때문에 확실히 짚고 넘어가자.

컴포넌트 설명
Publisher 데이터를 생성하고 통지(발행, 게시, 방출)하는 역할을 한다.
Subsriber 구독한 Publisher로부터 통지된 데이터를 전달받아서 처리한다.
Subscription Publisher에 요청할 데이터의 개수를 지정하고, 데이터의 구독을 취소하는 역할을 한다.
Processor Publisher와 Subscriber의 기능을 모두 가지고 있다. 즉, Subscriber로서 다른 Publisher를 구독할 수 있고, Publisher로서 다른 Subscriber가 구독할 수 있다.

Publisher와 Subscriber의 동작과정을 나타내는 그림 #

img.png

1. 데이터를 구독한다. (subscribe)

먼저 Subscriber는 전달받을 데이터를 구독한다.

2. 데이터를 통지할 준비가 되었음을 알린다. (onSubscribe)

Publisher는 데이터를 통지할 준비가 되었음을 Subscriber에 알린다.

3. 전달 받을 통지 데이터 개수를 요청한다. (Subscription.request)

Publisher가 데이터를 통지할 준비가 되었다는 알림을 받은 Subscriber는 전달받기를 원하는 데이터의 개수를 Publisher에게 요청한다.

▶ 데이터의 요청 개수를 지정하는 이유가 뭘까?

Subscriber가 Subscription.reuqest를 통해 데이터의 요청 개수를 지정한다. 이는 실제로 Publisher와 Subscriber는 각각 다른 스레드에서 비동기적으로 상호작용하는 경우가 대부분이기 때문이다.

이럴 경우 Publisher가 통지하는 속도가 Publisher로부터 통지된 데이터를 처리하는 Subscriber가 처리하는 속도보다 더 빠르면 처리를 기다리는 데이터가 쌓이게되어 시스템 부하가 커질 수 있다. 이러한 결과를 방지하기 위한 행위다.

Publisher의 데이터 통지 속도 > 통지된 데이터를 처리하는 Subscriber 속도 

간단하게 생각해보면 데이터가 전송되는 속도가 데이터가 처리되는 속도보다 빠르면 계속해서 처리해야하는 데이터가 밀리기 때문인 것이다.

4. 데이터를 생성한다.

5. 요청 받은 개수만큼 데이터를 통지한다. (onNext)

Publisher는 Subscriber로부터 요청받은 만큼의 데이터를 통지한다.

6. 데이터 처리를 완료할때까지 위 3~5번의 과정을 반복한다.

이렇게 Publisehr와 Subscriber 간에 데이터 통지, 데이터 수신, 데이터 요청의 과정을 반복한다.

7. 완료 또는 에러가(onError) 발생할때까지 데이터 생성, 통지, 요청을 계속한다.

8. 데이터 통지가 완료되었음을 알린다. (onComplete)

반복하다가 Publisher가 모든 데이터를 통지하게 되면 마지막으로 데이터 전송이 완료되었음을 Subscriber에게 알린다. 만약에 Publisher가 데이터를 처리하는 과정에서 에러가 발생하면 에러가 발생했음을 Subscriber에게 알린다.

위 글의 내용으로 이해가 되지 않아도 계속해서 포스팅을 읽자. 추후에 예제코드를 보고나서 다시 위 글을 읽으면 이해가 될것이다.

코드로 보는 리액티브 스트림즈 컴포넌트 #

리액티브 스트림즈 컴포넌트의 동작과정을 이미지와 순서 설명으로 알아보았다. 처리 과정은 이해가 되지만 역시나 Publisher, Subscriber, Subscription, Processor의 코드 흐름이 머릿속으로 그려지지는 않는다. 이제 실제 코드를 보면서 조금씩 머릿속으로 그림을 그려보자.

Publisher.java #

public interface Publisher<T> {
    public void subscribe(Subscriber <? super T> subscriber);
}

subscribe() 메서드 1개만 존재한다. subscribe() 메서드는 파라미터로 전달받은 Subscriber를 등록하는 역할을 한다.

‘Publisher는 데이터를 생성하고 통지하는 역할을 하고, Subscriber는 Publisher가 통지하는 데이터를 전달받기 위해 구독을 한다.’ 라는 내용으로 봤을때 우리는 구독을 처리하는 subscribe() 메서드가 당연히 Subscriber에 있을거라고 오해할 수 있다. 

▶ 왜 Subscriber가 아닌 Publisher에 subscribe() 메서드가 정의되어 있을까?

리액티브 스트림즈에서의 Publisher/Subscriber는 개념상으로는 Subscriber가 구독을 하는게 맞다. 하지만 실제 코드상으로는 Publisher가 subscribe() 메서드의 파라미터인 Subscriber를 등록하는 형태로 구독이 이뤄진다. 우선 아래의 호출로직으로 Publisher 객체의 subscribe() 메서드를 호출할때 Subscriber 객체를 파라미터로 넘긴다고 이해하자.

// 구독
pub.subscribe(sub);

Subscriber.java #

public interface Subscriber<T> {
    //구독 시작 처리
    public void onSubscribe(Subscription subscription); // 아래 Subscription을 인자로 전달
    
    //데이터 통지시 처리
    public void onNext(T item);
    
    //에러 통지시 처리
    public void onError(Throwable error);

    //완료 통지시 처리
    public void onComplete();
}
메서드 설명
onSubscribe 구독 시작 시점에 어떤 처리를 하는 역할을 한다. 여기서의 처리는 Publisher에게 요청할 데이터의 개수를 지정하거나 구독을 해지하는 것을 의미한다. 이것은 onSubscribe 메서드의 파라미터로 전달되는 Subscription 객체를 통해서 이뤄진다.
onNext Publisher가 통지한 데이터를 처리하는 역할을 한다.
onError Publisher가 데이터를 통지를 위한 처리 과정에서 에러가 발생했을때 해당 에러를 처리하는 역할을 한다.
onComplete Publisher가 데이터 통지를 완료했음을 알릴 때 호출되는 메서드다. 데이터 통지가 정상적으로 완료될 경우에 어떤 후처리를 해야한다면 omComplete 메서드에서 처리 코드를 작성하면 된다.

Subscripton.java #

public interface Subscription { // 통지받을 데이터 개수를 지정해 데이터 통지를 요청하거나 통지받지 않게 구독을 해지할때 사용하는 인터페이스
    //통지받을 데이터 개수 요청
    public void request(long num);
    
    //구독 해지
    public void cancel(); // Subscriber에서 호출함 (Subscription을 받은 Subscriber에서 구독 해지를 위해 호출)
}
메서드 설명
request Subscriber가 구독한 데이터의 개수를 요청한다. Publisher에게 데이터의 개수를 요청할 수 있다.
cancel 구독을 해지한다.

실제 구독하는 구현코드! 위의 설명을 돕기위한 예제 코드다.

// Publisher
Publisher<Integer> pub = new Publisher<Integer>() {
    @Override
    public void subscribe(Subscriber<? super Integer> sub) {
        sub.onSubscribe(new Subscription() {
            @Override
            public void request(long n) {
                ...
            }

            @Override
            public void cancel() {
                ...
            }
        });
    }
};

// Subscriber
Subscriber<Integer> sub = new Subscriber<Integer>() {
    @Override
    public void onSubscribe(Subscription s) {
        log.debug("onSubscribe");
        s.request(Long.MAX_VALUE);
    }

    @Override
    public void onNext(Integer i) {
        log.debug("onNext:{}", i);
    }

    @Override
    public void onError(Throwable t) {
        log.debug("onError:{}", t);
    }

    @Override
    public void onComplete() {
        log.debug("onComplete");
    }
};


// 구독
pub.subscribe(sub);

결국 Publisher의 subscribe() 메서드를 호출하여 구독하는데, 이때 파라미터 Subscriber 객체를 넘기면 된다.

위 코드의 흐름

Publisher와 Subscriber의 동작과정을 리액티브 스트림즈의 컴포넌트 코드 관점에서 다시 이해해보자.

순서 과정
1 Publisher가 Subscriber 인터페이스 구현 객체를 subscribe 메서드의 파라미터로 전달한다.
2 Publisher 냅에서는 전달받은 Subscriber 인터페이스 구현 객체의 onSubscribe 메서드를 호출하면서 Subscriber의 구독을 의미하는 Subscription 인터페이스 구현 객체를 Subscriber에게 전달한다.
3 호출된 Subscriber 인터페이스 구현 객체의 onSubscribe() 메서드에서 전달받은 Subscription 객체를 통해 전달받을 데이터의 개수를 Publisher 에게 요청한다.
4 Publisher는 Subscriber로부터 전달받은 요청 개수만큼의 데이터를 onNext() 메서드를 호출해서 Subsriber에게 전달한다.
5 Publisher는 통지할 데이터가 더이상 없을 경우 onComplete 메서드를 호출해서 Subscriber에게 데이터 처리 종료를 알린다.

Processor.java #

public abstract interface Processor<T, R> extends Subscriber<T>, Publichser<R> {}

Processor 인터페이스는 별도로 구현해야할 메서드가 없다. 대신 Subscriber, Publisher 인터페이스를 상속하고있다. 리액티브 스트림즈 컴포넌트에서 설명한대로 Processor가 Publisher과 Subscriber 기능을 모두 가지고있기 때문이다.

구현 예제코드 #

@Slf4j
public class E05_PubSub_2 {
    public static void main(String[] args) {
        Publisher<Integer> pub = iterPub(Stream.iterate(1, a -> a + 1).limit(10).collect(Collectors.toList()));

        // 구독자
        Subscriber<Integer> sub = logSub();

        // 구독 시작
        pub.subscribe(sub);
    }

    private static Subscriber<Integer> logSub() {
        Subscriber<Integer> sub = new Subscriber<Integer>() {
            @Override
            public void onSubscribe(Subscription s) {
                // Subscription 의 request 를 요청해야한다.
                log.debug("onSubscribe");
                s.request(Long.MAX_VALUE);
            }

            @Override
            public void onNext(Integer i) {
                log.debug("onNext:{}", i);
            }

            @Override
            public void onError(Throwable t) {
                log.debug("onError:{}", t);
            }

            @Override
            public void onComplete() {
                log.debug("onComplete");
            }
        };

        return sub;
    }

    private static Publisher<Integer> iterPub(List<Integer> iter) {
        Publisher<Integer> pub = new Publisher<Integer>() {
            // Publisher 의 구현해야하는 메서드
            @Override
            public void subscribe(Subscriber<? super Integer> sub) { // 호출하면 그때부터 데이터를 통지
                // Subscription : Publisher, Subscriber 둘 사이의 구독이 한번 일어난다는 의미
                sub.onSubscribe(new Subscription() {
                    @Override
                    public void request(long n) {
                        try {
                            // iterable 의 원소를 통지한다.
                            iter.forEach(s -> sub.onNext(s));
                            // 여기서 멈추면 안되고, publisher 가 데이터 통지가 완료했으면 완료됨을 호출해야한다.
                            sub.onComplete();
                        } catch (Throwable t) {
                            // 에러 처리
                            sub.onError(t);
                        }
                    }

                    /**
                     * Subscriber 에서 Subscription 객체의 cancel()을 호출할 수 있다.
                     * 더이상 데이터를 통지받지 않겠다고 알림
                     */
                    @Override
                    public void cancel() {

                    }
                });
            }
        };
        return pub;
    }
}
  1. Publisher 객체 생성
Publisher<Integer> pub = iterPub(Stream.iterate(1, a -> a + 1).limit(10).collect(Collectors.toList()));

img_1.png

  1. Subscriber 객체 생성
Subscriber<Integer> sub = logSub();
  1. 구독 실행!
pub.subscribe(sub);

여기서부터 실행 흐름을 들여다보자. #

  1. Subscriber 객체의 onSubscribe() 호출
메서드 설명
onSubscribe 구독 시작 시점에 어떤 처리를 하는 역할을 한다. 여기서의 처리는 Publisher에게 요청할 데이터의 개수를 지정하거나 구독을 해지하는 것을 의미한다. 이것은 onSubscribe 메서드의 파라미터로 전달되는 Subscription 객체를 통해서 이뤄진다.

img_2.png

  1. Subscriber 객체의 onNext() 호출
메서드 설명
onNext Publisher가 통지한 데이터를 처리하는 역할을 한다.

img_3.png!

  1. 위 1)~2)번 반복

  2. 1, 2, 3 ~ 9까지 반복 완료 후 마지막 데이터인 10의 onNext()가 호출된 시점이 왔다.

img_4.png

  1. 모든 데이터 통지가 완료되었으므로 onComplete() 메서드 호출
메서드 설명
onComplete Publisher가 데이터 통지를 완료했음을 알릴 때 호출되는 메서드다. 데이터 통지가 정상적으로 완료될 경우에 어떤 후처리를 해야한다면 omComplete 메서드에서 처리 코드를 작성하면 된다.

img_5.png

▶ Subscriber 객체의 onComplete() 호출된 모습

img_6.png

실행결과

[main] DEBUG com.reactive.step02.E05_PubSub_2 - onSubscribe
[main] DEBUG com.reactive.step02.E05_PubSub_2 - onNext:1
[main] DEBUG com.reactive.step02.E05_PubSub_2 - onNext:2
[main] DEBUG com.reactive.step02.E05_PubSub_2 - onNext:3
[main] DEBUG com.reactive.step02.E05_PubSub_2 - onNext:4
[main] DEBUG com.reactive.step02.E05_PubSub_2 - onNext:5
[main] DEBUG com.reactive.step02.E05_PubSub_2 - onNext:6
[main] DEBUG com.reactive.step02.E05_PubSub_2 - onNext:7
[main] DEBUG com.reactive.step02.E05_PubSub_2 - onNext:8
[main] DEBUG com.reactive.step02.E05_PubSub_2 - onNext:9
[main] DEBUG com.reactive.step02.E05_PubSub_2 - onNext:10
[main] DEBUG com.reactive.step02.E05_PubSub_2 - onComplete