MySQL 中并发工作线程的原子读取和更新
P粉239164234
P粉239164234 2023-12-21 13:38:51
0
2
535

假设我有多个工作人员可以同时读取和写入 MySQL 表(例如 jobs)。每个工人的任务是:

  1. 查找最旧的 已排队 作业
  2. 将其状态设置为 RUNNING
  3. 返回对应的ID。

请注意,当工作人员运行步骤 #1 时,可能有任何符合条件的作业(即 QUEUED)。

到目前为止我有以下伪代码。我相信如果步骤 #1 没有返回作业,我需要取消 (ROLLBACK) 事务。我将如何在下面的代码中做到这一点?

BEGIN TRANSACTION;

# Update the status of jobs fetched by this query:
SELECT id from jobs WHERE status = "QUEUED" 
ORDER BY created_at ASC LIMIT 1;

# Do the actual update, otherwise abort (i.e. ROLLBACK?)
UPDATE jobs
SET status="RUNNING"
# HERE: Not sure how to make this conditional on the previous ID
# WHERE id = <ID from the previous SELECT>

COMMIT;

P粉239164234
P粉239164234

全部回复(2)
P粉536909186

目前还不太清楚你想要什么。但假设您的任务是:查找下一个QUEUED 作业。将其状态设置为RUNNING并选择相应的ID。

在单线程环境中,您可以只使用您的代码。将选定的 ID 提取到应用程序代码中的变量中,并将其传递给 WHERE 子句中的 UPDATE 查询。您甚至不需要事务,因为只有一个写入语句。您可以在 SQLscript 中进行模仿。

假设这是您当前的状态:

| id  | created_at          | status   |
| --- | ------------------- | -------- |
| 1   | 2020-06-15 12:00:00 | COMLETED |
| 2   | 2020-06-15 12:00:10 | QUEUED   |
| 3   | 2020-06-15 12:00:20 | QUEUED   |
| 4   | 2020-06-15 12:00:30 | QUEUED   |

您想要启动下一个排队作业(id=2)。

SET @id_for_update = (
  SELECT id
  FROM jobs
  WHERE status = 'QUEUED'
  ORDER BY id
  LIMIT 1
);

UPDATE jobs
SET status="RUNNING"
WHERE id = @id_for_update;

SELECT @id_for_update;

你会得到

@id_for_update
2

从上次选择开始。该表将具有以下状态:

| id  | created_at          | status   |
| --- | ------------------- | -------- |
| 1   | 2020-06-15 12:00:00 | COMLETED |
| 2   | 2020-06-15 12:00:10 | RUNNING  |
| 3   | 2020-06-15 12:00:20 | QUEUED   |
| 4   | 2020-06-15 12:00:30 | QUEUED   |

在 DB Fiddle 上查看

如果您有多个启动作业的进程,则需要使用 FOR UPDATE 锁定该行。但可以使用LAST_INSERT_ID()来避免这种情况:

从上面的状态开始,作业 2 已经在运行:

UPDATE jobs
SET status = 'RUNNING',
    id = LAST_INSERT_ID(id)
WHERE status = 'QUEUED'
ORDER BY id
LIMIT 1;

SELECT LAST_INSERT_ID();

您将得到:

| LAST_INSERT_ID() | ROW_COUNT() |
| ---------------- | ----------- |
| 3                | 1           |

新的状态是:

| id  | created_at          | status   |
| --- | ------------------- | -------- |
| 1   | 2020-06-15 12:00:00 | COMLETED |
| 2   | 2020-06-15 12:00:10 | RUNNING  |
| 3   | 2020-06-15 12:00:20 | RUNNING  |
| 4   | 2020-06-15 12:00:30 | QUEUED   |

在 DB Fiddle 上查看

如果 UPDATE 语句没有影响任何行(没有排队的行),ROW_COUNT() 将为 0

可能存在一些我不知道的风险 - 但这也不是我真正的处理方式。我宁愿在 jobs 表中存储更多信息。简单的例子:

CREATE TABLE jobs (
  id INT auto_increment primary key,
  created_at timestamp not null default now(),
  updated_at timestamp not null default now() on update now(),
  status varchar(50) not null default 'QUEUED',
  process_id varchar(50) null default null
);

UPDATE jobs
SET status = 'RUNNING',
    process_id = 'some_unique_pid'    
WHERE status = 'QUEUED'
ORDER BY id
LIMIT 1;

现在正在运行的作业属于特定进程,您只需使用

即可选择它
SELECT * FROM jobs WHERE process_id = 'some_unique_pid';

您甚至可能想了解更多信息 - 例如。 queued_atstarted_atfinished_at

P粉635509719

本周我正在实施与您的案例非常相似的事情。多个工作人员,每个工作人员抓取一组行中的“下一行”进行工作。

伪代码是这样的:

BEGIN;

SELECT ID INTO @id FROM mytable WHERE status = 'QUEUED' LIMIT 1 FOR UPDATE;

UPDATE mytable SET status = 'RUNNING' WHERE id = @id;

COMMIT;

使用 FOR UPDATE 对于避免竞争条件(即多个工作人员试图获取同一行)非常重要。

参见https://dev.mysql.com/ doc/refman/8.0/en/select-into.html 了解有关 SELECT ... INTO 的信息。

热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板