Existe atualmente um interesse cada vez maior na chamada Programação Reativa. Esse paradigma pode ser visto, numa visão bem resumida, como sendo a criação de aplicações baseadas em eventos (ou mensagens) e totalmente assíncrona, sem bloqueios, fornecendo uma melhor experiência ao usuário e aproveitamento dos recursos de hardware.

O famoso Manifesto Reativo (vide seção Links) ilustra os pontos principais dessa filosofia, que podem ser vistos na Figura 1.

Os pilares da Programação Reativa
Figura 1. Os pilares da Programação Reativa

Devido à natureza das linguagens funcionais, que evitam estados mutáveis e facilitam a criação de aplicações concorrentes, o casamento entre o paradigma funcional e reativo é o caminho natural para a criação de aplicações reativas altamente responsivas. No Scala, por exemplo, temos diversos frameworks que suportam esse paradigma:

  • Play! Framework (disponível também para Java);
  • Akka Framework e seu modelo de atores (disponível também para Java);
  • Slick 3.0 (API de persistência Scala);
  • RxScala (baseado no RxJava);
  • Spray 2.0;
  • Reactive MongoDB, e etc.

Um dos pontos principais do paradigma reativo é o conceito de programação assíncrona e callbacks. Nesse artigo, iremos explorar a programação assíncrona propiciada pela mônada CompletableFuture no Java 8. Mônada são contêiners que encapsulam valores e/ou computações.

Antes disso, será necessário rever o conceito de Future em Java.

Future em Java

A interface Future faz parte do pacote de concorrência java.util.concurrent e foi criada no Java 5 para representar o resultado de uma computação assíncrona. Future também pode ver vista como uma “promessa” de entrega de um resultado que será computado no futuro.

Veremos na Listagem 1 um exemplo de utilização de Future em Java.


import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class Exemplo1 {

   private static final ExecutorService pool = Executors.newFixedThreadPool(10);

   public static Callable<String> getData(final int index, final int time) {
       return new Callable<String>() {
           @Override
           public String call() throws Exception {
               Thread.sleep(time);                
               return "TESTE-" + index;
           }
       };
   }
   
   public static void main(String[] args) throws InterruptedException, ExecutionException {
       long start = System.nanoTime();
       Callable<String> c1 = getData(0, 5000);
       Future<String> f1 = pool.submit(c1);
       System.out.println(f1.get());
       long end = System.nanoTime();
       System.out.println("Tempo decorrido (segundos) = " + ((end - start)/1.0E9));
       pool.shutdown();
   }
}
Listagem 1. Criando uma Future

O código da Listagem 1 aguarda aproximadamente cinco segundos e imprime a mensagem “TESTE-0”.

A interface Callable é similar a interface Runnable:

  • Em Runnable, sobrecarrega-se o método run e dentro dele definimos o trecho de código a ser processado concorrentemente por uma thread. O método run não tem retorno (tipo void);
  • Para Callable, que é uma interface parametrizada, sobrecarregamos o método call com o trecho de código que será executado concorrentemente e que retornará uma resultado T.

No método getData (linhas 11 a 19 da Listagem 1), definimos uma classe anônima que estende Callable, cujo método call aguarda n milissegundos e retorna uma String. Callable, portanto, é uma promessa de resultado de uma String a ser processada no futuro.

Em seguida passamos a instância de Callable c1 (linha 23) para o pool de threads (linha 24) através do método submit, que retorna uma interface Future, no nosso exemplo, uma Future.

O método mais importante da Future é o método get, uma chamada bloqueante que aguarda o trecho de código definido no método call de Callable ser executado e retorna o resultado dessa computação.

Alguém poderia argumentar: qual a diferença entre usar Future/Callable e criar uma aplicação que simplesmente chama o código dentro de call? Ambos iriam demorar aproximadamente cinco segundos.


public class Exemplo2 {
 
    public static String getData(final int index, final int time) throws InterruptedException {
        Thread.sleep(time);                
        return "TESTE-" + index;
    }
    
