Home > Java > javaTutorial > body text

A detailed introduction to the summary and thinking of Java concurrent programming

黄舟
Release: 2017-03-20 10:41:10
Original
1574 people have browsed it

Writing high-quality concurrent code is an extremely difficult task. The Java language has built-in support for multi-threading since the first version, which was very remarkable at the time. However, when we have a deeper understanding and more practice of concurrencyprogramming, the implementation Concurrent programming has more solutions and better choices. This article is a summary and reflection on concurrent programming. It also shares some experience on how to write concurrent code in Java 5 and later versions.

Why concurrency is needed

Concurrency is actually a decoupling strategy, which helps us separate what to do (goal) and when to do it (timing). Doing so can significantly improve the application's throughput (more CPU scheduling time) and structure (the program has multiple parts working together). Anyone who has done Java Web development knows that the Servlet program in Java Web adopts a single-instance multi-thread working mode with the support of the Servlet container. The Servlet container handles concurrency issues for you.

Misunderstandings and Answers

The most common misunderstandings about concurrent programming are the following:

-Concurrency always improves performance (concurrency has a lot of idle time on the CPU) can significantly improve the performance of the program, but when the number of threads is large, frequent scheduling switching between threads will degrade the performance of the system)

-Writing concurrent programs does not require modifying the original design (the purpose is the same as Timing of decoupling often has a huge impact on the system structure)

-You don’t need to pay attention to concurrency issues when using Web or EJB containers (only by understanding what the container is doing can you use the container better)

The following statements are an objective understanding of concurrency:

-Writing concurrent programs will add additional overhead to the code

-Correct concurrency It is very complex, even for very simple problems

-Defects in concurrency are not easy to find because they are not easy to reproduce

-Concurrency often requires fundamental modifications to the design strategy

Principles and techniques of concurrent programming

Single responsibility principle

Separate concurrency-related code and other code (concurrency-related code has its own development, modification and tuning lifecycle).

Limit data scope

Two threads may interfere with each other when modifying the same field of a shared object, resulting in unpredictable behavior. One solution is to construct a critical areas, but the number of critical areas must be limited.

Use data copies

Data copies are a good way to avoid sharing data. The copied objects are only treated in a read-only manner. A class named CopyOnWriteArrayList was added to the java.util.concurrent package of Java 5. It is a subtype of the List interface, so you can think of it as a thread-safe version of ArrayList. It uses copy-on-write to create a copy of the data. Perform operations to avoid problems caused by concurrent access to shared data.

Threads should be as independent as possible

Let threads exist in their own world and not share data with other threads. Anyone who has experience in Java Web development knows that Servlet works in a single instance and multi-threaded manner. The data related to each request is passed in through the parameters of the service method (or doGet or doPost method) of the Servlet subclass. of. As long as the code in the servlet only uses local variables, the servlet will not cause synchronization problems. The springMVC controller does the same thing. The objects obtained from the request are passed in as method parameters rather than as members of the class. Obviously, Struts 2 does exactly the opposite, so the Action class as the controller in Struts 2 Each request corresponds to an instance.

Concurrent programming before Java 5

Java’s thread model is based on preemptive thread scheduling, that is to say:

  • All threads can easily share objects in the same process.

  • Any thread that has a reference to these objects can modify these objects.

  • To protect data, objects can be locked.

Java's concurrency based on threads and locks is too low-level, and using locks is often very evil, because it is equivalent to turning all concurrency into queue waiting.

Before Java 5, the synchronized keyword can be used to implement the lock function. It can be used in code blocks and methods, indicating that the thread must obtain the appropriate lock before executing the entire code block or method. For non-static methods (member methods) of a class, this means obtaining the lock of the object instance. For static methods (class methods) of the class, this means obtaining the lock of the Class object of the class. For synchronized code blocks, ProgrammerYou can specify the lock of that object to be obtained.

Whether it is a synchronized code block or a synchronized method, only one thread can enter at a time. If other threads try to enter (whether it is the same synchronized block or a different synchronized block), the JVM will suspend them (put them in the wait lock) in the pool). This structure is called a critical section in concurrency theory. Here we can make a summary of the functions of synchronization and locking using synchronized in Java:

  • can only lock objects, not basic data types

  • A single object in the locked object array will not be locked

  • A synchronized method can be regarded as a synchronized(this) { … } code block that contains the entire method

  • The static synchronization method will lock its Class object

  • The synchronization of the inner class is independent of the outer class

  • The synchronized modifier is not part of the method signature, so it cannot appear in the method declaration of the interface

  • Asynchronous methods do not care about the state of the lock, they are in Synchronized methods can still run while they are running.

  • The lock implemented by synchronized is a reentrant lock.

