Table des matières
Contenu de la question
Solution
Problème
Conception fiable
manager.go
main.go
manager_test.go
Maison développement back-end Golang Problèmes de synchronisation dans ce programme Golang

Problèmes de synchronisation dans ce programme Golang

Feb 10, 2024 am 09:06 AM
Mécanisme de synchronisation

Problèmes de synchronisation dans ce programme Golang

L'éditeur PHP Apple vous présentera aujourd'hui un sujet intéressant - "Problèmes de synchronisation dans ce programme Golang". Lors de l'écriture de programmes concurrents, nous rencontrons souvent des problèmes de synchronisation, c'est-à-dire de compétition et de coordination entre plusieurs threads. En tant que langage de programmation concurrent, Golang fournit une multitude de mécanismes et d'outils de synchronisation, mais il existe également certains problèmes de synchronisation courants qui nécessitent notre attention et nos solutions. Cet article explorera ces problèmes en détail et proposera les solutions correspondantes pour aider chacun à mieux comprendre et gérer les défis de synchronisation dans Golang. Explorons ensemble !

Contenu de la question

J'essaie de créer un programme qui agit comme un serveur proxy et peut basculer dynamiquement vers un nouveau point de terminaison. Mais j'ai rencontré un problème, lors de l'appel switchovertonewendpoint() 后,仍然有一些代理对象连接到原始端点 8.8.8.8, il devrait être éteint.

package main

import (
    "net"
    "sync"
    "sync/atomic"
    "time"
)

type proxy struct {
    id       int32
    from, to *net.tcpconn
}

var switchover int32 = 0

func setswitchover() {
    atomic.storeint32((*int32)(&switchover), 1)
}

func switchoverenabled() bool {
    return atomic.loadint32((*int32)(&switchover)) == 1
}

var proxies map[int32]*proxy = make(map[int32]*proxy, 0)
var proxyseq int32 = 0
var mu sync.rwmutex

func addproxy(from *net.tcpconn) {
    mu.lock()
    proxyseq += 1
    proxy := &proxy{id: proxyseq, from: from}
    proxies[proxyseq] = proxy
    mu.unlock()

    var toaddr string
    if switchoverenabled() {
        toaddr = "1.1.1.1"
    } else {
        toaddr = "8.8.8.8"
    }
    tcpaddr, _ := net.resolvetcpaddr("tcp4", toaddr)
    toconn, err := net.dialtcp("tcp", nil, tcpaddr)
    if err != nil {
        panic(err)
    }
    proxy.to = toconn
}

func switchovertonewendpoint() {
    mu.rlock()
    closedproxies := proxies
    mu.runlock()

    setswitchover()
    for _, proxy := range closedproxies {
        proxy.from.close()
        proxy.to.close()
        mu.lock()
        delete(proxies, proxy.id)
        mu.unlock()
    }
}

func main() {
    tcpaddr, _ := net.resolvetcpaddr("tcp4", "0.0.0.0:5432")
    ln, _ := net.listentcp("tcp", tcpaddr)
    go func() {
        time.sleep(time.second * 30)
        switchovertonewendpoint()
    }()
    for {
        clientconn, err := ln.accepttcp()
        if err != nil {
            panic(err)
        }
        go addproxy(clientconn)
    }
}
Copier après la connexion

Après y avoir réfléchi un moment, je suppose que le problème est

mu.rlock()
    closedproxies := proxies
    mu.runlock()
Copier après la connexion

Mais je ne sais pas si c'est la cause première et si elle peut être corrigée en la remplaçant par ce qui suit :

closedProxies := make([]*Proxy, 0)
    mu.RLock()
    for _, proxy := range proxies {
        closedProxies = append(closedProxies, proxy)
    }
    mu.RUnlock()
Copier après la connexion

Ce cas étant difficile à reproduire, des professionnels peuvent-ils apporter des idées ou des conseils ? Tous les commentaires sont les bienvenus. Merci d'avance.

Solution

Problème

Le changement est nécessaire. Dans l'implémentation originale, latedproxies contenait le même mappage. Voir cette démo :

package main

import "fmt"

