WEBZINE

Comment programmer en asynchrone sur Java avec les CompletableFuture

Les systèmes d’informations sont à l’image de notre monde : de plus en plus complexe. Ils stockent de plus en plus de données et la recoupe, la croise constamment.

Lorsque l’on aborde ces systèmes complexes, composés d’un grand nombre de services, bases de données, référentiels, etc.… il devient très vite compliqué d’utiliser en parallèle chacune de ces différentes sources de données. Les traitements sont sans cesse plus complexes à effectuer, mais demandent toujours un temps de réponse constant.

Interroger les sources de données les unes après les autres n’est plus suffisant : le temps de traitement devenant bien trop long.

Aujourd’hui, ce que l’on demande au SI d’une entreprise c’est d’optimiser les différents traitements en parallélisant le plus de tâches possibles au sein d’un même service sans parvenir à bloquer celui-ci lorsqu’une demande de données est plus longue que les autres.

Un moyen d’y parvenir, c’est d’exploiter Java, en utilisant l’API (*Application Programming Interface) CompletableFuture qui va nous permettre de simplifier la gestion des appels asynchrone et d’ainsi paralléliser les appels pour ne pas attendre la réponse à la requête précédente avant d’en faire une nouvelle.

La librairie CompletableFuture

La librairie implémente l’interface des CompletionStage qui existe depuis Java 8. Elle met à disposition tout un ensemble de méthodes pour créer et gérer les appels et ayant pour force la possibilité d’être chaînées les unes après les autres. Voici une liste des fonctions disponibles les plus utiles.

CompletedFuture() :

La méthode suivante permet de créer un CompletableFuture avec un résultat prédéfinit :

final CompletableFuture<String> message =  CompletableFuture.completedFuture("message").exceptionally(throwable -> {
        return "Nouveau completable";
    });
}


La librairie met à disposition deux types de méthode, une première qui sera exécutée dans le même thread que le Future précédent et une seconde dites « async » qui sera exécutée par défaut dans l’implémentation commune de ForkJoinPool si aucun Executor n’est spécifié.

thenApply(),  thenApplyAsync():

Applique une fonction au Future précédent et retourne comme résultat un nouveau CompletionStage:

    final CompletableFuture<String> message = CompletableFuture.completedFuture("message").thenApply(m -> {
        return m + "nouveau message";
    });
    final CompletableFuture<String> messageAsync = CompletableFuture.completedFuture("message").thenApplyAsync(m -> {
        return m + "nouveau message";
    });
}


thenAccept(), thenAcceptAsync()

Applique une fonction au Future précédent et ne retourne pas de résultat.

    final CompletableFuture<Void> voidMessage = CompletableFuture.completedFuture("message").thenAccept(m -> {
        // traitement
    });
    final  CompletableFuture<Void> voidAsync = CompletableFuture.completedFuture("message").thenAcceptAsync(m -> {
        // traitement
    });
}


thenCompose(), thenComposeAsync()

Utilise le résultat d’un CompletableFuture et renvoi un nouveau CompletableFuture

    final CompletableFuture<String> message = CompletableFuture.completedFuture("mon").thenApply(s -> s + " nouveau ")
            .thenCompose(m1 -> CompletableFuture.completedFuture(" completable").thenApply(m2 -> m1 + m2).thenApply(s -> s));
    // message = mon nouveau completable
}


thenCombine(), thenCombineAsync()

Permet de combiner le résultat de deux CompletableFuture de même type

 public static void main(String[] args) {
        CompletableFuture<String> message = CompletableFuture.completedFuture("Hello")
                .thenCombine(CompletableFuture.completedFuture(" World")
                                     .thenApply(s -> s), (s, s2) -> s + s2);g
        // message = Hello World
    }
}


allOf()

Permet d’attendre la fin de chaque future avant de continuer. Cette fonction ne renvoie pas le résultat des completables dans thenApply.

        final CompletableFuture<String> hello = CompletableFuture.completedFuture("Hello");
        final CompletableFuture<String> world = CompletableFuture.completedFuture("World");
        final CompletableFuture<String> nouveauCompletable = CompletableFuture.allOf(hello, world).thenApply(unused -> {
            return hello.join() + " " + world.join();
        });
    }
}


Il existe bien d’autres fonctions disponibles, vous en trouverez la liste complète ici :

Petit bonus, voici une fonction pour pouvoir attendre l’exécution d’une liste de Completable et renvoyer leur résultat sous forme d’une nouvelle liste :

    public static <T> CompletableFuture<List<T>> customAllOf(List<CompletableFuture<T>> futures) {
        return CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new))
                .thenApply(unused -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList()));
    }


