Lectures et mises à jour atomiques avec des threads de travail simultanés dans MySQL
P粉239164234
P粉239164234 2023-12-21 13:38:51
0
2
525

Supposons que plusieurs travailleurs puissent lire et écrire sur une table MySQL en même temps (par exemple jobs). La tâche de chaque travailleur est :

  1. Trouver les 已排队 emplois les plus anciens
  2. Définissez son statut sur RUNNING
  3. Renvoyer l'identifiant correspondant.

Veuillez noter que lorsque le travailleur exécute l'étape 1, il peut ne pas avoir d'emplois éligibles (c'est-à-dire QUEUED).

J'ai jusqu'à présent le pseudocode suivant. Je crois que si l'étape n°1 ne renvoie pas le travail, je dois annuler (ROLLBACK) la transaction. Comment pourrais-je procéder dans le code ci-dessous ?

BEGIN TRANSACTION;

# Update the status of jobs fetched by this query:
SELECT id from jobs WHERE status = "QUEUED" 
ORDER BY created_at ASC LIMIT 1;

# Do the actual update, otherwise abort (i.e. ROLLBACK?)
UPDATE jobs
SET status="RUNNING"
# HERE: Not sure how to make this conditional on the previous ID
# WHERE id = <ID from the previous SELECT>

COMMIT;

P粉239164234
P粉239164234

répondre à tous(2)
P粉536909186

Ce que vous voulez n’est pas encore tout à fait clair. Mais disons que votre tâche est la suivante : trouver le prochain QUEUED 作业。将其状态设置为RUNNING et sélectionner l'ID correspondant.

Dans un environnement monothread, vous pouvez simplement utiliser votre code. Extrayez l'ID sélectionné dans une variable du code de l'application et transmettez-le à la requête UPDATE dans la clause WHERE. Vous n'avez même pas besoin d'une transaction car il n'y a qu'une seule instruction d'écriture. Vous pouvez imiter cela dans SQLscript.

Supposons que ceci soit votre statut actuel :

| id  | created_at          | status   |
| --- | ------------------- | -------- |
| 1   | 2020-06-15 12:00:00 | COMLETED |
| 2   | 2020-06-15 12:00:10 | QUEUED   |
| 3   | 2020-06-15 12:00:20 | QUEUED   |
| 4   | 2020-06-15 12:00:30 | QUEUED   |

Vous souhaitez démarrer le prochain travail en file d'attente (id=2).

SET @id_for_update = (
  SELECT id
  FROM jobs
  WHERE status = 'QUEUED'
  ORDER BY id
  LIMIT 1
);

UPDATE jobs
SET status="RUNNING"
WHERE id = @id_for_update;

SELECT @id_for_update;

Vous obtiendrez

@id_for_update
2

Départ de la dernière sélection. Le tableau aura le statut suivant :

| id  | created_at          | status   |
| --- | ------------------- | -------- |
| 1   | 2020-06-15 12:00:00 | COMLETED |
| 2   | 2020-06-15 12:00:10 | RUNNING  |
| 3   | 2020-06-15 12:00:20 | QUEUED   |
| 4   | 2020-06-15 12:00:30 | QUEUED   |

Vue sur DB Fiddle

Si plusieurs processus lancent des tâches, vous devez utiliser FOR UPDATE 锁定该行。但可以使用LAST_INSERT_ID() pour éviter cela :

À partir de l'état ci-dessus, la tâche 2 est déjà en cours d'exécution :

UPDATE jobs
SET status = 'RUNNING',
    id = LAST_INSERT_ID(id)
WHERE status = 'QUEUED'
ORDER BY id
LIMIT 1;

SELECT LAST_INSERT_ID();

Vous obtiendrez :

| LAST_INSERT_ID() | ROW_COUNT() |
| ---------------- | ----------- |
| 3                | 1           |

Le nouveau statut est :

| id  | created_at          | status   |
| --- | ------------------- | -------- |
| 1   | 2020-06-15 12:00:00 | COMLETED |
| 2   | 2020-06-15 12:00:10 | RUNNING  |
| 3   | 2020-06-15 12:00:20 | RUNNING  |
| 4   | 2020-06-15 12:00:30 | QUEUED   |

Vue sur DB Fiddle

Si l'instruction UPDATE n'a affecté aucune ligne (aucune ligne en file d'attente), ROW_COUNT() 将为 0.

Il peut y avoir certains risques dont je ne suis pas conscient - mais ce n'est pas vraiment comme ça que je l'aborde non plus. Je préfère stocker plus d'informations dans le tableau jobs. Exemple simple :

CREATE TABLE jobs (
  id INT auto_increment primary key,
  created_at timestamp not null default now(),
  updated_at timestamp not null default now() on update now(),
  status varchar(50) not null default 'QUEUED',
  process_id varchar(50) null default null
);

et

UPDATE jobs
SET status = 'RUNNING',
    process_id = 'some_unique_pid'    
WHERE status = 'QUEUED'
ORDER BY id
LIMIT 1;

Maintenant, le travail en cours appartient à un processus spécifique, vous pouvez le sélectionner simplement en utilisant

SELECT * FROM jobs WHERE process_id = 'some_unique_pid';

Vous voudrez peut-être même en savoir plus - par exemple. queued_atstarted_atfinished_at.

P粉635509719

Cette semaine, je mets en œuvre quelque chose de très similaire à votre cas. Plusieurs travailleurs, chacun saisissant la « ligne suivante » dans un ensemble de lignes sur lesquelles travailler.

Le pseudo code est comme ceci :

BEGIN;

SELECT ID INTO @id FROM mytable WHERE status = 'QUEUED' LIMIT 1 FOR UPDATE;

UPDATE mytable SET status = 'RUNNING' WHERE id = @id;

COMMIT;

L'utilisation de FOR UPDATE est importante pour éviter les conditions de concurrence (c'est-à-dire plusieurs travailleurs essayant de récupérer la même ligne).

Voir https://dev.mysql.com/doc/refman/8.0/en/select-into.html pour plus d'informations sur SELECT ... INTO.

Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal