Reactive Streams 구현 라이브러리 (1) Reactor #
Project reactor #
- Pivotal 사에서 개발
- Spring reactor에서 사용
- Mono와 Flux publisher 제공
Project reactor - Flux #
- 0..n개의 item을 전달
- 에러가 발생하면 error signal 전달하고 종료
- 모든 item을 전달했다면 complete signal 전달 하고 종료
- backPressure 지원
Flux 예제 #
SimpleSubscriber
- FluxIterable publisher
- Subscription : StrictSubscriber
@Slf4j
@RequiredArgsConstructor
public class p181_SimpleSubscriber<T> implements Subscriber<T> {
private final Integer count;
/**
* 지속적으로 요청을 하는게 아니라, 딱 한번 N개의 요청을 받고 그 이후로 값을 계속 받음
* @param s the {@link Subscription} that allows requesting data via {@link Subscription#request(long)}
*/
@Override
public void onSubscribe(Subscription s) {
log.info("subscribe");
s.request(count); // count만큼 request
log.info("request: {}", count);
}
@SneakyThrows
@Override
public void onNext(T t) {
log.info("item: {}", t);
Thread.sleep(100);
}
@Override
public void onError(Throwable t) {
log.error("error: {}", t.getMessage());
}
@Override
public void onComplete() {
log.info("complete");
}
}
FluxSimpleExample
@Slf4j
public class p181_FluxSimpleExample {
@SneakyThrows
public static void main(String[] args) {
log.info("start main");
// main 쓰레드에서 수행
getItems() // 고정된 개수를 subscribe
.subscribe(new p181_SimpleSubscriber<>(Integer.MAX_VALUE));
log.info("end main");
Thread.sleep(1000);
}
private static Flux<Integer> getItems() {
return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
}
}
실행결과
13:18:58.672 [main] INFO com.example06.reactor.p181_FluxSimpleExample - start main
13:18:58.733 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
13:18:58.736 [main] INFO com.example06.reactor.p181_SimpleSubscriber - subscribe
13:18:58.736 [main] INFO com.example06.reactor.p181_SimpleSubscriber - request: 2147483647
13:18:58.737 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 1
13:18:58.904 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 2
13:18:59.049 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 3
13:18:59.233 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 4
13:18:59.399 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 5
13:18:59.578 [main] INFO com.example06.reactor.p181_SimpleSubscriber - complete
13:18:59.578 [main] INFO com.example06.reactor.p181_FluxSimpleExample - end main
Flux - subscribeOn 예제 #
FluxSimpleSubscribeOnExample
@Slf4j
public class p182_FluxSimpleSubscribeOnExample {
@SneakyThrows
public static void main(String[] args) {
log.info("start main");
getItems()
.map(i -> {
log.info("map {}", i);
return i;
})
// main 쓰레드가 아닌 다른 쓰레드에서 수행
.subscribeOn(Schedulers.single())
.subscribe(new p181_SimpleSubscriber<>(Integer.MAX_VALUE));
log.info("end main"); // 바로 호출
Thread.sleep(1000);
}
private static Flux<Integer> getItems() {
return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
}
}
실행결과
- single-1 쓰레드에서 수행
13:22:13.042 [main] INFO com.example06.reactor.p182_FluxSimpleSubscribeOnExample - start main
13:22:13.094 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
13:22:13.120 [main] INFO com.example06.reactor.p181_SimpleSubscriber - subscribe
13:22:13.120 [main] INFO com.example06.reactor.p181_SimpleSubscriber - request: 2147483647
13:22:13.122 [main] INFO com.example06.reactor.p182_FluxSimpleSubscribeOnExample - end main
13:22:13.124 [single-1] INFO com.example06.reactor.p182_FluxSimpleSubscribeOnExample - map 1
13:22:13.124 [single-1] INFO com.example06.reactor.p181_SimpleSubscriber - item: 1
13:22:13.264 [single-1] INFO com.example06.reactor.p182_FluxSimpleSubscribeOnExample - map 2
13:22:13.264 [single-1] INFO com.example06.reactor.p181_SimpleSubscriber - item: 2
13:22:13.440 [single-1] INFO com.example06.reactor.p182_FluxSimpleSubscribeOnExample - map 3
13:22:13.441 [single-1] INFO com.example06.reactor.p181_SimpleSubscriber - item: 3
13:22:13.613 [single-1] INFO com.example06.reactor.p182_FluxSimpleSubscribeOnExample - map 4
13:22:13.614 [single-1] INFO com.example06.reactor.p181_SimpleSubscriber - item: 4
13:22:13.789 [single-1] INFO com.example06.reactor.p182_FluxSimpleSubscribeOnExample - map 5
13:22:13.789 [single-1] INFO com.example06.reactor.p181_SimpleSubscriber - item: 5
13:22:13.969 [single-1] INFO com.example06.reactor.p181_SimpleSubscriber - complete
Flux - subscribe #
FluxNoSubscribeExample
- subscribe하지 않으면, 아무 일도 일어나지 않는다.
@Slf4j
public class p183_FluxNoSubscribeExample {
public static void main(String[] args) {
log.info("start main");
getItems(); // subscribe 하지않으면 아무일도 일어나지 않는다.
log.info("end main");
}
private static Flux<Integer> getItems() {
return Flux.create(fluxSink -> {
log.info("start getItems");
for (int i = 0; i < 5; i++) {
fluxSink.next(i);
}
fluxSink.complete();
log.info("end getItems");
});
}
}
Flux - backPressure #
- 1번째 예제
@Slf4j
public class p184_FluxSimpleRequestThreeExample {
public static void main(String[] args) {
// 3개 요청 (1, 2, 3 이후 종료) , 추가적인 요청 없음
getItems().subscribe(new p181_SimpleSubscriber<>(3));
}
private static Flux<Integer> getItems() {
return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
}
}
SimpleSubscriber
@Slf4j
@RequiredArgsConstructor
public class p181_SimpleSubscriber<T> implements Subscriber<T> {
private final Integer count;
/**
* 지속적으로 요청을 하는게 아니라, 딱 한번 N개의 요청을 받고 그 이후로 값을 계속 받음
* @param s the {@link Subscription} that allows requesting data via {@link Subscription#request(long)}
*/
@Override
public void onSubscribe(Subscription s) {
log.info("subscribe");
s.request(count); // count만큼 request
log.info("request: {}", count);
}
...
}
실행결과
09:24:32.953 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
09:24:32.957 [main] INFO com.example06.reactor.p181_SimpleSubscriber - subscribe
09:24:32.957 [main] INFO com.example06.reactor.p181_SimpleSubscriber - request: 3
09:24:32.958 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 1
09:24:33.091 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 2
09:24:33.232 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 3
- 2번째 예제
@Slf4j
public class p185_FluxContinuousRequestSubscriberExample {
public static void main(String[] args) {
getItems().subscribe(new p185_ContinuousRequestSubscriber<>());
}
private static Flux<Integer> getItems() {
return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
}
}
ContinuousRequestSubscriber
@Slf4j
public class p185_ContinuousRequestSubscriber<T>
implements Subscriber<T> {
private final Integer count = 1;
private Subscription subscription;
@Override
public void onSubscribe(Subscription s) {
this.subscription = s;
log.info("subscribe");
s.request(count); // 개수만큼 요청
log.info("request: {}", count);
}
@SneakyThrows
@Override
public void onNext(T t) {
log.info("item: {}", t);
Thread.sleep(1000);
// 1개를 또 호출
subscription.request(1);
log.info("request: {}", count);
}
@Override
public void onError(Throwable t) {
log.error("error: {}", t.getMessage());
}
@Override
public void onComplete() {
log.info("complete");
}
}
- 아래 로직으로 계속 반복 수행한다.
// 1개를 또 호출
subscription.request(1);
실행결과
09:33:34.419 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
09:33:34.424 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - subscribe
09:33:34.424 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - request: 1
09:33:34.425 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - item: 1
09:33:35.445 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - request: 1
09:33:35.445 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - item: 2
09:33:36.518 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - request: 1
09:33:36.518 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - item: 3
09:33:37.589 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - request: 1
09:33:37.589 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - item: 4
09:33:38.655 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - request: 1
09:33:38.655 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - item: 5
09:33:39.718 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - request: 1
09:33:39.727 [main] INFO com.example06.reactor.p185_ContinuousRequestSubscriber - complete
Flux - error #
@Slf4j
public class p186_FluxErrorExample {
public static void main(String[] args) {
log.info("start main");
getItems().subscribe(new p181_SimpleSubscriber<>(Integer.MAX_VALUE));
log.info("end main");
}
private static Flux<Integer> getItems() {
return Flux.create(fluxSink -> {
fluxSink.next(0);
fluxSink.next(1);
var error = new RuntimeException("error in flux");
fluxSink.error(error); // 에러 전달
});
}
}
실행결과
09:36:46.692 [main] INFO com.example06.reactor.p186_FluxErrorExample - start main
09:36:46.756 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
09:36:46.762 [main] INFO com.example06.reactor.p181_SimpleSubscriber - subscribe
09:36:46.762 [main] INFO com.example06.reactor.p181_SimpleSubscriber - request: 2147483647
09:36:46.764 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 0
09:36:46.885 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 1
09:36:47.028 [main] ERROR com.example06.reactor.p181_SimpleSubscriber - error: error in flux
09:36:47.028 [main] INFO com.example06.reactor.p186_FluxErrorExample - end main
Flux - complete #
@Slf4j
public class p187_FluxCompleteExample {
public static void main(String[] args) {
log.info("start main");
getItems().subscribe(new p181_SimpleSubscriber<>(Integer.MAX_VALUE));
log.info("end main");
}
private static Flux<Integer> getItems() {
return Flux.create(fluxSink -> {
fluxSink.complete(); // complete 전달
});
}
}
실행결과
09:37:31.038 [main] INFO com.example06.reactor.p187_FluxCompleteExample - start main
09:37:31.100 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
09:37:31.106 [main] INFO com.example06.reactor.p181_SimpleSubscriber - subscribe
09:37:31.106 [main] INFO com.example06.reactor.p181_SimpleSubscriber - request: 2147483647
09:37:31.109 [main] INFO com.example06.reactor.p181_SimpleSubscriber - complete
09:37:31.109 [main] INFO com.example06.reactor.p187_FluxCompleteExample - end main
Project reactor - Mono #
- 0..1개의 item을 전달
- 에러가 발생하면 error signal 전달하고 종료
- 모든 item을 전달했다면 complete signal 전달 하고 종료
Mono 예제 #
- 1개의 item만 전달하기 때문에 next 하나만 실행하면 complete가 보장
- 혹은 전달하지 않고 complete를 하면 값이 없다는 것을 의미 - 하나의 값이 있거나 없다
@Slf4j
public class p190_MonoSimpleExample {
@SneakyThrows
public static void main(String[] args) {
log.info("start main");
getItems()
.subscribe(new p181_SimpleSubscriber<>(Integer.MAX_VALUE));
log.info("end main");
Thread.sleep(1000);
}
private static Mono<Integer> getItems() {
return Mono.create(monoSink -> {
monoSink.success(1);
});
}
}
실행결과
09:39:02.445 [main] INFO com.example06.reactor.p190_MonoSimpleExample - start main
09:39:02.481 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
09:39:02.484 [main] INFO com.example06.reactor.p181_SimpleSubscriber - subscribe
09:39:02.484 [main] INFO com.example06.reactor.p181_SimpleSubscriber - request: 2147483647
09:39:02.484 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 1
09:39:02.658 [main] INFO com.example06.reactor.p181_SimpleSubscriber - complete
09:39:02.658 [main] INFO com.example06.reactor.p190_MonoSimpleExample - end main
Mono와 Flux #
Mono
- 없거나 혹은 하나의 값
- Mono로 특정 사건이 완료되는 시점을 가리킬 수도 있다
Flux
- 무한하거나 유한한 여러 개의 값
Flux를 Mono로 변환 #
- Mono.from으로 Flux를 Mono로 변환
- 첫 번째 값만 전달
@Slf4j
public class p192_FluxToMonoExample {
public static void main(String[] args) {
log.info("start main");
// 1,2,3,4,5 중 첫번째값 1이 onNext로 전달되고 complete
// 뒤에 있는 값들은 모두 무시
Mono.from(getItems())
.subscribe(new p181_SimpleSubscriber<>(Integer.MAX_VALUE));
log.info("end main");
}
private static Flux<Integer> getItems() {
return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
}
}
실행결과
09:40:37.275 [main] INFO com.example06.reactor.p192_FluxToMonoExample - start main
09:40:37.340 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
09:40:37.363 [main] INFO com.example06.reactor.p181_SimpleSubscriber - subscribe
09:40:37.363 [main] INFO com.example06.reactor.p181_SimpleSubscriber - request: 2147483647
09:40:37.365 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 1
09:40:37.473 [main] INFO com.example06.reactor.p181_SimpleSubscriber - complete
09:40:37.473 [main] INFO com.example06.reactor.p192_FluxToMonoExample - end main
Flux를 Mono로 변환 (Mono<List>)
#
@Slf4j
public class p193_FluxToListMonoExample {
public static void main(String[] args) {
log.info("start main");
getItems()
// collect 하고 complete 이벤트 발생 시점에 모은 값들을 모두 전달
// 1, 2, 3, 4, 5를 내부 배열에 저장하고, 가지고있던 값들을 모두 onNext() 한다.
// 하나로 합쳐져서 Mono로 한번 요청됨 ([1,2,3,4,5])
.collectList()
.subscribe(new p181_SimpleSubscriber<>(Integer.MAX_VALUE));
log.info("end main");
}
private static Flux<Integer> getItems() {
return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
}
}
실행결과
09:41:24.680 [main] INFO com.example06.reactor.p193_FluxToListMonoExample - start main
09:41:24.743 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
09:41:24.766 [main] INFO com.example06.reactor.p181_SimpleSubscriber - subscribe
09:41:24.766 [main] INFO com.example06.reactor.p181_SimpleSubscriber - request: 2147483647
09:41:24.767 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: [1, 2, 3, 4, 5]
09:41:24.940 [main] INFO com.example06.reactor.p181_SimpleSubscriber - complete
09:41:24.940 [main] INFO com.example06.reactor.p193_FluxToListMonoExample - end main
Mono를 Flux로 변환 #
- flux()
- Mono를 next 한 번 호출하고 onComplete를 호출하는 Flux로 변환
@Slf4j
public class p194_MonoToFluxExample {
public static void main(String[] args) {
log.info("start main");
// flux() - Mono를 next 한번 호출하고 onComplete를 호출하는 Flux로 변환
// [1,2,3,4,5]
getItems().flux()
.subscribe(new p181_SimpleSubscriber<>(Integer.MAX_VALUE));
log.info("end main");
}
private static Mono<List<Integer>> getItems() {
return Mono.just(List.of(1, 2, 3, 4, 5));
}
}
실행결과
09:42:16.606 [main] INFO com.example06.reactor.p194_MonoToFluxExample - start main
09:42:16.650 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
09:42:16.694 [main] INFO com.example06.reactor.p181_SimpleSubscriber - subscribe
09:42:16.695 [main] INFO com.example06.reactor.p181_SimpleSubscriber - request: 2147483647
09:42:16.695 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: [1, 2, 3, 4, 5]
09:42:16.802 [main] INFO com.example06.reactor.p181_SimpleSubscriber - complete
09:42:16.802 [main] INFO com.example06.reactor.p194_MonoToFluxExample - end main
Mono를 Flux로 변환 (Mono<List> -> Flux)
#
@Slf4j
public class p195_ListMonoToFluxExample {
public static void main(String[] args) {
log.info("start main");
getItems()
// Mono의 결과를 Flux 형태로 바꾸고, flux를 받아서 처리
// 1, 2, 3, 4, 5 하나씩 처리
.flatMapMany(value -> Flux.fromIterable(value))
.subscribe(new p181_SimpleSubscriber<>(Integer.MAX_VALUE));
log.info("end main");
}
private static Mono<List<Integer>> getItems() {
return Mono.just(List.of(1, 2, 3, 4, 5));
}
}
실행결과
09:43:07.895 [main] INFO com.example06.reactor.p195_ListMonoToFluxExample - start main
09:43:07.931 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
09:43:07.972 [main] INFO com.example06.reactor.p181_SimpleSubscriber - subscribe
09:43:07.972 [main] INFO com.example06.reactor.p181_SimpleSubscriber - request: 2147483647
09:43:07.972 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 1
09:43:08.082 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 2
09:43:08.239 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 3
09:43:08.414 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 4
09:43:08.588 [main] INFO com.example06.reactor.p181_SimpleSubscriber - item: 5
09:43:08.724 [main] INFO com.example06.reactor.p181_SimpleSubscriber - complete
09:43:08.724 [main] INFO com.example06.reactor.p195_ListMonoToFluxExample - end main
- 강의 : Spring Webflux 완전 정복 : 코루틴부터 리액티브 MSA 프로젝트까지
- github 예제코드 : https://github.com/seohaebada/webflux