Cas concret

Afin de mieux comprendre l’application de ces fonctions, nous allons nous intéresser aux possibilités de la librairie au travers d’un cas concret :

On veut via un webservice REST récupérer des données d’une base de données, tout en enrichissant le résultat avec les réponses de deux webservices. Dans l’ordre ça donne le flux suivant:

– Requête en base de données

– Appel du premier webservice

– Appel du deuxième webservice.

(Par simplicité on va seulement simuler les différents appels)

Prenons une application simple utilisant la dépendance Spring « spring-boot-starter-web », rien de très exotique, du classique.

Nous allons voir maintenant un exemple de mise en œuvre de manière synchrone et asynchrone en utilisant l’api CompletableFuture

Code commun

Les trois objets suivants sont communs aux deux versions :

ProjectDto: L’objet du résultat final.

AgentDto, ClientDto: Les objets Agent et Client représentant les résultats des webservices.

ProjectEntity: L’objet représentant le projet en base de données.

public class ProjectDto {
    private Integer        id;
    private String         name;
    private String         clientName;
    private String         clientAdresse;
    private List<AgentDto> agents;
}
public class AgentDto {
    private Integer id;
    private String name;
    private String surname;
}
public class ClientDto {
    private Integer id;
    private String  name;
    private String  adresse;
    private String  phoneNumber;
}
public class ProjectEntity {
    private Integer       id;
    private String        name;
    private Integer       clientId;
    private List<Integer> agents;
    private Integer       budget;
}


Version synchrone

Par simplicité on simule dans les services le temps de réponse de la base de données et des deux services.

ProjectDatabaseMockService.java

@Service
public class ProjectDatabaseMockService {
    @SneakyThrows
    public List<ProjectEntity> getAll() {
        Thread.sleep(50);
        return Arrays.asList(new ProjectEntity(1, "Awsome Project", 1, Arrays.asList(1, 2, 3), 1000000),
                             new ProjectEntity(2, "IA Project", 2, Arrays.asList(4, 5, 6), 10000),
                             ….);
    }
}


AgentMockWebService.java

@Service
public class AgentMockWebService {
    ….
    @SneakyThrows
    public List<AgentDto> findAllIn(final List<Integer> ids) {
        Thread.sleep(150);
        return users.stream().filter(userDto -> ids.contains(userDto.getId())).collect(Collectors.toList());
    }
}


ClientMockWebService.java

@Service
public class ClientMockWebService {
    ….
    @SneakyThrows
    public List<ClientDto> findAllIn(final List<Integer> ids) {
        Thread.sleep(100);
        return clients.stream().filter(clientDto -> ids.contains(clientDto.getId())).collect(Collectors.toList());
    }
}


ProjetController.java

@RestController
public class ProjectController {
    private final ProjectDatabaseMockService database;
    private final ClientMockWebService clientMockWebService;
    private final AgentMockWebService  userMockWebService;
    public ProjectController(ProjectDatabaseMockService database, ClientMockWebService clientMockWebService, AgentMockWebService userMockWebService) {
        this.database             = database;
        this.clientMockWebService = clientMockWebService;
        this.userMockWebService   = userMockWebService;
    }
    @GetMapping(value = "/projects-sync")
    public List<ProjectDto> getProjets() {
        final List<ProjectEntity> projets = database.getAll(); //1
        final List<ClientDto> clients = clientMockWebService.findAllIn(projets.stream().map(ProjectEntity::getClientId).collect(Collectors.toList())); // 2
        final List<AgentDto>  agents  = userMockWebService.findAllIn(projets.stream().map(ProjectEntity::getAgents).flatMap(Collection::stream).collect(Collectors.toList())); //3
        return projets.stream()
                .map(projectEntity -> createProjectDto(projectEntity, clients, agents))
                .collect(Collectors.toList()); // 4, 5
    }

    private ProjectDto createProjectDto(ProjectEntity projectEntity, List<ClientDto> clientDtos, List<AgentDto> agentDtos) {
        final ClientDto clientDto = clientDtos
                .stream()
                .filter(client -> client.getId().equals(projectEntity.getClientId()))
                .findFirst()
                .orElse(new ClientDto());
        final List<AgentDto> agents = agentDtos
                .stream()
                .filter(agentDto -> projectEntity.getAgents().contains(agentDto.getId()))
                .collect(Collectors.toList());
        return new ProjectDto(projectEntity.getId(), projectEntity.getName(), clientDto.getAdresse(), clientDto.getAdresse(), agents);
    }
}


  1. On appelle la base de données des projets
  2. On appelle le web service des clients pour récupérer les informations des clients relatifs au projet.
  3. On appelle le web service des agents pour récupérer les informations des agents liés au projet.
  4. On regroupe les données du projet dans un objet qui contient les informations du projet, des clients et des agents.
  5. On renvoie le résultat.