    public static void main(String[] args) throws InterruptedException {
        long start = System.nanoTime();
        System.out.println(getData(0, 5000));
        long end = System.nanoTime();
        System.out.println("Tempo decorrido (segundos) = " + ((end - start)/1.0E9));
    }
}
Listagem 2. Código sem Future

Veja que o código da Listagem 2 também leva aproximadamente cinco segundos para ser executado.

Para ver a vantagem de se usar Future, vamos alterar o código da Listagem 1 adicionando outra Future, como mostra a Listagem 3.


import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class Exemplo3 {

   private static final ExecutorService pool = Executors.newFixedThreadPool(10);

   public static Callable<String> getData(final int index, final int time) {
       return new Callable<String>() {
           @Override
           public String call() throws Exception {
               Thread.sleep(time);                
               return "TESTE-" + index;
           }
       };
   }
   
   public static void main(String[] args) throws InterruptedException, 
   ExecutionException {
       System.out.println("Processors = " + Runtime.getRuntime()
       .availableProcessors());
       long start = System.nanoTime();
       Callable<String> c1 = getData(0, 5000);
       Callable<String> c2 = getData(1, 5000);
       Future<String> f1 = pool.submit(c1);
       Future<String> f2 = pool.submit(c2);
       System.out.println(f1.get());
       System.out.println(f2.get());        
       long end = System.nanoTime();
       System.out.println("Tempo decorrido (segundos) = " 
       + ((end - start)/1.0E9));
       pool.shutdown();
   }
}
Listagem 3. Usando duas Futures

Qual o tempo de execução final do programa da Listagem 3? Como cada Future leva cinco segundos para processar e há dois métodos get em sequência, alguns poderiam pensar que levaria algo em torno de dez segundos. Ao executar esse código, temos:


Processors = 2
TESTE-0
TESTE-1
Tempo decorrido (segundos) = 5.003425092

O código levou aproximadamente cinco segundos para executar as duas Futures. Notem que enquanto a Future f1 estava bloqueada (através do método get), o código da Future f2 estava sendo executado pelo sistema dentro do Pool de Threads. Portanto, ao terminar o primeiro método get bloqueante da Future f1, o método get de f2 retornou quase que imediatamente, com a computação do trecho já processada, o que indica que o processamento concorrente das duas tarefas realmente aconteceu.

E se o pool de threads tivesse apenas uma thread? Se você pegar o código da linha 9 da Listagem 3 e modificar para:


private static final ExecutorService pool = Executors.newFixedThreadPool(1);

Verá que o programa levará em torno de 10 segundos para executar (observem a influência do Pool de Threads), pois só há uma thread disponível para processar as duas futures:


Processors = 2
TESTE-0
TESTE-1
Tempo decorrido (segundos) = 10.002935669

Com esses exemplos, vimos que a combinação Future e Callable permite a execução concorrente de trechos de código e gerenciados pelo Pool de Threads que definimos dentro da aplicação. Porém, a interface Future não é perfeita, pois o método get é bloqueante. Isso fere um dos pilares da programação reativa, que é o de não permitir o bloqueio de threads.

A interface Future também define o método isDone para verificar se o resultado do processamento ainda não está disponível. Vejamos o seguinte trecho de código da Listagem 4.


import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class Exemplo4 {

   private static final ExecutorService pool = Executors.newFixedThreadPool(10);

   public static Callable<String> getData(final int index, final int time) {
       return new Callable<String>() {
           @Override
           public String call() throws Exception {
               Thread.sleep(time);                
               return "TESTE-" + index;
           }
       };
   }
   
   public static void main(String[] args) throws InterruptedException, ExecutionException {
      long start = System.nanoTime();
       Callable<String> c1 = getData(0, 5000);
       Future<String> f1 = pool.submit(c1);
       while(!f1.isDone()) {
           System.out.println("Aguardando...");
       }
       System.out.println(f1.get());
       long end = System.nanoTime();
       System.out.println("Tempo decorrido (segundos) = " + ((end - start)/1.0E9));
       pool.shutdown();
   }
}
Listagem 4. Usando isDone

