MySQL 中並發工作執行緒的原子讀取與更新
P粉239164234
P粉239164234 2023-12-21 13:38:51
0
2
438

假設我有多位工作人員可以同時讀取和寫入 MySQL 表(例如 jobs)。每個工人的任務是:

  1. 尋找最舊的 已排隊 作業
  2. 將其狀態設為 RUNNING
  3. 傳回對應的ID。

請注意,當工作人員執行步驟 #1 時,可能有任何符合條件的作業(即 QUEUED)。

到目前為止我有以下偽代碼。我相信如果步驟 #1 沒有返回作業,我需要取消 (ROLLBACK) 事務。我將如何在下面的程式碼中做到這一點?

BEGIN TRANSACTION;

# Update the status of jobs fetched by this query:
SELECT id from jobs WHERE status = "QUEUED" 
ORDER BY created_at ASC LIMIT 1;

# Do the actual update, otherwise abort (i.e. ROLLBACK?)
UPDATE jobs
SET status="RUNNING"
# HERE: Not sure how to make this conditional on the previous ID
# WHERE id = <ID from the previous SELECT>

COMMIT;

P粉239164234
P粉239164234

全部回覆(2)
P粉536909186

目前還不太清楚你想要什麼。但假設您的任務是:尋找下一個QUEUED 作業。將其狀態設為RUNNING並選擇對應的ID。

在單執行緒環境中,您可以只使用您的程式碼。將選定的 ID 提取到應用程式程式碼中的變數中,並將其傳遞給 WHERE 子句中的 UPDATE 查詢。您甚至不需要事務,因為只有一個寫入語句。您可以在 SQLscript 中進行模仿。

假設這是您目前的狀態:

| id  | created_at          | status   |
| --- | ------------------- | -------- |
| 1   | 2020-06-15 12:00:00 | COMLETED |
| 2   | 2020-06-15 12:00:10 | QUEUED   |
| 3   | 2020-06-15 12:00:20 | QUEUED   |
| 4   | 2020-06-15 12:00:30 | QUEUED   |

您想要啟動下一個排隊作業(id=2)。

SET @id_for_update = (
  SELECT id
  FROM jobs
  WHERE status = 'QUEUED'
  ORDER BY id
  LIMIT 1
);

UPDATE jobs
SET status="RUNNING"
WHERE id = @id_for_update;

SELECT @id_for_update;

你會得到

@id_for_update
2

從上次選擇開始。該表將具有以下狀態:

| id  | created_at          | status   |
| --- | ------------------- | -------- |
| 1   | 2020-06-15 12:00:00 | COMLETED |
| 2   | 2020-06-15 12:00:10 | RUNNING  |
| 3   | 2020-06-15 12:00:20 | QUEUED   |
| 4   | 2020-06-15 12:00:30 | QUEUED   |

在 DB Fiddle 上查看

如果您有多個啟動作業的進程,則需要使用 FOR UPDATE 鎖定該行。但可以使用LAST_INSERT_ID()來避免這種情況:

從上面的狀態開始,作業 2 已經在運作:

UPDATE jobs
SET status = 'RUNNING',
    id = LAST_INSERT_ID(id)
WHERE status = 'QUEUED'
ORDER BY id
LIMIT 1;

SELECT LAST_INSERT_ID();

您將得到:

| LAST_INSERT_ID() | ROW_COUNT() |
| ---------------- | ----------- |
| 3                | 1           |

新的狀態是:

| id  | created_at          | status   |
| --- | ------------------- | -------- |
| 1   | 2020-06-15 12:00:00 | COMLETED |
| 2   | 2020-06-15 12:00:10 | RUNNING  |
| 3   | 2020-06-15 12:00:20 | RUNNING  |
| 4   | 2020-06-15 12:00:30 | QUEUED   |

在 DB Fiddle 上查看

如果 UPDATE 語句沒有影響任何行(沒有排隊的行),ROW_COUNT() 將為 0

可能存在一些我不知道的風險 - 但這也不是我真正的處理方式。我寧願在 jobs 表中存儲更多資訊。簡單的例子:

CREATE TABLE jobs (
  id INT auto_increment primary key,
  created_at timestamp not null default now(),
  updated_at timestamp not null default now() on update now(),
  status varchar(50) not null default 'QUEUED',
  process_id varchar(50) null default null
);

UPDATE jobs
SET status = 'RUNNING',
    process_id = 'some_unique_pid'    
WHERE status = 'QUEUED'
ORDER BY id
LIMIT 1;

現在正在執行的作業屬於特定進程,您只需使用

即可選擇它
SELECT * FROM jobs WHERE process_id = 'some_unique_pid';

您甚至可能想了解更多 - 例如。 queued_atstarted_atfinished_at

P粉635509719

本週我正在實施與您的案例非常相似的事情。多個工作人員,每個工作人員抓取一組行中的「下一行」進行工作。

偽程式碼是這樣的:

BEGIN;

SELECT ID INTO @id FROM mytable WHERE status = 'QUEUED' LIMIT 1 FOR UPDATE;

UPDATE mytable SET status = 'RUNNING' WHERE id = @id;

COMMIT;

使用 FOR UPDATE 對於避免競爭條件(即多個工作人員試圖取得同一行)非常重要。

請參閱https://dev.mysql.com/ doc/refman/8.0/en/select-into.html 以了解有關 SELECT ... INTO 的資訊。

熱門教學
更多>
最新下載
更多>
網站特效
網站源碼
網站素材
前端模板
關於我們 免責聲明 Sitemap
PHP中文網:公益線上PHP培訓,幫助PHP學習者快速成長!