Tout le traitement est fait de manière synchrone : les appels se font les uns après les autres avant le croisement des données et le renvoi du résultat. Un traitement exécuté ainsi sera trop lent lorsqu’il y aura besoin de croiser un grand nombre de données, le temps de traitement se cumulant.

Version asynchrone

Pour la version asynchrone il est nécessaire d’ajouter sur Application.java l’annotation @EnableAsync

@EnableAsync
@SpringBootApplication
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}


Il est également aussi nécessaire d’ajouter la configuration permettant de paramétrer le nombre de thread, sinon on reste à la configuration par défaut quelle que soit la machine et on ne profite pas de la puissance de calcul de celle-ci à son maximum.

AsynConfiguration.java

@Configuration
@NoArgsConstructor
public class AsyncConfiguration {
    private final int    corePoolSize       = 12;
    private final int    maxPoolSize       = 24;
    private final int    queueCapacity    = 42;
    private final String threadNamePrefix = "Demo-async";
    @Bean
    public Executor asyncExecutor() {
        final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(corePoolSize); // 1
        executor.setMaxPoolSize(maxPoolSize); // 2
        executor.setQueueCapacity(queueCapacity); // 3
        executor.setThreadNamePrefix(threadNamePrefix);
        executor.initialize();
        return executor;
    }
}


  1. Nombre de thread minimum exécutés en parallèle.
  2. Nombre de thread maximum exécutés en parallèle.
  3. Nombre de thread en attente.

Les trois services de simulation restent les mêmes, à ceci-près qu’ils retournent maintenant des CompletableFuture.

ProjectDatabaseMockService.java