Pegamos o exemplo da Listagem 1 e modificamos para usar o método isDone dentro de um laço (linhas 25 a 27). Enquanto a Future f1 não for processada, o método retorna false. Isso permite enviar feedbacks sobre o andamento do processamento ao usuário e até permitir que ele cancele o processo. Quando isDone for igual a true, ao chamarmos o método get, obteremos imediatamente o valor do trecho computado.

Ainda assim não é bom o suficiente, pois temos que escrever manualmente o código para ver quando o código está terminado, para que get não bloqueie. Por exemplo, na linguagem Scala temos também o conceito de Future, que representa um processamento assíncrono, mas a mesma possui callbacks onde um método é chamado quando o processamento termina, sem precisar chamar um método bloqueante como o get ou fazer verificações periódicas com isDone. Isso está muito mais em linha com a programação reativa, pois evita o uso de bloqueios.


import scala.concurrent.Future 
import scala.util.{Failure, Success} 
import scala.concurrent.ExecutionContext.Implicits.global 
 
object FutureSample extends App { 
  val f1 = Future { 
    Thread.sleep(5000) 
    "TESTE-0" 
  } 
  f1.onComplete { 
    case Success(result) => println(result) 
    case Failure(e) => e.printStackTrace() 
  } 
  Thread.sleep(10000) 
}
Listagem 5. Future em Scala

Conforme a Listagem 5, no Scala, uma Future fornece o método onComplete, que recebe um trecho de código a ser executado (callback) quando a Future tiver processado o código definido dentro dela sem nunca bloquear o código (por isso foi usado uma Thread.sleep ao final do código para travar o programa até a Future ser processada). No caso do processamento ser executado com sucesso, a case Success(result) => println(result) será executada. No caso de exceção no trecho de código da Future, a case Failure será invocada e será chamado o método printStackTrace da exceção capturada.

ListenableFuture (Guava)

Felizmente, a biblioteca Guava da Google fornece uma alternativa totalmente assíncrona e similar ao Future do Scala através da classe ListenableFuture para Java.

Vamos reescrever o código da Listagem 1 usando a biblioteca Guava e a classe ListenableFuture, como mostra a Listagem 6.


import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;

public class Exemplo5 {

   private static ListeningExecutorService pool = MoreExecutors
   .listeningDecorator(Executors.newFixedThreadPool(10));

   public static Callable<String> getData(final int index, final int time) {
       return new Callable<String>() {
           @Override
           public String call() throws Exception {
               Thread.sleep(time);                
               return "TESTE-" + index;
           }
       };
   }
   
   public static void main(String[] args) throws InterruptedException, 
   ExecutionException {
       final long start = System.nanoTime();
       Callable<String> c1 = getData(0, 5000);
       ListenableFuture<String> f1 = pool.submit(c1);
       Futures.addCallback(f1, new FutureCallback<String>() {
           @Override
           public void onSuccess(String contents) {
               System.out.println(contents);
               long end = System.nanoTime();
               System.out.println("Tempo decorrido (segundos) = " + 
               ((end - start)/1.0E9));
           }

           @Override
           public void onFailure(Throwable throwable) {
               throwable.printStackTrace();
           }
       });        
       Thread.sleep(10000);        
       pool.shutdown();
   }
}
Listagem 6. Usando Guava

Vejamos as principais diferenças no código da Listagem 6 com o código da Listagem 1:

  • Linha 12: devemos colocar o pool de threads num decorator através da classe MoreExecutors. Com isso, poderemos usar os recursos da biblioteca Guava, no caso o método submit, que retorna um ListenableFuture ao invés da Future;
  • Linha 25: foi transformada numa variável final para que a inner class tenha acesso a ela (restrição do Java). Com ListenableFuture temos a disposição uma solução totalmente assíncrona, não-bloqueante e baseada em eventos (callbacks) no Java;
  • Linha 27: ao invés de retornar uma Future, por usarmos o pool decorator do Guava, recebemos um ListenableFuture;
  • Linha 28 a 40: registramos no ListanableFuture o código de callback a ser executado, no caso, a classe anônima que estende FutureCallback. Para isso usamos o método estático addCallback da classe utilitária Futures do Guava. É o mesmo estilo da programação de eventos do Swing quando registramos um listener para os eventos de tela.

