서론
https://javabom.tistory.com/96
지난 자바봄 포스팅에서 Future Interface 을 간단히 알아보았다. 원문을 번역한 포스팅에 불과하지만 어느정도 Future 인터페이스가 왜 있는지 정도는 파악했던 것 같아. 지금보니 Limitations of the Futre 파트는 정말 성의가 없다. 요 포스탕에서 면죄부를 얻도록 열심히 써보고자 한다 ,, ㅎㅅㅎ
CompletableFuture
일련의 Dependency 한 과정을 Independency 하게 처리하고 싶다면 어떻게 해야할까?
예를들어, 요청에 따른 주문 > 결제 > 정산
의 과정은 서로 Dependency 해야하지만(즉, 순서가 보장되어야하지만) A 고객의 주문 > 결제 > 정산
과 B고객의 주문 > 결제 > 정산
은 서로 Independency 해도 된다면?
그럴때 CompletableFuture를 사용할 수 있다.
예시는 요 링크를 참고했다.
https://www.youtube.com/watch?v=ImtZgX1nmr8
다수의 고객이 주문 > 결제 > 완료의 과정을 거친다고 가정하자.
package com.java.future.demo;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class FutureSample {
public void process() {
List<String> users = Arrays.asList("A", "B", "C");
ExecutorService executorService1 = Executors.newFixedThreadPool(3);
ExecutorService executorService2 = Executors.newFixedThreadPool(3);
for (String user : users) {
CompletableFuture.supplyAsync(() -> order(user))
.thenApply(this::calculate)
.thenApply(this::finish);//같은 쓰레드에서 수행한다
// thenApplyAsync -> 같은 쓰레드풀의 다른 쓰레드에서 수행. 다른 쓰레드풀을 사용하고싶다면, 인자로 쓰레드풀을 각각 명시해두어야 함
}
}
public String order(final String user) {
System.out.println("Order: " + user + " Thread: " + Thread.currentThread().getName());
return user;
}
public String calculate(final String user) {
System.out.println("Calculate: " + user + " Thread: " + Thread.currentThread().getName());
return user;
}
public String finish(final String user) {
System.out.println("Finish: " + user + " Thread: " + Thread.currentThread().getName());
return user;
}
}
Thread Pool 을 지정하지 않으면 기본 ForkJoin Common Pool을 사용한다. 위 코드를 실행하기 전에 결과를 예상해봤다.
예상결과 thread-1 이 A의 요청 다 수행
예상결과 thread-2 가 B의 요청 다 수행
예상결과 thread-3 이 C의 요청 다 수행
하지만 보란듯이 틀렸다.
아래와 같이 첫번째 Order만 ForkJoin pool 을 사용하고, 나머지는 메인 쓰레드가 수행하고 있었다. 물론 두 번째 인자로 쓰레드풀을 명시해주면 이런일은 발생하지 않지만 왜이런지 너무 궁금했다.
아래 링크에서 해답을 찾을 수 있었다.
https://stackoverflow.com/questions/27723546/completablefuture-supplyasync-and-thenapply
thenApply 는 이전 메서드와 같은 쓰레드풀을 "무조건" 사용해야 할 경우에는 쓰면 안된다는 말이었다. 그리고 같은 쓰레드 풀을 사용할 수 있는 한가지 방법을 제시해주었는데, (두번째 파라미터로 넘겨주는 것 말고)
이렇게 order 메서드를 수행하는데 시간이 더 소요되게 만들었더니,
public String order(final String user) {
System.out.println("Order: " + user + " Thread: " + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return user;
}
내가 원하는 결과가 나왔다. 아직도 왜 첫번째 때 C 만 ForkJoinPool 을 공유했는지는 알 수 없지만 말이다 : (
supplyAsync 내의 메서드가 Completed 된 상태로 thenApply 로 넘어가면 caller thread 가(여기서는 main) 해당 function 을 수행하게된다. 왜냐하면 Main Thread가 Blocking 하지 않기 때문이다.
하지만 미처 끝나지 않은 function 이 넘어가면 같은 쓰레드(ForkJoin CommonPool)에서 수행하게된다.
Main 메서드에서 다음 작업을 수행하게되면 이전 작업이 끝나기 전까지 Blocking 되기 때문이다. Non Blocking 의 목적은 이전 작업에 의해 Blocking 되지 않는 것에 있다.
만약 다른 Task 를 서로 다른 Thread Pool 에서 사용하고 싶다면 thenApplyAsync를 사용하면된다.
(그렇지 않으면 친절하게 컴파일 에러도 난다)
어떤 Task 는 더 많은 자원이 필요할 것이고, 어떤 Task 는 그렇지 않을 것이다. 그럴 때 유용하게 사용될 수 있는 메서드인 것 같다.
public void process() {
List<String> users = Arrays.asList("A", "B", "C");
ExecutorService executorService1 = Executors.newFixedThreadPool(100);
ExecutorService executorService2 = Executors.newFixedThreadPool(5);
for (String user : users) {
CompletableFuture.supplyAsync(() -> order(user), executorService1)
.thenApplyAsync(this::calculate, executorService2)
.thenApplyAsync(this::finish, executorService1);
}
}
위에서 계속 봤다시피 thenApply 는 메서드체이닝으로 콜백되는 CompletableFuture 로 작업을 이어갈 수 있다. 그럴 필요가 없다면 thenAccept를 쓰면된다.
public void process() {
List<String> users = Arrays.asList("A", "B", "C");
ExecutorService executorService1 = Executors.newFixedThreadPool(100);
ExecutorService executorService2 = Executors.newFixedThreadPool(5);
for (String user : users) {
CompletableFuture.supplyAsync(() -> order(user), executorService1)
.thenApplyAsync(this::calculate, executorService2)
.thenAccept(this::finish); // finish 메서드는 void 타입으로 바꾸었다.
}
}
결론
Future Interface 의 구현체를 알고 사용하자! 라는 생각으로 가볍게 생각했던거였는데, 생각보다 삽질도 많았고 좋은 포스팅도 많아서 재밌으면서 고통스럽게 공부했다 ㅋ_ㅋ
앞으로 더 공부하고 싶은 내용도 생겼는데, CompletableFuture 로도 충분히 Non-blocking, Asynchronous한 작업이 가능한데, WebFlux 가 도입된 이유가 무엇일지 굉장히 궁금해졌다. React, Stream API 등 사전에 알아야할 것들이 많아서 당장 의지가 막 샘솟진 않지만 ㅋ_ㅋ 업무에서 WebFlux 를 도입하고 있기도 하고 이대로 멈춰있기는 조금 쫄려졌달까 ,,
'개발 > Java & Spring ' 카테고리의 다른 글
[번역정리] Java Concurrency:Java Memory Model(write by Dmytro Timchenko) (0) | 2020.10.06 |
---|---|
[Spring] HttpServletRequest, HttpSession 을 필요로하는 로직을 테스트하는 방법 (0) | 2020.04.22 |
[자바봄] producer - consumer 과제일기 : 1단계 (0) | 2020.04.13 |
[기록] JUnit5 자주사용하는 코드 모음 (0) | 2020.04.01 |
[Java/Spring] javax.validation @Size vs @NotBlank과 Validation Test (2) | 2020.03.02 |