Free Lines Arrow
본문 바로가기
Reactive Programing

[Reactive Programming] Thread and Schedulers

by p8labs 2025. 1. 11.
728x90
반응형

Thread and Schedulers

  • 리액티브에서 동작하는 쓰레드를 알아보자.
  • 병렬처리를 하면서 가장 중요한 개념이자 알아야 되는 사항이라고 생각한다.

 

Mono, Flux 는 스스로 쓰레드를 지정하지 않는다.

  • 모노 플럭스는 직접 사용자가 쓰레드를 지정해줘야 해당 쓰레드에서 동작한다.
  • 그리고 이전에 사용했던 쓰레드를 그대로 사용한다.

아래 코드는 쓰레드를 생성해서 플럭스를 수행해보는 예제다.

package com.p8labs.reactive.scheduler_and_threding;

import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Slf4j
public class ThreadEx {
    public void TestMonoInThread() throws InterruptedException {
        final Flux<String> flux = Flux.just("FIRST ","SECOND ","THIRD ");
        String name = Thread.currentThread().getName();
        log.info("THREAD NAME: {}", name);
        Thread t = new Thread(() -> flux
                .map(msg -> msg)
                .subscribe(v ->
                        log.info("THREAD NAME: {} data: {}",Thread.currentThread().getName(),v)

                )
        );
        t.start();
        t.join();
    }
}

 

 

  • 처음으로 Main Thread 인 Test woker 가 수행이된다.
  • 두번째로 Thread-3 안에서 모든 flux 방출이 발생한걸 볼수 있다.
THREAD NAME: Test worker
THREAD NAME: Thread-3 data: FIRST 
THREAD NAME: Thread-3 data: SECOND 
THREAD NAME: Thread-3 data: THIRD

 

 

Scheduler

기본 scheduler에 대해 알아보자 scheduler는

스케줄러는 ExecutorService와 유사한 역할을 하며 더 많은 기능을 제공한다.

 

제공되는 메서드들

  • Schedulers.immediate()
  • Schedulers.single()
  • Schedulers.boundedElastic()

Schedulers.immediate()

  • 호출된 쓰레드에서  작업을 바로 수행한다. 그렇기 때문에 동기로 동작한다.
  • 쓰레드 전환을 하지 않는다.
  • 실제 코드에서는 사용하면 안된다

Schedulers.single()

  • 하나의 쓰레드에서 작업을 수행한다. 그렇기 때문에 순차적으로 처리가 된다.
  • 호출 될때 마다 새로운 쓰레드를 할당하고 싶으면 ISchedulers.newSingle() 을 사용해야 된다.
  • 동기화가 보장된다.

Schedulers.boundedElastic()

  • Schedulers.boundedElastic()는 내부적으로 스레드 풀을 관리 한다.
  • 쓰레드 풀을 기반으로 동작한다.
  • 자체 쓰레드를 제공하여 다른 리소스들에 영향을 주지 않는다.
  • 블로킹 작업에 최적화 되어 있다. (파일 입출력, DB 쿼리, 네트워크 통신)
  • 동기적 함수를 Mono 로 감싸 비동기로 만드는 법
Mono blockingWrapper = Mono.fromCallable(() -> {
    return /* make a remote synchronous call */
});
blockingWrapper = blockingWrapper.subscribeOn(Schedulers.boundedElastic());

 

    • 3.6.0부터는 설정에 따라 두 가지 다른 구현체를 사용할수 있다.
      • ExecutorService 기반
        • 플랫폼 쓰레드를 재사용한다.
        • 필요에 따라 풀을 생성하거나 idle 풀을 재사용한다.
        • idle 풀은 기본적으로 60초 이상 지나면 폐기 된다.
        • 백업 쓰레드의 상한선이 있다 (기본값은 CPU - core * 10)
        • 상한선에 도달하면 최대 10,000개의 작업까지 큐에 들어간다.
      • Thread-per-task 기반 (VirtualThread(JDK21) 인스턴스에서 실행되도록 설계되었습니다.)
        • eactor.schedulers.defaultBoundedElasticOnVirtualThreads 시스템 속성을 true로 해야한다.
        • boundedElastic()는 VirtualThread 클래스의 새 인스턴스에서 모든 작업을 실행하도록 맞춤화된 UnitedElasticScheduler의 특정 구현을 반환 한다.
        • idel 풀이 없으면 요청마다 새로운 VirtualThread 를 생성한다.

 

