728x90
반응형
서론
- 리액티브 프로그래밍 패러다임은 종종 옵저버 설계 패턴의 확장으로 표현됩니다.
- Observer Pattern 에서 방출, 구독 이라는 개념을 익혔습니다.
- 리액터는 좀더 많은 연산을 가능하게 해주는 퍼블리셔를 구현하며 결합을 가능하게 합니다.
Flux
플럭스는 0~N개의 방출된 항목의 비동기 시퀀스를 나타낸다.
선택적으로 완료 신호나 오류에 의해 종료되는 표준 퍼블리셔이다.
플럭스의 흐름도
Flux String 복사 예제
- 리스트를 Flux 로 변환한뒤 Flux에서 다시 List<String> 을 반환하는 예제이다.
- List<string> -> Flux<String> -> List<String>
- 아래 예제는 Flux가 어떻게 동작하는지만 확인해 본다.
1. List<String> copyList = new ArrayList<>();
2. List<String> originList = Arrays.asList("foo", "bar", "foobar");
3. Flux<String> flux = Flux.fromIterable(originList);
4. flux.subscribe(i -> copyList.add(i));
3. orginList 를 Flux로 반환한다.
4. flux에서 연산을 마친 방출된 데이터를 copyList에 담는다.
Mono
- Mono 는 0~1개의 방출된 항목의 비동기 시퀀스를 나타낸다.
- Mono는 onNext 신호를 통해 최대 한 개의 항목을 방출한다.
- onComplete 신호(값이 있든 없든 성공한 Mono)로 종료하거나
단일 onError 신호(실패한 Mono)만 방출하는 특수 퍼블리셔입니다. - OnNext를 호출한뒤 즉시 OnComplete 를 호출될것으로 예상된다.
- Mono는 Flux 연산자의 일부를 제공한다
1. List<String> copyList = new ArrayList<>();
2. Mono<String> mono = Mono.just("foo");
3. mono.subscribe(i -> copyList.add(i));
4. String a = mono.block();
mono 는 하나의 데이터만 방출 한다.
3. 비동기 처리로 방출된 데이터를 담는다.
4. 동기로 작업이 끝날때 까지 기다린다.
Subscribe, block
- Flux 와 Mono는 그냥 동작하지 않는다.
- subscribe: 비동기로 수행
- block: 동기로 완료될때 까지 기다린다.
Block 사용법
// mono
Mono<String> mono = Mono.just("foo");
String string = mono.block(); // foo 리턴
// flux
List<String> originList = Arrays.asList("foo", "bar", "foobar");
Flux<String> flux = Flux.fromIterable(originList);
flux.blockFirst(); // 먼저 방출된 stirng 리턴
Subscribe 사용법
1. subscribe(); // 구독하고, 트리거만 시킨다.
2. subscribe(Consumer<? super T> consumer); // 생산된 값을 처리한다.
3. subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer); // 오류처리
4. subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer); // 에러 없이 완료 되었을때 추가 처리
5. subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer,
Consumer<? super Subscription> subscriptionConsumer); // 구독자에게 반응
예제
Flux를 이용하여 테스트할 함수
// 정상 함수
public String upper(String word) {
log.info("Call upper function, INPUT VALUE {}", word);
return word.toUpperCase(Locale.ROOT);
}
// 익셉션 터지는 함수
public String upperWithException(String word) {
if (word.equalsIgnoreCase("bar")) {
throw new IllegalArgumentException("Throw error!!!!");
}
log.info("Call upper function, INPUT VALUE {}", word);
return word.toUpperCase(Locale.ROOT);
}
1번 예제
// 1번 케이스
public void sub1Ex() {
List<String> originList = Arrays.asList("foo", "bar", "foobar");
Flux<String> flux = Flux.fromIterable(originList);
flux.map(this::upper)
.subscribe();
}
------------------------------------------------------------------------------------
함수만 실행이 된다.
// 출력값
Call upper function, INPUT VALUE foo
Call upper function, INPUT VALUE bar
Call upper function, INPUT VALUE foobar
-------------------------------------------------------------------------------------
2번 예제
// 2번 케이스
public void sub2Ex() {
List<String> originList = Arrays.asList("foo", "bar", "foobar");
Flux<String> flux = Flux.fromIterable(originList);
flux.map(this::upper)
.subscribe(word -> log.info("PRINT ::: {}", word));
}
------------------------------------------------------------------------------------
함수가 실행이 되고 방출된 값이 출력된다.
// 출력값
Call upper function, INPUT VALUE foo
PRINT ::: FOO
Call upper function, INPUT VALUE bar
PRINT ::: BAR
Call upper function, INPUT VALUE foobar
PRINT ::: FOOBAR
-------------------------------------------------------------------------------------
3번예제
익셉션이 발생한경우
2 번 케이스에서 익셉션이 발생 했을경우
public void sub2ExWithException() {
List<String> originList = Arrays.asList("foo", "bar", "foobar");
Flux<String> flux = Flux.fromIterable(originList);
flux.map(this::upperWithException)
.subscribe(word -> log.info("PRINT ::: {}", word));
}
------------------------------------------------------------------------------------
정상으로 처리되다가 익셉션이 터지는 경우 에러가 발생한다.
다음 값들이 정상적으로 처리가 안된다.
// 출력값
Call upper function, INPUT VALUE foo
PRINT ::: FOO
called default onErrorDropped
java.lang.IllegalArgumentException: Throw error!!!!
Caused by: java.lang.IllegalArgumentException: Throw error!!!!
at com.p8labs.reactive.monoflux.FluxSubscribe.upperWithException(FluxSubscribe.java:45)
-------------------------------------------------------------------------------------
3번 익셉션 발생시 처리
public void sub3ex() {
List<String> originList = Arrays.asList("foo", "bar", "foobar");
Flux<String> flux = Flux.fromIterable(originList);
flux.map(this::upperWithException)
.subscribe(word -> log.info("PRINT ::: {}", word),
word -> log.info("Find error!!!!!"));
}
------------------------------------------------------------------------------------
정상으로 처리되다가 익셉션이 터지는 경우 에러가 발생한다.
Find Error 를 찍는다.
다음 값들이 정상적으로 처리가 안된다.
// 출력값
Call upper function, INPUT VALUE foo
PRINT ::: FOO
Find error!!!!!
-------------------------------------------------------------------------------------
4번예제
4번 예제
public void sub4ex() {
List<String> originList = Arrays.asList("foo", "bar", "foobar");
Flux<String> flux = Flux.fromIterable(originList);
flux.map(this::upper)
.subscribe(word -> log.info("PRINT ::: {}", word),
word -> log.info("Find error!!!!!"),
() -> log.info("SUCCESS!!!"));
}
//출력값
------------------------------------------------------------------------------------
Call upper function, INPUT VALUE foo
PRINT ::: FOO
Call upper function, INPUT VALUE bar
PRINT ::: BAR
Call upper function, INPUT VALUE foobar
PRINT ::: FOOBAR
SUCCESS!!!
------------------------------------------------------------------------------------
5 번은 deprecate 예정으로 작성하지 않겠습니다.
728x90
반응형
'Reactive Programing' 카테고리의 다른 글
[Reactive Programming] zipWith (0) | 2024.11.09 |
---|---|
[Reactive Programming] 속도비교 (1) | 2024.11.02 |
[Reactive Programming] 개념 (0) | 2024.10.06 |
댓글