func main() {
    proxies := make(map[int]int, 0)
    for i := 0; i < 10; i++ {
        proxies[i] = i
    }

    closeproxies := proxies

    proxies[10] = 10
    proxies[11] = 11

    for k := range closeproxies {
        delete(proxies, k)
    }

    fmt.printf("items left: %d\n", len(proxies))
    // output:
    //   items left: 0
}
Copier après la connexion

Mais ce n’est pas la cause profonde. Peut être copié closeproxies 之后但在调用 setswitchover 之前添加新代理。在这种情况下,新代理连接到旧地址,但不在 closeproxies. Je pense que c'est la cause profonde.

Encore une question. Fermez ce proxy avant de définir le champ to 字段之前,将向 proxies 添加新代理。程序可能希望在设置 to, provoquant une panique.

Conception fiable

L'idée est de regrouper tous les points de terminaison dans une tranche et de laisser chaque point de terminaison gérer sa propre liste de proxy. Il nous suffit donc de garder une trace de l'index du point de terminaison actuel. Lorsque nous voulons passer à un autre point de terminaison, nous modifions simplement l'index et demandons au point de terminaison obsolète d'effacer son proxy. La seule complication restante consiste à garantir que les points de terminaison obsolètes effacent tous leurs proxys. Voir la mise en œuvre ci-dessous :

manager.go

C'est la concrétisation de cette idée.

package main

import (
    "sync"
)

// conn is abstraction of a connection to make manager easy to test.
type conn interface {
    close() error
}

// dialer is abstraction of a dialer to make manager easy to test.
type dialer interface {
    dial(addr string) (conn, error)
}

type manager struct {
    // mucurrent protects the "current" member.
    mucurrent sync.rwmutex
    current   int // when current is -1, the manager is shuted down.
    endpoints []*endpoint

    // mu protects the whole switch action.
    mu sync.mutex
}

func newmanager(dialer dialer, addresses ...string) *manager {
    if len(addresses) < 2 {
        panic("a manger should handle at least 2 addresses")
    }

    endpoints := make([]*endpoint, len(addresses))
    for i, addr := range addresses {
        endpoints[i] = &endpoint{
            address: addr,
            dialer:  dialer,
        }
    }
    return &manager{
        endpoints: endpoints,
    }
}

func (m *manager) addproxy(from conn) {
    // 1. addproxy will wait when the write lock of m.mucurrent is taken.
    // once the write lock is released, addproxy will connect to the new endpoint.
    // switch only holds the write lock for a short time, and switch is called
    // not so frequently, so addproxy won't wait too much.
    // 2. switch will wait if there is any addproxy holding the read lock of
    // m.mucurrent. that means switch waits longer. the advantage is that when
    // e.clear is called in switch, all addproxy requests to the old endpoint
    // are done. so it's safe to call e.clear then.
    m.mucurrent.rlock()
    defer m.mucurrent.runlock()

    current := m.current

    // do not accept any new connection when m has been shutdown.
    if current == -1 {
        from.close()
        return
    }

    m.endpoints[current].addproxy(from)
}

func (m *manager) switch() {
    // in a real world, switch is called not so frequently.
    // so it's ok to add a lock here.
    // and it's necessary to make sure the old endpoint is cleared and ready
    // for use in the future.
    m.mu.lock()
    defer m.mu.unlock()

    // take the write lock of m.mucurrent.
    // it waits for all the addproxy requests holding the read lock to finish.
    m.mucurrent.lock()
    old := m.current

    // do nothing when m has been shutdown.
    if old == -1 {
        m.mucurrent.unlock()
        return
    }
    next := old + 1
    if next >= len(m.endpoints) {
        next = 0
    }
    m.current = next
    m.mucurrent.unlock()

    // when it reaches here, all addproxy requests to the old endpoint are done.
    // and it's safe to call e.clear now.
    m.endpoints[old].clear()
}

func (m *manager) shutdown() {
    m.mu.lock()
    defer m.mu.unlock()

    m.mucurrent.lock()
    current := m.current
    m.current = -1
    m.mucurrent.unlock()

    m.endpoints[current].clear()
}

type proxy struct {
    from, to conn
}

type endpoint struct {
    address string
    dialer  dialer

    mu      sync.mutex
    proxies []*proxy
}

