004 Impl3 Munity

Reactive Streams 구현 라이브러리 (3) Munity #

Mutiny #

  • Hibernate reactive에서 비동기 라이브러리로 제공
  • Multi, Uni publisher 제공

img.png

Mutiny - Multi #

  • 0..n개의 item을 전달
  • 에러가 발생하면 error signal 전달 하고 종료
  • 모든 item을 전달했다면 complete signal 전달하고 종료
  • backPressure 지원
  • Reactor의 flux와 유사

Multi 예제 #

@Slf4j
public class p218_MultiExample {
    public static void main(String[] args) {
        getItems()
                .subscribe()
                // subscribe 동시에 넘길 수 없음, subscribe() 호출 후 아래 호출 필요
                .withSubscriber(
                        new p218_SimpleMultiSubscriber<>(Integer.MAX_VALUE)
                );
    }

    private static Multi<Integer> getItems() {
        return Multi.createFrom().items(1, 2, 3, 4, 5);
    }
}

SimpleMultiSubscriber

@Slf4j
@RequiredArgsConstructor
public class p218_SimpleMultiSubscriber<T> implements MultiSubscriber<T> {
    private final Integer count;

    @Override
    public void onSubscribe(Flow.Subscription s) {
        s.request(count);
        log.info("subscribe");
    }

    @Override
    public void onItem(T item) {
        log.info("item: {}", item);
    }

    @Override
    public void onFailure(Throwable failure) {
        log.error("fail: {}", failure.getMessage());
    }

    @Override
    public void onCompletion() {
        log.info("completion");
    }
}

실행결과

10:46:36.701 [main] INFO com.example06.mutiny.p218_SimpleMultiSubscriber - item: 1
10:46:36.702 [main] INFO com.example06.mutiny.p218_SimpleMultiSubscriber - item: 2
10:46:36.702 [main] INFO com.example06.mutiny.p218_SimpleMultiSubscriber - item: 3
10:46:36.702 [main] INFO com.example06.mutiny.p218_SimpleMultiSubscriber - item: 4
10:46:36.702 [main] INFO com.example06.mutiny.p218_SimpleMultiSubscriber - item: 5
10:46:36.702 [main] INFO com.example06.mutiny.p218_SimpleMultiSubscriber - completion
10:46:36.702 [main] INFO com.example06.mutiny.p218_SimpleMultiSubscriber - subscribe

Mutiny - Uni #

  • 0..1개의 item을 전달
  • 에러가 발생하면 error signal 전달 하고 종료
  • 모든 item을 전달했다면 complete signal 전달하고 종료
  • Reactor의 Mono와 유사

Uni 예제 #

@Slf4j
public class p220_UniExample {
    public static void main(String[] args) {
        getItem()
                .subscribe()
                .withSubscriber(new p220_SimpleUniSubscriber<>(Integer.MAX_VALUE));
    }

    private static Uni<Integer> getItem() {
        return Uni.createFrom().item(1);
    }
}

SimpleUniSubscriber

@Slf4j
@RequiredArgsConstructor
public class p220_SimpleUniSubscriber<T> implements UniSubscriber<T> {
    private final Integer count;
    private UniSubscription subscription;

    @Override
    public void onSubscribe(UniSubscription s) {
        this.subscription = s;
        s.request(1);
        log.info("subscribe");
    }

    @Override
    public void onItem(T item) {
        log.info("item: {}", item);
    }

    @Override
    public void onFailure(Throwable failure) {
        log.error("error: {}", failure.getMessage());
    }
}

실행결과

10:47:27.202 [main] INFO com.example06.mutiny.p220_SimpleUniSubscriber - subscribe
10:47:27.208 [main] INFO com.example06.mutiny.p220_SimpleUniSubscriber - item: 1

  1. 강의 : Spring Webflux 완전 정복 : 코루틴부터 리액티브 MSA 프로젝트까지
  2. github 예제코드 : https://github.com/seohaebada/webflux