Reactive Programming
일반적으로 자바 코드는 쓰레드 풀을 이용해서 비동기성을 구현한다. 웹 서버의 경우 쓰레드 풀을 사용해 요청한 사용자에게 쓰레드를 할당한다. 데이터베이스 연결에서도 마찬가지로 쓰레드를 사용하게 된다. 이때 데이터베이스가 질의에 응답해서 데이터를 가져오고 웹 서버가 이를 연산하기까지의 대기가 생기게 된다. 따라서 블로킹이다.
비동기
Srping WebFlux는 기본적으로 Project Reactor를 기반으로 Reactive Programming을 구현한다. Reactive stream은 데이터 소스인 publisher가 subscriber에게 데이터를 보내는 발행자 구독 모델(publisher-subscriber model)이다. 이러한 Reactive stream에서 발행자는 데이터를 보내기만 한다.
Reactive Streams
reactive stream에서 데이터 스트림은 비동기적, 논블로킹, 백프레셔(back pressure)를 지원한다.
발행자(Publisher)
구독자(Subscriber)
구독(Subscription)
프로세서(Processor)
1. Publisher
발행자는 한 명 이상의 구독자에게 데이터 스트림을 보낸다. 구독자는 subscribe() 메서드를 통해 발행자를 구독한다. 이때 발행자는 아무때나 데이터를 보내는 것이 아니라 구독자의 요청에 따라 보내게 된다.
public inferface Publisher<T> {
public void subscribe(Subscription<? super T> s);
}2. Subscriber
구독자는 발행자가 보낸 데이터를 직접 사용한다.
발행자와 구독자 통신 방법
Subscriber객체가Publisher.subscribe()메서드에 넘어가게 되면onSubscribe()메서드가 실행된다. 이때Subscription객체도 같이 보낸다.Publisher는SubScriber가 데이터를 요청할 때만 데이터 스트림을 보낼 수 있다. 따라서Subscription.request(long)호출을 기다린다. 이때 매개변수로Publisher가 보낼 데이터의 양을 지정할 수 있다.Publicsher는Subscriber가 데이터를 잘 받았는지 상관 없이 그냥 데이터를 보내기만 한다. 하지만Subscriber는 자신에게 필요한 데이터가 몇 개인지 알 수 있다. 따라서Subscription객체에Publisher에게 요구할 데이터의 양이 지정하고 보낸다. 이러한 과정을 백프레셔(back pressure)라고 한다.Publisher에게 요청이 잘 도착하면 데이터 스트림을 보내기 시작한다. 이때Subscriber는onNext()라는 메서드를 사용해서 데이터를 사용한다.- 마지막으로
onError()나onCompletion()를 사용해서 통신을 끝낸다. 이때는Subscription.request()메서드를 실행해도Publisher에게 가지 않는다.
3. Subscription
구독자와 발행자 사이의 중재 역할을 한다. 발행자에게 데이터 요구를 하는 것은 구독자의 역할이다.
4. Processing
Flux<Integer> flux = Flux.just(1, 10, 100).log();
flux.reduce(Integer::sum).subscribe(f -> System.out.println("f = " + f));여기서 Flux.just()는 Flux 객체를 생성한다. Flux는 발행자이다.
13:47:33.988 [main] INFO reactor.Flux.Array.1 -- | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
13:47:33.991 [main] INFO reactor.Flux.Array.1 -- | request(unbounded)
13:47:33.991 [main] INFO reactor.Flux.Array.1 -- | onNext(1)
13:47:33.991 [main] INFO reactor.Flux.Array.1 -- | onNext(10)
13:47:33.991 [main] INFO reactor.Flux.Array.1 -- | onNext(100)
13:47:33.992 [main] INFO reactor.Flux.Array.1 -- | onComplete()
f = 111Flux 발행자를 구독할때 request(unbounded)처럼 unbounded(무제한)의 수를 같이 보냈다. 이후 Flux 발행자는 데이터 스트림을 보내고 이를 받은 Subscriber는 onNext() 메서드를 호출한다.