func (e *endpoint) clear() {
    for _, p := range e.proxies {
        p.from.close()
        p.to.close()
    }

    // assign a new slice to e.proxies, and the gc will collect the old one.
    e.proxies = []*proxy{}
}

func (e *endpoint) addproxy(from conn) {
    toconn, err := e.dialer.dial(e.address)
    if err != nil {
        // close the from connection so that the client will reconnect?
        from.close()
        return
    }

    e.mu.lock()
    defer e.mu.unlock()
    e.proxies = append(e.proxies, &proxy{from: from, to: toconn})
}
Copier après la connexion

main.go

Cette démo montre comment utiliser le managerType :

précédemment implémenté
package main

import (
    "net"
    "time"
)

type realdialer struct{}

func (d realdialer) dial(addr string) (conn, error) {
    tcpaddr, err := net.resolvetcpaddr("tcp4", addr)
    if err != nil {
        return nil, err
    }
    return net.dialtcp("tcp", nil, tcpaddr)
}

func main() {
    manager := newmanager(realdialer{}, "1.1.1.1", "8.8.8.8")

    tcpaddr, _ := net.resolvetcpaddr("tcp4", "0.0.0.0:5432")
    ln, _ := net.listentcp("tcp", tcpaddr)

    go func() {
        for range time.tick(30 * time.second) {
            manager.switch()
        }
    }()
    for {
        clientconn, err := ln.accepttcp()
        if err != nil {
            panic(err)
        }
        go manager.addproxy(clientconn)
    }
}
Copier après la connexion

manager_test.go

Exécutez le test en utilisant la commande suivante : go test ./... -race -count 10

package main

import (
    "errors"
    "math/rand"
    "sync"
    "sync/atomic"
    "testing"
    "time"

    "github.com/google/uuid"
)

func TestManager(t *testing.T) {
    addresses := []string{"1.1.1.1", "8.8.8.8"}
    dialer := newDialer(addresses...)
    manager := NewManager(dialer, addresses...)

    ch := make(chan int, 1)

    var wg sync.WaitGroup
    wg.Add(1)
    go func() {
        for range ch {
            manager.Switch()
        }
        wg.Done()
    }()

    count := 1000
    total := count * 10

    wg.Add(total)

    fromConn := &fakeFromConn{}
    for i := 0; i < total; i++ {
        if i%count == count-1 {
            ch <- 0
        }
        go func() {
            manager.AddProxy(fromConn)
            wg.Done()
        }()
    }
    close(ch)

    wg.Wait()

    manager.Shutdown()

    for _, s := range dialer.servers {
        left := len(s.conns)
        if left != 0 {
            t.Errorf("server %s, unexpected connections left: %d", s.addr, left)
        }
    }

    closedCount := fromConn.closedCount.Load()

    if closedCount != int32(total) {
        t.Errorf("want closed count: %d, got: %d", total, closedCount)
    }
}

type fakeFromConn struct {
    closedCount atomic.Int32
}

func (c *fakeFromConn) Close() error {
    c.closedCount.Add(1)

    return nil
}

type fakeToConn struct {
    id     uuid.UUID
    server *fakeServer
}

func (c *fakeToConn) Close() error {
    if c.id == uuid.Nil {
        return nil
    }

    c.server.removeConn(c.id)

    return nil
}

type fakeServer struct {
    addr  string
    mu    sync.Mutex
    conns map[uuid.UUID]bool
}

func (s *fakeServer) addConn() (uuid.UUID, error) {
    s.mu.Lock()
    defer s.mu.Unlock()

    id, err := uuid.NewRandom()
    if err == nil {
        s.conns[id] = true
    }
    return id, err
}

func (s *fakeServer) removeConn(id uuid.UUID) {
    s.mu.Lock()
    defer s.mu.Unlock()

    delete(s.conns, id)
}

type fakeDialer struct {
    servers map[string]*fakeServer
}

func newDialer(addresses ...string) *fakeDialer {
    servers := make(map[string]*fakeServer)
    for _, addr := range addresses {
        servers[addr] = &fakeServer{
            addr:  addr,
            conns: make(map[uuid.UUID]bool),
        }
    }
    return &fakeDialer{
        servers: servers,
    }
}

