首頁 > 後端開發 > Golang > go語言同步機制有哪些

go語言同步機制有哪些

青灯夜游
發布: 2022-12-26 17:45:28
原創
4870 人瀏覽過

go同步機制有:1、channel,著重並發問題中的資料流動,把流動的資料放到channel中,就能使用channel解決這個並發;2、Sync.Mutex,擁有Lock、Unlock兩個方法,主要實現思想體現在Lock函數中;3、Sync.waitGroup;4、Sync.Once;5、Sync.context;6、Sync.pool;7、atomic包,針對變數進行操作。

go語言同步機制有哪些

本教學操作環境:windows7系統、GO 1.18版本、Dell G3電腦。

Golang的提供的同步機制有sync模組下的Mutex、WaitGroup以及語言本身提供的chan等。

1.channel 

概述

#Golang以如此明顯的方式告訴我們:

優點:channel的核心是資料流動,關注到並發問題中的資料流動,把流動的資料放到channel中,就能使用channel解決這個並發

           問題,而且使用channel是線程安全的並且不會有資料衝突,比鎖好用多了

##缺點:不太適應同步太複雜的場景,例如多協程的同步等待問題,而且有死鎖問題 ,channel死鎖問題:死鎖問題連結

分類

channel類型:無緩衝和緩衝類型

channel有兩種形式的,一種是無緩衝的,一個線程向這個channel發送了訊息後,會阻塞當前的這個線程,知道其他線程去接收這個channel的消息。無緩衝的形式如下:

intChan := make(chan int)
 
带缓冲的channel,是可以指定缓冲的消息数量,当消息数量小于指定值时,不会出现阻塞,超过之后才会阻塞,需要等待其他线程去接收channel处理,带缓冲的形式如下:
 
//3为缓冲数量
intChan := make(chan int, 3)
登入後複製

舉例

 type Person struct {
	Name    string
	Age     uint8
	Address Addr
}
 
type Addr struct {
	city     string
	district string
}
 
/*
测试channel传输复杂的Struct数据
 */
func testTranslateStruct() {
	personChan := make(chan Person, 1)
 
	person := Person{"xiaoming", 10, Addr{"shenzhen", "longgang"}}
	personChan <- person
 
	person.Address = Addr{"guangzhou", "huadu"}
	fmt.Printf("src person : %+v \n", person)
 
	newPerson := <-personChan
	fmt.Printf("new person : %+v \n", newPerson)
}
登入後複製

在實際應用過程中,等待channel 結束訊號的過程可能不是無期限的,一般會伴隨一個timer ,超時時間如下面所示:

/*
检查channel读写超时,并做超时的处理
 */
func testTimeout() {
	g := make(chan int)
	quit := make(chan bool)
 
	go func() {
		for {
			select {
			case v := <-g:
				fmt.Println(v)
			case <-time.After(time.Second * time.Duration(3)):
				quit <- true
				fmt.Println("超时,通知主线程退出")
				return
			}
		}
	}()
 
	for i := 0; i < 3; i++ {
		g <- i
	}
 
	<-quit
	fmt.Println("收到退出通知,主线程退出")
}
登入後複製

2.Sync.Mutex

Mutex擁有Lock、Unlock兩個方法,主要的實作思想都體現在Lock函數中。

Lock執行時,分成三種情況:

  • 無衝突透過CAS運算把目前狀態設定為加鎖狀態;

  • #有衝突開始自旋,並等待鎖釋放,如果其他Goroutine在這段時間內釋放了該鎖, 直接獲得該鎖;如果沒有釋放,進入3;

  • # #有衝突,且已經過了自旋階段透過呼叫semacquire函數來讓目前Goroutine進入等待狀態。
  •     無衝突時是最簡單的情況;有衝突時,首先進行自旋,是從效率方面考慮的, 因為大多數的Mutex保護的程式碼段都很短,經過短暫的自旋就可以獲得;如果自旋等待無果,就只好透過信號量來讓當前Goroutine進入等待了。

3. Sync.waitGroup

Channel在某些同步場景下,使用略顯複雜,不管是使用多個channel或使用channel數組,如下:

func coordinateWithChan() {
 sign := make(chan struct{}, 2)
 num := int32(0)
 fmt.Printf("The number: %d [with chan struct{}]\n", num)
 max := int32(10)
 go addNum(&num, 1, max, func() {
  sign <- struct{}{}
 })
 go addNum(&num, 2, max, func() {
  sign <- struct{}{}
 })
 <-sign
 <-sign
}
登入後複製

所以Sync.waitGroup 就顯得更優雅,Sync.waitGroup 用來等待一組goroutines的結束,在主Goroutine裡聲明,並且設置要等待的goroutine的個數,每個goroutine執行完成之後呼叫Done,最後在主Goroutines 裡Wait即可。類似於JAVA中的CountDownLatch或循環屏障,並且Sync.waitGroup可以被重複使用,提供瞭如下API:

func (wg *WaitGroup) Add(delta int)
func (wg *WaitGroup) Done()
func (wg *WaitGroup) Wait()
登入後複製

但是Sync.waitGroup的使用需要遵循一些規則,避免拋出Panic:

