Home > Operation and Maintenance > Safety > How to solve the pitfalls of using ParallelStream

How to solve the pitfalls of using ParallelStream

PHPz
Release: 2023-05-21 21:08:21
forward
1973 people have browsed it

For example, the following code snippet makes people read it like reading poetry. But if used incorrectly, it can be fatal.

List<Integer> transactionsIds = widgets.stream()              .filter(b -> b.getColor() == RED)              .sorted((x,y) -> x.getWeight() - y.getWeight())              .mapToInt(Widget::getWeight)              .sum();
Copy after login

This code has a key function, which is stream. Using it, we can convert an ordinary list into a stream, and then use pipelines to process the list. All in all, everything I’ve used says it’s good.

Not too familiar with these functions? You can refer to: "Map and flatMap are everywhere, what do they mean?"

Here comes the question

If What will happen if we replace stream with parallelStream?

According to the literal meaning, the stream will change from serial to parallel.

Considering that this is a parallel situation, it is obvious that there will be thread safety issues. However, what we are discussing here is not the need to use thread-safe collections, as this topic is too basic. In this day and age, learning to use thread-safe collections in thread-unsafe situations has become an essential skill.

The pitfall this time is the performance issue of parallel streaming.

We let code speak.

The following code enables 8 threads at the same time, and all threads are using parallel streams for data calculation. In the execution logic, we let each task sleep for 1 second, so that we can simulate the time-consuming waiting of some I/O requests.

Using stream, the program will return after 30 seconds, but we expect the program to return in more than 1 second, because it is a parallel stream and deserves this title.

The test found that we waited for a long time before the task was completed.

static void paralleTest() {     List<Integer> numbers = Arrays.asList(             0, 1, 2, 3, 4, 5, 6, 7, 8, 9,             10, 11, 12, 13, 14, 15, 16, 17, 18, 19,             20, 21, 22, 23, 24, 25, 26, 27, 28, 29     );     final long begin = System.currentTimeMillis();     numbers.parallelStream().map(k -> {         try {             Thread.sleep(1000);             System.out.println((System.currentTimeMillis() - begin) + "ms => " + k + " \t" + Thread.currentThread());         } catch (InterruptedException e) {             e.printStackTrace();         }         return k;     }).collect(Collectors.toList()); }  public static void main(String[] args) { //    System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");     new Thread(() -> paralleTest()).start();     new Thread(() -> paralleTest()).start();     new Thread(() -> paralleTest()).start();     new Thread(() -> paralleTest()).start();     new Thread(() -> paralleTest()).start();     new Thread(() -> paralleTest()).start();     new Thread(() -> paralleTest()).start();     new Thread(() -> paralleTest()).start(); }
Copy after login

pit

In fact, this code takes different times when executed on different machines.

Since it is parallelism, there must be a degree of parallelism. If the degree of parallelism is too low, its capabilities will not be exerted; if the degree of parallelism is too high, context switching time will be wasted. I was very frustrated to find that many senior developers, who know the various parameters of the thread pool by heart, and various tunings, dare to turn a blind eye and use parallelStream in I/O-intensive business.

To understand this degree of parallelism, we need to look at the specific construction method. Find code like this in the ForkJoinPool class.

try {  // ignore exceptions in accessing/parsing properties     String pp = System.getProperty         ("java.util.concurrent.ForkJoinPool.common.parallelism");     if (pp != null)         parallelism = Integer.parseInt(pp);     fac = (ForkJoinWorkerThreadFactory) newInstanceFromSystemProperty(         "java.util.concurrent.ForkJoinPool.common.threadFactory");     handler = (UncaughtExceptionHandler) newInstanceFromSystemProperty(         "java.util.concurrent.ForkJoinPool.common.exceptionHandler"); } catch (Exception ignore) { }  if (fac == null) {     if (System.getSecurityManager() == null)         fac = defaultForkJoinWorkerThreadFactory;     else // use security-managed default         fac = new InnocuousForkJoinWorkerThreadFactory(); } if (parallelism < 0 && // default 1 less than #cores     (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)     parallelism = 1; if (parallelism > MAX_CAP)     parallelism = MAX_CAP;
Copy after login

As you can see, the degree of parallelism is controlled by the following parameters. If this parameter cannot be obtained, the parallelism of the number of CPUs - 1 will be used by default.

As you can see, this function is designed for computing-intensive business. When you assign too many tasks to it, its parallel execution degrades to a serial-like effect.

-Djava.util.concurrent.ForkJoinPool.common.parallelism=N
Copy after login

Even if you set an initial size using -Djava.util.concurrent.ForkJoinPool.common.parallelism=N, it still has problems.

Once set, the parallelism variable is set to final and modification is prohibited. In other words, the above parameters will only take effect once.

Zhang San may use the following code and set the parallelism size to 20.

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
Copy after login

Li Si may have set this value to 30 in the same way. To determine which value is used in your project, you need to ask the JVM how to load the class information.

This method is not very reliable.

A solution

We can achieve different types of task separation by providing an external forkjoinpool, that is, changing the submission method.

The code is as follows. Task separation can be achieved through explicit code submission.

ForkJoinPool pool = new ForkJoinPool(30);  final long begin = System.currentTimeMillis(); try {     pool.submit(() ->             numbers.parallelStream().map(k -> {                 try {                     Thread.sleep(1000);                     System.out.println((System.currentTimeMillis() - begin) + "ms => " + k + " \t" + Thread.currentThread());                 } catch (InterruptedException e) {                     e.printStackTrace();                 }                 return k;             }).collect(Collectors.toList())).get(); } catch (InterruptedException e) {     e.printStackTrace(); } catch (ExecutionException e) {     e.printStackTrace(); }
Copy after login

In this way, different scenarios can have different degrees of parallelism. There are similarities between manual resource management in this way and CountDownLatch, they have the same purpose.

The above is the detailed content of How to solve the pitfalls of using ParallelStream. For more information, please follow other related articles on the PHP Chinese website!

Related labels:
source:yisu.com
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template