Free Lines Arrow
본문 바로가기
Reactive Programing

[Reactive Programming] Flux, Mono 개념

by skahn1215 2024. 10. 14.
728x90
반응형

서론

  • 리액티브 프로그래밍 패러다임은 종종 옵저버 설계 패턴의 확장으로 표현됩니다.
  •  Observer Pattern 에서 방출, 구독 이라는 개념을 익혔습니다.
  • 리액터는 좀더 많은 연산을 가능하게 해주는 퍼블리셔를 구현하며 결합을 가능하게 합니다.

 

Flux

플럭스는 0~N개의 방출된 항목의 비동기 시퀀스를 나타낸다.

선택적으로 완료 신호나 오류에 의해 종료되는 표준 퍼블리셔이다.

플럭스의 흐름도

 

Flux String 복사 예제

  • 리스트를 Flux 로 변환한뒤 Flux에서 다시 List<String> 을 반환하는 예제이다.
  • List<string> -> Flux<String> -> List<String>
  • 아래 예제는 Flux가 어떻게 동작하는지만 확인해 본다.
1. List<String> copyList = new ArrayList<>();
2. List<String> originList = Arrays.asList("foo", "bar", "foobar");
3. Flux<String> flux = Flux.fromIterable(originList);
4. flux.subscribe(i -> copyList.add(i));
3. orginList 를 Flux로 반환한다.
4. flux에서 연산을 마친 방출된 데이터를 copyList에 담는다.

 

 

 

Mono

  • Mono 는 0~1개의 방출된 항목의 비동기 시퀀스를 나타낸다.
  • Mono는 onNext 신호를 통해 최대 한 개의 항목을 방출한다.
  • onComplete 신호(값이 있든 없든 성공한 Mono)로 종료하거나
    단일 onError 신호(실패한 Mono)만 방출하는 특수 퍼블리셔입니다.
  • OnNext를 호출한뒤 즉시 OnComplete 를 호출될것으로 예상된다.
  • Mono는 Flux 연산자의 일부를 제공한다

1. List<String> copyList = new ArrayList<>();
2. Mono<String> mono = Mono.just("foo");
3. mono.subscribe(i -> copyList.add(i));
4. String a = mono.block();
mono 는 하나의 데이터만 방출 한다.
3. 비동기 처리로 방출된 데이터를 담는다.
4. 동기로 작업이 끝날때 까지 기다린다.

 

 

Subscribe, block

  • Flux 와 Mono는 그냥 동작하지 않는다.
  • subscribe: 비동기로 수행
  • block: 동기로 완료될때 까지 기다린다.

Block 사용법

// mono
Mono<String> mono = Mono.just("foo");
String string = mono.block(); // foo 리턴

// flux
List<String> originList = Arrays.asList("foo", "bar", "foobar");
Flux<String> flux = Flux.fromIterable(originList);
flux.blockFirst(); // 먼저 방출된 stirng 리턴

 

Subscribe 사용법

1. subscribe(); // 구독하고, 트리거만 시킨다.

2. subscribe(Consumer<? super T> consumer); // 생산된 값을 처리한다.

3. subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer); // 오류처리

4. subscribe(Consumer<? super T> consumer,             
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer);   // 에러 없이 완료 되었을때 추가 처리

5. subscribe(Consumer<? super T> consumer,        
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer,
          Consumer<? super Subscription> subscriptionConsumer);  // 구독자에게 반응

 

예제

Flux를 이용하여 테스트할 함수

// 정상 함수
public String upper(String word) {
    log.info("Call upper function, INPUT VALUE {}", word);
    return word.toUpperCase(Locale.ROOT);
}



// 익셉션 터지는 함수
public String upperWithException(String word) {
    if (word.equalsIgnoreCase("bar")) {
        throw new IllegalArgumentException("Throw error!!!!");
    }
    log.info("Call upper function, INPUT VALUE {}", word);
    return word.toUpperCase(Locale.ROOT);
}

 

 

 