publishOn, subscribeOn

위에서 간단하게 스케쥴러와 쓰레드에 대해 살펴 봤는데 Reactor 는 Reactive chain 안에서 
스케쥴러와 컨텐스트를 전환하는 2가지 방법 publisOn, subscribeOn 을 제공한다.
 

publishOn

해당 메소드를 만난 순간 부터 지정한 스케쥴러 기반으로 수행된다.

업스트림에서 시그널을 받고 할당된 스케쥴러를 기반으로 다운스트림을 실행시킨다.

 

예제

Scheduler s = Schedulers.newParallel("parallel-scheduler", 4);

final Flux<String> flux = Flux
        .range(1, 2)
        .map(i -> {
            System.out.println("First map: " + Thread.currentThread().getName());
            return  10 + i;
        })
        .publishOn(s)
        .map(i -> {
            System.out.println("Second map: " + Thread.currentThread().getName());
            return  "value " + i;
        });

Thread thread = new Thread(() -> flux.subscribe(v -> System.out.println("END!!")));

thread.run();

 

결과

First map: Test worker
First map: Test worker
Second map: parallel-scheduler-1
END!!
Second map: parallel-scheduler-1
END!!

 

  • First map 은 Test worker 쓰레드에서 수행이 되었다.
  • Second map 은 publishOn 을 만난뒤 지정된 쓰레드로 수행이 된걸 확인 할 수 있다.
  • 그러다 보니 First map 이 처리가 된 뒤에 각각 Second map 이 병렬처리로된 쓰레드로 분리되어 수행이 되었다. 

 

subscribeOn

  • 위치와 상관없이 구독될때 지정한 쓰레드가 동작한다.
  • publishOn 이 선언된 이후는 영향을 주지 않는다.
  • 데이터 소스뒤에 선언하는걸 추천한다.
  • 이유는 중간 연산자가 있다면 해당 쓰레드에 영향을 주기 때문이다.

Scheduler s = Schedulers.newParallel("parallel-scheduler", 4);

final Flux<String> flux = Flux
        .range(1, 2)
        .map(i -> {
            System.out.println("First map: " + Thread.currentThread().getName());
            return  10 + i;
        })
        .subscribeOn(s)
        .map(i -> {
            System.out.println("Second map: " + Thread.currentThread().getName());
            return  "value " + i;
        });



Thread thread = new Thread(() -> flux.subscribe(v -> System.out.println("END!!")));

thread.run();

 

결과

- 구독 시작점 부터 지정된 스케쥴러가 동작했다.

- 여기서 한가지 포인트를 집고 넘어가면
스케쥴러가 병렬처리여도 순차처리가 되었다. 그 이유는 flux 의 방출하는 순간은 순차적으로 수행이 되기 때문이다.

First map: parallel-scheduler-1
Second map: parallel-scheduler-1
END!!
First map: parallel-scheduler-1
Second map: parallel-scheduler-1
END!!

 

 

 

결론

리액티브 프로그래밍에서 스케쥴러를 변환하는 방법을 알아 보았다.

다음에는 두개를 혼합해서 사용해보는 것을 공부해 보려고 한다.

 

728x90
반응형

'Reactive Programing' 카테고리의 다른 글

[Reactive Programming] zipWith  (0) 2024.11.09
[Reactive Programming] 속도비교  (1) 2024.11.02
[Reactive Programming] Flux, Mono 개념  (0) 2024.10.14
[Reactive Programming] 개념  (0) 2024.10.06

댓글