Maison > Java > javaDidacticiel > Comment utiliser l'outil multithread Java CompletableFuture

Comment utiliser l'outil multithread Java CompletableFuture

WBOY
Libérer: 2023-04-29 08:34:15
avant
1983 Les gens l'ont consulté

    Avant-propos

    Problèmes avec Future

    Lors de l'écriture d'un programme multi-thread, vous pouvez utiliser Future pour obtenir les résultats d'un thread asynchrone, mais vous rencontrerez quelques problèmes lors de l'utilisation :

    • Si vous le souhaitez Pour effectuer d'autres opérations sur les résultats de Future, vous devez bloquer le thread actuel

    • Plusieurs Futures ne peuvent pas être exécutés dans une chaîne. Les résultats de chaque Future sont indépendants. Il est prévu de faire une autre chose asynchrone sur le résultat. d'un futur ;

    • Il n'y a pas de stratégie de gestion des exceptions. Si l'exécution du futur échoue, elle doit être capturée manuellement

    CompletableFuture est né

    Afin de résoudre le problème du futur, JDK nous a fourni un classe d'outils utile CompletableFuture dans 1.8 ;

    Il implémente les interfaces Future et CompletionStage et fournit les méthodes de traitement correspondantes pour les lacunes de Future.

    • Notre nouvelle logique de traitement peut être automatiquement rappelée une fois l'exécution du thread asynchrone terminée, sans blocage

    • Plusieurs tâches asynchrones peuvent être organisées, combinées ou triées

    • Gestion des exceptions

    Le cœur de CompletableFuture L'idée est que chaque tâche asynchrone peut être considérée comme une étape (CompletionStage), puis d'autres tâches asynchrones peuvent faire ce qu'elles veulent en fonction de cette étape.

    CompletionStage définit de nombreuses méthodes de traitement par étapes, qui sont très puissantes. Voici seulement quelques méthodes couramment utilisées dans la vie quotidienne pour votre référence.

    Utilisation

    Utilisation de base - Soumettre une tâche asynchrone

    Utilisation simple

    Exécution asynchrone, aucun résultat requis :

    // 可以执行Executors异步执行,如果不指定,默认使用ForkJoinPool
    CompletableFuture.runAsync(() -> System.out.println("Hello CompletableFuture!"));
    Copier après la connexion

    Exécution asynchrone, tout en renvoyant les résultats :

    // 同样可以指定线程池
    CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> "Hello CompletableFuture!");
    System.out.println(stringCompletableFuture.get());
    Copier après la connexion

    Traitement du dernier résultat de la tâche asynchrone

    • thenRun : Non besoin des résultats de l'étape précédente, diriger de nouvelles opérations

    • puisAccepter : obtenir le contenu du traitement asynchrone précédent et effectuer de nouvelles opérations

    • puisAppliquer : obtenir le contenu de l'étape précédente, puis générer un nouveau contenu

    Tout ce qui porte le suffixe Async signifie que la nouvelle opération de traitement est toujours asynchrone. Les opérations asynchrones peuvent spécifier des exécuteurs pour le traitement

    Comment utiliser loutil multithread Java CompletableFuture

    // Demo
           CompletableFuture
                    .supplyAsync(() -> "Hello CompletableFuture!")
                    // 针对上一步的结果做处理,产生新的结果
                    .thenApplyAsync(s -> s.toUpperCase())
                    // 针对上一步的结果做处理,不返回结果
                    .thenAcceptAsync(s -> System.out.println(s))
                    // 不需要上一步返回的结果,直接进行操作
                    .thenRunAsync(() -> System.out.println("end"));
            ;
    Copier après la connexion

    Sélectionnez les deux résultats -acceptEither

    Lorsque nous avons deux traitements de rappel, n'importe quelle complétion peut être utilisée, les deux résultats n'ont aucune relation, puis utilisez acceptEither.

    Celui qui termine en premier l'exécution des deux threads asynchrones utilisera le résultat Il en va de même pour les autres types de méthodes.

    Comment utiliser loutil multithread Java CompletableFuture

    Comment utiliser loutil multithread Java CompletableFuture

    // 返回abc
    CompletableFuture
                    .supplyAsync(() -> {
                        SleepUtils.sleep(100);
                        return "Hello CompletableFuture!";
                    })
                    .acceptEither(CompletableFuture.supplyAsync(() -> "abc"), new Consumer<String>() {
                        @Override
                        public void accept(String s) {
                            System.out.println(s);
                        }
                    });
    // 返回Hello CompletableFuture!       
    CompletableFuture
                    .supplyAsync(() -> "Hello CompletableFuture!")
                    .acceptEither(CompletableFuture.supplyAsync(() -> {
                        SleepUtils.sleep(100);
                        return "abc";
                    }), new Consumer<String>() {
                        @Override
                        public void accept(String s) {
                            System.out.println(s);
                        }
                    });
    Copier après la connexion

    Combinez les deux résultats -thenCombine, thenAcceptBoth

    thenCombine

    Lorsque nous avons deux CompletionStage, nous devons intégrer les deux résultats, puis calculer un nouveau résultat.

    • thenCompose traite le résultat du CompletionStage précédent et renvoie le résultat, et le type de retour doit être CompletionStage.

    • thenCombine obtient le résultat du premier CompletionStage, puis obtient le CompletionStage actuel et traite les résultats des deux.

            CompletableFuture<Integer> heightAsync = CompletableFuture.supplyAsync(() -> 172);
    
            CompletableFuture<Double> weightAsync = CompletableFuture.supplyAsync(() -> 65)
                    .thenCombine(heightAsync, new BiFunction<Integer, Integer, Double>() {
                        @Override
                        public Double apply(Integer wight, Integer height) {
                            return wight * 10000.0 / (height * height);
                        }
                    })
                    ;
    Copier après la connexion

    thenAcceptBoth

    nécessite les résultats de deux CompleteableFutures asynchrones. Lorsque les deux sont terminés, le rappel thenAcceptBoth est entré.

    Comment utiliser loutil multithread Java CompletableFuture

    Comment utiliser loutil multithread Java CompletableFuture

    // thenAcceptBoth案例:
            CompletableFuture
                    .supplyAsync(() -> "Hello CompletableFuture!")
                    .thenAcceptBoth(CompletableFuture.supplyAsync(() -> "abc"), new BiConsumer<String, String>() {
                    		// 参数一为我们刚开始运行时的CompletableStage,新传入的作为第二个参数
                        @Override
                        public void accept(String s, String s2) {
                            System.out.println("param1=" + s + ", param2=" + s2);
                        }
                    });
    // 结果:param1=Hello CompletableFuture!, param2=abc
    Copier après la connexion

    Gestion des exceptions

    Lorsque nous utilisons CompleteFuture pour effectuer des appels en chaîne, si l'un des multiples rappels asynchrones a un problème d'exécution, alors les rappels suivants s'arrêteront, une exception est donc nécessaire pour les stratégies de traitement.

    exceptionnellement

    exceptionnellement, c'est lorsqu'une erreur survient, cela nous donne la possibilité de récupérer et de personnaliser le contenu renvoyé.

            CompletableFuture.supplyAsync(() -> {
                throw new RuntimeException("发生错误");
            }).exceptionally(throwable -> {
                log.error("调用错误 {}", throwable.getMessage(), throwable);
                return "异常处理内容";
            });
    Copier après la connexion

    handle

    exceptionnellement ne sera exécuté que lorsqu'une exception se produit, tandis que handle sera exécuté indépendamment du fait qu'une erreur se produise ou non.

    CompletableFuture.supplyAsync(() -> {
        return "abc";
    })
    .handle((r,err) -> {
        log.error("调用错误 {}", err.getMessage(), err);
        // 对结果做额外的处理
        return r;
    })
    ;
    Copier après la connexion

    Cas

    Un grand nombre d'utilisateurs envoient des messages texte|messages

    L'exigence est d'informer les utilisateurs avec des conditions spécifiques dans un tableau via des messages texte. Cependant, il existe des millions d'utilisateurs de messages texte si la lecture en un seul thread est possible. utilisé, l’efficacité de lecture sera très lente. À ce stade, vous pouvez envisager d'utiliser le multithread pour lire :

    1. Divisez la tâche de lecture en plusieurs sous-tâches différentes et spécifiez le décalage et le nombre de lectures

      // 假设有500万条记录
            long recordCount = 500 * 10000;
            int subTaskRecordCount = 10000;
            // 对记录进行分片
            List<Map> subTaskList = new LinkedList<>();
            for (int i = 0; i < recordCount / 500; i++) {
                // 如果子任务结构复杂,建议使用对象
                HashMap<String, Integer> subTask = new HashMap<>();
                subTask.put("index", i);
                subTask.put("offset", i * subTaskRecordCount);
                subTask.put("count", subTaskRecordCount);
                subTaskList.add(subTask);
            }
    Copier après la connexion

    2. Utilisez la lecture par lots multi-thread

      // 进行subTask批量处理,拆分为不同的任务
            subTaskList.stream()
                    .map(subTask -> CompletableFuture.runAsync(()->{
                        // 读取数据,然后处理
                        // dataTunel.read(subTask);
                    },excuturs))   // 使用应用的通用任务线程池
                    .map(c -> ((CompletableFuture<?>) c).join());
    Copier après la connexion

    3. Effectuer un traitement de logique métier ou effectuer directement un traitement de logique métier après la lecture ;

    并发获取商品不同信息

    在系统拆分比较细的时候,价格,优惠券,库存,商品详情等信息分散在不同的系统中,有时候需要同时获取商品的所有信息, 有时候可能只需要获取商品的部分信息。

    当然问题点在于要调用多个不同的系统,需要将RT降低下来,那么需要进行并发调用;

         List<Task> taskList = new ArrayList<>();
            List<Object> result = taskList.stream()
                    .map(task -> CompletableFuture.supplyAsync(()->{
    //                    handlerMap.get(task).query();
                        return "";
                    }, executorService))
                    .map(c -> c.join())
                    .collect(Collectors.toList());
    Copier après la connexion

    问题

    thenRun和thenRunAsync有什么区别

    • 如果不使用传入的线程池,大家用默认的线程池ForkJoinPool

    • thenRun用的默认和上一个任务使用相同的线程池

    • thenRunAsync在执行新的任务的时候可以接受传入一个新的线程池,使用新的线程池执行任务;

    handle和exceptional有什么区别

    exceptionally是只有发生异常时才会执行,而handle则是不管是否发生错误都会执行。

    Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

    Étiquettes associées:
    source:yisu.com
    Déclaration de ce site Web
    Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
    Tutoriels populaires
    Plus>
    Derniers téléchargements
    Plus>
    effets Web
    Code source du site Web
    Matériel du site Web
    Modèle frontal