Cet article vous apporte des connaissances pertinentes sur java, qui résout principalement les problèmes liés à la mise en œuvre d'un pool de coroutines basé sur quasar. Un thread peut avoir plusieurs coroutines, et un processus peut également avoir plusieurs coroutines indépendamment. Les processus de thread sont tous des mécanismes synchrones. , alors que les coroutines sont asynchrones, examinons-les ensemble, j'espère que cela sera utile à tout le monde.
Étude recommandée : "Tutoriel vidéo Java"
Scénario commercial : Golang et Swoole adoptent les coroutines Avec le même nombre de tâches simultanées, les coroutines peuvent être plusieurs fois plus nombreuses que les threads. Récemment, lorsque j'interrogeais Java, j'ai appris que Java lui-même n'avait pas de coroutines, mais qu'une certaine vache implémentait elle-même des coroutines, qui est le protagoniste de cet article, quasar (processus fibre) ! Mais je n'ai vu personne révéler le fonctionnement intéressant du pool de coroutines manuscrites (Qui l'utiliserait directement ? Cela signifie qu'ils n'ont pas été sévèrement battus par la société ~)
Un thread peut avoir plusieurs coroutines, et un processus peut également posséder plusieurs coroutines.
Les processus thread sont tous des mécanismes synchrones, tandis que les coroutines sont asynchrones.
La coroutine peut conserver l'état du dernier appel. Chaque fois que le processus rentre, cela équivaut à entrer dans l'état du dernier appel.
Les threads sont préemptifs, tandis que les coroutines ne sont pas préemptives, les utilisateurs doivent donc libérer leurs droits d'utilisation pour passer à d'autres coroutines. Par conséquent, une seule coroutine a réellement le droit de s'exécuter en même temps, ce qui équivaut à la capacité de. un seul fil.
Les coroutines ne remplacent pas les threads, mais sont abstraites des threads. Les threads sont des ressources CPU divisées. Les coroutines sont des processus de code organisés. Les threads sont des ressources de coroutines, mais les coroutines n'utilisent pas directement les threads. La coroutine utilise directement l'exécuteur (Intercepteur). L'exécuteur peut être associé à n'importe quel thread ou pool de threads, et peut être le thread actuel, le thread de l'interface utilisateur ou créer un nouveau processus.
Les threads sont des ressources de coroutines. Les coroutines utilisent la ressource thread indirectement via Interceptor.
Sans plus attendre, passons directement au code :
Package d'import :
<dependency> <groupId>co.paralleluniverse</groupId> <artifactId>quasar-core</artifactId> <version>0.7.9</version> <classifier>jdk8</classifier> </dependency>
WorkTools工具类:
package com.example.ai; import co.paralleluniverse.fibers.Fiber; import co.paralleluniverse.fibers.SuspendExecution; import co.paralleluniverse.strands.SuspendableRunnable; import java.util.concurrent.ArrayBlockingQueue; public class WorkTools { //协程池中默认协程的个数为5 private static int WORK_NUM = 5; //队列默认任务为100 private static int TASK_COUNT = 100; //工做协程数组 private Fiber[] workThreads; //等待队列 private final ArrayBlockingQueue<SuspendableRunnable> taskQueue; //用户在构造这个协程池时,但愿启动的协程数 private final int workerNum; //构造方法:建立具备默认协程个数的协程池 public WorkTools() { this(WORK_NUM,TASK_COUNT); } //建立协程池,workNum为协程池中工做协程的个数 public WorkTools(int workerNum, int taskCount) { if (workerNum <= 0) { workerNum = WORK_NUM; } if (taskCount <= 0) { taskCount = TASK_COUNT; } this.workerNum = workerNum; taskQueue = new ArrayBlockingQueue(taskCount); workThreads = new Fiber[workerNum]; for (int i = 0; i < workerNum; i++) { int finalI = i; workThreads[i] = new Fiber<>(new SuspendableRunnable() { @Override public void run() throws SuspendExecution, InterruptedException { SuspendableRunnable runnable = null; while (true){ try{ //取任务,没有则阻塞。 runnable = taskQueue.take(); }catch (Exception e){ System.out.println(e.getMessage()); } //存在任务则运行。 if(runnable != null){ runnable.run(); } runnable = null; } } }); //new一个工做协程 workThreads[i].start(); //启动工做协程 } Runtime.getRuntime().availableProcessors(); } //执行任务,其实就是把任务加入任务队列,何时执行由协程池管理器决定 public void execute(SuspendableRunnable task) { try { taskQueue.put(task); //put:阻塞接口的插入 } catch (Exception e) { // TODO: handle exception System.out.println("阻塞"); } } //销毁协程池,该方法保证全部任务都完成的状况下才销毁全部协程,不然等待任务完成再销毁 public void destory() { //工做协程中止工做,且置为null System.out.println("ready close thread..."); for (int i = 0; i < workerNum; i++) { workThreads[i] = null; //help gc } taskQueue.clear(); //清空等待队列 } //覆盖toString方法,返回协程信息:工做协程个数和已完成任务个数 @Override public String toString() { return "WorkThread number:" + workerNum + " ==分割线== wait task number:" + taskQueue.size(); } }
Code de test :
package com.example.ai; import co.paralleluniverse.strands.SuspendableRunnable; import lombok.SneakyThrows; import org.springframework.boot.autoconfigure.SpringBootApplication; import java.util.concurrent.CountDownLatch; @SpringBootApplication public class AiApplication { @SneakyThrows public static void main(String[] args) { //等待协程任务完毕后再结束主线程 CountDownLatch cdl = new CountDownLatch(50); //开启5个协程,50个任务列队。 WorkTools myThreadPool = new WorkTools(5, 50); for (int i = 0; i< 50; i++){ int finalI = i; myThreadPool.execute(new SuspendableRunnable() { @Override public void run() { System.out.println(finalI); try { //延迟1秒 Thread.sleep(1000); cdl.countDown(); } catch (InterruptedException e) { System.out.println("阻塞中"); } } }); } //阻塞 cdl.await(); } }
Le code spécifique est commenté, afin que vous puissiez le comprendre vous-même. Je l'ai également implémenté en utilisant l'écriture de pool de threads.
Actuellement, nous essayons de résoudre le problème : pendant le processus de blocage de la coroutine, la classe Fiber signalera un avertissement de blocage, ce qui est déroutant et ennuyeux à regarder. Il n’y a aucun moyen de résoudre ce problème pour le moment. Voyons si l’un de vous, maîtres, a des idées dans les commentaires ci-dessous. Merci beaucoup~
Apprentissage recommandé : "Tutoriel vidéo Java"
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!