1085 단어
5 분
webflux
2025-03-15

스프링에서 외부 API를 가져오기 위해서는 RestTemplateWebClient가 있다. 전자의 경우 Depreated 된다는 소문도 있고, 스프링 자체에서 WebClinet를 권장하고 있기에 WebClient를 사용하기로 했다.

WebClient를 이번에 처음 사용 해보기도 하고 리액티브 프로그래밍을 해본 적이 없어서 더 어려웠다. 그래서 열심히 구글링한 내용들을 정리하려고 한다.

Flux 정렬#

.collectSortedList(
            Comparator.comparing(SyncJobDto::id, Comparator.nullsLast(Comparator.naturalOrder()))
                .thenComparing(SyncJobDto::targetDate, Comparator.nullsLast(Comparator.naturalOrder()))
        );

collectSortedList()의 경우 Flux 발행자의 데이터 스트림을 List로 생성하고 Mono로 반환하는 메서드이다.

Mono<List<T>> collectSortedList() 매개 변수가 없으면 자연스러운 순서로 정렬된다.

API 요구사항에 맞춰 지수, 날짜 순으로 정렬했다.

Comparator.nullLast()
  • null 값인 경우 항상 다른 값보다 뒤로 가기
Comparator.naturalOrder()
  • 객체의 자연스러운 순서에 따라 정렬하는 비교자 반환
    • 예를 들어 숫자는 오름차순(1,2,3,…), 문자열(a,b,c,…)은 사전 순으로 정렬

flatMapMany()#

flatMapMany()
  • Mono<List<T>>의 Mono 단일 항목을 publisher로 변환 후 publisher의 push된 데이터를 하나의 Flux 형태의 스트림으로 변환

    • 비동기적으로 1변환 연산자
    • Mono에서 Flux로 타입 변환도 가능함
  • flatMap()은 각 요소를 다른 Mono로 변환

    • Mono:arrow_right: Mono
Mono<Integer> numbers = Mono.just(3); // just 정적 메서드로 Mono publisher 생성
Flux<Integer> result = numbers.flatMapMany(number ->
                                          Flux.just(
                                              num,
                                              num+2,
                                              num*2,
                                              num*num
                                          )
                                         );
result.subscribe(System.out::println);
// 3 5 6 9

fromIterable()#

Flux.fromIterable()
  • Iterable 인터페이스를 구현한 컬렉션(List,Set)을 Flux 스트림으로 변환
  • 컬렉션이나 Iterable 구현체의 데이터를 순차적으로 push하는 Flux Publisher로 변환
  • Flux.fromIterable()Iterable.iterator() 메서드를 호출해서 각 Subscriber에게 push 해준다.
Flux<Integer> flux = Flux.fromIterable(Arrays.asList(1,2,3,4,5))
    .subscribe(System.out::println);
조건사용법
이미 존재하는 데이터Flux.just(), Flux.fromIterable()
범위나 횟수Flux.range(start, count)

Mono.fromCallable()#

Callable로부터 Mono를 생성하는 메서드이다.

특징#

  1. 지연 평가(Lazy Evaluation): Subscribe가 발생할 때까지 실행하지 않음
  2. 블로킹 작업 처리: 블로킹 작업을 비동기적으로 처리할 때 사용
Mono blockingMono = Mono.fromCallable(() -> {
    return blocking(); // 기존의 blocking 메서드 호출
});

// boundedElastic 스케줄러에서 실행하여 메인 쓰레드 블로킹 방지
blockingMono.subscribeOn(Schedulers.boundedElastic());

결국 블로킹 작업을 별도의 쓰레드에서 실행하기 위해서 사용한다.


subscribeOn(Schedulers.boundedElastic())#

리액티브 스트림을 boundedElastic 스케줄러로 전환하는 기능을 한다. 주로 블로킹된 작업을 처리할 때 사용한다.

schedulers.boundedElastic()#

제한된 수의 쓰레드를 동적으로 생성하는 스케줄러이다. 블로킹 작업에 적합하도록 설계 되었다.

Flux.fromIterable(urls)
    .flatMap(url -> 
            Mono.fromCallable() ->
            blocking.get(url)) // blocking이라는 객체가 있다고 가정
    .subscribeOn(Schedulers.boundedElastic())

이렇게 되면 블로킹 API 호출을 리액티브 스트림으로 감싸고 각각의 호출을 boundedElastic 쓰레드에서 실행하여 메인 쓰레드를 방해하지 않도록 한다.


thenReturn(V)#

Mono 클래스에서 제공하는 연산자로 Mono 시퀀스가 끝난 후 지정된 값을 반환하는 새로운 Mono를 반환한다.

Mono<Integer> monoInt = Mono.fromCallable(() -> {
    System.out.println("blocking task");
    return 1;
});

Mono<String> monoString = monoInt.thenReturn("good");
// 이렇게 해서 출력하면 "good"이라는 문자열이 나옴

concatWith()#

리액티브 스트림을 순차적으로 연결할 때 사용한다. MonoFlux 인스턴스에서 사용할 수 있다.

완료된 스트림 시퀀스에 다른 Publisher의 스트림을 연결하는 것이다. 따라서 두 스트림을 순서대로 연결해서 하나의 스트림으로 만든다.

이때 FluxMono는 불변성을 가지기 때문에 concatWith()는 원본을 수정하지 않고 새로운 인스턴스를 반환한다.

Flux<Integer> flux1 = Flux.just(1,2,3);
Flux<Integer> flux2 = Flux.just(4,5,6);
Flux<Integer> flux3 = Flux.just(7,8,9);

flux1.concatWith(flux2).concatWith(flux3)
    .subscribe(flux -> System.out.println(flux + "->"));

// 결과: 1 -> 2 -> 3 -> 4 -> 5 -> 6 -> 7 -> 8 -> 9 ->

Mono.empty()#

어떤 데이터도 반환하지 않고 완료 신호만 보내는 Mono 객체만을 생성한다. 즉 onSubscribe()onComplete() 신호만 보낸다.

Mono는 기본적으로 0개 또는 1개의 데이터를 반환할 수 있는 Publisher다. Mono.empty()를 사용하면 0개의 데이터를 반환하는 경우다.


공식 문서

https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html

https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html

webflux
https://realits.me/posts/webflux/
저자
realitsyourman
게시일
2025-03-15
라이선스
CC BY-NC-SA 4.0