003 Impl2 Rxjava

Reactive Streams 구현 라이브러리 (2) RxJava #

RxJava #

  • Netflix 사에서 개발
  • 닷넷 프레임워크를 지원하는 Reactive Extensions를 포팅
  • Flowable, Observable, Single, Maybe, Completable, publisher 제공

img.png

RxJava - Flowable #

  • 0..n개의 item을 전달
  • 에러가 발생하면 error signal 전달 하고 종료
  • 모든 item을 전달했다면 complete signal 전달하고 종료
  • backPressure 지원
  • Reactor의 Flux와 유사

img_1.png

Flowable 예제 #

@Slf4j
public class p199_FlowableExample {
    public static void main(String[] args) {
        log.info("start main");
        getItems()
                .subscribe(new p181_SimpleSubscriber<>(Integer.MAX_VALUE));
        log.info("end main");
    }

    private static Flowable<Integer> getItems() {
        return Flowable.fromIterable(List.of(1, 2, 3, 4, 5));
    }
}

실행결과

09:53:02.296 [main] INFO com.example06.rxjava.p199_FlowableExample - start main
09:53:02.339 [main] INFO com.example06.reactor.p181_SimpleSubscriber - subscribe
09:53:02.339 [main] INFO com.example06.reactor.p181_SimpleSubscriber - request: 2147483647
09:53:02.340 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 1
09:53:02.450 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 2
09:53:02.622 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 3
09:53:02.762 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 4
09:53:02.933 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 5
09:53:03.112 [main] INFO com.example06.reactor.p181_SimpleSubscriber - complete
09:53:03.114 [main] INFO com.example06.rxjava.p199_FlowableExample - end main

Flowable - backPressure 예제 #

@Slf4j
public class p200_FlowableContinuousRequestSubscriberExample {
    public static void main(String[] args) {
        log.info("start main");
        getItems()
                // 1개씩 처리 (backPressure)
                .subscribe(new p185_ContinuousRequestSubscriber<>());
        log.info("end main");
    }

    private static Flowable<Integer> getItems() {
        return Flowable.fromIterable(List.of(1, 2, 3, 4, 5));
    }
}

ContinuousRequestSubscriber

@Slf4j
public class p185_ContinuousRequestSubscriber<T>
        implements Subscriber<T> {
    private final Integer count = 1;
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription s) {
        this.subscription = s;
        log.info("subscribe");
        s.request(count); // 개수만큼 요청
        log.info("request: {}", count);
    }

    @SneakyThrows
    @Override
    public void onNext(T t) {
        log.info("item: {}", t);

        Thread.sleep(1000);
        // 1개를 또 호출
        subscription.request(1);
        log.info("request: {}", count);
    }

    @Override
    public void onError(Throwable t) {
        log.error("error: {}", t.getMessage());
    }

    @Override
    public void onComplete() {
        log.info("complete");
    }
}

실행결과

09:54:17.223 [main] INFO com.example06.rxjava.p200_FlowableContinuousRequestSubscriberExample - start main
09:54:17.258 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - subscribe
09:54:17.259 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - request: 1
09:54:17.260 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - item: 1
09:54:18.335 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - request: 1
09:54:18.335 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - item: 2
09:54:19.408 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - request: 1
09:54:19.409 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - item: 3
09:54:20.483 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - request: 1
09:54:20.484 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - item: 4
09:54:21.544 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - request: 1
09:54:21.545 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - item: 5
09:54:22.618 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - request: 1
09:54:22.623 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - complete
09:54:22.623 [main] INFO com.example06.rxjava.p200_FlowableContinuousRequestSubscriberExample - end main

RxJava - Observable #

  • 0..n개의 item을 전달
  • 에러가 발생하면 error signal 전달 하고 종료
  • 모든 item을 전달했다면 complete signal 전달하고 종료
  • backPressure 지원 X

img_2.png

Observable vs Flowable #

Observable Flowable Push 기반 Pull 기반 Subscriber가 처리할 수 없더라도 item을 전달 Subscriber가 request의 수를 조절 Reactive manifesto의 message driven을 일부만 준수 Reactive manifesto의 message driven을 모두 준수 onSubscribe로 Disposable 전달 onSubscribe시 Subscription 전 달

