Implementierung einer sperrenfreien, unbegrenzten Warteschlange mit dem neuen Typ atomic.Pointer

PHPz
Freigeben: 2024-02-09 11:30:20
nach vorne
847 Leute haben es durchsucht

Implementierung einer sperrenfreien, unbegrenzten Warteschlange mit dem neuen Typ atomic.Pointer

php-Editor Strawberry stellt Ihnen heute eine neue Technologie vor – die Verwendung des neuen Typs atomic.Pointer zur Implementierung einer sperrenfreien und unbegrenzten Warteschlange. Bei der gleichzeitigen Programmierung sind Warteschlangen eine übliche Datenstruktur. Bei herkömmlichen Warteschlangenimplementierungen ist jedoch normalerweise die Verwendung von Sperren erforderlich, um die Thread-Sicherheit zu gewährleisten, was zu Leistungseinbußen führt. Der neue Typ atomic.Pointer bietet eine sperrenfreie Lösung, mit der effiziente gleichzeitige Warteschlangenoperationen erreicht werden können. Im Folgenden stellen wir diese neue Implementierung im Detail vor, sowie ihre Vorteile und deren Verwendung.

Frageninhalt

Ich versuche, diese nicht blockierende Warteschlange von Michael und Scott zu implementieren.

Ich versuche, den neuen Typ atomic.pointer zu verwenden, der in go 1.19 eingeführt wurde, aber in meiner Anwendung treten Datenrennen auf.

Das ist meine Umsetzung:

package queue

import (
    "errors"
    "sync/atomic"
)

// LockfreeQueue represents a FIFO structure with operations to enqueue
// and dequeue generic values.
// Reference: https://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html
type LockFreeQueue[T any] struct {
    head atomic.Pointer[node[T]]
    tail atomic.Pointer[node[T]]
}

// node represents a node in the queue
type node[T any] struct {
    value T
    next  atomic.Pointer[node[T]]
}

// newNode creates and initializes a node
func newNode[T any](v T) *node[T] {
    return &node[T]{value: v}
}

// NewQueue creates and initializes a LockFreeQueue
func NewLockFreeQueue[T any]() *LockFreeQueue[T] {
    var head atomic.Pointer[node[T]]
    var tail atomic.Pointer[node[T]]
    n := &node[T]{}
    head.Store(n)
    tail.Store(n)
    return &LockFreeQueue[T]{
        head: head,
        tail: tail,
    }
}

// Enqueue adds a series of Request to the queue
func (q *LockFreeQueue[T]) Enqueue(v T) {
    n := newNode(v)
    for {
        tail := q.tail.Load()
        next := tail.next.Load()
        if tail == q.tail.Load() {
            if next == nil {
                if tail.next.CompareAndSwap(next, n) {
                    q.tail.CompareAndSwap(tail, n)
                    return
                }
            } else {
                q.tail.CompareAndSwap(tail, next)
            }
        }
    }
}

// Dequeue removes a Request from the queue
func (q *LockFreeQueue[T]) Dequeue() (T, error) {
    for {
        head := q.head.Load()
        tail := q.tail.Load()
        next := head.next.Load()
        if head == q.head.Load() {
            if head == tail {
                if next == nil {
                    return head.value, errors.New("queue is empty")
                }
                q.tail.CompareAndSwap(tail, next)
            } else {
                v := next.value
                if q.head.CompareAndSwap(head, next) {
                    return v, nil
                }
            }
        }
    }
}

// Check if the queue is empty.
func (q *LockFreeQueue[T]) IsEmpty() bool {
        return q.head.Load() == q.tail.Load()
}
Nach dem Login kopieren

Ich habe hier eine andere Implementierung gefunden, die in meiner App ohne Datenrennen funktioniert, aber ich kann anscheinend nicht herausfinden, was genau der Unterschied zwischen den beiden ist.

Danke für jede Hilfe oder Rückmeldung!

Workaround

Es stellt sich heraus, dass die Änderung einiger Dinge das Problem lösen kann.

Erste Änderung:

var n = node[t]{}
head.store(&n)
tail.store(&n)
Nach dem Login kopieren

Die zweite Änderung besteht darin, die dequeue() Rücksendesignatur zu ändern.

Die endgültige Datei sieht so aus:

package queue

import (
    "sync/atomic"
)

// LockfreeQueue represents a FIFO structure with operations to enqueue
// and dequeue generic values.
// Reference: https://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html
type LockFreeQueue[T any] struct {
    head atomic.Pointer[node[T]]
    tail atomic.Pointer[node[T]]
}

// node represents a node in the queue
type node[T any] struct {
    value T
    next  atomic.Pointer[node[T]]
}

// newNode creates and initializes a node
func newNode[T any](v T) *node[T] {
    return &node[T]{value: v}
}

// NewQueue creates and initializes a LockFreeQueue
func NewLockFreeQueue[T any]() *LockFreeQueue[T] {
    var head atomic.Pointer[node[T]]
    var tail atomic.Pointer[node[T]]
    var n = node[T]{}
    head.Store(&n)
    tail.Store(&n)
    return &LockFreeQueue[T]{
        head: head,
        tail: tail,
    }
}

// Enqueue adds a series of Request to the queue
func (q *LockFreeQueue[T]) Enqueue(v T) {
    n := newNode(v)
    for {
        tail := q.tail.Load()
        next := tail.next.Load()
        if tail == q.tail.Load() {
            if next == nil {
                if tail.next.CompareAndSwap(next, n) {
                    q.tail.CompareAndSwap(tail, n)
                    return
                }
            } else {
                q.tail.CompareAndSwap(tail, next)
            }
        }
    }
}

// Dequeue removes a Request from the queue
func (q *LockFreeQueue[T]) Dequeue() T {
    var t T
    for {
        head := q.head.Load()
        tail := q.tail.Load()
        next := head.next.Load()
        if head == q.head.Load() {
            if head == tail {
                if next == nil {
                    return t
                }
                q.tail.CompareAndSwap(tail, next)
            } else {
                v := next.value
                if q.head.CompareAndSwap(head, next) {
                    return v
                }
            }
        }
    }
}

// Check if the queue is empty.
func (q *LockFreeQueue[T]) IsEmpty() bool {
    return q.head.Load() == q.tail.Load()
}
Nach dem Login kopieren

Das obige ist der detaillierte Inhalt vonImplementierung einer sperrenfreien, unbegrenzten Warteschlange mit dem neuen Typ atomic.Pointer. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Quelle:stackoverflow.com
Erklärung dieser Website
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn
Beliebte Tutorials
Mehr>
Neueste Downloads
Mehr>
Web-Effekte
Quellcode der Website
Website-Materialien
Frontend-Vorlage
Über uns Haftungsausschluss Sitemap
Chinesische PHP-Website:Online-PHP-Schulung für das Gemeinwohl,Helfen Sie PHP-Lernenden, sich schnell weiterzuentwickeln!