Zum Beispiel sorgt das Code-Snippet unten dafür, dass die Leute es lesen, als würden sie Gedichte lesen. Doch bei falscher Anwendung kann es tödlich sein.
List<Integer> transactionsIds = widgets.stream() .filter(b -> b.getColor() == RED) .sorted((x,y) -> x.getWeight() - y.getWeight()) .mapToInt(Widget::getWeight) .sum();
Dieser Code hat eine Schlüsselfunktion, nämlich Stream. Damit können wir eine gewöhnliche Liste in einen Stream umwandeln und dann Pipelines verwenden, um die Liste zu verarbeiten. Alles in allem sagt alles, was ich verwendet habe, dass es gut ist.
Sie sind mit diesen Funktionen nicht so vertraut? Sie können sich auf Folgendes beziehen: „Karte und FlatMap sind überall, was bedeuten sie?“ 🎜🎜## 🎜🎜#
Was passiert, wenn wir Stream in ParallelStream ändern?Entsprechend der wörtlichen Bedeutung ändert sich der Stream von seriell zu parallel.
Angesichts der Tatsache, dass dies eine parallele Situation ist, ist es offensichtlich, dass es Probleme mit der Thread-Sicherheit geben wird. Allerdings sprechen wir hier nicht über die Notwendigkeit, threadsichere Sammlungen zu verwenden, da dieses Thema zu grundlegend ist. Heutzutage ist das Erlernen der Verwendung threadsicherer Sammlungen in Thread-unsicheren Situationen zu einer wesentlichen Fähigkeit geworden. Die Falle ist dieses Mal das Leistungsproblem des parallelen Streamings. Wir lassen Code sprechen. Der folgende Code aktiviert 8 Threads gleichzeitig, und alle Threads verwenden parallele Streams zur Datenberechnung. In der Ausführungslogik lassen wir jede Aufgabe 1 Sekunde lang ruhen, um das zeitaufwändige Warten einiger E/A-Anfragen zu simulieren. Bei Verwendung von Stream kehrt das Programm nach 30 Sekunden zurück. Wir gehen jedoch davon aus, dass das Programm in mehr als 1 Sekunde zurückkehrt, da es sich um einen parallelen Stream handelt und diesen Titel verdient. Der Test ergab, dass wir lange gewartet haben, bis die Aufgabe abgeschlossen war.static void paralleTest() { List<Integer> numbers = Arrays.asList( 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29 ); final long begin = System.currentTimeMillis(); numbers.parallelStream().map(k -> { try { Thread.sleep(1000); System.out.println((System.currentTimeMillis() - begin) + "ms => " + k + " \t" + Thread.currentThread()); } catch (InterruptedException e) { e.printStackTrace(); } return k; }).collect(Collectors.toList()); } public static void main(String[] args) { // System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20"); new Thread(() -> paralleTest()).start(); new Thread(() -> paralleTest()).start(); new Thread(() -> paralleTest()).start(); new Thread(() -> paralleTest()).start(); new Thread(() -> paralleTest()).start(); new Thread(() -> paralleTest()).start(); new Thread(() -> paralleTest()).start(); new Thread(() -> paralleTest()).start(); }
pit
Tatsächlich dauert dieser Code unterschiedlich lange, wenn er auf verschiedenen Maschinen ausgeführt wird.Da es parallel ist, muss ein gewisses Maß an Parallelität vorhanden sein. Wenn der Grad der Parallelität zu niedrig ist, können seine Fähigkeiten nicht genutzt werden; ist der Grad der Parallelität zu hoch, wird Zeit für den Kontextwechsel verschwendet. Ich war sehr frustriert, als ich feststellte, dass viele erfahrene Entwickler, die die verschiedenen Parameter des Thread-Pools und alle Arten von Optimierungen auswendig kennen, es wagen, ein Auge zuzudrücken und parallelStream in E/A-intensiven Unternehmen zu verwenden.
Um diesen Grad der Parallelität zu verstehen, müssen wir uns die spezifische Konstruktionsmethode ansehen. Finden Sie Code wie diesen in der ForkJoinPool-Klasse.try { // ignore exceptions in accessing/parsing properties String pp = System.getProperty ("java.util.concurrent.ForkJoinPool.common.parallelism"); if (pp != null) parallelism = Integer.parseInt(pp); fac = (ForkJoinWorkerThreadFactory) newInstanceFromSystemProperty( "java.util.concurrent.ForkJoinPool.common.threadFactory"); handler = (UncaughtExceptionHandler) newInstanceFromSystemProperty( "java.util.concurrent.ForkJoinPool.common.exceptionHandler"); } catch (Exception ignore) { } if (fac == null) { if (System.getSecurityManager() == null) fac = defaultForkJoinWorkerThreadFactory; else // use security-managed default fac = new InnocuousForkJoinWorkerThreadFactory(); } if (parallelism < 0 && // default 1 less than #cores (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0) parallelism = 1; if (parallelism > MAX_CAP) parallelism = MAX_CAP;
-Djava.util.concurrent.ForkJoinPool.common.parallelism=N
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
Eine Lösung
Wir können verschiedene Arten der Aufgabentrennung erreichen, indem wir einen externen Forkjoinpool bereitstellen, also die Übermittlungsmethode ändern.Der Code ist wie unten gezeigt und kann durch explizite Codeübermittlung erreicht werden.
ForkJoinPool pool = new ForkJoinPool(30); final long begin = System.currentTimeMillis(); try { pool.submit(() -> numbers.parallelStream().map(k -> { try { Thread.sleep(1000); System.out.println((System.currentTimeMillis() - begin) + "ms => " + k + " \t" + Thread.currentThread()); } catch (InterruptedException e) { e.printStackTrace(); } return k; }).collect(Collectors.toList())).get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); }
Das obige ist der detaillierte Inhalt vonSo lösen Sie die Fallstricke bei der Verwendung von ParallelStream. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!