[작성중][java]CompletableFuture

intro

  • thread, Future, Promise, lamdba 등의 기본지식은 있어야 하는듯
  • https://brush-up.github.io/java/java8-lambda-functional-interface/ 를 먼저 선행해야함.
  • future, promises
    • https://en.wikipedia.org/wiki/Futures_and_promises 참고하자
    • Future
      • a read-only reference(읽기 전용) to a yet-to-be-computed value
      • 이벤트 발생 시 사용할 수(읽을 수) 있는 어떤 값
    • Promise
      • pretty much the same except that you can write to it(쓰기 가능)
      • 연산의 결과를 넣어두고(쓰고) 이 결과 값을 읽을 수 있는 Future를 얻을 수 있습니다. 즉, Promise가 끝나고 나면 성공, 혹은 실패에 연결되어 있는 Future를 통해 특정 동작을 유발시킬 수 있다.
    • 공통점은 yet-to-be-computed value 아직 계산되지 않은 값에 대한 레퍼런스 라는 점
  • Future
      package java.util.concurrent;
    
      public interface Future<V> {
          boolean cancel(boolean var1);
          boolean isCancelled();
          boolean isDone();
          V get() throws InterruptedException, ExecutionException;
          V get(long var1, TimeUnit var3) throws InterruptedException, ExecutionException, TimeoutException;
      }
    
  • Future < V > 는 언젠가 사용 가능해지는 타입 V 의 값
    • read-only 하다
  • Promise[ V ] 는 언젠가 얻게 될 타입 V의 값
    • writable 하다 인듯.

sync, async

동기방식

  • 아래와 같이 시간이 많이 걸리는 method가 있을때
    .
    .
    .
    public static void main(String[] args){
      TestInterface test = new TestInterface() {
          @Override
          public String TEST_Sync(String name) {
              System.out.println(getCurrentTime() + " TEST_Sync: thread: " + Thread.currentThread().getName());
              try {
                  Thread.sleep(2000);
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
              return name;
          }
    .
    .
    .
    System.out.println(getCurrentTime() + "======== before TEST_Sync ========");
    test.TEST_Sync("test");
    System.out.println(getCurrentTime() + "======== after TEST_Sync ========");
    

    결과값은 당연히 아래처럼 2초가 걸려 다음 스텝으로 넘어간다.

    [10:38:25] ======== before TEST_Sync ========
    [10:38:25]  TEST_Sync: thread: main
    [10:38:27] ======== after TEST_Sync ========
    

    비동기 방식

    이러한 호출을 비동기로 하기 위해서 이전까지는 아래처럼 내부적으로 thread 하나를 생성해서 동작하도록 하면

@Override
public CompletableFuture<String> TEST_ASync1(String name) {
    CompletableFuture<String> future1 = new CompletableFuture<>();
    new Thread(() -> {
        String output = TEST_Sync(name);
        future1.complete(output);
    }).start();;
    return future1;
}
.
.
.
System.out.println(getCurrentTime() + "======== before TEST_ASync1 ========");
CompletableFuture<String> future = (CompletableFuture<String>) test.TEST_ASync1("test2");
System.out.println(getCurrentTime() + "======== after1 TEST_ASync1 ========");
String aaa = future.join();//blocking
System.out.println(getCurrentTime() + "======== after2 TEST_ASync1 ========");

결과 값은 당연히 시간 지연 없이 바로 응답을 받게 된다. 물론 결과 최종 데이터를 조회하기 위해 CompletableFuture의 join이나 get 을 수행할때 blocking 이 되는데…

[10:38:27] ======== before TEST_ASync1 ========
[10:38:27] ======== after1 TEST_ASync1 ========
[10:38:27]  TEST_Sync: thread: Thread-0
[10:38:29] ======== after2 TEST_ASync2 ========

위의 비동기 method를 좀더 깔끔하게 해보자

CompletableFuture 에 제공하는 supplyAsync, runAync 라는 것이 있다.

supplyAsync 는 Supplier Functional Interface를 파라미터로 받아 Supplie는 인자는 받지 않고 리턴 타입만 존재 https://brush-up.github.io/java/java8-lambda-functional-interface/

@FunctionalInterface
public interface Supplier<T> {
    T get();
}
//supplyAsync
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
    return asyncSupplyStage(ASYNC_POOL, supplier);
}

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) {
    return asyncSupplyStage(screenExecutor(executor), supplier);
}

runAync 는 Runnable Functional Interface를 파라미터로 받아

@FunctionalInterface
public interface Runnable {
    void run();
}
public static CompletableFuture<Void> runAsync(Runnable runnable) {
    return asyncRunStage(ASYNC_POOL, runnable);
}
//runAync
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) {
    return asyncRunStage(screenExecutor(executor), runnable);
}

이 기본지식을 가지고 아래처러 supplyAsync 를 이용해서 호출하면

@Override
public CompletableFuture<String> TEST_ASync2(String name) {

    return CompletableFuture.supplyAsync(() -> {
        System.out.println(getCurrentTime() + " thread: " + Thread.currentThread().getName());
        return TEST_Sync(name);
    });
}
.
.
.
System.out.println(getCurrentTime() + "======== before TEST_ASync2 ========");
CompletableFuture<String> future2 = (CompletableFuture<String>) test.TEST_ASync2("test2");
System.out.println(getCurrentTime() + "======== after1 TEST_ASync2 ========");
String aaa2 = future2.join();//blocking
System.out.println(getCurrentTime() + "======== after2 TEST_ASync2 ========");

결과값은

