Atomic reads and updates with concurrent worker threads in MySQL
P粉239164234
P粉239164234 2023-12-21 13:38:51
0
2
434

Suppose I have multiple workers that can read and write to a MySQL table at the same time (e.g. jobs). The task of each worker is:

  1. Find the oldest Queued Job
  2. Set its status to RUNNING
  3. Return the corresponding ID.

Please note that when the worker runs step #1, there may not be any eligible jobs (i.e. QUEUED).

I have the following pseudocode so far. I believe if step #1 does not return the job, I need to cancel the (ROLLBACK) transaction. How would I do this in the code below?

BEGIN TRANSACTION;

# Update the status of jobs fetched by this query:
SELECT id from jobs WHERE status = "QUEUED" 
ORDER BY created_at ASC LIMIT 1;

# Do the actual update, otherwise abort (i.e. ROLLBACK?)
UPDATE jobs
SET status="RUNNING"
# HERE: Not sure how to make this conditional on the previous ID
# WHERE id = <ID from the previous SELECT>

COMMIT;

P粉239164234
P粉239164234

reply all(2)
P粉536909186

It's not quite clear what you want. But let's say your task is: find the next QUEUED job. Set its status to RUNNING and select the appropriate ID.

In a single-threaded environment, you can just use your code. Extract the selected ID into a variable in the application code and pass it to the UPDATE query in the WHERE clause. You don't even need a transaction as there is only one write statement. You can imitate this in SQLscript.

Assume this is your current status:

| id  | created_at          | status   |
| --- | ------------------- | -------- |
| 1   | 2020-06-15 12:00:00 | COMLETED |
| 2   | 2020-06-15 12:00:10 | QUEUED   |
| 3   | 2020-06-15 12:00:20 | QUEUED   |
| 4   | 2020-06-15 12:00:30 | QUEUED   |

You want to start the next queued job (id=2).

SET @id_for_update = (
  SELECT id
  FROM jobs
  WHERE status = 'QUEUED'
  ORDER BY id
  LIMIT 1
);

UPDATE jobs
SET status="RUNNING"
WHERE id = @id_for_update;

SELECT @id_for_update;

You will get

@id_for_update
2

Start from last selection. The table will have the following status:

| id  | created_at          | status   |
| --- | ------------------- | -------- |
| 1   | 2020-06-15 12:00:00 | COMLETED |
| 2   | 2020-06-15 12:00:10 | RUNNING  |
| 3   | 2020-06-15 12:00:20 | QUEUED   |
| 4   | 2020-06-15 12:00:30 | QUEUED   |

View on DB Fiddle

If you have multiple processes launching jobs, you will need to lock the row using FOR UPDATE. But you can use LAST_INSERT_ID() to avoid this situation:

Starting from the above status, job 2 is already running:

UPDATE jobs
SET status = 'RUNNING',
    id = LAST_INSERT_ID(id)
WHERE status = 'QUEUED'
ORDER BY id
LIMIT 1;

SELECT LAST_INSERT_ID();

You will get:

| LAST_INSERT_ID() | ROW_COUNT() |
| ---------------- | ----------- |
| 3                | 1           |

The new status is:

| id  | created_at          | status   |
| --- | ------------------- | -------- |
| 1   | 2020-06-15 12:00:00 | COMLETED |
| 2   | 2020-06-15 12:00:10 | RUNNING  |
| 3   | 2020-06-15 12:00:20 | RUNNING  |
| 4   | 2020-06-15 12:00:30 | QUEUED   |

View on DB Fiddle

If the UPDATE statement did not affect any rows (no queued rows), ROW_COUNT() will be 0.

There may be some risks that I'm not aware of - but that's not really how I approach it either. I would rather store more information in the jobs table. Simple example:

CREATE TABLE jobs (
  id INT auto_increment primary key,
  created_at timestamp not null default now(),
  updated_at timestamp not null default now() on update now(),
  status varchar(50) not null default 'QUEUED',
  process_id varchar(50) null default null
);

and

UPDATE jobs
SET status = 'RUNNING',
    process_id = 'some_unique_pid'    
WHERE status = 'QUEUED'
ORDER BY id
LIMIT 1;

The job that is running now belongs to a specific process, you can select it simply using

SELECT * FROM jobs WHERE process_id = 'some_unique_pid';

You might even want to know more - eg. queued_at, started_at, finished_at.

P粉635509719

This week I'm implementing something very similar to your case. Multiple workers, each grabbing the "next row" in a set of rows to work on.

The pseudo code is like this:

BEGIN;

SELECT ID INTO @id FROM mytable WHERE status = 'QUEUED' LIMIT 1 FOR UPDATE;

UPDATE mytable SET status = 'RUNNING' WHERE id = @id;

COMMIT;

Using FOR UPDATE is important to avoid race conditions (i.e. multiple workers trying to get the same row).

See https://dev.mysql.com/ doc/refman/8.0/en/select-into.html for information about SELECT ... INTO.

Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template
About us Disclaimer Sitemap
php.cn:Public welfare online PHP training,Help PHP learners grow quickly!