Quando o processamento terminar, no caso de sucesso, será chamado o método onSuccess com o valor da computação. No caso de erro, o método invocado será o onFailure com a exceção gerada. Isso é bem similar ao código Scala apresentado na Listagem 5;

  • Linha 41: como tudo agora é não bloqueante, foi colocado um sleep (o dobro do valor necessário que a Future leva), para não finalizar o programa antes da Future terminar o processamento.

Existem atualmente aplicações Java que usam a especificação Servlet 3.0 para criar servlets assíncronas, melhorando o desempenho do servidor como um todo. Em algumas dessas soluções, a FutureCallback é utilizada.

CompletableFuture (Java 8)

A partir do Java 8, não precisamos mais usar a biblioteca Guava para obter uma Future assíncrona, pois o mesmo fornece a classe CompletableFuture para esse objetivo, que é muito mais poderosa, sendo considerada como uma Future com esteroides, bastando comparar a quantidade de métodos dessa classe com a interface Future de Java.

A quantidade de funcionalidades disponíveis em CompletableFuture é grande em relação a Future (que só tem cinco métodos disponíveis) e, além disso, suporta lambdas e fornece métodos não-bloqueantes através de callbacks, assim como a Future do Scala e o ListenableFuture do Guava. Não bastasse tudo isso, CompletableFuture é uma mônada, o que significa que podemos fazer encadeamento de métodos e composição.

Antes de reescrevermos o código da Listagem 1, vamos ver como usar a classe CompletableFuture com o exemplo da Listagem 7.


import java.util.concurrent.CompletableFuture;

public class CompletableExemplo1 {
   
   public static void main(String ...args) throws InterruptedException {
       CompletableFuture<String> c1 = new CompletableFuture<>();
       c1.thenAccept(str -> System.out.println(str));
       Thread.sleep(1000);
   }
}
Listagem 7. Criando um simples CompletableFuture

Nesse exemplo criamos uma CompletableFuture na linha 6, e repare que não definimos nenhum trecho de código para ele executar, ou seja, podemos definir isso como sendo literalmente uma “promessa vazia”.

O método thenAccept recebe uma interface Consumer, ou seja, podemos passar uma função lambda que recebe um argumento e não devolve resultado. No nosso caso, definimos strSystem.out.println(str). Essa função lambda é o callback que será chamado quando CompletableFuture terminar seu processamento. No caso do código da Listagem 7, não definimos nada para a CompletableFuture fazer, logo, thenAccept nunca será executada.

Uma das formas de se definir um valor é através do método complete, que recebe um parâmetro T (o mesmo tipo que foi definido na classe parametrizada), como mostra a Listagem 8.


import java.util.concurrent.CompletableFuture;
 
public class CompletableExemplo1 {
    
    public static void main(String ...args) throws InterruptedException {
        CompletableFuture<String> c1 = new CompletableFuture<>();
        c1.complete("Ola Mundo");
        c1.thenAccept(str -> System.out.println(str));
        Thread.sleep(1000);
    }
}
Listagem 8. Usando método complete

Veja que a única novidade é o método complete, que define um valor para c1, com isso, o método thenAccept será chamado. Esse exemplo parece ser simplório, mas ele ganha muita importância quando threads entram no jogo, como mostra a Listagem 9.


import java.util.concurrent.CompletableFuture;
 
public class CompletableExemplo2 {
    
    public static void main(String ...args) throws InterruptedException {
        final CompletableFuture<String> c1 = new CompletableFuture<>();
        new Thread(() -> c1.complete("Ola Mundo")).start();
        c1.thenAccept(str -> System.out.println(str));
        Thread.sleep(1000);
    }
}
Listagem 9. Threads e complete