Inside the JVM, in order to improve efficiency, each thread running at the same time will have a cached copy of the data it is processing. When we use synchronzied for synchronization, what is actually synchronized It is a memory block that represents the locked object in different threads (the copy data will remain synchronized with the main memory. Now you know why the word synchronization is used). Simply put, after the synchronization block or synchronization method is executed, the Any modifications made to the locked object must be written back to the main memory before releasing the lock; after entering the synchronization block to obtain the lock, the data of the locked object is read from the main memory, and a copy of the data of the thread holding the lock is It must be synchronized with the data view in main memory.

In the original version of Java, there was a keyword called Volatile, which is a simple synchronization processing mechanism, because variables modified by volatile follow the following rules:

  • The value of a variable is always read from the main memory before use.

  • Modifications to variable values ​​will always be written back to main memory after completion.

Using the volatile keyword can prevent the compiler from incorrect optimization assumptions in a multi-threaded environment (the compiler may optimize variables whose values ​​will not change in one thread into constants) ), but only variables that when modified do not depend on the current state (value when read) should be declared volatile.

The immutable mode is also a design that can be considered during concurrent programming. Let the object's state remain unchanged. If you want to modify the object's state, you will create a copy of the object and write the changes to the copy without changing the original object, so that there will be no state inconsistency. , so immutable objects are thread-safe. The String class that we use very frequently in Java adopts this design. If you are not familiar with immutable patterns, you can read Chapter 34 of Dr. Yan Hong's book "Java and Patterns". At this point you may also realize the importance of the final keyword.

Concurrent Programming in Java 5

No matter which direction Java develops or dies in the future, Java 5 is definitely an extremely important version in the history of Java development. This version provides various languages We will not discuss the features here (if you are interested, you can read my other article "The 20th Year of Java: Looking at the Development of Programming Technology from the Evolution of Java Versions"), but we must thank Doug Lea for providing him in Java 5 The landmark masterpiece java.util.concurrent package, its emergence gives Java concurrent programming more choices and better ways of working. Doug Lea's masterpieces mainly include the following content:

  • Better thread-safe containers

  • Thread pools and related tool classes

  • Optional non-blocking solution

  • Exhibited lock and semaphore mechanism

Below we will discuss Explain these things one by one.

Atomic class

There is an atomic sub-package under the java.util.concurrent package in Java 5, which has several classes starting with Atomic, such as AtomicInteger and AtomicLong. They take advantage of the characteristics of modern processors and can complete atomic operations in a non-blocking manner. The code is as follows:

/**
 ID序列生成器
*/
public class IdGenerator {
    private final AtomicLong sequenceNumber = new AtomicLong(0);

    public long next() {
        return sequenceNumber.getAndIncrement(); 
    }
}
Copy after login

Display lock

The lock mechanism based on the synchronized keyword has the following problems:

  • There is only one type of lock, and it has the same effect on all synchronization operations

  • The lock can only be placed at the beginning of a code block or method Acquire at the place and release at the end

  • The thread either gets the lock or is blocked, there is no other possibility

Java 5 locking mechanism We have reconstructed and provided a displayed lock, which can improve the locking mechanism in the following aspects:

  • 可以添加不同类型的锁,例如读取锁和写入锁

  • 可以在一个方法中加锁,在另一个方法中解锁

  • 可以使用tryLock方式尝试获得锁,如果得不到锁可以等待、回退或者干点别的事情,当然也可以在超时之后放弃操作

显示的锁都实现了java.util.concurrent.Lock接口,主要有两个实现类:

  • ReentrantLock – 比synchronized稍微灵活一些的重入锁

  • ReentrantReadWriteLock – 在读操作很多写操作很少时性能更好的一种重入锁

对于如何使用显示锁,可以参考我的Java面试系列文章《Java面试题集51-70》中第60题的代码。只有一点需要提醒,解锁的方法unlock的调用最好能够在finally块中,因为这里是释放外部资源最好的地方,当然也是释放锁的最佳位置,因为不管正常异常可能都要释放掉锁来给其他线程以运行的机会。

CountDownLatch

