Reactive Streams 구현 라이브러리 (2) RxJava #
RxJava #
- Netflix 사에서 개발
- 닷넷 프레임워크를 지원하는 Reactive Extensions를 포팅
- Flowable, Observable, Single, Maybe, Completable, publisher 제공
RxJava - Flowable #
- 0..n개의 item을 전달
- 에러가 발생하면 error signal 전달 하고 종료
- 모든 item을 전달했다면 complete signal 전달하고 종료
- backPressure 지원
- Reactor의 Flux와 유사
Flowable 예제 #
@Slf4j
public class p199_FlowableExample {
public static void main(String[] args) {
log.info("start main");
getItems()
.subscribe(new p181_SimpleSubscriber<>(Integer.MAX_VALUE));
log.info("end main");
}
private static Flowable<Integer> getItems() {
return Flowable.fromIterable(List.of(1, 2, 3, 4, 5));
}
}
실행결과
09:53:02.296 [main] INFO com.example06.rxjava.p199_FlowableExample - start main
09:53:02.339 [main] INFO com.example06.reactor.p181_SimpleSubscriber - subscribe
09:53:02.339 [main] INFO com.example06.reactor.p181_SimpleSubscriber - request: 2147483647
09:53:02.340 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 1
09:53:02.450 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 2
09:53:02.622 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 3
09:53:02.762 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 4
09:53:02.933 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 5
09:53:03.112 [main] INFO com.example06.reactor.p181_SimpleSubscriber - complete
09:53:03.114 [main] INFO com.example06.rxjava.p199_FlowableExample - end main
Flowable - backPressure 예제 #
@Slf4j
public class p200_FlowableContinuousRequestSubscriberExample {
public static void main(String[] args) {
log.info("start main");
getItems()
// 1개씩 처리 (backPressure)
.subscribe(new p185_ContinuousRequestSubscriber<>());
log.info("end main");
}
private static Flowable<Integer> getItems() {
return Flowable.fromIterable(List.of(1, 2, 3, 4, 5));
}
}
ContinuousRequestSubscriber
@Slf4j
public class p185_ContinuousRequestSubscriber<T>
implements Subscriber<T> {
private final Integer count = 1;
private Subscription subscription;
@Override
public void onSubscribe(Subscription s) {
this.subscription = s;
log.info("subscribe");
s.request(count); // 개수만큼 요청
log.info("request: {}", count);
}
@SneakyThrows
@Override
public void onNext(T t) {
log.info("item: {}", t);
Thread.sleep(1000);
// 1개를 또 호출
subscription.request(1);
log.info("request: {}", count);
}
@Override
public void onError(Throwable t) {
log.error("error: {}", t.getMessage());
}
@Override
public void onComplete() {
log.info("complete");
}
}
실행결과
09:54:17.223 [main] INFO com.example06.rxjava.p200_FlowableContinuousRequestSubscriberExample - start main
09:54:17.258 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - subscribe
09:54:17.259 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - request: 1
09:54:17.260 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - item: 1
09:54:18.335 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - request: 1
09:54:18.335 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - item: 2
09:54:19.408 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - request: 1
09:54:19.409 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - item: 3
09:54:20.483 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - request: 1
09:54:20.484 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - item: 4
09:54:21.544 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - request: 1
09:54:21.545 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - item: 5
09:54:22.618 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - request: 1
09:54:22.623 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - complete
09:54:22.623 [main] INFO com.example06.rxjava.p200_FlowableContinuousRequestSubscriberExample - end main
RxJava - Observable #
- 0..n개의 item을 전달
- 에러가 발생하면 error signal 전달 하고 종료
- 모든 item을 전달했다면 complete signal 전달하고 종료
- backPressure 지원 X
Observable vs Flowable #
Observable Flowable Push 기반 Pull 기반 Subscriber가 처리할 수 없더라도 item을 전달 Subscriber가 request의 수를 조절 Reactive manifesto의 message driven을 일부만 준수 Reactive manifesto의 message driven을 모두 준수 onSubscribe로 Disposable 전달 onSubscribe시 Subscription 전 달
Observable 예제 #
@Slf4j
public class p203_ObservableExample {
public static void main(String[] args) {
// 배압 조절 불가능
getItems()
.subscribe(new p203_SimpleObserver());
}
private static Observable<Integer> getItems() {
return Observable.fromIterable(List.of(1, 2, 3, 4, 5));
}
}
SimpleObserver
@Slf4j
public class p203_SimpleObserver implements Observer {
private Disposable disposable;
@Override
public void onSubscribe(@NonNull Disposable d) {
log.info("subscribe");
this.disposable = d;
}
@Override
public void onNext(@NonNull Object o) {
log.info("item: {}", o);
}
@Override
public void onError(@NonNull Throwable e) {
log.error("error: {}", e.getMessage());
}
@Override
public void onComplete() {
log.info("complete");
}
}
실행결과
09:57:21.403 [main] INFO com.example06.rxjava.p203_SimpleObserver - subscribe
09:57:21.404 [main] INFO com.example06.rxjava.p203_SimpleObserver - item: 1
09:57:21.404 [main] INFO com.example06.rxjava.p203_SimpleObserver - item: 2
09:57:21.404 [main] INFO com.example06.rxjava.p203_SimpleObserver - item: 3
09:57:21.404 [main] INFO com.example06.rxjava.p203_SimpleObserver - item: 4
09:57:21.405 [main] INFO com.example06.rxjava.p203_SimpleObserver - item: 5
09:57:21.405 [main] INFO com.example06.rxjava.p203_SimpleObserver - complete
RxJava - Single #
- 1개의 item을 전달 후 바로 onComplete signal 전달
- 1개의 item이 없다면 onError signal 전달
- 에러가 발생했다면 onError signal 전달
Single - success 예제 #
public class p205_SingleExample {
public static void main(String[] args) {
getItem()
.subscribe(new p205_SimpleSingleObserver<>());
}
private static Single<Integer> getItem() {
return Single.create(singleEmitter -> {
singleEmitter.onSuccess(1);
});
}
}
SimpleSingleObserver
@Slf4j
public class p205_SimpleSingleObserver<T> implements SingleObserver<T> {
private Disposable disposable;
@Override
public void onSubscribe(@NonNull Disposable d) {
this.disposable = d;
log.info("subscribe");
}
@Override
public void onSuccess(@NonNull Object o) {
log.info("item: {}", o);
}
@Override
public void onError(@NonNull Throwable e) {
log.error("error: {}", e.getMessage());
}
}
실행결과
09:58:59.778 [main] INFO com.example06.rxjava.p205_SimpleSingleObserver - subscribe
09:58:59.780 [main] INFO com.example06.rxjava.p205_SimpleSingleObserver - item: 1
Single - error (빈 값) #
@Slf4j
public class p206_SingleNullExample {
public static void main(String[] args) {
getItem()
.subscribe(new p205_SimpleSingleObserver<>());
}
private static Single<Integer> getItem() {
return Single.create(singleEmitter -> {
singleEmitter.onSuccess(null); // 에러 발생시킴
});
}
}
실행결과
09:59:53.191 [main] INFO com.example06.rxjava.p205_SimpleSingleObserver - subscribe
09:59:53.193 [main] ERROR com.example06.rxjava.p205_SimpleSingleObserver - error: onSuccess called with a null value. Null values are generally not allowed in 3.x operators and sources.
RxJava - Maybe #
- 1개의 item을 전달 후 바로 onComplete signal 전달
- 1개의 item이 없어도 onComplete signal 전달 가능
- 에러가 발생했다면 onError signal 전달
- Reactor의 Mono와 유사
예제 #
SimpleMaybeObserver
@Slf4j
public class SimpleMaybeObserver<T> implements MaybeObserver<T> {
private Disposable disposable;
@Override
public void onSubscribe(@NonNull Disposable d) {
this.disposable = d;
log.info("subscribe");
}
@Override
public void onSuccess(@NonNull T t) {
log.info("item: {}", t);
}
@Override
public void onError(@NonNull Throwable e) {
log.error("error: {}", e.getMessage());
}
@Override
public void onComplete() {
log.info("complete");
}
}
Maybe - success 예제 #
@Slf4j
public class p208_MaybeExample {
public static void main(String[] args) {
maybeGetItem()
.subscribe(new SimpleMaybeObserver<>());
}
private static Maybe<Integer> maybeGetItem() {
return Maybe.create(maybeEmitter -> {
maybeEmitter.onSuccess(1);
});
}
}
실행결과
10:42:03.579 [main] INFO com.example06.rxjava.SimpleMaybeObserver - subscribe
10:42:03.581 [main] INFO com.example06.rxjava.SimpleMaybeObserver - item: 1
Maybe - success (빈 값) 예제 #
@Slf4j
public class p209_MaybeEmptyValueExample {
public static void main(String[] args) {
maybeGetItem()
.subscribe(new SimpleMaybeObserver<>());
}
private static Maybe<Integer> maybeGetItem() {
return Maybe.create(maybeEmitter -> {
maybeEmitter.onComplete(); // complete()만 호출
});
}
}
실행결과
10:42:24.592 [main] INFO com.example06.rxjava.SimpleMaybeObserver - subscribe
10:42:24.593 [main] INFO com.example06.rxjava.SimpleMaybeObserver - complete
Maybe - error 예제 #
@Slf4j
public class p209_MaybeNullValueExample {
public static void main(String[] args) {
maybeGetItem()
.subscribe(new SimpleMaybeObserver<>());
}
private static Maybe<Integer> maybeGetItem() {
return Maybe.create(maybeEmitter -> {
maybeEmitter.onSuccess(null);
});
}
}
실행결과
10:42:48.454 [main] INFO com.example06.rxjava.SimpleMaybeObserver - subscribe
10:42:48.456 [main] ERROR com.example06.rxjava.SimpleMaybeObserver - error: onSuccess called with a null value. Null values are generally not allowed in 3.x operators and sources.
RxJava - Completable #
- onComplete 혹은 onError signal만 전달
- 값이 아닌 사건을 전달
예제 #
SimpleCompletableObserver
@Slf4j
public class SimpleCompletableObserver implements CompletableObserver {
private Disposable disposable;
@Override
public void onSubscribe(@NonNull Disposable d) {
log.info("subscribe");
this.disposable = d;
}
@Override
public void onComplete() {
log.info("complete");
}
@Override
public void onError(@NonNull Throwable e) {
log.error("error: {}", e.getMessage());
}
}
Completable - success 예제 #
@Slf4j
public class p212_CompletableExample {
public static void main(String[] args) {
getCompletion()
.subscribe(new SimpleCompletableObserver());
}
private static Completable getCompletion() {
return Completable.create(completableEmitter -> {
Thread.sleep(1000);
completableEmitter.onComplete(); // 값이 아닌 사건을 전달
});
}
}
실행결과
10:43:45.900 [main] INFO com.example06.rxjava.SimpleCompletableObserver - subscribe
10:43:46.924 [main] INFO com.example06.rxjava.SimpleCompletableObserver - complete
Completable - error 예제 #
@Slf4j
public class p213_CompletableErrorExample {
public static void main(String[] args) {
getCompletion()
.subscribe(new SimpleCompletableObserver());
}
private static Completable getCompletion() {
return Completable.create(completableEmitter -> {
Thread.sleep(1000);
completableEmitter.onError(
new RuntimeException("error in completable")
);
});
}
}
실행결과
10:44:07.096 [main] INFO com.example06.rxjava.SimpleCompletableObserver - subscribe
10:44:08.124 [main] ERROR com.example06.rxjava.SimpleCompletableObserver - error: error in completable
- 강의 : Spring Webflux 완전 정복 : 코루틴부터 리액티브 MSA 프로젝트까지
- github 예제코드 : https://github.com/seohaebada/webflux