Atomare Lesevorgänge und Aktualisierungen mit gleichzeitigen Arbeitsthreads in MySQL
P粉239164234
P粉239164234 2023-12-21 13:38:51
0
2
477

Angenommen, ich habe mehrere Worker, die gleichzeitig in einer MySQL-Tabelle lesen und schreiben können (zum Beispiel jobs). Die Aufgabe jedes Arbeiters ist:

  1. Finden Sie die ältesten 已排队 Jobs
  2. Setzen Sie den Status auf RUNNING
  3. Geben Sie die entsprechende ID zurück.

Bitte beachten Sie, dass es möglicherweise keine qualifizierten Jobs gibt (d. h. QUEUED), wenn der Arbeitnehmer Schritt Nr. 1 ausführt.

Ich habe bisher den folgenden Pseudocode. Ich glaube, wenn Schritt 1 den Auftrag nicht zurückgibt, muss ich die Transaktion abbrechen (ROLLBACK). Wie würde ich das im folgenden Code machen?

BEGIN TRANSACTION;

# Update the status of jobs fetched by this query:
SELECT id from jobs WHERE status = "QUEUED" 
ORDER BY created_at ASC LIMIT 1;

# Do the actual update, otherwise abort (i.e. ROLLBACK?)
UPDATE jobs
SET status="RUNNING"
# HERE: Not sure how to make this conditional on the previous ID
# WHERE id = <ID from the previous SELECT>

COMMIT;

P粉239164234
P粉239164234

Antworte allen(2)
P粉536909186

目前还不太清楚你想要什么。但假设您的任务是:查找下一个QUEUED 作业。将其状态设置为RUNNING并选择相应的ID。

在单线程环境中,您可以只使用您的代码。将选定的 ID 提取到应用程序代码中的变量中,并将其传递给 WHERE 子句中的 UPDATE 查询。您甚至不需要事务,因为只有一个写入语句。您可以在 SQLscript 中进行模仿。

假设这是您当前的状态:

| id  | created_at          | status   |
| --- | ------------------- | -------- |
| 1   | 2020-06-15 12:00:00 | COMLETED |
| 2   | 2020-06-15 12:00:10 | QUEUED   |
| 3   | 2020-06-15 12:00:20 | QUEUED   |
| 4   | 2020-06-15 12:00:30 | QUEUED   |

您想要启动下一个排队作业(id=2)。

SET @id_for_update = (
  SELECT id
  FROM jobs
  WHERE status = 'QUEUED'
  ORDER BY id
  LIMIT 1
);

UPDATE jobs
SET status="RUNNING"
WHERE id = @id_for_update;

SELECT @id_for_update;

你会得到

@id_for_update
2

从上次选择开始。该表将具有以下状态:

| id  | created_at          | status   |
| --- | ------------------- | -------- |
| 1   | 2020-06-15 12:00:00 | COMLETED |
| 2   | 2020-06-15 12:00:10 | RUNNING  |
| 3   | 2020-06-15 12:00:20 | QUEUED   |
| 4   | 2020-06-15 12:00:30 | QUEUED   |

在 DB Fiddle 上查看

如果您有多个启动作业的进程,则需要使用 FOR UPDATE 锁定该行。但可以使用LAST_INSERT_ID()来避免这种情况:

从上面的状态开始,作业 2 已经在运行:

UPDATE jobs
SET status = 'RUNNING',
    id = LAST_INSERT_ID(id)
WHERE status = 'QUEUED'
ORDER BY id
LIMIT 1;

SELECT LAST_INSERT_ID();

您将得到:

| LAST_INSERT_ID() | ROW_COUNT() |
| ---------------- | ----------- |
| 3                | 1           |

新的状态是:

| id  | created_at          | status   |
| --- | ------------------- | -------- |
| 1   | 2020-06-15 12:00:00 | COMLETED |
| 2   | 2020-06-15 12:00:10 | RUNNING  |
| 3   | 2020-06-15 12:00:20 | RUNNING  |
| 4   | 2020-06-15 12:00:30 | QUEUED   |

在 DB Fiddle 上查看

如果 UPDATE 语句没有影响任何行(没有排队的行),ROW_COUNT() 将为 0

可能存在一些我不知道的风险 - 但这也不是我真正的处理方式。我宁愿在 jobs 表中存储更多信息。简单的例子:

CREATE TABLE jobs (
  id INT auto_increment primary key,
  created_at timestamp not null default now(),
  updated_at timestamp not null default now() on update now(),
  status varchar(50) not null default 'QUEUED',
  process_id varchar(50) null default null
);

UPDATE jobs
SET status = 'RUNNING',
    process_id = 'some_unique_pid'    
WHERE status = 'QUEUED'
ORDER BY id
LIMIT 1;

现在正在运行的作业属于特定进程,您只需使用

即可选择它
SELECT * FROM jobs WHERE process_id = 'some_unique_pid';

您甚至可能想了解更多信息 - 例如。 queued_atstarted_atfinished_at

P粉635509719

本周我正在实施与您的案例非常相似的事情。多个工作人员,每个工作人员抓取一组行中的“下一行”进行工作。

伪代码是这样的:

BEGIN;

SELECT ID INTO @id FROM mytable WHERE status = 'QUEUED' LIMIT 1 FOR UPDATE;

UPDATE mytable SET status = 'RUNNING' WHERE id = @id;

COMMIT;

使用 FOR UPDATE 对于避免竞争条件(即多个工作人员试图获取同一行)非常重要。

参见https://dev.mysql.com/ doc/refman/8.0/en/select-into.html 了解有关 SELECT ... INTO 的信息。

Neueste Downloads
Mehr>
Web-Effekte
Quellcode der Website
Website-Materialien
Frontend-Vorlage