Agora, complete é chamado dentro de uma Thread, ou seja, podemos ter código concorrente onde o mesmo definirá um valor e o setará em c1. Quando isso ocorrer, o método thenAccept é disparado. Portanto, uma outra thread, que não a thread onde CompletableFuture foi criada, pode definir seu valor e ativar o callback.

Imagine que a thread que irá definir o valor de c1 lance uma exceção. Podemos usar os métodos completeExceptionally e exceptionally para manipular essa situação, como mostra a Listagem 10.


import java.util.concurrent.CompletableFuture;

public class CompletableExemplo3 {
   
   public static void main(String ...args) throws InterruptedException {
       final CompletableFuture<String> c1 = new CompletableFuture<>();
       new Thread(() -> {
          try {
               throw new InterruptedException("Excecao");
           } catch(Exception ex) {
               c1.completeExceptionally(ex);
           }
       }).start();
       c1.exceptionally(ex -> {
           System.out.println(“Erro = “ + ex.getMessage());
           return "Erro";
       });
       c1.thenAccept(str -> System.out.println(“Ok = “ + str));
       Thread.sleep(1000);
   }
}
Listagem 10. Lidando com exceçõesE

No caso de exceção, podemos passá-la a c1 através do método completeExceptionally (linha 11). Na linha 14 definimos o callback a ser executado quando houver exceção, através do método exceptionally, e na linha 18 o callback em caso de sucesso (thenAccept).

Como houve erro na thread, o lambda definido nas linhas 14 a 17 será executado.

Ao invés de definir um callback para exceções e outro para execuções normais em dois métodos diferentes, podemos usar o método handle, como mostra a Listagem 11.


import java.util.Random;
import java.util.concurrent.CompletableFuture;
 
public class CompletableExemplo4 {
 
    public static void main(String... args) throws InterruptedException {
        final CompletableFuture<String> c1 = new CompletableFuture<>();
        new Thread(() -> {
            try {
                if(new Random().nextInt(2) == 0) {
                    throw new InterruptedException("Excecao");
                } else {
                    c1.complete("OK");
                }
            } catch (Exception ex) {
                c1.completeExceptionally(ex);
            }
        }).start();
 
        c1.handle((content, ex) -> {
            if (ex == null) {
                System.out.println("Conteudo = " + content);
            } else {                
                ex.printStackTrace();
            }
            return "";
        });
        Thread.sleep(1000);
    }
}
Listagem 11. Método handle

O código chama um único método handle para definir o call-back, tanto para a correta execução do código, como para o tratamento de exceção.

Podemos também criar um método “recover” a partir da combinação de exceptionally e thenAccept, como mostra a Listagem 12.


import java.util.Random;
import java.util.concurrent.CompletableFuture;
 
public class CompletableExemplo5 {
    
    public static void main(String ...args) throws InterruptedException {
        final CompletableFuture<String> c1 = new CompletableFuture<>();
        new Thread(() -> {
            try {
                if(new Random().nextInt(2) == 0) {
                    throw new InterruptedException("Excecao");
                } else {
                    c1.complete("OK");
                }
            } catch (Exception ex) {
                c1.completeExceptionally(ex);
            }
        }).start();
        
        c1.exceptionally(ex -> {
            System.out.println("Erro = " + ex.getMessage());
            return "Erro";
        }).thenAccept(str -> System.out.println("Ok = " + str));
        
        Thread.sleep(1000);
    }
}
Listagem 12. Definindo recover

Desta vez usamos exceptionally e thenAccept de forma encadeada no código da Listagem 12. No caso de não haver erro, apenas thenAccept será chamado e, em caso de erro, o método exceptionally será chamado, definirá a String “Erro” como valor default e o método thenAccept será chamado para esse valor, ou seja, exceptionally serve para definir valores default em caso de erros.

Além de complete, podemos usar os métodos supplyAsync e runAsync para fornecer lambdas com o código a ser computado, como mostra a Listagem 13.


import java.util.concurrent.CompletableFuture;
 
public class CompletableExemplo6 {
 