CountDownLatch是一种简单的同步模式,它让一个线程可以等待一个或多个线程完成它们的工作从而避免对临界资源并发访问所引发的各种问题。下面借用别人的一段代码(我对它做了一些重构)来演示CountDownLatch是如何工作的。

import java.util.concurrent.CountDownLatch;

/**
 * 工人类
 * @author 骆昊
 *
 */
class Worker {
    private String name;        // 名字
    private long workDuration;  // 工作持续时间

    /**
     * 构造器
     */
    public Worker(String name, long workDuration) {
        this.name = name;
        this.workDuration = workDuration;
    }

    /**
     * 完成工作
     */
    public void doWork() {
        System.out.println(name + " begins to work...");
        try {
            Thread.sleep(workDuration); // 用休眠模拟工作执行的时间
        } catch(InterruptedException ex) {
            ex.printStackTrace();
        }
        System.out.println(name + " has finished the job...");
    }
}

/**
 * 测试线程
 * @author 骆昊
 *
 */
class WorkerTestThread implements Runnable {
    private Worker worker;
    private CountDownLatch cdLatch;

    public WorkerTestThread(Worker worker, CountDownLatch cdLatch) {
        this.worker = worker;
        this.cdLatch = cdLatch;
    }

    @Override
    public void run() {
        worker.doWork();        // 让工人开始工作
        cdLatch.countDown();    // 工作完成后倒计时次数减1
    }
}

class CountDownLatchTest {

    private static final int MAX_WORK_DURATION = 5000;  // 最大工作时间
    private static final int MIN_WORK_DURATION = 1000;  // 最小工作时间

    // 产生随机的工作时间
    private static long getRandomWorkDuration(long min, long max) {
        return (long) (Math.random() * (max - min) + min);
    }

