002 Impl1 Reactor

Reactive Streams 구현 라이브러리 (1) Reactor #

Project reactor #

  • Pivotal 사에서 개발
  • Spring reactor에서 사용
  • Mono와 Flux publisher 제공

img.png

Project reactor - Flux #

  • 0..n개의 item을 전달
  • 에러가 발생하면 error signal 전달하고 종료
  • 모든 item을 전달했다면 complete signal 전달 하고 종료
  • backPressure 지원

img_1.png

Flux 예제 #

SimpleSubscriber

  • FluxIterable publisher
  • Subscription : StrictSubscriber
@Slf4j
@RequiredArgsConstructor
public class p181_SimpleSubscriber<T> implements Subscriber<T> {
    private final Integer count;

    /**
     * 지속적으로 요청을 하는게 아니라, 딱 한번 N개의 요청을 받고 그 이후로 값을 계속 받음
     * @param s the {@link Subscription} that allows requesting data via {@link Subscription#request(long)}
     */
    @Override
    public void onSubscribe(Subscription s) {
        log.info("subscribe");
        s.request(count); // count만큼 request
        log.info("request: {}", count);
    }

    @SneakyThrows
    @Override
    public void onNext(T t) {
        log.info("item: {}", t);
        Thread.sleep(100);
    }

    @Override
    public void onError(Throwable t) {
        log.error("error: {}", t.getMessage());
    }

    @Override
    public void onComplete() {
        log.info("complete");
    }
}

FluxSimpleExample

@Slf4j
public class p181_FluxSimpleExample {
    @SneakyThrows
    public static void main(String[] args) {
        log.info("start main");
        // main 쓰레드에서 수행
        getItems() // 고정된 개수를 subscribe
                .subscribe(new p181_SimpleSubscriber<>(Integer.MAX_VALUE));
        log.info("end main");

        Thread.sleep(1000);
    }

    private static Flux<Integer> getItems() {
        return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
    }
}

실행결과

13:18:58.672 [main] INFO com.example06.reactor.p181_FluxSimpleExample - start main
13:18:58.733 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
13:18:58.736 [main] INFO com.example06.reactor.p181_SimpleSubscriber - subscribe
13:18:58.736 [main] INFO com.example06.reactor.p181_SimpleSubscriber - request: 2147483647
13:18:58.737 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 1
13:18:58.904 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 2
13:18:59.049 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 3
13:18:59.233 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 4
13:18:59.399 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 5
13:18:59.578 [main] INFO com.example06.reactor.p181_SimpleSubscriber - complete
13:18:59.578 [main] INFO com.example06.reactor.p181_FluxSimpleExample - end main

Flux - subscribeOn 예제 #

FluxSimpleSubscribeOnExample

@Slf4j
public class p182_FluxSimpleSubscribeOnExample {
    @SneakyThrows
    public static void main(String[] args) {
        log.info("start main");
        getItems()
                .map(i -> {
                    log.info("map {}", i);
                    return i;
                })
                // main 쓰레드가 아닌 다른 쓰레드에서 수행
                .subscribeOn(Schedulers.single())
                .subscribe(new p181_SimpleSubscriber<>(Integer.MAX_VALUE));
        log.info("end main"); // 바로 호출

        Thread.sleep(1000);
    }

    private static Flux<Integer> getItems() {
        return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
    }
}

실행결과

  • single-1 쓰레드에서 수행
13:22:13.042 [main] INFO com.example06.reactor.p182_FluxSimpleSubscribeOnExample - start main
13:22:13.094 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
13:22:13.120 [main] INFO com.example06.reactor.p181_SimpleSubscriber - subscribe
13:22:13.120 [main] INFO com.example06.reactor.p181_SimpleSubscriber - request: 2147483647
13:22:13.122 [main] INFO com.example06.reactor.p182_FluxSimpleSubscribeOnExample - end main
13:22:13.124 [single-1] INFO com.example06.reactor.p182_FluxSimpleSubscribeOnExample - map 1
13:22:13.124 [single-1] INFO com.example06.reactor.p181_SimpleSubscriber - item: 1
13:22:13.264 [single-1] INFO com.example06.reactor.p182_FluxSimpleSubscribeOnExample - map 2
13:22:13.264 [single-1] INFO com.example06.reactor.p181_SimpleSubscriber - item: 2
13:22:13.440 [single-1] INFO com.example06.reactor.p182_FluxSimpleSubscribeOnExample - map 3
13:22:13.441 [single-1] INFO com.example06.reactor.p181_SimpleSubscriber - item: 3
13:22:13.613 [single-1] INFO com.example06.reactor.p182_FluxSimpleSubscribeOnExample - map 4
13:22:13.614 [single-1] INFO com.example06.reactor.p181_SimpleSubscriber - item: 4
13:22:13.789 [single-1] INFO com.example06.reactor.p182_FluxSimpleSubscribeOnExample - map 5
13:22:13.789 [single-1] INFO com.example06.reactor.p181_SimpleSubscriber - item: 5
13:22:13.969 [single-1] INFO com.example06.reactor.p181_SimpleSubscriber - complete