func (d *fakeDialer) Dial(addr string) (Conn, error) {
    n := rand.Intn(100)
    if n == 0 {
        return nil, errors.New("fake network error")
    }
    // Simulate network latency.
    time.Sleep(time.Duration(n) * time.Millisecond)

    s := d.servers[addr]
    id, err := s.addConn()
    if err != nil {
        return nil, err
    }
    conn := &fakeToConn{
        id:     id,
        server: s,
    }
    return conn, nil
}
Copier après la connexion

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn

Outils d'IA chauds

Undresser.AI Undress

Undresser.AI Undress

Application basée sur l'IA pour créer des photos de nu réalistes

AI Clothes Remover

AI Clothes Remover

Outil d'IA en ligne pour supprimer les vêtements des photos.

Undress AI Tool

Undress AI Tool

Images de déshabillage gratuites

Clothoff.io

Clothoff.io

Dissolvant de vêtements AI

AI Hentai Generator

AI Hentai Generator

Générez AI Hentai gratuitement.

Article chaud

R.E.P.O. Crystals d'énergie expliqués et ce qu'ils font (cristal jaune)
3 Il y a quelques semaines By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. Meilleurs paramètres graphiques
3 Il y a quelques semaines By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. Comment réparer l'audio si vous n'entendez personne
3 Il y a quelques semaines By 尊渡假赌尊渡假赌尊渡假赌
WWE 2K25: Comment déverrouiller tout dans Myrise
4 Il y a quelques semaines By 尊渡假赌尊渡假赌尊渡假赌

Outils chauds

Bloc-notes++7.3.1

Bloc-notes++7.3.1

Éditeur de code facile à utiliser et gratuit

SublimeText3 version chinoise

SublimeText3 version chinoise

Version chinoise, très simple à utiliser

Envoyer Studio 13.0.1

Envoyer Studio 13.0.1

Puissant environnement de développement intégré PHP

Dreamweaver CS6

Dreamweaver CS6

Outils de développement Web visuel

SublimeText3 version Mac

SublimeText3 version Mac

Logiciel d'édition de code au niveau de Dieu (SublimeText3)

La relation parent-enfant entre les fonctions golang et goroutine La relation parent-enfant entre les fonctions golang et goroutine Apr 25, 2024 pm 12:57 PM

Il existe une relation parent-enfant entre les fonctions et les goroutines dans Go. La goroutine parent crée la goroutine enfant, et la goroutine enfant peut accéder aux variables de la goroutine parent mais pas l'inverse. Créez une goroutine enfant à l'aide du mot-clé go, et la goroutine enfant est exécutée via une fonction anonyme ou une fonction nommée. La goroutine parent peut attendre que la goroutine enfant se termine via sync.WaitGroup pour s'assurer que le programme ne se termine pas avant que toutes les goroutines enfants ne soient terminées.

Comparaison des avantages et des inconvénients des fonctions golang et goroutine Comparaison des avantages et des inconvénients des fonctions golang et goroutine Apr 25, 2024 pm 12:30 PM

Les fonctions sont utilisées pour effectuer des tâches de manière séquentielle et sont simples et faciles à utiliser, mais elles présentent des problèmes de blocage et de contraintes de ressources. Goroutine est un thread léger qui exécute des tâches simultanément. Il possède des capacités élevées de simultanéité, d'évolutivité et de traitement des événements, mais il est complexe à utiliser, coûteux et difficile à déboguer. En combat réel, Goroutine a généralement de meilleures performances que les fonctions lors de l'exécution de tâches simultanées.

Programmation simultanée C++ : comment gérer la communication inter-thread ? Programmation simultanée C++ : comment gérer la communication inter-thread ? May 04, 2024 pm 12:45 PM

Les méthodes de communication inter-thread en C++ incluent : la mémoire partagée, les mécanismes de synchronisation (verrous mutex, variables de condition), les canaux et les files d'attente de messages. Par exemple, utilisez un verrou mutex pour protéger un compteur partagé : déclarez un verrou mutex (m) et une variable partagée (counter) ; chaque thread met à jour le compteur en verrouillant (lock_guard) ; pour éviter les conditions de course.