public CompletableFuture<List<ProjectEntity>> getAll() {
    Thread.sleep(50);
    return CompletableFuture.completedFuture(Arrays.asList(new ProjectEntity(1, "Awsome Project", 1, Arrays.asList(1, 2, 3), 1000000),
                                                           new ProjectEntity(2, "IA Project", 2, Arrays.asList(4, 5, 6), 10000), ...);
}


AgentMockWebService.java

public CompletableFuture<List<AgentDto>> findAllIn(final List<Integer> ids) {
    Thread.sleep(150);
    return CompletableFuture.completedFuture(agents.stream().filter(userDto -> ids.contains(userDto.getId())).collect(Collectors.toList()));
}


ClientMockWebService.java

public CompletableFuture<List<ClientDto>> findAllIn(final List<Integer> ids) {
    Thread.sleep(100);
    return CompletableFuture.completedFuture(clients.stream().filter(clientDto -> ids.contains(clientDto.getId())).collect(Collectors.toList()));
}


A présent ce qui va particulièrement nous intéresser : le Controller.

ProjectController.java

@RestController
public class ProjectController {
    private final ProjectDatabaseMockService database;
    private final ClientMockWebService clientMockWebService;
    private final AgentMockWebService  userMockWebService;
    public ProjectController(ProjectDatabaseMockService database, ClientMockWebService clientMockWebService, AgentMockWebService userMockWebService) {
        this.database             = database;
        this.clientMockWebService = clientMockWebService;
        this.userMockWebService   = userMockWebService;
    }

    @SneakyThrows
    @Async  // 1
    @GetMapping(value = "/projects-async")
    public CompletableFuture<List<ProjectDto>> getProjets() { // 2
        // join() make blocking call, but we need to retrieve datas from project
        final List<ProjectEntity>                projectEntities = database.getAll().join(); // 3
        final CompletableFuture<List<ClientDto>> clients = clientMockWebService.findAllIn(projectEntities.stream().map(ProjectEntity::getClientId).collect(Collectors.toList())); // 4
        final CompletableFuture<List<AgentDto>>  agents  = userMockWebService.findAllIn(projectEntities.stream().map(ProjectEntity::getAgents).flatMap(Collection::stream).collect(Collectors.toList())); // 5
        return CompletableFuture
                // wait for both
                .allOf(clients, agents).thenApply(unused -> { // 6
                    // retrieve result with join()
                    final List<ClientDto> clientDtos = clients.join(); //7
                    final List<AgentDto>  userDtos   = agents.join();
                    return projectEntities.stream().map(projectEntity -> createProjectDto(projectEntity, clientDtos, userDtos)).collect(Collectors.toList());  //8
                });
    }

    private ProjectDto createProjectDto(ProjectEntity projectEntity, List<ClientDto> clientDtos, List<AgentDto> userDtos) {
        final ClientDto clientDto = clientDtos
                .stream()
                .filter(client -> client.getId().equals(projectEntity.getClientId()))
                .findFirst()
                .orElse(new ClientDto());
        final List<AgentDto> agents = userDtos
                .stream()
                .filter(userDto -> projectEntity.getAgents().contains(userDto.getId()))
                .collect(Collectors.toList());
        return new ProjectDto(projectEntity.getId(), projectEntity.getName(), clientDto.getAdresse(), clientDto.getAdresse(), agents);
    }


  1. La méthode est annotée @Async. Cette annotation va indiquer à Spring de lancer le traitement en mode asynchrone
  2. La méthode retourne un CompletableFuture et non plus une simple liste.
  3. Le premier appel à la base de données est bloquant. En effet le join() rend le traitement bloquant, mais comme il nous faut le résultat de la base pour poursuivre notre traitement et faire nos autres requêtes cela ne pose pas problème.
  4. On appelle le webservice clients avec les identifiants de tous les clients des projets
  5. On appelle le webservice des agents avec les identifiants de tous les agents des projets.  
  6. On attend la fin du traitement des deux webservices pour poursuivre.
  7. On récupère le résultat des deux webservices.
  8. On traite les résultats pour construire la donnée.

Comme vous pouvez le remarquer, les points 3 et 6 rendent le traitement bloquant. On attend le résultat de la base avant de poursuivre les appels puis on attend le résultat des deux appels pour construire la donnée. Ne pourrait-on pas rendre tout cela beaucoup plus fluide ?

Plusieurs possibilités s’offrent à nous. Nous ne pourrons pas déroger à l’appel bloquant de la base (étape 3) mais nous pouvons par la suite traiter le résultat de manière différente, voici une des possibilités :

ProjectController.java v2

@Async
@GetMapping(value = "/projects-async-v2")
public CompletableFuture<List<ProjectDto>>getProjetsV2() {
    List<ProjectEntity> entities = new ArrayList<>();
    List<ClientDto> clientDtos = new ArrayList<>();
    List<AgentDto> agentDtos = new ArrayList<>();
    final CompletableFuture<List<ProjectEntity>> futureProjets = database.getAll().thenApply(projectEntities -> {
        entities.addAll(projectEntities);
        return projectEntities;
    }); // 1
    final CompletableFuture<Void> futureClients = futureProjets
            .thenApply(projectEntities -> projectEntities.stream().map(ProjectEntity::getClientId).collect(Collectors.toList()))
            .thenCompose(clientMockWebService::findAllIn)
            .thenAccept(clientDtos::addAll); //2
    final CompletableFuture<Void> futureAgents = futureProjets
            .thenApply(projectEntities -> projectEntities.stream().map(ProjectEntity::getAgents).flatMap(Collection::stream).collect(Collectors.toList()))
            .thenCompose(agentMockWebService::findAllIn)
            .thenAccept(agentDtos::addAll); // 3
    return CompletableFuture
            .allOf(futureClients, futureAgents) // 4
            .thenApply(unused -> entities
                    .stream()
                    .map(projectEntity -> createProjectDto(projectEntity, clientDtos, agentDtos))
                    .collect(Collectors.toList())
            );
}


  1. On lance l’appel en base de données et on ajoute les résultats dans une liste externe
  2. On récupère le résultat du premier appel pour récupérer les ids puis faire l’appel
  3. On récupère le résultat du premier appel pour récupérer les ids puis faire l’appel
  4. On attend les résultats des deux webservices on traite le résultat et on retourne celui-ci.

Voilà deux manières d’écrire le même appel, mais il en existe bien d’autres : on peut tout à fait par exemple compléter l’objet ProjectDto dans chaque retour de base de données et de webservices en conservant l’id du client et des agents dans la ProjectDto mais sans l’exposer au client appelant. Cela nous permettrait de ne pas avoir de liste de client et d’agent intermédiaires et de ne plus faire le traitement final dans le allOf.

Exemple :

ProjectDtoV2.java

public class ProjectV2Dto {
    private Integer        id;
    private String         name;
    private String         clientName;
    private String         clientAdresse;
    private List<AgentDto> agents;
    @JsonIgnore
    private List<Integer>  agentIds;
    @JsonIgnore
    private Integer        cliendId;
}


ProjectController.java v3

@Async
@GetMapping(value = "/projects-async-v3")
public CompletableFuture<List<ProjectV2Dto>> getProjetsV3() {
    List<ProjectV2Dto> projectV2Dtos = new ArrayList<>();
    final CompletableFuture<Void> futureProjets = database.getAll()
            .thenAccept(projectEntities -> projectV2Dtos.addAll(projectEntities.stream().map(projectEntity ->
                                                                      new ProjectV2Dto().setId(projectEntity.getId())
                                                                              .setName(projectEntity.getName())
                                                                              .setAgentIds(projectEntity.getAgents())
                                                                              .setCliendId(projectEntity.getClientId()))
                                         .collect(Collectors.toList()))); // 1
    final CompletableFuture<Void> futureClients = futureProjets
            .thenApply(unused -> projectV2Dtos.stream().map(ProjectV2Dto::getCliendId).collect(Collectors.toList()))
            .thenCompose(clientMockWebService::findAllIn)
            .thenAccept(clientDtos -> projectV2Dtos.forEach(projectV2Dto -> {
                final ClientDto clientDto = clientDtos
                        .stream()
                        .filter(client -> client.getId().equals(projectV2Dto.getCliendId()))
                        .findFirst()
                        .orElse(new ClientDto());
                projectV2Dto.setClientAdresse(clientDto.getAdresse());
                projectV2Dto.setClientName(clientDto.getName());
            })); // 2
    final CompletableFuture<Void> futureAgents = futureProjets
            .thenApply(unused -> projectV2Dtos.stream().map(ProjectV2Dto::getAgentIds).flatMap(Collection::stream).collect(Collectors.toList()))
            .thenCompose(agentMockWebService::findAllIn)
            .thenAccept(agentDtos -> projectV2Dtos.forEach(projectV2Dto -> {
                final List<AgentDto> agents = agentDtos
                        .stream()
                        .filter(agentDto -> projectV2Dto.getAgentIds().contains(agentDto.getId()))
                        .collect(Collectors.toList());
                projectV2Dto.setAgents(agents);
            })); // 3
    return CompletableFuture
            .allOf(futureClients, futureAgents)
            .thenApply(unused -> projectV2Dtos); //4
}


  1. Appel de la base de données et construction du ProjectDtoV2 dès le retour de la base.
  2. Au retour du Completable « futureProjet » on appelle le webservice client et on poursuit la construction du ProjectDtoV2 à la suite avec les informations des clients
  3. Au retour du completable « futureClient » on appelle le webservice agent et on poursuit la construction du ProjectDtoV2 à la suite avec les informations des agents
  4. On retourne la liste déjà construite dès que « futureClient » et « futureAgent » ont finis

Les points 2 et 3 permettent de récupérer les données et construire l’objet en parallèle pour les deux webservices.

Pour résumer

Cet article permet de définir la manière dont fonctionne l’API des CompletableFuture et la façon dont s’en servir. On a pu voir qu’il existe de nombreuses façons d’arriver au même résultat en utilisant l’API et que cette API permet d’utiliser l’a-synchronicité de manière beaucoup plus simple et plus fluide que de gérer manuellement les thread en Java.

Il faut avoir à l’esprit que comme toute technologie, cette api n’est pas la solution à tout, elle introduit une complexité supplémentaire à votre application notamment au niveau du contexte, des caches, gestion des logs ou encore backpressure.

Merci à Paul-Arnaud Portefaix, notre développeur HCube Conseil, Tech Lead sur Java – Angular, pour la réalisation de cet article.

Sources

Vous pouvez retrouver les sources des projets ici :

https://github.com/paportefaix/demo-asynchrone

https://github.com/paportefaix/demo-synchrone

Pour approfondir :

https://medium.com/trendyol-tech/spring-boot-async-executor-management-with-threadpooltaskexecutor-f493903617d

https://www.baeldung.com/java-completablefuture

https://dzone.com/articles/20-examples-of-using-javas-completablefuture

Partager sur facebook
Facebook
Partager sur google
Google+
Partager sur twitter
Twitter
Partager sur linkedin
LinkedIn
Partager sur email
Email

Parce que lire, c’est fondamental !

Conseils, innovation ou success stories: découvrez les articles écrits ou mis en avant par nos Hcubiens. Des nouveautés vous attendent chaque semaine ! Bonne lecture !

Parce que lire, c’est fondamental !

Conseils, innovation ou success stories: découvrez les articles écrits ou mis en avant par nos Hcubiens. Des nouveautés vous attendent chaque semaine ! Bonne lecture !

CONTACTEZ NOUS

contact@hcube-conseil.fr
04 42 39 66 98
Nous trouver

INFORMATIONS

Nous contacter
Ils parlent de nous
Travailler avec nous
Conditions d’utilisation
RGPD

HCube Conseil 2019