L'exécuteur peut découpler les stratégies de soumission et d'exécution des tâches
Seules les tâches sont du même type et ont des temps d'exécution différents Uniquement lorsque le pool de threads est petit, les performances maximales peuvent être atteintes. Sinon, si certaines tâches longues et courtes sont placées dans un pool de threads, à moins que le pool de threads ne soit très grand, cela provoquera des blocages et d'autres problèmes
Similaire à : Soumettez deux tâches à un pool à un seul thread, et les deux tâches dépendent l'une de l'autre. Si une tâche attend une autre tâche, une impasse se produira ; que le pool n'est pas suffisant
Définition : une tâche doit attendre les résultats en cours d'exécution d'autres tâches dans le pool, et une impasse de famine peut se produire
Remarque : La taille du pool de threads est également soumise à d'autres restrictions, comme d'autres pools de ressources : pool de connexion à la base de données Si chaque tâche est une connexion, alors la taille du pool de threads est soumis à la taille du pool de connexion à la base de données 3. Configurer le pool de threads ThreadPoolExecutor
instance :
1. Renvoie certaines implémentations par défaut. via la méthode d'usine des exécuteurs 2. Personnalisez l'implémentation de la file d'attentedu pool de threads en instanciant ThreadPoolExecutor(. ....) 1. File d'attente illimitée : lorsque la tâche arrive et que le pool de threads est plein, la tâche attend dans la file d'attente. Si la tâche atteint l'infini, la file d'attente s'étendra à l'infini
Par exemple : ceci est. ce qu'utilisent les singletons et les pools de threads de taille fixe
2. File d'attente limitée : si une nouvelle tâche arrive et que la file d'attente est pleine, utilisez la
stratégie de saturation 3. Transfert synchrone : si le pool de threads est volumineux, placez la tâche. Après avoir été mise dans la file d'attente, il y aura un retard dans le transfert. Si le producteur de tâches est très rapide, cela entraînera également l'exécution de la tâche. queued
Syn
chronousQueue remettra directement la tâche au thread de travail Mécanisme : Lorsqu'une tâche est placée, il doit y avoir un thread en attente pour l'accepter. Sinon, alors
ajoutez unthread si le thread est saturé, rejetez la tâche Par exemple :
CacheThreadPool utilise cette stratégie.
Stratégie de saturation :
setRejectedExecutionHandl euh pour modifier la stratégie de saturation 1. Terminer
Abandonner(par défaut) : lancer une exception gérée par l'appelant 2. Abandonner
Rejeter3. Rejeter
DiscardOldest: supprimez la tâche la plus ancienne, remarque : si elle est prioritaire la file d'attente supprimera la tâche la plus prioritaire 4.
CtouterRuns : tâche de rollback, le thread appelant la gère tout seul 4. Thread factory ThreadFactoy
Fabrique de threads personnalisée : implémente ThreadFactory
Vous pouvez personnaliser le
comportementde la fabrique de threads : comme Uncaught ExceptionGestionnaire, etc.
public class MyAppThread extends Thread { public static final String DEFAULT_NAME = "MyAppThread"; private static volatile boolean debugLifecycle = false; private static final AtomicInteger created = new AtomicInteger(); private static final AtomicInteger alive = new AtomicInteger(); private static final Logger log = Logger.getAnonymousLogger(); public MyAppThread(Runnable r) { this(r, DEFAULT_NAME); } public MyAppThread(Runnable runnable, String name) { super(runnable, name + "-" + created.incrementAndGet()); //设置该线程工厂创建的线程的 未捕获异常的行为 setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { public void uncaughtException(Thread t, Throwable e) { log.log(Level.SEVERE, "UNCAUGHT in thread " + t.getName(), e); } }); } public void run() { // Copy debug flag to ensure consistent value throughout. boolean debug = debugLifecycle; if (debug) log.log(Level.FINE, "Created " + getName()); try { alive.incrementAndGet(); super.run(); } finally { alive.decrementAndGet(); if (debug) log.log(Level.FINE, "Exiting " + getName()); } } public static int getThreadsCreated() { return created.get(); } public static int getThreadsAlive() { return alive.get(); } public static boolean getDebug() { return debugLifecycle; } public static void setDebug(boolean b) { debugLifecycle = b; } }
5. Étendre ThreadPoolExecutor
1.afterExecute : Après la fin, si une exception Run
timeest levée, la méthode ne sera pas exécutée 2.
avantExécuter : Avant de démarrer, si une RuntimeException est levée, la tâche ne sera pas exécutée 3.terminée : Lorsque le pool de threads est fermé, il peut être utilisé pour libérer des ressources, etc.
2. Parallélisation de l'algorithme
récursif//串行化 void processSequentially(List<Element> elements) { for (Element e : elements) process(e); } //并行化 void processInParallel(Executor exec, List<Element> elements) { for (final Element e : elements) exec.execute(new Runnable() { public void run() { process(e); } }); }
Si chaque opération d'itération est indépendante les unes des autres, elle peut être exécutée en série
Par exemple : algorithme de
rechercheen profondeur d'abord ; remarque : la récursivité est toujours en série, Cependant, le calcul de chaque nœud est parallèle
//串行 计算compute 和串行迭代 public <T> void sequentialRecursive(List<Node<T>> nodes, Collection<T> results) { for (Node<T> n : nodes) { results.add(n.compute()); sequentialRecursive(n.getChildren(), results); } } //并行 计算compute 和串行迭代 public <T> void parallelRecursive(final Executor exec, List<Node<T>> nodes, final Collection<T> results) { for (final Node<T> n : nodes) { exec.execute(() -> results.add(n.compute())); parallelRecursive(exec, n.getChildren(), results); } } //调用并行方法的操作 public <T> Collection<T> getParallelResults(List<Node<T>> nodes) throws InterruptedException { ExecutorService exec = Executors.newCachedThreadPool(); Queue<T> resultQueue = new ConcurrentLinkedQueue<T>(); parallelRecursive(exec, nodes, resultQueue); exec.shutdown(); exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); return resultQueue; }
实例:
public class ConcurrentPuzzleSolver <P, M> { private final Puzzle<P, M> puzzle; private final ExecutorService exec; private final ConcurrentMap<P, Boolean> seen; protected final ValueLatch<PuzzleNode<P, M>> solution = new ValueLatch<PuzzleNode<P, M>>(); public ConcurrentPuzzleSolver(Puzzle<P, M> puzzle) { this.puzzle = puzzle; this.exec = initThreadPool(); this.seen = new ConcurrentHashMap<P, Boolean>(); if (exec instanceof ThreadPoolExecutor) { ThreadPoolExecutor tpe = (ThreadPoolExecutor) exec; tpe.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); } } private ExecutorService initThreadPool() { return Executors.newCachedThreadPool(); } public List<M> solve() throws InterruptedException { try { P p = puzzle.initialPosition(); exec.execute(newTask(p, null, null)); // 等待ValueLatch中闭锁解开,则表示已经找到答案 PuzzleNode<P, M> solnPuzzleNode = solution.getValue(); return (solnPuzzleNode == null) ? null : solnPuzzleNode.asMoveList(); } finally { exec.shutdown();//最终主线程关闭线程池 } } protected Runnable newTask(P p, M m, PuzzleNode<P, M> n) { return new SolverTask(p, m, n); } protected class SolverTask extends PuzzleNode<P, M> implements Runnable { SolverTask(P pos, M move, PuzzleNode<P, M> prev) { super(pos, move, prev); } public void run() { //如果有一个线程找到了答案,则return,通过ValueLatch中isSet CountDownlatch闭锁实现; //为类避免死锁,将已经扫描的节点放入set集合中,避免继续扫描产生死循环 if (solution.isSet() || seen.putIfAbsent(pos, true) != null){ return; // already solved or seen this position } if (puzzle.isGoal(pos)) { solution.setValue(this); } else { for (M m : puzzle.legalMoves(pos)) exec.execute(newTask(puzzle.move(pos, m), m, this)); } } } }
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!