Flux - subscribe #

FluxNoSubscribeExample

  • subscribe하지 않으면, 아무 일도 일어나지 않는다.
@Slf4j
public class p183_FluxNoSubscribeExample {
    public static void main(String[] args) {
        log.info("start main");
        getItems(); // subscribe 하지않으면 아무일도 일어나지 않는다.
        log.info("end main");
    }

    private static Flux<Integer> getItems() {
        return Flux.create(fluxSink -> {
            log.info("start getItems");
            for (int i = 0; i < 5; i++) {
                fluxSink.next(i);
            }
            fluxSink.complete();
            log.info("end getItems");
        });
    }
}

Flux - backPressure #

  1. 1번째 예제
@Slf4j
public class p184_FluxSimpleRequestThreeExample {
    public static void main(String[] args) {
        // 3개 요청 (1, 2, 3 이후 종료) , 추가적인 요청 없음 
        getItems().subscribe(new p181_SimpleSubscriber<>(3));
    }

    private static Flux<Integer> getItems() {
        return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
    }
}

SimpleSubscriber

@Slf4j
@RequiredArgsConstructor
public class p181_SimpleSubscriber<T> implements Subscriber<T> {
    private final Integer count;

    /**
     * 지속적으로 요청을 하는게 아니라, 딱 한번 N개의 요청을 받고 그 이후로 값을 계속 받음
     * @param s the {@link Subscription} that allows requesting data via {@link Subscription#request(long)}
     */
    @Override
    public void onSubscribe(Subscription s) {
        log.info("subscribe");
        s.request(count); // count만큼 request
        log.info("request: {}", count);
    }
    
    ...
}

실행결과

09:24:32.953 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
09:24:32.957 [main] INFO com.example06.reactor.p181_SimpleSubscriber - subscribe
09:24:32.957 [main] INFO com.example06.reactor.p181_SimpleSubscriber - request: 3
09:24:32.958 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 1
09:24:33.091 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 2
09:24:33.232 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 3
  1. 2번째 예제
@Slf4j
public class p185_FluxContinuousRequestSubscriberExample {
    public static void main(String[] args) {
        getItems().subscribe(new p185_ContinuousRequestSubscriber<>());
    }

    private static Flux<Integer> getItems() {
        return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
    }
}

ContinuousRequestSubscriber

@Slf4j
public class p185_ContinuousRequestSubscriber<T>
        implements Subscriber<T> {
    private final Integer count = 1;
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription s) {
        this.subscription = s;
        log.info("subscribe");
        s.request(count); // 개수만큼 요청
        log.info("request: {}", count);
    }

    @SneakyThrows
    @Override
    public void onNext(T t) {
        log.info("item: {}", t);

        Thread.sleep(1000);
        // 1개를 또 호출
        subscription.request(1);
        log.info("request: {}", count);
    }

    @Override
    public void onError(Throwable t) {
        log.error("error: {}", t.getMessage());
    }

    @Override
    public void onComplete() {
        log.info("complete");
    }
}
  1. 아래 로직으로 계속 반복 수행한다.
// 1개를 또 호출
subscription.request(1);

실행결과

09:33:34.419 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
09:33:34.424 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - subscribe
09:33:34.424 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - request: 1
09:33:34.425 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - item: 1
09:33:35.445 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - request: 1
09:33:35.445 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - item: 2
09:33:36.518 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - request: 1
09:33:36.518 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - item: 3
09:33:37.589 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - request: 1
09:33:37.589 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - item: 4
09:33:38.655 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - request: 1
09:33:38.655 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - item: 5
09:33:39.718 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - request: 1
09:33:39.727 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - complete

Flux - error #

@Slf4j
public class p186_FluxErrorExample {
    public static void main(String[] args) {
        log.info("start main");
        getItems().subscribe(new p181_SimpleSubscriber<>(Integer.MAX_VALUE));
        log.info("end main");
    }