Comment se comportent les fonctions PHP dans un environnement multithread ? Comment se comportent les fonctions PHP dans un environnement multithread ? Apr 16, 2024 am 10:48 AM

Dans un environnement multi-thread, le comportement des fonctions PHP dépend de leur type : Fonctions normales : thread-safe, peuvent être exécutées simultanément. Fonctions qui modifient les variables globales : dangereuses, doivent utiliser un mécanisme de synchronisation. Fonction d'opération de fichier : dangereuse, nécessité d'utiliser un mécanisme de synchronisation pour coordonner l'accès. Fonction d'exploitation de la base de données : dangereux, le mécanisme du système de base de données doit être utilisé pour éviter les conflits.

Sous le boom de l'IA, quels changements se sont produits dans le domaine de l'infrastructure de la chaîne publique ? Sous le boom de l'IA, quels changements se sont produits dans le domaine de l'infrastructure de la chaîne publique ? Apr 13, 2024 pm 04:49 PM

Sur le marché primaire ces derniers temps, la piste la plus en vogue est sans aucun doute l'IA, suivie du BTC. 80 % des projets discutés chaque jour sont concentrés dans ces deux pistes, au maximum, je peux parler de 5 ou 6 projets d'IA par jour. Il est prévisible que la bulle de l'IA atteindra son apogée l'année prochaine. Avec la mise en ligne de centaines de nouveaux projets d'IA, la valeur marchande de la piste IA atteindra son apogée lorsque la bulle éclatera enfin. est perdu, une véritable industrie va naître. La licorne qui trouve le point d'adaptation d'AIXCrypto continuera à faire avancer cette piste et l'ensemble de l'industrie. Ainsi, dans l'environnement surchauffé actuel de l'IA, nous devons nous calmer et examiner les changements qui ont eu lieu au niveau Infra ces derniers mois, en particulier dans la piste Infra de la chaîne publique. Certaines des nouveautés méritent d'être mentionnées. 1.ET

Quels sont les frameworks et bibliothèques de programmation concurrente en C++ ? Quels sont leurs avantages et limites respectifs ? Quels sont les frameworks et bibliothèques de programmation concurrente en C++ ? Quels sont leurs avantages et limites respectifs ? May 07, 2024 pm 02:06 PM

Le cadre de programmation simultanée C++ propose les options suivantes : threads légers (std::thread) ; conteneurs et algorithmes de concurrence Boost sécurisés pour les threads ; OpenMP pour les multiprocesseurs à mémoire partagée ; bibliothèque d'opérations d'interaction simultanée C++ multiplateforme ; (cpp-Concur).

Comment utiliser volatile en Java Comment utiliser volatile en Java May 01, 2024 pm 06:42 PM

Le mot-clé volatile est utilisé pour modifier les variables afin de garantir que tous les threads peuvent voir la dernière valeur de la variable et de garantir que la modification de la variable est une opération ininterrompue. Les principaux scénarios d'application incluent des variables partagées multithread, des barrières de mémoire et une programmation simultanée. Cependant, il convient de noter que volatile ne garantit pas la sécurité des threads et peut réduire les performances. Il ne doit être utilisé qu'en cas d'absolue nécessité.

Quelles sont les méthodes courantes d'optimisation des performances des programmes ? Quelles sont les méthodes courantes d'optimisation des performances des programmes ? May 09, 2024 am 09:57 AM

Les méthodes d'optimisation des performances du programme comprennent : Optimisation de l'algorithme : choisissez un algorithme avec une complexité temporelle moindre et réduisez les boucles et les instructions conditionnelles. Sélection de structure de données : sélectionnez les structures de données appropriées en fonction des modèles d'accès aux données, telles que les arbres de recherche et les tables de hachage. Optimisation de la mémoire : évitez de créer des objets inutiles, libérez la mémoire qui n'est plus utilisée et utilisez la technologie des pools de mémoire. Optimisation des threads : identifiez les tâches pouvant être parallélisées et optimisez le mécanisme de synchronisation des threads. Optimisation de la base de données : créez des index pour accélérer la récupération des données, optimisez les instructions de requête et utilisez des bases de données en cache ou NoSQL pour améliorer les performances.

See all articles