I have a program that basically does three things - set the value of a key, get the value if it exists, or wait until a given The value of the key is available. My initial idea - create a new type with map[string]interface{}
- where the "persistent" value is stored. Other than that, to wait for a value, I plan to use map[string](chan struct{})
. When the set()
method is called, I write to the channel and anyone waiting on it will know the value is there.
I don't know the keys beforehand - they are random. I'm not sure how to implement the wait()
method correctly.
type Map struct { sync.Mutex m map[string]interface{} wait map[string]chan (struct{}) } func (m *Map) Set(key string, value interface{}) { m.ensureWaitChan(key) m.Lock() defer m.Unlock() m.m[key] = value // Signal to all waiting. m.wait[key] <- struct{}{} } func (m *Map) Wait(key string) interface{} { m.ensureWaitChan(key) m.Lock() value, ok := m.m[key] if ok { m.Unlock() return value } m.Unlock() // <------ Unlocked state where something might happen. <-m.wait[key] value := m.m[key] return value } // If the channel does not exist for those waiting - create it. func (m *Map) ensureWaitChan(key string) { m.Lock() defer m.Unlock() _, ok := m.wait[key] if ok { return } m.wait[key] = make(chan struct{}, 100) }
The problem is - there is a race condition in wait()
- after I release the mutex, but before I start listening on the channel for incoming values.
What is the best way to handle this problem? Any other suggestions on how to achieve this are welcome, I'm sure there must be a better way to do this. I'm not polling the value at a fixed interval or anything like that.
What you are looking for is a hybrid between a sync map and a message broker. We can achieve this by leveraging communication and synchronization channels so that subscribers can receive the message as soon as it is published (if it is not already in the cache).
type Map struct { sync.Mutex m map[string]any subs map[string][]chan any } func (m *Map) Set(key string, value any) { m.Lock() defer m.Unlock() m.m[key] = value // Send the new value to all waiting subscribers of the key for _, sub := range m.subs[key] { sub <- value } delete(m.subs, key) } func (m *Map) Wait(key string) any { m.Lock() // Unlock cannot be deferred so we can unblock Set() while waiting value, ok := m.m[key] if ok { m.Unlock() return value } // if there is no value yet, subscribe to any new values for this key ch := make(chan any) m.subs[key] = append(m.subs[key], ch) m.Unlock() return <-ch }
Because subscribers must unlock the map mutex while waiting, they cannot safely access new messages added to the map. We send the new values directly to all subscribers via their own channels so that we don't need to add more syncs in the set
to make sure all subscribers are happy before unlocking the map itself. Unlocking the map early will allow subscribers to read it directly, but will also allow new values to be inserted at the same time, resulting in inconsistent results.
Running version, also includes generic map
implementation with type parameters: https://go.dev /play/p/an7vrspdgmo
The above is the detailed content of Wait for a value in a map to be available in Go. For more information, please follow other related articles on the PHP Chinese website!