1번 예제

// 1번 케이스
public void sub1Ex() {
    List<String> originList = Arrays.asList("foo", "bar", "foobar");
    Flux<String> flux = Flux.fromIterable(originList);
    flux.map(this::upper)
        .subscribe();
}

------------------------------------------------------------------------------------
함수만 실행이 된다.
// 출력값
Call upper function, INPUT VALUE foo
Call upper function, INPUT VALUE bar
Call upper function, INPUT VALUE foobar
-------------------------------------------------------------------------------------

 

 

2번 예제

// 2번 케이스
public void sub2Ex() {
    List<String> originList = Arrays.asList("foo", "bar", "foobar");
    Flux<String> flux = Flux.fromIterable(originList);
    flux.map(this::upper)
            .subscribe(word -> log.info("PRINT ::: {}", word));
}
------------------------------------------------------------------------------------
함수가 실행이 되고 방출된 값이 출력된다.
// 출력값
Call upper function, INPUT VALUE foo
PRINT ::: FOO
Call upper function, INPUT VALUE bar
PRINT ::: BAR
Call upper function, INPUT VALUE foobar
PRINT ::: FOOBAR
-------------------------------------------------------------------------------------

 

 

3번예제

익셉션이 발생한경우

2 번 케이스에서 익셉션이 발생 했을경우
public void sub2ExWithException() {
     List<String> originList = Arrays.asList("foo", "bar", "foobar");
     Flux<String> flux = Flux.fromIterable(originList);
     flux.map(this::upperWithException)
            .subscribe(word -> log.info("PRINT ::: {}", word));
}

------------------------------------------------------------------------------------
정상으로 처리되다가 익셉션이 터지는 경우 에러가 발생한다.
다음 값들이 정상적으로 처리가 안된다.


// 출력값
Call upper function, INPUT VALUE foo
PRINT ::: FOO
called default onErrorDropped
java.lang.IllegalArgumentException: Throw error!!!!
Caused by: java.lang.IllegalArgumentException: Throw error!!!!
	at com.p8labs.reactive.monoflux.FluxSubscribe.upperWithException(FluxSubscribe.java:45)
-------------------------------------------------------------------------------------


3번 익셉션 발생시 처리
public void sub3ex() {
    List<String> originList = Arrays.asList("foo", "bar", "foobar");
    Flux<String> flux = Flux.fromIterable(originList);
    flux.map(this::upperWithException)
            .subscribe(word -> log.info("PRINT ::: {}", word),
                       word -> log.info("Find error!!!!!"));
}


------------------------------------------------------------------------------------
정상으로 처리되다가 익셉션이 터지는 경우 에러가 발생한다.
Find Error 를 찍는다.
다음 값들이 정상적으로 처리가 안된다.

// 출력값
Call upper function, INPUT VALUE foo
PRINT ::: FOO
Find error!!!!!
-------------------------------------------------------------------------------------

 

 

4번예제

4번 예제
public void sub4ex() {
    List<String> originList = Arrays.asList("foo", "bar", "foobar");
    Flux<String> flux = Flux.fromIterable(originList);
    flux.map(this::upper)
            .subscribe(word -> log.info("PRINT ::: {}", word),
                   word -> log.info("Find error!!!!!"),
                        () -> log.info("SUCCESS!!!"));
}


//출력값
------------------------------------------------------------------------------------
Call upper function, INPUT VALUE foo
PRINT ::: FOO
Call upper function, INPUT VALUE bar
PRINT ::: BAR
Call upper function, INPUT VALUE foobar
PRINT ::: FOOBAR
SUCCESS!!!
------------------------------------------------------------------------------------

 

 

5 번은 deprecate 예정으로 작성하지 않겠습니다.

728x90
반응형

'Reactive Programing' 카테고리의 다른 글

[Reactive Programming] zipWith  (0) 2024.11.09
[Reactive Programming] 속도비교  (1) 2024.11.02
[Reactive Programming] 개념  (0) 2024.10.06

댓글