    public static void main(String... args) throws InterruptedException {
        final CompletableFuture<String> c1 = CompletableFuture.supplyAsync(() -> "OK");
        c1.handle((content, ex) -> {
            if (ex == null) {
                System.out.println("Conteudo = " + content);
            } else {
                ex.printStackTrace();
            }
            return "";
        });
        Thread.sleep(1000);
    }
}
Listagem 13. Usando supplyAsync

No código usamos supplyAsync para executar um lambda (do tipo Supplier), que simplesmente retorna uma String “OK”.

Lambdas não gostam muito de exceções verificadas. Por exemplo, o compilador reclamará se escrevermos esse código da Listagem 14.


import java.util.Random;
import java.util.concurrent.CompletableFuture;

public class CompletableExemplo7 {

   public static String getValor() throws InterruptedException {
       if (new Random().nextInt(2) == 0) {
           throw new InterruptedException("Erro");
       } else {
           return "OK";
       }
   }

   public static void main(String... args) throws InterruptedException {
       final CompletableFuture<String> c1 = 
       CompletableFuture.supplyAsync(() -> getValor());
       c1.exceptionally(ex -> {
           System.out.println("Erro = " + ex.getMessage());
          return "Erro";
       }).thenAccept(str -> System.out.println("Ok = " + str));
       Thread.sleep(1000);
   }
}
Listagem 14. Exceções verificadas

O compilador irá reclamar na linha 15 da Listagem 14 (unreported exception InterrupedException...). Uma das soluções para esse problema seria usando runAsync.


import java.util.Random;
import java.util.concurrent.CompletableFuture;
 
public class CompletableExemplo8 {
 
    public static String getValor() throws InterruptedException {
        if (new Random().nextInt(2) == 0) {
            throw new InterruptedException("Erro");
        } else {
            return "OK";
        }
    }
 
    public static void main(String... args) throws 
    InterruptedException {
        final CompletableFuture<String> c1 = 
        new CompletableFuture<>();
        CompletableFuture.runAsync(() -> {
            try {
              c1.complete(getValor());
            } catch(InterruptedException ex) {
              c1.completeExceptionally(ex);
            }
        });
        c1.exceptionally(ex -> {
            System.out.println("Erro = " + ex.getMessage());
            return "Erro";
        }).thenAccept(str -> System.out.println("Ok = " + str));
        Thread.sleep(1000);
    }
}
Listagem 15. Usando runAsync

Definimos dois CompletableFutures no código da Listagem 15: o primeiro é uma “promessa vazia” c1, cuja valor será definido dentro do segundo CompletableFuture, que executa runAsync. Como vimos nos exemplos iniciais, podemos definir o valor de CompletableFuture dentro de outra thread, usando os métodos complete ou completeExceptionally.

Depois de todos esses exemplos, podemos finalmente escrever o código da Listagem 1 usando CompletableFuture.


import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
public class Exemplo6 {
 
    private static final ExecutorService pool = Executors.newFixedThreadPool(1);
 
    public static String getData(final int index, final int time) throws InterruptedException {
        Thread.sleep(time);
        return "TESTE-" + index;
    }
 
    public static void main(String[] args) throws InterruptedException, ExecutionException {
 
        System.out.println("Processors = " + Runtime.getRuntime().availableProcessors());
 
        final long start = System.nanoTime();
        final CompletableFuture<String> future = new CompletableFuture();
 
        CompletableFuture.runAsync(() -> {
            try {
                future.complete(getData(0, 5000));
            } catch (InterruptedException e) {
                future.completeExceptionally(e);
            }
        }, pool
        );
 
        future.handle((content, ex) -> {
            if (ex == null) {
                System.out.println(content);
                long end = System.nanoTime();
                System.out.println("Tempo decorrido (segundos) = " + ((end - start) / 1.0E9));
            } else {
                ex.printStackTrace();
            }
            return null;
        });
 
        pool.shutdown();
    }
}
Listagem 16. Usando CompletableFuture

Depois de vermos todos os exemplos anteriores, fica fácil reescrever o código da Listagem 1 usando CompletableFuture, como demonstrado na Listagem 16.

