J'ai un programme qui fait essentiellement trois choses : définir la valeur d'une clé, obtenir la valeur si elle existe ou attendre que la valeur d'une clé donnée soit disponible. Ma pensée initiale : lors de la création d'une méthode avec map[string]interface{}
的新类型 - 其中存储“持久”值。除此之外,为了等待一个值,我计划使用 map[string](chan struct{})
。当调用 set()
, j'écrirais sur cette chaîne et tous ceux qui l'attendraient sauraient que la valeur est là.
Je ne connais pas les clés à l'avance, elles sont aléatoires. Je ne sais pas comment implémenter correctement la méthode wait()
.
type Map struct { sync.Mutex m map[string]interface{} wait map[string]chan (struct{}) } func (m *Map) Set(key string, value interface{}) { m.ensureWaitChan(key) m.Lock() defer m.Unlock() m.m[key] = value // Signal to all waiting. m.wait[key] <- struct{}{} } func (m *Map) Wait(key string) interface{} { m.ensureWaitChan(key) m.Lock() value, ok := m.m[key] if ok { m.Unlock() return value } m.Unlock() // <------ Unlocked state where something might happen. <-m.wait[key] value := m.m[key] return value } // If the channel does not exist for those waiting - create it. func (m *Map) ensureWaitChan(key string) { m.Lock() defer m.Unlock() _, ok := m.wait[key] if ok { return } m.wait[key] = make(chan struct{}, 100) }
Le problème est - il y a une condition de concurrence critique dans wait()
- après avoir libéré le mutex, mais avant de commencer à écouter sur la chaîne les valeurs entrantes.
Quelle est la meilleure façon de résoudre ce problème ? Toute autre suggestion sur la façon d'y parvenir est la bienvenue, je suis sûr qu'il doit y avoir une meilleure façon de procéder. Je n'interroge pas la valeur à un intervalle fixe ou quelque chose comme ça.
Ce que vous recherchez est un hybride entre une carte de synchronisation et un courtier de messages. Nous pouvons y parvenir en tirant parti des canaux de communication et de synchronisation afin que les abonnés puissent recevoir le message dès sa publication (s'il n'est pas déjà dans le cache).
type Map struct { sync.Mutex m map[string]any subs map[string][]chan any } func (m *Map) Set(key string, value any) { m.Lock() defer m.Unlock() m.m[key] = value // Send the new value to all waiting subscribers of the key for _, sub := range m.subs[key] { sub <- value } delete(m.subs, key) } func (m *Map) Wait(key string) any { m.Lock() // Unlock cannot be deferred so we can unblock Set() while waiting value, ok := m.m[key] if ok { m.Unlock() return value } // if there is no value yet, subscribe to any new values for this key ch := make(chan any) m.subs[key] = append(m.subs[key], ch) m.Unlock() return <-ch }
Étant donné que les abonnés doivent déverrouiller le mutex de la carte en attendant, ils ne peuvent pas accéder en toute sécurité aux nouveaux messages ajoutés à la carte. Nous envoyons les nouvelles valeurs directement à tous les abonnés via leurs propres canaux, nous n'avons donc pas besoin d'ajouter d'autres synchronisations dans set
pour nous assurer que tous les abonnés sont satisfaits avant de déverrouiller la carte elle-même. Déverrouiller la carte plus tôt permettra aux abonnés de la lire directement, mais permettra également d'insérer de nouvelles valeurs en même temps, ce qui entraînera des résultats incohérents.
Version en cours d'exécution, comprend également une implémentation générique map
avec des paramètres de type : https://go.dev/play/p/an7vrspdgmo
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!