[11:52:27] ======== before TEST_ASync2 ========
[11:52:27] ======== after1 TEST_ASync2 ========
[11:52:27]  thread: ForkJoinPool.commonPool-worker-3
[11:52:27]  TEST_Sync: thread: ForkJoinPool.commonPool-worker-3
[11:52:29] ======== after2 TEST_ASync2 ========

결과값을 보면 ForkJoinPool 를 이용하는데, supplyAsync를 실행할때 아래처럼 Executor를 파라미터로 주면 별도의 thread pool에서 동작하게 한다.

static Executor executor1 = Executors.newFixedThreadPool(3);
.
.
.
//user? thread pool
@Override
public CompletableFuture<String> TEST_ASync3(String name) {

    return CompletableFuture.supplyAsync(() -> {
        System.out.println(getCurrentTime() + " thread: " + Thread.currentThread().getName());
        return TEST_Sync(name); },
            executor1
    );
}
.
.
.
System.out.println(getCurrentTime() + "======== before TEST_ASync3 ========");
CompletableFuture<String> future3 = (CompletableFuture<String>) test.TEST_ASync3("test2");
System.out.println(getCurrentTime() + "======== after1 TEST_ASync3 ========");
String aaa3 = future3.join();//blocking
System.out.println(getCurrentTime() + "======== after2 TEST_ASync3 ========");

결과 값을 보면. 별도의 thread pool를 이용하는것을 볼수있따.

[11:52:29] ======== before TEST_ASync3 ========
[11:52:29] ======== after1 TEST_ASync3 ========
[11:52:29]  thread: pool-1-thread-1
[11:52:29]  TEST_Sync: thread: pool-1-thread-1
[11:52:31] ======== after2 TEST_ASync3 ========

좀더 개선해 보자

CompletableFuture의 get, join 메소드를 사용하면 순간 blocking이 발생하는데 이를 개선해보자. 이제 blocking 현상을 제거하기 위해. CompletableFuture의 thenApply, thenAccept 를 이용해서 조금더 수정해보자.

thenAccept 는 CompletableFuture < U > 를 반환한다.(결과반환이 없다.) thenApply는 CompletableFuture < T > 데이터를 포함하는 Future를 반환한다.

public <U> CompletableFuture<U> thenApply(Function<? super T, ? extends U> fn) {
    return this.uniApplyStage((Executor)null, fn);
}

public <U> CompletableFuture<U> thenApplyAsync(Function<? super T, ? extends U> fn) {
    return this.uniApplyStage(this.defaultExecutor(), fn);
}

public <U> CompletableFuture<U> thenApplyAsync(Function<? super T, ? extends U> fn, Executor executor) {
    return this.uniApplyStage(screenExecutor(executor), fn);
}

public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
    return this.uniAcceptStage((Executor)null, action);
}

public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
    return this.uniAcceptStage(this.defaultExecutor(), action);
}

public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor) {
    return this.uniAcceptStage(screenExecutor(executor), action);
}

thenAccept method를 이용해서 코드 작성해보자.

//use thenAccept
@Override
public void TEST_ASync4(String name) {

    System.out.println(getCurrentTime() + "======== TEST_ASync4 1========");
    CompletableFuture<Void> future = TEST_ASync3(name).thenAccept(p->
        {
            System.out.println(getCurrentTime() + " thread: " + Thread.currentThread().getName() + "TEST_ASync4 : call back1" + p);
        });
    System.out.println(getCurrentTime() + "======== TEST_ASync4 2========");
}

  • 결과값
    [09:26:42] ======== before TEST_ASync4 ========
    [09:26:42] ======== TEST_ASync4 1========
    [09:26:42] ======== TEST_ASync4 2========
    [09:26:42] ======== after TEST_ASync4 ========
    [09:26:42]  thread: pool-1-thread-1
    [09:26:42]  TEST_Sync: thread: pool-1-thread-1
    [09:26:44]  thread: pool-1-thread-1TEST_ASync4 : call back1test2
    
//use thenApply
@Override
public void TEST_ASync5(String name) {

    System.out.println(getCurrentTime() + "======== TEST_ASync5 1========");
    CompletableFuture<Void> future = TEST_ASync3(name)
            .thenApply(p->{
                System.out.println(getCurrentTime() + " thread: " + Thread.currentThread().getName() + "TEST_ASync5 call back1: " + p); return p;
            })
            .thenAccept(p->{
                System.out.println(getCurrentTime() + " thread: " + Thread.currentThread().getName() + "TEST_ASync5 call back2: " + p);
            });

    System.out.println(getCurrentTime() + "======== TEST_ASync5 2========");
}

  • 결과값
    [09:28:24] ======== before TEST_ASync5 ========
    [09:28:24] ======== TEST_ASync5 1========
    [09:28:24] ======== TEST_ASync5 2========
    [09:28:24] ======== after TEST_ASync5 ========
    [09:28:24]  thread: pool-1-thread-1
    [09:28:24]  TEST_Sync: thread: pool-1-thread-1
    [09:28:26]  thread: pool-1-thread-1TEST_ASync5 call back1: test2
    [09:28:26]  thread: pool-1-thread-1TEST_ASync5 call back2: test2
    
  • 위와 마찬가지로 별로의 thread pool을 이용하고 싶으면 thenAcceptAsync, thenApplyAsync 를 이용하면 된다.

  • 추가로 확인해야 함
    • https://dzone.com/articles/be-aware-of-forkjoinpoolcommonpool
    • https://c10106.tistory.com/5214
    • https://www.baeldung.com/java-fork-join
    • https://m.blog.naver.com/PostView.nhn?blogId=tmondev&logNo=220945933678&proxyReferer=https://www.google.com/
    • https://blog.naver.com/2feelus/220732310413
  • 참고 https://brunch.co.kr/@springboot/267

.