    private static Flux<Integer> getItems() {
        return Flux.create(fluxSink -> {
            fluxSink.next(0);
            fluxSink.next(1);
            var error = new RuntimeException("error in flux");
            fluxSink.error(error); // 에러 전달
        });
    }
}

실행결과

09:36:46.692 [main] INFO com.example06.reactor.p186_FluxErrorExample - start main
09:36:46.756 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
09:36:46.762 [main] INFO com.example06.reactor.p181_SimpleSubscriber - subscribe
09:36:46.762 [main] INFO com.example06.reactor.p181_SimpleSubscriber - request: 2147483647
09:36:46.764 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 0
09:36:46.885 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 1
09:36:47.028 [main] ERROR com.example06.reactor.p181_SimpleSubscriber - error: error in flux
09:36:47.028 [main] INFO com.example06.reactor.p186_FluxErrorExample - end main

Flux - complete #

@Slf4j
public class p187_FluxCompleteExample {
    public static void main(String[] args) {
        log.info("start main");
        getItems().subscribe(new p181_SimpleSubscriber<>(Integer.MAX_VALUE));
        log.info("end main");
    }

    private static Flux<Integer> getItems() {
        return Flux.create(fluxSink -> {
            fluxSink.complete(); // complete 전달
        });
    }
}

실행결과

09:37:31.038 [main] INFO com.example06.reactor.p187_FluxCompleteExample - start main
09:37:31.100 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
09:37:31.106 [main] INFO com.example06.reactor.p181_SimpleSubscriber - subscribe
09:37:31.106 [main] INFO com.example06.reactor.p181_SimpleSubscriber - request: 2147483647
09:37:31.109 [main] INFO com.example06.reactor.p181_SimpleSubscriber - complete
09:37:31.109 [main] INFO com.example06.reactor.p187_FluxCompleteExample - end main

Project reactor - Mono #

  • 0..1개의 item을 전달
  • 에러가 발생하면 error signal 전달하고 종료
  • 모든 item을 전달했다면 complete signal 전달 하고 종료

img_2.png

Mono 예제 #

  • 1개의 item만 전달하기 때문에 next 하나만 실행하면 complete가 보장
  • 혹은 전달하지 않고 complete를 하면 값이 없다는 것을 의미 - 하나의 값이 있거나 없다
@Slf4j
public class p190_MonoSimpleExample {
    @SneakyThrows
    public static void main(String[] args) {
        log.info("start main");
        getItems()
                .subscribe(new p181_SimpleSubscriber<>(Integer.MAX_VALUE));
        log.info("end main");

        Thread.sleep(1000);
    }

    private static Mono<Integer> getItems() {
        return Mono.create(monoSink -> {
            monoSink.success(1);
        });
    }
}

실행결과

09:39:02.445 [main] INFO com.example06.reactor.p190_MonoSimpleExample - start main
09:39:02.481 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
09:39:02.484 [main] INFO com.example06.reactor.p181_SimpleSubscriber - subscribe
09:39:02.484 [main] INFO com.example06.reactor.p181_SimpleSubscriber - request: 2147483647
09:39:02.484 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 1
09:39:02.658 [main] INFO com.example06.reactor.p181_SimpleSubscriber - complete
09:39:02.658 [main] INFO com.example06.reactor.p190_MonoSimpleExample - end main

Mono와 Flux #

Mono : Optional

  • 없거나 혹은 하나의 값
  • Mono로 특정 사건이 완료되는 시점을 가리킬 수도 있다

Flux: List

  • 무한하거나 유한한 여러 개의 값

Flux를 Mono로 변환 #

  • Mono.from으로 Flux를 Mono로 변환
  • 첫 번째 값만 전달
@Slf4j
public class p192_FluxToMonoExample {
    public static void main(String[] args) {
        log.info("start main");
        // 1,2,3,4,5 중 첫번째값 1이 onNext로 전달되고 complete
        // 뒤에 있는 값들은 모두 무시
        Mono.from(getItems())
                .subscribe(new p181_SimpleSubscriber<>(Integer.MAX_VALUE));
        log.info("end main");
    }

    private static Flux<Integer> getItems() {
        return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
    }
}

실행결과

