본문 바로가기

개발/Java & Spring

Completable Future

서론

https://javabom.tistory.com/96

 

Java의 Future Interface 란?

이펙티브자바에서 동시성 부분을 다루다가 Future 인터페이스가 잠깐 언급됐었습니다. 실행자 서비스의 submit 메서드의 반환값이 Future 타입이었고, 이 반환값의 get 메서드를 호출하니 Runnable 또는

javabom.tistory.com

 

지난 자바봄 포스팅에서 Future Interface 을 간단히 알아보았다. 원문을 번역한 포스팅에 불과하지만 어느정도 Future 인터페이스가 왜 있는지 정도는 파악했던 것 같아. 지금보니 Limitations of the Futre 파트는 정말 성의가 없다. 요 포스탕에서 면죄부를 얻도록 열심히 써보고자 한다 ,, ㅎㅅㅎ

 

 

CompletableFuture

일련의 Dependency 한 과정을 Independency 하게 처리하고 싶다면 어떻게 해야할까?

 

예를들어, 요청에 따른 주문 > 결제 > 정산 의 과정은 서로 Dependency 해야하지만(즉, 순서가 보장되어야하지만) A 고객의 주문 > 결제 > 정산 과 B고객의 주문 > 결제 > 정산 은 서로 Independency 해도 된다면?

 

그럴때 CompletableFuture를 사용할 수 있다.

 

예시는 요 링크를 참고했다.

https://www.youtube.com/watch?v=ImtZgX1nmr8

 

 

다수의 고객이 주문 > 결제 > 완료의 과정을 거친다고 가정하자.

package com.java.future.demo;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class FutureSample {

    public void process() {
        List<String> users = Arrays.asList("A", "B", "C");
        ExecutorService executorService1 = Executors.newFixedThreadPool(3);
        ExecutorService executorService2 = Executors.newFixedThreadPool(3);
        for (String user : users) {
            CompletableFuture.supplyAsync(() -> order(user))
                    .thenApply(this::calculate)
                    .thenApply(this::finish);//같은 쓰레드에서 수행한다
            // thenApplyAsync -> 같은 쓰레드풀의 다른 쓰레드에서 수행. 다른 쓰레드풀을 사용하고싶다면, 인자로 쓰레드풀을 각각 명시해두어야 함
        }
    }

    public String order(final String user) {
        System.out.println("Order: " + user + " Thread: " + Thread.currentThread().getName());
        return user;
    }

    public String calculate(final String user) {
        System.out.println("Calculate: " + user + " Thread: " + Thread.currentThread().getName());
        return user;
    }

    public String finish(final String user) {
        System.out.println("Finish: " + user + " Thread: " + Thread.currentThread().getName());
        return user;
    }
}

 

 

Thread Pool 을 지정하지 않으면 기본 ForkJoin Common Pool을 사용한다. 위 코드를 실행하기 전에 결과를 예상해봤다.

예상결과 thread-1 이 A의 요청 다 수행

예상결과 thread-2 가 B의 요청 다 수행

예상결과 thread-3 이 C의 요청 다 수행

 

 

하지만 보란듯이 틀렸다.

아래와 같이 첫번째 Order만 ForkJoin pool 을 사용하고, 나머지는 메인 쓰레드가 수행하고 있었다. 물론 두 번째 인자로 쓰레드풀을 명시해주면 이런일은 발생하지 않지만 왜이런지 너무 궁금했다.

 

 

아래 링크에서 해답을 찾을 수 있었다.

https://stackoverflow.com/questions/27723546/completablefuture-supplyasync-and-thenapply

 

CompletableFuture, supplyAsync() and thenApply()

Need to confirm something. The following code: CompletableFuture .supplyAsync(() -> {return doSomethingAndReturnA();}) .thenApply(a -> convertToB(a)); would be the same as:

stackoverflow.com

 

 

thenApply 는 이전 메서드와 같은 쓰레드풀을 "무조건" 사용해야 할 경우에는 쓰면 안된다는 말이었다. 그리고 같은 쓰레드 풀을 사용할 수 있는 한가지 방법을 제시해주었는데, (두번째 파라미터로 넘겨주는 것 말고)

 

이렇게 order 메서드를 수행하는데 시간이 더 소요되게 만들었더니,

    public String order(final String user) {
        System.out.println("Order: " + user + " Thread: " + Thread.currentThread().getName());
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return user;
    }

 

 

내가 원하는 결과가 나왔다. 아직도 왜 첫번째 때 C 만 ForkJoinPool 을 공유했는지는 알 수 없지만 말이다 : (

 

 

supplyAsync 내의 메서드가 Completed 된 상태로 thenApply 로 넘어가면 caller thread 가(여기서는 main) 해당 function 을 수행하게된다. 왜냐하면 Main Thread가 Blocking 하지 않기 때문이다.

 

하지만 미처 끝나지 않은 function 이 넘어가면 같은 쓰레드(ForkJoin CommonPool)에서 수행하게된다.

 

Main 메서드에서 다음 작업을 수행하게되면 이전 작업이 끝나기 전까지 Blocking 되기 때문이다. Non Blocking 의 목적은 이전 작업에 의해 Blocking 되지 않는 것에 있다.

 

만약 다른 Task 를 서로 다른 Thread Pool 에서 사용하고 싶다면 thenApplyAsync를 사용하면된다.

(그렇지 않으면 친절하게 컴파일 에러도 난다)

 

 

 

어떤 Task 는 더 많은 자원이 필요할 것이고, 어떤 Task 는 그렇지 않을 것이다. 그럴 때 유용하게 사용될 수 있는 메서드인 것 같다.

    public void process() {
        List<String> users = Arrays.asList("A", "B", "C");
        ExecutorService executorService1 = Executors.newFixedThreadPool(100);
        ExecutorService executorService2 = Executors.newFixedThreadPool(5);
        for (String user : users) {
            CompletableFuture.supplyAsync(() -> order(user), executorService1)
                    .thenApplyAsync(this::calculate, executorService2)
                    .thenApplyAsync(this::finish, executorService1);
        }
    }

 

 

 

위에서 계속 봤다시피 thenApply 는 메서드체이닝으로 콜백되는 CompletableFuture 로 작업을 이어갈 수 있다. 그럴 필요가 없다면 thenAccept를 쓰면된다.

public void process() {
        List<String> users = Arrays.asList("A", "B", "C");
        ExecutorService executorService1 = Executors.newFixedThreadPool(100);
        ExecutorService executorService2 = Executors.newFixedThreadPool(5);
        for (String user : users) {
            CompletableFuture.supplyAsync(() -> order(user), executorService1)
                    .thenApplyAsync(this::calculate, executorService2)
                    .thenAccept(this::finish); // finish 메서드는 void 타입으로 바꾸었다.
        }
    }

 

 

결론

Future Interface 의 구현체를 알고 사용하자! 라는 생각으로 가볍게 생각했던거였는데, 생각보다 삽질도 많았고 좋은 포스팅도 많아서 재밌으면서 고통스럽게 공부했다 ㅋ_ㅋ

앞으로 더 공부하고 싶은 내용도 생겼는데, CompletableFuture 로도 충분히 Non-blocking, Asynchronous한 작업이 가능한데, WebFlux 가 도입된 이유가 무엇일지 굉장히 궁금해졌다. React, Stream API 등 사전에 알아야할 것들이 많아서 당장 의지가 막 샘솟진 않지만 ㅋ_ㅋ 업무에서 WebFlux 를 도입하고 있기도 하고 이대로 멈춰있기는 조금 쫄려졌달까 ,,