Observable 예제 #

@Slf4j
public class p203_ObservableExample {
    public static void main(String[] args) {
        // 배압 조절 불가능
        getItems()
                .subscribe(new p203_SimpleObserver());
    }

    private static Observable<Integer> getItems() {
        return Observable.fromIterable(List.of(1, 2, 3, 4, 5));
    }
}

SimpleObserver

@Slf4j
public class p203_SimpleObserver implements Observer {
    private Disposable disposable;

    @Override
    public void onSubscribe(@NonNull Disposable d) {
        log.info("subscribe");
        this.disposable = d;
    }

    @Override
    public void onNext(@NonNull Object o) {
        log.info("item: {}", o);
    }

    @Override
    public void onError(@NonNull Throwable e) {
        log.error("error: {}", e.getMessage());
    }

    @Override
    public void onComplete() {
        log.info("complete");
    }
}

실행결과

09:57:21.403 [main] INFO com.example06.rxjava.p203_SimpleObserver - subscribe
09:57:21.404 [main] INFO com.example06.rxjava.p203_SimpleObserver - item: 1
09:57:21.404 [main] INFO com.example06.rxjava.p203_SimpleObserver - item: 2
09:57:21.404 [main] INFO com.example06.rxjava.p203_SimpleObserver - item: 3
09:57:21.404 [main] INFO com.example06.rxjava.p203_SimpleObserver - item: 4
09:57:21.405 [main] INFO com.example06.rxjava.p203_SimpleObserver - item: 5
09:57:21.405 [main] INFO com.example06.rxjava.p203_SimpleObserver - complete

RxJava - Single #

  • 1개의 item을 전달 후 바로 onComplete signal 전달
  • 1개의 item이 없다면 onError signal 전달
  • 에러가 발생했다면 onError signal 전달

img_3.png

Single - success 예제 #

public class p205_SingleExample {
    public static void main(String[] args) {
        getItem()
                .subscribe(new p205_SimpleSingleObserver<>());
    }

    private static Single<Integer> getItem() {
        return Single.create(singleEmitter -> {
            singleEmitter.onSuccess(1);
        });
    }
}

SimpleSingleObserver

@Slf4j
public class p205_SimpleSingleObserver<T> implements SingleObserver<T> {
    private Disposable disposable;

    @Override
    public void onSubscribe(@NonNull Disposable d) {
        this.disposable = d;
        log.info("subscribe");
    }

    @Override
    public void onSuccess(@NonNull Object o) {
        log.info("item: {}", o);
    }

    @Override
    public void onError(@NonNull Throwable e) {
        log.error("error: {}", e.getMessage());
    }
}

실행결과

09:58:59.778 [main] INFO com.example06.rxjava.p205_SimpleSingleObserver - subscribe
09:58:59.780 [main] INFO com.example06.rxjava.p205_SimpleSingleObserver - item: 1

Single - error (빈 값) #

@Slf4j
public class p206_SingleNullExample {
    public static void main(String[] args) {
        getItem()
                .subscribe(new p205_SimpleSingleObserver<>());
    }

    private static Single<Integer> getItem() {
        return Single.create(singleEmitter -> {
            singleEmitter.onSuccess(null); // 에러 발생시킴
        });
    }
}

실행결과

09:59:53.191 [main] INFO com.example06.rxjava.p205_SimpleSingleObserver - subscribe
09:59:53.193 [main] ERROR com.example06.rxjava.p205_SimpleSingleObserver - error: onSuccess called with a null value. Null values are generally not allowed in 3.x operators and sources.

RxJava - Maybe #

  • 1개의 item을 전달 후 바로 onComplete signal 전달
  • 1개의 item이 없어도 onComplete signal 전달 가능
  • 에러가 발생했다면 onError signal 전달
  • Reactor의 Mono와 유사

img_4.png

예제 #

SimpleMaybeObserver

@Slf4j
public class SimpleMaybeObserver<T> implements MaybeObserver<T> {
    private Disposable disposable;

    @Override
    public void onSubscribe(@NonNull Disposable d) {
        this.disposable = d;
        log.info("subscribe");
    }