a. 錯誤調用Done方法,導致waitGroup內部計數值出現負數的情況


b.錯誤的調用Add方法,在waitGroup內部計數值到達0的時候,Add方法被調用,導致應該被喚起的goroutine沒有被喚起,就開始了新的一輪計數週期

所以在調用的時候,就要遵循一下原則:

      先統一Add,再並發Done,最後Wait

4. Sync.Once

      Sync.once實作方式是內部包含一個int32位元的標誌,用來判斷方式是否被執行過,標誌值變更的時機為方法執行完之後,當有多個goroutine進行呼叫的時候,使用double-check方式進行驗證,首先在沒有同步方式的情況下,進行標誌值的判定,為0則競爭獲取mutex鎖,進入臨界區內,此時會在此進行標誌值的判斷,確保方法真的被執行一次。 double-check第一次是為了更快的進行判斷,但是存在錯誤的情況,第二次check是為了正確的確定標誌值此時的狀態。

使用:

func main() {
    var once sync.Once
    onceBody := func() {
        time.Sleep(3e9)
        fmt.Println("Only once")
    }
    done := make(chan bool)
    for i := 0; i < 10; i++ {
        j := i
        go func(int) {
            once.Do(onceBody)
            fmt.Println(j)
            done <- true
        }(j)
    }
    //给一部分时间保证能够输出完整【方法一】
    //for i := 0; i < 10; i++ {
    //    <-done
    //}

    //给一部分时间保证能够输出完整【方法二】
    <-done
    time.Sleep(3e9)
}
登入後複製

5. Sync.context

場景

當需要進行多批次的計算任務同步,或是需要一對多的協作流程的時候

#使用範例

func coordinateWithContext() {
 total := 12
 var num int32
 fmt.Printf("The number: %d [with context.Context]\n", num)
 cxt, cancelFunc := context.WithCancel(context.Background())
 for i := 1; i <= total; i++ {
  go addNum(&num, i, func() {
   if atomic.LoadInt32(&num) == int32(total) {
    cancelFunc()
   }
  })
 }
 <-cxt.Done()
 fmt.Println("End.")
}
登入後複製

注意事項

#

a.如何生成自己的context

通过WithCancel、WithDeadline、WithTimeout和WithValue四个方法从context.Background中派生出自己的子context

注意context.background这个上下文根节点仅仅是一个最基本的支点,它不提供任何额外的功能,也就是说,它既不可以被撤销(cancel),也不能携带任何数据,在使用是必须通过以上4种方法派生出自己的context

b.子context是会继承父context的值

c.撤销消息的传播

撤销消息会按照深度遍历的方式传播给子context(注意因为多routine调用的原因,最终的撤销顺序可能不会是深度遍历的顺序)

,在遍历的过程中,通过WithCancel、WithDeadline、WithTimeout派生的context会被撤销,但是通过WithValue方法派生的context不会被撤销

6. Sync.pool

7.atomic包,针对变量进行操作

我们调用sync/atomic中的几个函数可以对几种简单的类型进行原子操作。这些类型包括int32,int64,uint32,uint64,uintptr,unsafe.Pointer,共6个。这些函数的原子操作共有5种:增或减,比较并交换、载入、存储和交换它们提供了不同的功能,切使用的场景也有区别。

增或减

   顾名思义,原子增或减即可实现对被操作值的增大或减少。因此该操作只能操作数值类型。

   被用于进行增或减的原子操作都是以“Add”为前缀,并后面跟针对具体类型的名称。

//方法源码
func AddUint32(addr *uint32, delta uint32) (new uint32)
登入後複製

栗子:(在原来的基础上加n)

atomic.AddUint32(&addr,n)
登入後複製

栗子:(在原来的基础上加n(n为负数))

atomic.AddUint32(*addr,uint32(int32(n)))
//或
atomic.AddUint32(&addr,^uint32(-n-1))
登入後複製

比较并交换

   比较并交换----Compare And Swap 简称CAS

   他是假设被操作的值未曾被改变(即与旧值相等),并一旦确定这个假设的真实性就立即进行值替换

   如果想安全的并发一些类型的值,我们总是应该优先使用CAS

//方法源码
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
登入後複製

栗子:(如果addr和old相同,就用new代替addr)

ok:=atomic.CompareAndSwapInt32(&addr,old,new)
登入後複製

载入

   如果一个写操作未完成,有一个读操作就已经发生了,这样读操作使很糟糕的。

   为了原子的读取某个值sync/atomic代码包同样为我们提供了一系列的函数。这些函数都以"Load"为前缀,意为载入。

//方法源码
func LoadInt32(addr *int32) (val int32)
登入後複製

栗子

fun addValue(delta int32){
    for{
        v:=atomic.LoadInt32(&addr)
        if atomic.CompareAndSwapInt32(&v,addr,(delta+v)){
            break;
        }
    }
}
登入後複製