Dissemos anteriormente que CompletableFuture é uma mônada. Mas não existe um método flatMap nessa classe. Então, como ela pode ser uma mônada?

De fato, não existe, porém, o método thenCompose faz esse papel. Vejamos um caso: suponha que você deseja executar uma operação num CompletableFuture após ele ter sido processado. Por exemplo, dado um CompletableFuture que retorna um valor Integer, queremos multiplicar esse valor por 2, sendo que a multiplicação também deve ser assíncrona (executado num outro CompletableFuture).


import java.util.Random;
import java.util.concurrent.CompletableFuture;
 
public class CompletableExemplo9 {
 
    public static int getValor() {
        Integer i = new Random().nextInt(1000);
        System.out.println("valor original = " + i);
        return i;
    }
 
    public static void main(String... args) throws InterruptedException {
        final CompletableFuture<Integer> c1 =  CompletableFuture
                .supplyAsync(()->getValor())
                .thenCompose(i -> CompletableFuture.supplyAsync(()->i*2));
        
        c1.exceptionally(ex -> {
            System.out.println("Erro = " + ex.getMessage());
            return -1;
        }).thenAccept(num -> System.out.println("Operacao = " + num));
        Thread.sleep(1000);
    }
}
Listagem 17. Aplicando thenCompose

No código da Listagem 17 aplicamos encadeamento de métodos com thenCompose, mantendo o código totalmente assíncrono, sendo que o CompletableFuture mais interno só será executado quando o primeiro CompletableFuture c1 terminar.

Podemos usar também o método thenApply, que equivale ao conhecido método map() das linguagens funcionais, que aplica uma transformação simples através de uma função lambda.


import java.util.Random;
import java.util.concurrent.CompletableFuture;
 
public class CompletableExemplo10 {
 
    public static int getValor() {
        Integer i = new Random().nextInt(1000);
        System.out.println("valor original = " + i);
        return i;
    }
 
    public static void main(String... args) throws InterruptedException {
        final CompletableFuture<String> c1 =  CompletableFuture
                .supplyAsync(()->getValor())
                .thenApply(i -> String.valueOf(i));
        
        c1.exceptionally(ex -> {
            System.out.println("Erro = " + ex.getMessage());
            return "Erro";
        }).thenAccept(str -> System.out.println("String = " + str));
        Thread.sleep(1000);
    }
Listagem 18. Usando thenApply

No código da Listagem 18 estamos usando o thenApply para transformar o valor int devolvido pelo método getValor num valor String. Ou seja, convertemos um CompletableFuture para um CompletableFuture (que é o papel de uma função map), e thenApply não tem nenhum efeito bloqueante, ou seja, é tudo de forma assíncrona. Por fim, o código da Listagem 18 é reescrito na Listagem 19 usando encadeamento total.


import java.util.Random;
import java.util.concurrent.CompletableFuture;
 
public class CompletableExemplo11 {
 
    public static int getValor() {
        return new Random().nextInt(1000);
    }
 
    public static void main(String... args) throws InterruptedException {
        CompletableFuture
            .supplyAsync(()->getValor())
            .thenApply(i -> String.valueOf(i))        
            .exceptionally(ex -> {
                System.out.println("Erro = " + ex.getMessage());
                return "Erro";
            }).thenAccept(str -> System.out.println("String = " + str));
        Thread.sleep(1000);
    }
}
Listagem 19. Encadeamento

Existem outros métodos não explorados em CompletableFuture nesse artigo, além de diversas combinações que podemos fazer com seus métodos. Ela é incomparavelmente mais poderosa que os seus pares Java Future e ListenableFuture (e muito similar à dupla Future/Promise do Scala) e deve ser usada no lugar desses dois para a escrita de código assíncrono envolvendo Futures.

A Programação Reativa tem se tornado cada vez mais presente nas novas API's que surgem e na evolução de API's consolidadas. Um bom exemplo é a biblioteca RxJava e o suporte a Servlets assíncronas na especificação 3.0.

Portanto, ter em mente como Future e CompletableFuture funcionam é o primeiro passo para entender esse novo paradigma.