    public static void main(String[] args) {
        CountDownLatch latch = new CountDownLatch(2);   // 创建倒计时闩并指定倒计时次数为2
        Worker w1 = new Worker("骆昊", getRandomWorkDuration(MIN_WORK_DURATION, MAX_WORK_DURATION));
        Worker w2 = new Worker("王大锤", getRandomWorkDuration(MIN_WORK_DURATION, MAX_WORK_DURATION));

        new Thread(new WorkerTestThread(w1, latch)).start();
        new Thread(new WorkerTestThread(w2, latch)).start();

        try {
            latch.await();  // 等待倒计时闩减到0
            System.out.println("All jobs have been finished!");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
Copy after login

ConcurrentHashMap

ConcurrentHashMap是HashMap在并发环境下的版本,大家可能要问,既然已经可以通过Collections.synchronizedMap获得线程安全的映射型容器,为什么还需要ConcurrentHashMap呢?因为通过Collections工具类获得的线程安全的HashMap会在读写数据时对整个容器对象上锁,这样其他使用该容器的线程无论如何也无法再获得该对象的锁,也就意味着要一直等待前一个获得锁的线程离开同步代码块之后才有机会执行。实际上,HashMap是通过哈希函数来确定存放键值对的桶(桶是为了解决哈希冲突而引入的),修改HashMap时并不需要将整个容器锁住,只需要锁住即将修改的“桶”就可以了。HashMap的数据结构如下图所示。

A detailed introduction to the summary and thinking of Java concurrent programming

此外,ConcurrentHashMap还提供了原子操作的方法,如下所示:

  • putIfAbsent:如果还没有对应的键值对映射,就将其添加到HashMap中。

  • remove:如果键存在而且值与当前状态相等(equals比较结果为true),则用原子方式移除该键值对映射

  • replace:替换掉映射中元素的原子操作

CopyOnWriteArrayList

CopyOnWriteArrayList是ArrayList在并发环境下的替代品。CopyOnWriteArrayList通过增加写时复制语义来避免并发访问引起的问题,也就是说任何修改操作都会在底层创建一个列表的副本,也就意味着之前已有的迭代器不会碰到意料之外的修改。这种方式对于不要严格读写同步的场景非常有用,因为它提供了更好的性能。记住,要尽量减少锁的使用,因为那势必带来性能的下降(对数据库中数据的并发访问不也是如此吗?如果可以的话就应该放弃悲观锁而使用乐观锁),CopyOnWriteArrayList很明显也是通过牺牲空间获得了时间(在计算机的世界里,时间和空间通常是不可调和的矛盾,可以牺牲空间来提升效率获得时间,当然也可以通过牺牲时间来减少对空间的使用)。

A detailed introduction to the summary and thinking of Java concurrent programming

可以通过下面两段代码的运行状况来验证一下CopyOnWriteArrayList是不是线程安全的容器。

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

class AddThread implements Runnable {
    private List<Double> list;

    public AddThread(List<Double> list) {
        this.list = list;
    }

    @Override
    public void run() {
        for(int i = 0; i < 10000; ++i) {
            list.add(Math.random());
        }
    }
}

public class Test05 {
    private static final int THREAD_POOL_SIZE = 2;

    public static void main(String[] args) {
        List<Double> list = new ArrayList<>();
        ExecutorService es = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
        es.execute(new AddThread(list));
        es.execute(new AddThread(list));
        es.shutdown();
    }
}
Copy after login

上面的代码会在运行时产生ArrayIndexOutOfBoundsException,试一试将上面代码25行的ArrayList换成CopyOnWriteArrayList再重新运行。

List<Double> list = new CopyOnWriteArrayList<>();
Copy after login

Queue

队列是一个无处不在的美妙概念,它提供了一种简单又可靠的方式将资源分发给处理单元(也可以说是将工作单元分配给待处理的资源,这取决于你看待问题的方式)。实现中的并发编程模型很多都依赖队列来实现,因为它可以在线程之间传递工作单元。

Java 5中的BlockingQueue就是一个在并发环境下非常好用的工具,在调用put方法向队列中插入元素时,如果队列已满,它会让插入元素的线程等待队列腾出空间;在调用take方法从队列中取元素时,如果队列为空,取出元素的线程就会阻塞。

A detailed introduction to the summary and thinking of Java concurrent programming

可以用BlockingQueue来实现生产者-消费者并发模型(下一节中有介绍),当然在Java 5以前也可以通过wait和notify来实现线程调度,比较一下两种代码就知道基于已有的并发工具类来重构并发代码到底好在哪里了。

基于wait和notify的实现

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 公共常量
 * @author 骆昊
 *
 */
class Constants {
    public static final int MAX_BUFFER_SIZE = 10;
    public static final int NUM_OF_PRODUCER = 2;
    public static final int NUM_OF_CONSUMER = 3;
}

/**
 * 工作任务
 * @author 骆昊
 *
 */
class Task {
    private String id;  // 任务的编号

    public Task() {
        id = UUID.randomUUID().toString();
    }

    @Override
    public String toString() {
        return "Task[" + id + "]";
    }
}

/**
 * 消费者
 * @author 骆昊
 *
 */
class Consumer implements Runnable {
    private List<Task> buffer;

    public Consumer(List<Task> buffer) {
        this.buffer = buffer;
    }

    @Override
    public void run() {
        while(true) {
            synchronized(buffer) {
                while(buffer.isEmpty()) {
                    try {
                        buffer.wait();
                    } catch(InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                Task task = buffer.remove(0);
                buffer.notifyAll();
                System.out.println("Consumer[" + Thread.currentThread().getName() + "] got " + task);
            }
        }
    }
}

/**
 * 生产者
 * @author 骆昊
 *
 */
class Producer implements Runnable {
    private List<Task> buffer;

    public Producer(List<Task> buffer) {
        this.buffer = buffer;
    }

    @Override
    public void run() {
        while(true) {
            synchronized (buffer) {
                while(buffer.size() >= Constants.MAX_BUFFER_SIZE) {
                    try {
                        buffer.wait();
                    } catch(InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                Task task = new Task();
                buffer.add(task);
                buffer.notifyAll();
                System.out.println("Producer[" + Thread.currentThread().getName() + "] put " + task);
            }
        }
    }

}

public class Test06 {

    public static void main(String[] args) {
        List<Task> buffer = new ArrayList<>(Constants.MAX_BUFFER_SIZE);
        ExecutorService es = Executors.newFixedThreadPool(Constants.NUM_OF_CONSUMER + Constants.NUM_OF_PRODUCER);
        for(int i = 1; i <= Constants.NUM_OF_PRODUCER; ++i) {
            es.execute(new Producer(buffer));
        }
        for(int i = 1; i <= Constants.NUM_OF_CONSUMER; ++i) {
            es.execute(new Consumer(buffer));
        }
    }
}
Copy after login

基于BlockingQueue的实现

import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * 公共常量
 * @author 骆昊
 *
 */
class Constants {
    public static final int MAX_BUFFER_SIZE = 10;
    public static final int NUM_OF_PRODUCER = 2;
    public static final int NUM_OF_CONSUMER = 3;
}

/**
 * 工作任务
 * @author 骆昊
 *
 */
class Task {
    private String id;  // 任务的编号

    public Task() {
        id = UUID.randomUUID().toString();
    }

    @Override
    public String toString() {
        return "Task[" + id + "]";
    }
}

/**
 * 消费者
 * @author 骆昊
 *
 */
class Consumer implements Runnable {
    private BlockingQueue<Task> buffer;

    public Consumer(BlockingQueue<Task> buffer) {
        this.buffer = buffer;
    }

    @Override
    public void run() {
        while(true) {
            try {
                Task task = buffer.take();
                System.out.println("Consumer[" + Thread.currentThread().getName() + "] got " + task);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

/**
 * 生产者
 * @author 骆昊
 *
 */
class Producer implements Runnable {
    private BlockingQueue<Task> buffer;

    public Producer(BlockingQueue<Task> buffer) {
        this.buffer = buffer;
    }

    @Override
    public void run() {
        while(true) {
            try {
                Task task = new Task();
                buffer.put(task);
                System.out.println("Producer[" + Thread.currentThread().getName() + "] put " + task);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    }

}

public class Test07 {

    public static void main(String[] args) {
        BlockingQueue<Task> buffer = new LinkedBlockingQueue<>(Constants.MAX_BUFFER_SIZE);
        ExecutorService es = Executors.newFixedThreadPool(Constants.NUM_OF_CONSUMER + Constants.NUM_OF_PRODUCER);
        for(int i = 1; i <= Constants.NUM_OF_PRODUCER; ++i) {
            es.execute(new Producer(buffer));
        }
        for(int i = 1; i <= Constants.NUM_OF_CONSUMER; ++i) {
            es.execute(new Consumer(buffer));
        }
    }
}
Copy after login

使用BlockingQueue后代码优雅了很多。

并发模型

在继续下面的探讨之前,我们还是重温一下几个概念:

概念解释
临界资源并发环境中有着固定数量的资源
互斥对资源的访问是排他式的
饥饿一个或一组线程长时间或永远无法取得进展
死锁两个或多个线程相互等待对方结束
活锁想要执行的线程总是发现其他的线程正在执行以至于长时间或永远无法执行

重温了这几个概念后,我们可以探讨一下下面的几种并发模型。

生产者-消费者

一个或多个生产者创建某些工作并将其置于缓冲区或队列中,一个或多个消费者会从队列中获得这些工作并完成之。这里的缓冲区或队列是临界资源。当缓冲区或队列放满的时候,生产这会被阻塞;而缓冲区或队列为空的时候,消费者会被阻塞。生产者和消费者的调度是通过二者相互交换信号完成的。

读者-写者

当存在一个主要为读者提供信息的共享资源,它偶尔会被写者更新,但是需要考虑系统的吞吐量,又要防止饥饿和陈旧资源得不到更新的问题。在这种并发模型中,如何平衡读者和写者是最困难的,当然这个问题至今还是一个被热议的问题,恐怕必须根据具体的场景来提供合适的解决方案而没有那种放之四海而皆准的方法(不像我在国内的科研文献中看到的那样)。

哲学家进餐

1965年,荷兰计算机科学家图灵奖得主Edsger Wybe Dijkstra提出并解决了一个他称之为哲学家进餐的同步问题。这个问题可以简单地描述如下:五个哲学家围坐在一张圆桌周围,每个哲学家面前都有一盘通心粉。由于通心粉很滑,所以需要两把叉子才能夹住。相邻两个盘子之间放有一把叉子如下图所示。哲学家的生活中有两种交替活动时段:即吃饭和思考。当一个哲学家觉得饿了时,他就试图分两次去取其左边和右边的叉子,每次拿一把,但不分次序。如果成功地得到了两把叉子,就开始吃饭,吃完后放下叉子继续思考。

把上面问题中的哲学家换成线程,把叉子换成竞争的临界资源,上面的问题就是线程竞争资源的问题。如果没有经过精心的设计,系统就会出现死锁、活锁、吞吐量下降等问题。

A detailed introduction to the summary and thinking of Java concurrent programming

下面是用信号量原语来解决哲学家进餐问题的代码,使用了Java 5并发工具包中的Semaphore类(代码不够漂亮但是已经足以说明问题了)。

//import java.util.concurrent.ExecutorService;
//import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

/**
 * 存放线程共享信号量的上下问
 * @author 骆昊
 *
 */
class AppContext {
    public static final int NUM_OF_FORKS = 5;   // 叉子数量(资源)
    public static final int NUM_OF_PHILO = 5;   // 哲学家数量(线程)

    public static Semaphore[] forks;    // 叉子的信号量
    public static Semaphore counter;    // 哲学家的信号量

    static {
        forks = new Semaphore[NUM_OF_FORKS];

        for (int i = 0, len = forks.length; i < len; ++i) {
            forks[i] = new Semaphore(1);    // 每个叉子的信号量为1
        }

        counter = new Semaphore(NUM_OF_PHILO - 1);  // 如果有N个哲学家,最多只允许N-1人同时取叉子
    }

    /**
     * 取得叉子
     * @param index 第几个哲学家
     * @param leftFirst 是否先取得左边的叉子
     * @throws InterruptedException
     */
    public static void putOnFork(int index, boolean leftFirst) throws InterruptedException {
        if(leftFirst) {
            forks[index].acquire();
            forks[(index + 1) % NUM_OF_PHILO].acquire();
        }
        else {
            forks[(index + 1) % NUM_OF_PHILO].acquire();
            forks[index].acquire();
        }
    }

    /**
     * 放回叉子
     * @param index 第几个哲学家
     * @param leftFirst 是否先放回左边的叉子
     * @throws InterruptedException
     */
    public static void putDownFork(int index, boolean leftFirst) throws InterruptedException {
        if(leftFirst) {
            forks[index].release();
            forks[(index + 1) % NUM_OF_PHILO].release();
        }
        else {
            forks[(index + 1) % NUM_OF_PHILO].release();
            forks[index].release();
        }
    }
}

/**
 * 哲学家
 * @author 骆昊
 *
 */
class Philosopher implements Runnable {
    private int index;      // 编号
    private String name;    // 名字

    public Philosopher(int index, String name) {
        this.index = index;
        this.name = name;
    }

    @Override
    public void run() {
        while(true) {
            try {
                AppContext.counter.acquire();
                boolean leftFirst = index % 2 == 0;
                AppContext.putOnFork(index, leftFirst);
                System.out.println(name + "正在吃意大利面(通心粉)...");   // 取到两个叉子就可以进食
                AppContext.putDownFork(index, leftFirst);
                AppContext.counter.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

public class Test04 {

    public static void main(String[] args) {
        String[] names = { "骆昊", "王大锤", "张三丰", "杨过", "李莫愁" };   // 5位哲学家的名字
//      ExecutorService es = Executors.newFixedThreadPool(AppContext.NUM_OF_PHILO); // 创建固定大小的线程池
//      for(int i = 0, len = names.length; i < len; ++i) {
//          es.execute(new Philosopher(i, names[i]));   // 启动线程
//      }
//      es.shutdown();
        for(int i = 0, len = names.length; i < len; ++i) {
            new Thread(new Philosopher(i, names[i])).start();
        }
    }

}
Copy after login

现实中的并发问题基本上都是这三种模型或者是这三种模型的变体。

测试并发代码

对并发代码的测试也是非常棘手的事情,棘手到无需说明大家也很清楚的程度,所以这里我们只是探讨一下如何解决这个棘手的问题。我们建议大家编写一些能够发现问题的测试并经常性的在不同的配置和不同的负载下运行这些测试。不要忽略掉任何一次失败的测试,线程代码中的缺陷可能在上万次测试中仅仅出现一次。具体来说有这么几个注意事项

  • 不要将系统的失效归结于偶发事件,就像拉不出屎的时候不能怪地球没有引力。

  • 先让非并发代码工作起来,不要试图同时找到并发和非并发代码中的缺陷。

  • 编写可以在不同配置环境下运行的线程代码。

  • 编写容易调整的线程代码,这样可以调整线程使性能达到最优。

  • 让线程的数量多于CPU或CPU核心的数量,这样CPU调度切换过程中潜在的问题才会暴露出来。

  • 让并发代码在不同的平台上运行。

  • 通过自动化或者硬编码的方式向并发代码中加入一些辅助测试的代码。

Java 7的并发编程

Java 7中引入了TransferQueue,它比BlockingQueue多了一个叫transfer的方法,如果接收线程处于等待状态,该操作可以马上将任务交给它,否则就会阻塞直至取走该任务的线程出现。可以用TransferQueue代替BlockingQueue,因为它可以获得更好的性能。

刚才忘记了一件事情,Java 5中还引入了Callable接口、Future接口和FutureTask接口,通过他们也可以构建并发应用程序,代码如下所示。

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class Test07 {
    private static final int POOL_SIZE = 10;

    static class CalcThread implements Callable<Double> {
        private List<Double> dataList = new ArrayList<>();

        public CalcThread() {
            for(int i = 0; i < 10000; ++i) {
                dataList.add(Math.random());
            }
        }

        @Override
        public Double call() throws Exception {
            double total = 0;
            for(Double d : dataList) {
                total += d;
            }
            return total / dataList.size();
        }

    }

    public static void main(String[] args) {
        List<Future<Double>> fList = new ArrayList<>();
        ExecutorService es = Executors.newFixedThreadPool(POOL_SIZE);
        for(int i = 0; i < POOL_SIZE; ++i) {
            fList.add(es.submit(new CalcThread()));
        }

        for(Future<Double> f : fList) {
            try {
                System.out.println(f.get());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        es.shutdown();
    }
}
Copy after login

Callable接口也是一个单方法接口,显然这是一个回调方法,类似于函数式编程中的回调函数,在Java 8 以前,Java中还不能使用Lambda表达式来简化这种函数式编程。和Runnable接口不同的是Callable接口的回调方法call方法会返回一个对象,这个对象可以用将来时的方式在线程执行结束的时候获得信息。上面代码中的call方法就是将计算出的10000个0到1之间的随机小数的平均值返回,我们通过一个Future接口的对象得到了这个返回值。目前最新的Java版本中,Callable接口和Runnable接口都被打上了@FunctionalInterface的注解,也就是说它可以用函数式编程的方式(Lambda表达式)创建接口对象。

下面是Future接口的主要方法:

  • get():获取结果。如果结果还没有准备好,get方法会阻塞直到取得结果;当然也可以通过参数设置阻塞超时时间。

  • cancel():在运算结束前取消。

  • isDone():可以用来判断运算是否结束。

Java 7中还提供了分支/合并(fork/join)框架,它可以实现线程池中任务的自动调度,并且这种调度对用户来说是透明的。为了达到这种效果,必须按照用户指定的方式对任务进行分解,然后再将分解出的小型任务的执行结果合并成原来任务的执行结果。这显然是运用了分治法(pide-and-conquer)的思想。下面的代码使用了分支/合并框架来计算1到10000的和,当然对于如此简单的任务根本不需要分支/合并框架,因为分支和合并本身也会带来一定的开销,但是这里我们只是探索一下在代码中如何使用分支/合并框架,让我们的代码能够充分利用现代多核CPU的强大运算能力。

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;

class Calculator extends RecursiveTask<Integer> {
    private static final long serialVersionUID = 7333472779649130114L;

    private static final int THRESHOLD = 10;
    private int start;
    private int end;

    public Calculator(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    public Integer compute() {
        int sum = 0;
        if ((end - start) < THRESHOLD) {    // 当问题分解到可求解程度时直接计算结果
            for (int i = start; i <= end; i++) {
                sum += i;
            }
        } else {
            int middle = (start + end) >>> 1;
            // 将任务一分为二
            Calculator left = new Calculator(start, middle);
            Calculator right = new Calculator(middle + 1, end);
            left.fork();
            right.fork();
            // 注意:由于此处是递归式的任务分解,也就意味着接下来会二分为四,四分为八...

            sum = left.join() + right.join();   // 合并两个子任务的结果
        }
        return sum;
    }

}

public class Test08 {

    public static void main(String[] args) throws Exception {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        Future<Integer> result = forkJoinPool.submit(new Calculator(1, 10000));
        System.out.println(result.get());
    }
}
Copy after login

伴随着Java 7的到来,Java中默认的数组排序算法已经不再是经典的快速排序(双枢轴快速排序)了,新的排序算法叫TimSort,它是归并排序和插入排序的混合体,TimSort可以通过分支合并框架充分利用现代处理器的多核特性,从而获得更好的性能(更短的排序时间)。

The above is the detailed content of A detailed introduction to the summary and thinking of Java concurrent programming. For more information, please follow other related articles on the PHP Chinese website!

Related labels:
source:php.cn
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template
About us Disclaimer Sitemap
php.cn:Public welfare online PHP training,Help PHP learners grow quickly!