存储

   与读操作对应的是写入操作,sync/atomic也提供了与原子的值载入函数相对应的原子的值存储函数。这些函数的名称均以“Store”为前缀

   在原子的存储某个值的过程中,任何cpu都不会进行针对进行同一个值的读或写操作。如果我们把所有针对此值的写操作都改为原子操作,那么就不会出现针对此值的读操作读操作因被并发的进行而读到修改了一半的情况。

   原子操作总会成功,因为他不必关心被操作值的旧值是什么。

//方法源码
func StoreInt32(addr *int32, val int32)
登入後複製

栗子

atomic.StoreInt32(被操作值的指针,新值)
atomic.StoreInt32(&value,newaddr)
登入後複製

交换

   原子交换操作,这类函数的名称都以“Swap”为前缀。

   与CAS不同,交换操作直接赋予新值,不管旧值。

   会返回旧值

//方法源码
func SwapInt32(addr *int32, new int32) (old int32)
登入後複製

栗子

atomic.SwapInt32(被操作值的指针,新值)(返回旧值)
oldval:=atomic.StoreInt32(&value,newaddr)
登入後複製

扩展知识:Sync包简述

1. 什么是Sync包?

Package sync provides basic synchronization primitives such as mutual exclusion locks. Other than the Once and WaitGroup types, most are intended for use by low-level library routines. Higher-level synchronization is better done via channels and communication.

Values containing the types defined in this package should not be copied.

这句话大意是说:
Sync包同步提供基本的同步原语,如互斥锁。 除了Once和WaitGroup类型之外,大多数类型都是供低级库例程使用的。 通过Channel和沟通可以更好地完成更高级别的同步。并且此包中的值在使用过后不要拷贝。

从描述中可以看到的是,golang 并不推荐这个包中的大多数并发控制方法,但还是提供了相关方法,主要原因是golang中提倡以共享内存的方式来通信:

不要以共享内存的方式来通信,作为替代,我们应该以通信的手段来共享内存

共享内存的方式使得多线程中的通信变得简单,但是在并发的安全性控制上将变得异常繁琐。
正确性不是我们唯一想要的,我们想要的还有系统的可伸缩性,以及可理解性,我觉得这点非常重要,比如现在广泛使用的Raft算法。

2. 包中的Type

包中主要有: Locker, Cond, Map, Mutex, Once, Pool,
RWMutex, WaitGroup

type Locker interface {
        Lock()
        Unlock()
}
type Cond struct {
        // L is held while observing or changing the condition
        L Locker
}
登入後複製

3. 什么是锁,为什么需要锁?

锁是sync包中的核心,他主要有两个方法,加锁和解锁。
在单线程运行的时候程序是顺序执行的,程序对数据的访问也是:
读取 => 一顿操作(加减乘除之类的) => 写回原地址
但是一旦程序中进行了并发编程,也就是说,某一个函数可能同时被不同的线程执行的时候,以时间为维度会发生以下情况:

go語言同步機制有哪些

可以看到的是,A地址的数字被执行了两次自增,若A=5,我们在执行完成后预期的A值是7,但是在这种情况下我们得到的A却是6,bug了~
还有很多类似的并发错误,所以才有锁的引入。若是我们在线程2读取A的值的时候对A进行加锁,让线程2等待,线程1执行完成之后在执行线程2,这样就能够保证数据的正确性。但是正确性不是我们唯一想要的。

4 写更优雅的代码

在很多语言中我们经常为了保证数据安全正确,会在并发的时候对数据加锁

Lock()
doSomething()
Unlock()
登入後複製

Golang在此包中也提供了相关的锁,但是标明了"most are intended for use by low-level library routines" 所以我这里只对 Once and WaitGroup types做简述。

5.Once 对象

Once 是一个可以被多次调用但是只执行一次,若每次调用Do时传入参数f不同,但是只有第一个才会被执行。

func (o *Once) Do(f func())
登入後複製
    var once sync.Once
    onceBody := func() {
        fmt.Println("Only once")
    }
    done := make(chan bool)
    for i := 0; i < 10; i++ {
        go func() {
            once.Do(onceBody)
            done <- true
        }()
    }
    for i := 0; i < 10; i++ {
        <-done
    }
登入後複製

如果你执行这段代码会发现,虽然调用了10次,但是只执行了1次。BTW:这个东西可以用来写单例。

6. WaitGroup

下面是个官方的例子:

var wg sync.WaitGroup
var urls = []string{
        "http://www.golang.org/",
        "http://www.google.com/",
        "http://www.somestupidname.com/",
}
for _, url := range urls {
        // Increment the WaitGroup counter.
        wg.Add(1)
        // Launch a goroutine to fetch the URL.
        go func(url string) {
                // Decrement the counter when the goroutine completes.
                defer wg.Done()
                // Fetch the URL.
                http.Get(url)
        }(url)
}
// Wait for all HTTP fetches to complete.
wg.Wait()
登入後複製

7. 简述

Golang中高级的并发可以通过channel来实现,这是golang所倡导的,但是go也提供了锁等先关操作。

【相关推荐:Go视频教程编程教学

以上是go語言同步機制有哪些的詳細內容。更多資訊請關注PHP中文網其他相關文章!

相關標籤:
來源:php.cn
本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn
最新問題
熱門教學
更多>
最新下載
更多>
網站特效
網站源碼
網站素材
前端模板