對於大多數程式設計師(當然我也基本上是其中一員),並發程式設計幾乎就等價於為相關資料結構加上一個鎖(Mutex)。例如如果我們需要一個支援並發的棧,那最簡單的方法就是給一個單執行緒的棧加上鎖
std::sync::Mutex
##
。 (加上Arc是為了能讓多個執行緒都擁有堆疊的所有權)
<code> <p>use std::sync::{Mutex, Arc};<br><br>#[derive(Clone)]<br>struct ConcurrentStack<T> {<br> inner: Arc<Mutex<Vec<T>>>,<br>}<br><br>impl<T> ConcurrentStack<T> {<br> pub fn new() -> Self {<br> ConcurrentStack {<br> inner: Arc::new(Mutex::new(Vec::new())),<br> }<br> }<br><br> pub fn push(&self, data: T) {<br> let mut inner = self.inner.lock().unwrap();<br> (*inner).push(data);<br> }<br><br> pub fn pop(&self) -> Option<T> {<br> let mut inner = self.inner.lock().unwrap();<br> (*inner).pop()<br> }<br><br>}<br> </p></code>
程式碼寫起來十分方便,因為它幾乎與單執行緒版本相同,這很明顯是一個好處。只需要按照單線程的版本寫完,然後在資料結構上加上鎖,然後在必要的時候取得和釋放(在Rust中基本上是自動的)鎖。
那麼問題是什麼呢?首先不談你可能會忘記獲取和釋放鎖(這一點要感謝Rust,在Rust中幾乎不可能發生),你可能會面臨死鎖的問題(哲學家就餐問題)。然後也不談一些低優先級的任務可能會長期搶佔高優先級任務的資源(因為鎖是第一位的),當線程數量比較大的時候,大部分的時間都被用在了同步上(等待鎖能被取得),效能就會變得非常差。考慮一個大量讀取而偶爾寫入的並發資料庫,如果用鎖去處理,即使資料庫沒有任何更新,每兩個讀取之間也需要做一次同步,代價太大!
於是,大量電腦科學家和程式設計師就把目光轉向了無鎖(lock-free)並發。無鎖物件:如果一個共享物件保證了無論其他執行緒做何種操作,總有一些執行緒會在有限的系統操作步驟後完成一個對其的操作 Her91 。也就是說,至少有一個執行緒對其操作會成效。使用鎖的並發明顯就不屬於這個範疇:如果取得了鎖的執行緒被延遲,那麼這段時間裡沒有任何執行緒能夠完成任何操作。極端情況下如果出現了死鎖,那麼就沒有任何執行緒能夠完成任何操作。
那大家可能會好奇,無鎖並發要怎麼實現呢?有沒有例子呢?在此之前讓我們先來看看一個公認的在無鎖並發中非常重要的原子原語:CAS。 CAS的過程是用指定值去比較一儲存值,只有當他們相同時,才會修改儲存值為新的指定值。 CAS是原子操作(由處理器支持,例如x86的compare and exchange (CMPXCHG)),該原子性保證瞭如果其他執行緒已經改變了儲存值,那麼寫入就會失敗。 Rust標準庫中的
std::sync::atomic
中的類型就提供了CAS操作,例如原子指針
std::sync::atomic::AtomicPtr
<div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="brush:php;toolbar:false"><code>
<p>pub fn compare_and_swap(<br> &self,<br> current: *mut T,<br> new: *mut T,<br> order: Ordering<br>) -> *mut T<br>
</p></code></pre><div class="contentsignin">登入後複製</div></div>
(在這裡,不用糾結ordering是什麼,也就是說請儘管忽略忽略
Acquire
,
Release
,
<code> <p>#![feature(box_raw)]<br><br>use std::ptr::{self, null_mut};<br>use std::sync::atomic::AtomicPtr;<br>use std::sync::atomic::Ordering::{Relaxed, Release, Acquire};<br><br>pub struct Stack<T> {<br> head: AtomicPtr<Node<T>>,<br>}<br><br>struct Node<T> {<br> data: T,<br> next: *mut Node<T>,<br>}<br><br>impl<T> Stack<T> {<br> pub fn new() -> Stack<T> {<br> Stack {<br> head: AtomicPtr::new(null_mut()),<br> }<br> }<br><br> pub fn pop(&self) -> Option<T> {<br> loop {<br> // 快照<br> let head = self.head.load(Acquire);<br><br> // 如果栈为空<br> if head == null_mut() {<br> return None<br> } else {<br> let next = unsafe { (*head).next };<br><br> // 如果现状较快照并没有发生改变<br> if self.head.compare_and_swap(head, next, Release) == head {<br><br> // 读取内容并返回<br> return Some(unsafe { ptr::read(&(*head).data) })<br> }<br> }<br> }<br> }<br><br> pub fn push(&self, t: T) {<br> // 创建node并转化为*mut指针<br> let n = Box::into_raw(Box::new(Node {<br> data: t,<br> next: null_mut(),<br> }));<br> loop {<br> // 快照<br> let head = self.head.load(Relaxed);<br><br> // 基于快照更新node<br> unsafe { (*n).next = head; }<br><br> // 如果在此期间,快照仍然没有过时<br> if self.head.compare_and_swap(head, n, Release) == head {<br> break<br> }<br> }<br> }<br>}<br> </p></code>
如果你正在使用Java或其他帶有GC的程式語言,你可能已經完成了大量的工作。現在的問題在於,在Rust這種沒有GC的語言中,pop中<div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="brush:php;toolbar:false">return Some(unsafe { ptr::read(&(*head).data) })</pre><div class="contentsignin">登入後複製</div></div>
並沒有人去釋放
以上是Java有鎖並發、無鎖並發和CAS實例分析的詳細內容。更多資訊請關注PHP中文網其他相關文章!