    @Override
    public void onSuccess(@NonNull T t) {
        log.info("item: {}", t);
    }

    @Override
    public void onError(@NonNull Throwable e) {
        log.error("error: {}", e.getMessage());
    }

    @Override
    public void onComplete() {
        log.info("complete");
    }
}

Maybe - success 예제 #

@Slf4j
public class p208_MaybeExample {
    public static void main(String[] args) {
        maybeGetItem()
                .subscribe(new SimpleMaybeObserver<>());
    }

    private static Maybe<Integer> maybeGetItem() {
        return Maybe.create(maybeEmitter -> {
            maybeEmitter.onSuccess(1);
        });
    }
}

실행결과

10:42:03.579 [main] INFO com.example06.rxjava.SimpleMaybeObserver - subscribe
10:42:03.581 [main] INFO com.example06.rxjava.SimpleMaybeObserver - item: 1

Maybe - success (빈 값) 예제 #

@Slf4j
public class p209_MaybeEmptyValueExample {
    public static void main(String[] args) {
        maybeGetItem()
                .subscribe(new SimpleMaybeObserver<>());
    }

    private static Maybe<Integer> maybeGetItem() {
        return Maybe.create(maybeEmitter -> {
            maybeEmitter.onComplete(); // complete()만 호출
        });
    }
}

실행결과

10:42:24.592 [main] INFO com.example06.rxjava.SimpleMaybeObserver - subscribe
10:42:24.593 [main] INFO com.example06.rxjava.SimpleMaybeObserver - complete

Maybe - error 예제 #

@Slf4j
public class p209_MaybeNullValueExample {
    public static void main(String[] args) {
        maybeGetItem()
                .subscribe(new SimpleMaybeObserver<>());
    }

    private static Maybe<Integer> maybeGetItem() {
        return Maybe.create(maybeEmitter -> {
            maybeEmitter.onSuccess(null);
        });
    }
}

실행결과

10:42:48.454 [main] INFO com.example06.rxjava.SimpleMaybeObserver - subscribe
10:42:48.456 [main] ERROR com.example06.rxjava.SimpleMaybeObserver - error: onSuccess called with a null value. Null values are generally not allowed in 3.x operators and sources.

RxJava - Completable #

  • onComplete 혹은 onError signal만 전달
  • 값이 아닌 사건을 전달

img_5.png

예제 #

SimpleCompletableObserver

@Slf4j
public class SimpleCompletableObserver implements CompletableObserver {
    private Disposable disposable;

    @Override
    public void onSubscribe(@NonNull Disposable d) {
        log.info("subscribe");
        this.disposable = d;
    }

    @Override
    public void onComplete() {
        log.info("complete");
    }

    @Override
    public void onError(@NonNull Throwable e) {
        log.error("error: {}", e.getMessage());
    }
}

Completable - success 예제 #

@Slf4j
public class p212_CompletableExample {
    public static void main(String[] args) {
        getCompletion()
                .subscribe(new SimpleCompletableObserver());
    }

    private static Completable getCompletion() {
        return Completable.create(completableEmitter -> {
            Thread.sleep(1000);
            completableEmitter.onComplete(); // 값이 아닌 사건을 전달 
        });
    }
}

실행결과

10:43:45.900 [main] INFO com.example06.rxjava.SimpleCompletableObserver - subscribe
10:43:46.924 [main] INFO com.example06.rxjava.SimpleCompletableObserver - complete

Completable - error 예제 #

@Slf4j
public class p213_CompletableErrorExample {
    public static void main(String[] args) {
        getCompletion()
                .subscribe(new SimpleCompletableObserver());
    }

    private static Completable getCompletion() {
        return Completable.create(completableEmitter -> {
            Thread.sleep(1000);
            completableEmitter.onError(
                    new RuntimeException("error in completable")
            );
        });
    }
}

실행결과

10:44:07.096 [main] INFO com.example06.rxjava.SimpleCompletableObserver - subscribe
10:44:08.124 [main] ERROR com.example06.rxjava.SimpleCompletableObserver - error: error in completable

  1. 강의 : Spring Webflux 완전 정복 : 코루틴부터 리액티브 MSA 프로젝트까지
  2. github 예제코드 : https://github.com/seohaebada/webflux