09:40:37.275 [main] INFO com.example06.reactor.p192_FluxToMonoExample - start main
09:40:37.340 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
09:40:37.363 [main] INFO com.example06.reactor.p181_SimpleSubscriber - subscribe
09:40:37.363 [main] INFO com.example06.reactor.p181_SimpleSubscriber - request: 2147483647
09:40:37.365 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 1
09:40:37.473 [main] INFO com.example06.reactor.p181_SimpleSubscriber - complete
09:40:37.473 [main] INFO com.example06.reactor.p192_FluxToMonoExample - end main

Flux를 Mono로 변환 (Mono<List>) #

@Slf4j
public class p193_FluxToListMonoExample {
    public static void main(String[] args) {
        log.info("start main");
        getItems()
                // collect 하고 complete 이벤트 발생 시점에 모은 값들을 모두 전달
                // 1, 2, 3, 4, 5를 내부 배열에 저장하고, 가지고있던 값들을 모두 onNext() 한다.
                // 하나로 합쳐져서 Mono로 한번 요청됨 ([1,2,3,4,5])
                .collectList()
                .subscribe(new p181_SimpleSubscriber<>(Integer.MAX_VALUE));
        log.info("end main");
    }

    private static Flux<Integer> getItems() {
        return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
    }
}

실행결과

09:41:24.680 [main] INFO com.example06.reactor.p193_FluxToListMonoExample - start main
09:41:24.743 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
09:41:24.766 [main] INFO com.example06.reactor.p181_SimpleSubscriber - subscribe
09:41:24.766 [main] INFO com.example06.reactor.p181_SimpleSubscriber - request: 2147483647
09:41:24.767 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: [1, 2, 3, 4, 5]
09:41:24.940 [main] INFO com.example06.reactor.p181_SimpleSubscriber - complete
09:41:24.940 [main] INFO com.example06.reactor.p193_FluxToListMonoExample - end main

Mono를 Flux로 변환 #

  • flux()
  • Mono를 next 한 번 호출하고 onComplete를 호출하는 Flux로 변환
@Slf4j
public class p194_MonoToFluxExample {
    public static void main(String[] args) {
        log.info("start main");

        // flux() - Mono를 next 한번 호출하고 onComplete를 호출하는 Flux로 변환
        // [1,2,3,4,5]
        getItems().flux()
                .subscribe(new p181_SimpleSubscriber<>(Integer.MAX_VALUE));
        log.info("end main");
    }

    private static Mono<List<Integer>> getItems() {
        return Mono.just(List.of(1, 2, 3, 4, 5));
    }
}

실행결과

09:42:16.606 [main] INFO com.example06.reactor.p194_MonoToFluxExample - start main
09:42:16.650 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
09:42:16.694 [main] INFO com.example06.reactor.p181_SimpleSubscriber - subscribe
09:42:16.695 [main] INFO com.example06.reactor.p181_SimpleSubscriber - request: 2147483647
09:42:16.695 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: [1, 2, 3, 4, 5]
09:42:16.802 [main] INFO com.example06.reactor.p181_SimpleSubscriber - complete
09:42:16.802 [main] INFO com.example06.reactor.p194_MonoToFluxExample - end main

Mono를 Flux로 변환 (Mono<List> -> Flux) #

@Slf4j
public class p195_ListMonoToFluxExample {
    public static void main(String[] args) {
        log.info("start main");
        getItems()
                // Mono의 결과를 Flux 형태로 바꾸고, flux를 받아서 처리
                // 1, 2, 3, 4, 5 하나씩 처리 
                .flatMapMany(value -> Flux.fromIterable(value))
                .subscribe(new p181_SimpleSubscriber<>(Integer.MAX_VALUE));
        log.info("end main");
    }

    private static Mono<List<Integer>> getItems() {
        return Mono.just(List.of(1, 2, 3, 4, 5));
    }
}

실행결과

09:43:07.895 [main] INFO com.example06.reactor.p195_ListMonoToFluxExample - start main
09:43:07.931 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
09:43:07.972 [main] INFO com.example06.reactor.p181_SimpleSubscriber - subscribe
09:43:07.972 [main] INFO com.example06.reactor.p181_SimpleSubscriber - request: 2147483647
09:43:07.972 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 1
09:43:08.082 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 2
09:43:08.239 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 3
09:43:08.414 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 4
09:43:08.588 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 5
09:43:08.724 [main] INFO com.example06.reactor.p181_SimpleSubscriber - complete
09:43:08.724 [main] INFO com.example06.reactor.p195_ListMonoToFluxExample - end main

  1. 강의 : Spring Webflux 완전 정복 : 코루틴부터 리액티브 MSA 프로젝트까지
  2. github 예제코드 : https://github.com/seohaebada/webflux