Just like procedural programming and object-oriented programming, a good programming model needs to have an extremely simple core and rich extensions on top of it, which can solve various problems in the real world. All kinds of problems. This article takes GO language as an example to explain the core and extension.
Concurrent mode kernel
This concurrent mode kernel only requires coroutines and channels. The coroutine is responsible for executing code, and the channel is responsible for delivering events between coroutines.
#Concurrent programming has always been a very difficult job. In order to write a good concurrent program, we have to understand threads, locks, semaphore, barriers and even the way the CPU updates the cache, and they all have weird tempers and are full of traps. The author will never operate these underlying concurrent elements myself unless absolutely necessary. A concise concurrency pattern does not require these complex low-level elements, only coroutines and channels are enough.
Coroutines are lightweight threads. In procedural programming, when you call a procedure, you need to wait for it to finish executing before returning. When calling a coroutine, there is no need to wait for it to complete execution, it will return immediately.
Coroutines are very lightweight. Go language can execute hundreds of thousands of coroutines in one process and still maintain high performance. For ordinary platforms, if a process has thousands of threads, its CPU will be busy with context switching, and the performance will drop sharply. It's not a good idea to create threads randomly, but we can use coroutines a lot.
Channel is the data transmission channel between coroutines. Channels can pass data between many coroutines, which can be values or references. Channels can be used in two ways.
The coroutine can try to put data into the channel. If the channel is full, the coroutine will be suspended until the channel can put data for him.
The coroutine can try to request data from the channel. If the channel has no data, the coroutine will be suspended until the channel returns data.
In this way, the channel can control the running of the coroutine while transmitting data. It's a bit like event-driven, and a bit like a blocking queue. These two concepts are very simple, and each language platform will have corresponding implementations. There are also libraries in Java and C that can implement both.
#As long as there are coroutines and channels, concurrency problems can be solved elegantly. It is not necessary to use other concurrency-related concepts. So how to use these two sharp blades to solve various practical problems?
Concurrency mode extension
Compared with threads, coroutines can be created in large numbers. By opening this door, we can expand new usages. We can make generators, let functions return "services", let loops execute concurrently, and share variables. However, the emergence of new usages also brings new thorny problems. Coroutines will also leak, and inappropriate use will affect performance. Various usages and issues will be introduced one by one below. The code for the demonstration is written in GO language because it is concise and clear and supports all functions.
1. Generator
Sometimes, we need a function that can continuously generate data. For example, this function can read files, read the network, generate self-growing sequences, and generate random numbers. These behaviors are characterized by the fact that some variables, such as file paths, are known to the function. Then keep calling it to return new data.
#The following is an example of generating random numbers. Let us make a random number generator that will execute concurrently.
// 函数rand_generator_1 ,返回 int funcrand_generator_1() int { return rand.Int() } // 上面是一个函数,返回一个int。假如rand.Int()这个函数调用需要很长时间等待,那该函数的调用者也会因此而挂起。所以我们可以创建一个协程,专门执行rand.Int()。 // 函数rand_generator_2,返回通道(Channel) funcrand_generator_2() chan int { // 创建通道 out := make(chan int) // 创建协程 go func() { for { //向通道内写入数据,如果无人读取会等待 out <- rand.Int() } }() return out } funcmain() { // 生成随机数作为一个服务 rand_service_handler :=rand_generator_2() // 从服务中读取随机数并打印 fmt.Printf("%d\n",<-rand_service_handler) }
The above function can be executed concurrently with rand.Int(). It is worth noting that the return of a function can be understood as a "service". But when we need to obtain random data, we can access it from this service at any time. It has already prepared the corresponding data for us, so there is no need to wait, and it can be available at any time.
If we do not call this service very frequently, one coroutine is enough to meet our needs. But what if we need a lot of access? We can use the multiplexing technology introduced below to start several generators and then integrate them into a large service.
Calling the generator can return a "service". Can be used in situations where data is continuously obtained. It has a wide range of uses, reading data, generating IDs, and even timers. This is a very concise idea, which makes the program concurrent.
2. Multiplexing
Multiplexing is a technology that allows multiple queues to be processed at one time. Apache requires a process to handle each connection, so its concurrency performance is not very good. Nginx uses multiplexing technology to allow one process to handle multiple connections, so the concurrency performance is better.
Similarly, in the case of coroutines, multiplexing is also needed, but it is different. Multiplexing can integrate several similar small services into one large service.
那么让我们用多路复用技术做一个更高并发的随机数生成器吧。
// 函数rand_generator_3 ,返回通道(Channel) funcrand_generator_3() chan int { // 创建两个随机数生成器服务 rand_generator_1 := rand_generator_2() rand_generator_2 := rand_generator_2() //创建通道 out := make(chan int) //创建协程 go func() { for { //读取生成器1中的数据,整合 out <-<-rand_generator_1 } }() go func() { for { //读取生成器2中的数据,整合 out <-<-rand_generator_2 } }() return out }
上面是使用了多路复用技术的高并发版的随机数生成器。通过整合两个随机数生成器,这个版本的能力是刚才的两倍。虽然协程可以大量创建,但是众多协程还是会争抢输出的通道。
Go语言提供了Select关键字来解决,各家也有各家窍门。加大输出通道的缓冲大小是个通用的解决方法。
多路复用技术可以用来整合多个通道。提升性能和操作的便捷。配合其他的模式使用有很大的威力。
3、Future技术
Future是一个很有用的技术,我们常常使用Future来操作线程。我们可以在使用线程的时候,可以创建一个线程,返回Future,之后可以通过它等待结果。 但是在协程环境下的Future可以更加彻底,输入参数同样可以是Future的。
调用一个函数的时候,往往是参数已经准备好了。调用协程的时候也同样如此。但是如果我们将传入的参 数设为通道,这样我们就可以在不准备好参数的情况下调用函数。这样的设计可以提供很大的自由度和并发度。函数调用和函数参数准备这两个过程可以完全解耦。 下面举一个用该技术访问数据库的例子。
//一个查询结构体 typequery struct { //参数Channel sql chan string //结果Channel result chan string } //执行Query funcexecQuery(q query) { //启动协程 go func() { //获取输入 sql := <-q.sql //访问数据库,输出结果通道 q.result <- "get" + sql }() } funcmain() { //初始化Query q := query{make(chan string, 1),make(chan string, 1)} //执行Query,注意执行的时候无需准备参数 execQuery(q) //准备参数 q.sql <- "select * fromtable" //获取结果 fmt.Println(<-q.result) }
上面利用Future技术,不单让结果在Future获得,参数也是在Future获取。准备好参数后,自动执行。Future和生成器的区别在 于,Future返回一个结果,而生成器可以重复调用。还有一个值得注意的地方,就是将参数Channel和结果Channel定义在一个结构体里面作为 参数,而不是返回结果Channel。这样做可以增加聚合度,好处就是可以和多路复用技术结合起来使用。
Future技术可以和各个其他技术组合起来用。可以通过多路复用技术,监听多个结果Channel,当有结果后,自动返回。也可以和生成器组合使用,生 成器不断生产数据,Future技术逐个处理数据。Future技术自身还可以首尾相连,形成一个并发的pipe filter。这个pipe filter可以用于读写数据流,操作数据流。
Future是一个非常强大的技术手段。可以在调用的时候不关心数据是否准备好,返回值是否计算好的问题。让程序中的组件在准备好数据的时候自动跑起来。
4、并发循环
循环往往是性能上的热点。如果性能瓶颈出现在CPU上的话,那么九成可能性热点是在一个循环体内部。所以如果能让循环体并发执行,那么性能就会提高很多。
要并发循环很简单,只有在每个循环体内部启动协程。协程作为循环体可以并发执行。调用启动前设置一个计数器,每一个循环体执行完毕就在计数器上加一个元素,调用完成后通过监听计数器等待循环协程全部完成。
//建立计数器 sem :=make(chan int, N); //FOR循环体 for i,xi:= range data { //建立协程 go func (i int, xi float) { doSomething(i,xi); //计数 sem <- 0; } (i, xi); } // 等待循环结束 for i := 0; i < N; ++i { <-sem }
上面是一个并发循环例子。通过计数器来等待循环全部完成。如果结合上面提到的Future技术的话,则不必等待。可以等到真正需要的结果的地方,再去检查数据是否完成。
通过并发循环可以提供性能,利用多核,解决CPU热点。正因为协程可以大量创建,才能在循环体中如此使用,如果是使用线程的话,就需要引入线程池之类的东西,防止创建过多线程,而协程则简单的多。
5、ChainFilter技术
前面提到了Future技术首尾相连,可以形成一个并发的pipe filter。这种方式可以做很多事情,如果每个Filter都由同一个函数组成,还可以有一种简单的办法把他们连起来。
由于每个Filter协程都可以并发运行,这样的结构非常有利于多核环境。下面是一个例子,用这种模式来产生素数。
// Aconcurrent prime sieve packagemain // Sendthe sequence 2, 3, 4, ... to channel 'ch'. funcGenerate(ch chan<- int) { for i := 2; ; i++ { ch<- i // Send 'i' to channel 'ch'. } } // Copythe values from channel 'in' to channel 'out', //removing those divisible by 'prime'. funcFilter(in <-chan int, out chan<- int, prime int) { for { i := <-in // Receive valuefrom 'in'. if i%prime != 0 { out <- i // Send'i' to 'out'. } } } // Theprime sieve: Daisy-chain Filter processes. funcmain() { ch := make(chan int) // Create a newchannel. go Generate(ch) // Launch Generate goroutine. for i := 0; i < 10; i++ { prime := <-ch print(prime, "\n") ch1 := make(chan int) go Filter(ch, ch1, prime) ch = ch1 } }
上面的程序创建了10个Filter,每个分别过滤一个素数,所以可以输出前10个素数。
Chain-Filter通过简单的代码创建并发的过滤器链。这种办法还有一个好处,就是每个通道只有两个协程会访问,就不会有激烈的竞争,性能会比较好
6、共享变量
协程之间的通信只能够通过通道。但是我们习惯于共享变量,而且很多时候使用共享变量能让代码更简洁。比如一个Server有两个状态开和关。其他仅仅希望获取或改变其状态,那又该如何做呢。可以将这个变量至于0通道中,并使用一个协程来维护。
下面的例子描述如何用这个方式,实现一个共享变量。
//共享变量有一个读通道和一个写通道组成 typesharded_var struct { reader chan int writer chan int } //共享变量维护协程 funcsharded_var_whachdog(v sharded_var) { go func() { //初始值 var value int = 0 for { //监听读写通道,完成服务 select { case value =<-v.writer: case v.reader <-value: } } }() } funcmain() { //初始化,并开始维护协程 v := sharded_var{make(chan int),make(chan int)} sharded_var_whachdog(v) //读取初始值 fmt.Println(<-v.reader) //写入一个值 v.writer <- 1 //读取新写入的值 fmt.Println(<-v.reader) }
这样,就可以在协程和通道的基础上实现一个协程安全的共享变量了。定义一个写通道,需要更新变量的时候,往里写新的值。再定义一个读通道,需要读的时候,从里面读。通过一个单独的协程来维护这两个通道。保证数据的一致性。
一般来说,协程之间不推荐使用共享变量来交互,但是按照这个办法,在一些场合,使用共享变量也是可取的。很多平台上有较为原生的共享变量支持,到底用那种 实现比较好,就见仁见智了。另外利用协程和通道,可以还实现各种常见的并发数据结构,如锁等等,就不一一赘述。
7、协程泄漏
协程和内存一样,是系统的资源。对于内存,有自动垃圾回收。但是对于协程,没有相应的回收机制。会不会若干年后,协程普及了,协程泄漏和内存泄漏一样成为 程序员永远的痛呢?
一般而言,协程执行结束后就会销毁。协程也会占用内存,如果发生协程泄漏,影响和内存泄漏一样严重。轻则拖慢程序,重则压垮机器。
C和C++都是没有自动内存回收的程序设计语言,但只要有良好的编程习惯,就能解决规避问题。对于协程是一样的,只要有好习惯就可以了。
只有两种情况会导致协程无法结束。一种情况是协程想从一个通道读数据,但无人往这个通道写入数据,或许这个通道已经被遗忘了。还有一种情况是程想往一个通道写数据,可是由于无人监听这个通道,该协程将永远无法向下执行。下面分别讨论如何避免这两种情况。
对于协程想从一个通道读数据,但无人往这个通道写入数据这种情况。解决的办法很简单,加入超时机制。对于有不确定会不会返回的情况,必须加入超时,避免出 现永久等待。
另外不一定要使用定时器才能终止协程。也可以对外暴露一个退出提醒通道。任何其他协程都可以通过该通道来提醒这个协程终止。
对于协程想往一个通道写数据,但通道阻塞无法写入这种情况。解决的办法也很简单,就是给通道加缓冲。但前提是这个通道只会接收到固定数目的写入。
比方说, 已知一个通道最多只会接收N次数据,那么就将这个通道的缓冲设置为N。那么该通道将永远不会堵塞,协程自然也不会泄漏。也可以将其缓冲设置为无限,不过这 样就要承担内存泄漏的风险了。等协程执行完毕后,这部分通道内存将会失去引用,会被自动垃圾回收掉。
funcnever_leak(ch chan int) { //初始化timeout,缓冲为1 timeout := make(chan bool, 1) //启动timeout协程,由于缓存为1,不可能泄露 go func() { time.Sleep(1 * time.Second) timeout <- true }() //监听通道,由于设有超时,不可能泄露 select { case <-ch: // a read from ch hasoccurred case <-timeout: // the read from ch has timedout } }
上面是个避免泄漏例子。使用超时避免读堵塞,使用缓冲避免写堵塞。
和内存里面的对象一样,对于长期存在的协程,我们不用担心泄漏问题。一是长期存在,二是数量较少。要警惕的只有那些被临时创建的协程,这些协程数量大且生 命周期短,往往是在循环中创建的,要应用前面提到的办法,避免泄漏发生。协程也是把双刃剑,如果出问题,不但没能提高程序性能,反而会让程序崩溃。但就像 内存一样,同样有泄漏的风险,但越用越溜了。
并发模式之实现
在并发编程大行其道的今天,对协程和通道的支持成为各个平台比不可少的一部分。虽然各家有各家的叫法,但都能满足协程的基本要求—并发执行和可大量创建。笔者对他们的实现方式总结了一下。
下面列举一些已经支持协程的常见的语言和平台。
GoLang 和Scala作为最新的语言,一出生就有完善的基于协程并发功能。Erlang最为老资格的并发编程语言,返老还童。其他二线语言则几乎全部在新的版本中加入了协程。
It is surprising that C/C and Java, the three most mainstream platforms in the world, do not provide language-level native support for coroutines. They are all burdened with a heavy history that cannot and does not need to be changed. But they have other ways to use coroutines.
The Java platform has many ways to implement coroutines:
· Modify the virtual machine: patch the JVM to implement coroutines. This implementation has good effects, but it loses the benefits of cross-platform
· Modify bytecode: Enhance the bytecode after compilation is completed, or use a new JVM language. Slightly increases the difficulty of compilation.
· Use JNI: Use JNI in the Jar package, which is easy to use, but cannot be cross-platform.
· Use threads to simulate coroutines: Make coroutines heavyweight and completely rely on JVM thread implementation.
Among them, the method of modifying bytecode is relatively common. Because this implementation method can balance performance and portability. Scale, the most representative JVM language, can well support coroutine concurrency. The popular Java Actor model class library akka is also a coroutine implemented by modifying bytecode.
For C language, coroutines are the same as threads. This can be accomplished using a variety of system calls. As a relatively advanced concept, coroutines have too many implementation methods, so we will not discuss them here. The more mainstream implementations include libpcl, coro, lthread, etc.
For C, there is Boost implementation, as well as some other open source libraries. There is also a language called μC, which provides concurrency extensions based on C.
It can be seen that this programming model has been widely supported in many language platforms and is no longer niche. If you want to use it, you can add it to your toolbox at any time.
For more articles related to go language, please pay attention to the go language tutorial column.
The above is the detailed content of Concurrency graphic tutorial in go language. For more information, please follow other related articles on the PHP Chinese website!