Maison > développement back-end > Golang > le corps du texte

Résoudre le défi du milliard de lignes en Go (de à s)

王林
Libérer: 2024-08-05 20:04:32
original
565 Les gens l'ont consulté

Resolvendo o desafio de um bilhão de linhas em Go (de para s)

Il y a quelque temps, un ami m'a parlé d'un défi qui consistait à lire un fichier de 1 milliard de lignes. J'ai trouvé l'idée très intéressante, mais comme c'était la semaine des examens à la fac, j'ai fini par laisser ça pour voir plus tard. Des mois plus tard, j'ai vu une vidéo de Theo sur le défi et j'ai décidé d'y regarder de plus près.

L'objectif du One Billion Row Challenge est de calculer la température minimale, maximale et moyenne d'une liste de villes - le détail est qu'il y a 1 milliard d'éléments dans cette liste, où chaque élément est constitué du nom d'une ville. et une température, chaque ville peut apparaître plus d'une fois. Enfin, le programme doit afficher ces valeurs par ordre alphabétique par nom de ville.

Je pensais que ce serait amusant d'essayer de résoudre le défi, même s'il n'y avait pas de récompense. Quoi qu'il en soit, j'ai écrit ce texte décrivant mon processus.

Première tentative : faire fonctionner le code

Chaque fois que j'ai besoin de résoudre un problème plus compliqué, mon premier objectif est de faire fonctionner le programme. Ce n'est peut-être pas le code le plus rapide ou le plus propre, mais c'est un code qui fonctionne.

En gros, j'ai créé la structure Location pour représenter chaque ville de la liste, contenant la température minimale et maximale, la somme des températures et combien de fois la ville apparaît dans la liste (ces deux dernières sont nécessaires pour calculer la moyenne) . Je sais qu'il existe un moyen de calculer directement la moyenne, sans avoir à stocker le nombre de températures et leur somme. Mais comme je l'ai mentionné plus tôt, l'objectif était de faire fonctionner le code.

La liste des données est composée du nom de la ville suivi de la température, séparé par un point-virgule. Par exemple :

Antananarivo;15.6
Iqaluit;-20.7
Dolisie;25.8
Kuopio;-6.8
Copier après la connexion

Le moyen le plus simple de lire des données consiste à utiliser Scan, qui vous permet de lire une ligne à la fois. Avec la ligne, vous pouvez le diviser en deux parties : les valeurs avant et après le point-virgule. Pour obtenir la température, vous pouvez utiliser strconv.ParseFloat, qui convertit une chaîne en float. Le code complet de la première implémentation peut être consulté ci-dessous :

package main

import (
    "bufio"
    "fmt"
    "math"
    "os"
    "sort"
    "strconv"
    "strings"
)

type Location struct {
    min   float64
    max   float64
    sum   float64
    count int
}

func NewLocation() *Location {
    return &Location{
        min:   math.MaxInt16,
        max:   math.MinInt16,
        sum:   0,
        count: 0,
    }
}

func (loc *Location) Add(temp float64) {
    if temp < loc.min {
        loc.min = temp
    } else if temp > loc.max {
        loc.max = temp
    }

    loc.sum += temp
    loc.count += 1
}

var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file")

func main() {
    flag.Parse()
    if *cpuprofile != "" {
        f, err := os.Create(*cpuprofile)
        if err != nil {
            log.Fatal(err)
        }
        pprof.StartCPUProfile(f)
        defer pprof.StopCPUProfile()
    }

    file, _ := os.Open("./measurements.txt")
    defer file.Close()

    m := map[string]*Location{}

    scanner := bufio.NewScanner(file)
    for scanner.Scan() {
        line := scanner.Text()
        name, tempStr, _ := strings.Cut(line, ";")
        temp, _ := strconv.ParseFloat(tempStr, 32)

        loc, ok := m[name]
        if !ok {
            loc = NewLocation()
            m[name] = loc
        }
        loc.Add(temp)
    }

    keys := make([]string, 0, len(m))
    for k := range m {
        keys = append(keys, k)
    }
    sort.Strings(keys)

    for _, name := range keys {
        loc := m[name]
        mean := loc.sum / float64(loc.count)
        fmt.Printf("%s: %.1f/%.1f/%.1f\n", name, loc.min, mean, loc.max)
    }
}
Copier après la connexion

Cette version plus simple a pris environ 97 secondes à exécuter.

Optimiser la conversion des chaînes en flottants

En analysant le profil d'exécution, j'ai réalisé que l'un des plus gros goulots d'étranglement était la fonction strconv.ParseFloat. En gros, son temps d'exécution total était de 23 secondes (environ 23 % du temps total).

Le problème avec cette fonction est qu'elle est générique, c'est-à-dire qu'elle est conçue pour fonctionner avec n'importe quel float valide. Cependant, nos données ont un format de température très spécifique. Voir l'exemple ci-dessous :

Antananarivo;15.6
Iqaluit;-20.7
Dolisie;5.8
Kuopio;-6.8
Copier après la connexion

Le format de la température est toujours le même : un ou deux chiffres avant le point et un chiffre après le point, et peut inclure un signe moins au début. Ainsi, nous pouvons créer une fonction qui convertit les valeurs d'une manière spécifique, optimisant le processus sans avoir besoin d'effectuer toutes les vérifications génériques de ParseFloat.

func bytesToTemp(b []byte) float64 {
    var v int16
    var isNeg int16 = 1

    for i := 0; i < len(b)-1; i++ {
        char := b[i]
        if char == '-' {
            isNeg = -1
        } else if char == '.' {
            digit := int16(b[i+1] - '0')
            v = v*10 + digit
        } else {
            digit := int16(char - '0')
            v = v*10 + digit
        }
    }

    return float64(v*isNeg) / 10
}
Copier après la connexion

Pour lire les données au format octet au lieu de chaîne, j'ai modifié le retour du scanner de chaîne en octets

line := scanner.Bytes()
before, after, _ := bytes.Cut(line, []byte{';'})
name := string(before)
temp := bytesToTemp(after)
Copier après la connexion

Ces petits changements ont réduit le temps d'exécution à 75 secondes.

Lire de plus gros morceaux de données

Le plus grand avantage de l'utilisation de Scan est que le programme n'a pas besoin de charger l'intégralité du fichier en mémoire en une seule fois. Au lieu de cela, il vous permet de lire ligne par ligne, en échangeant les performances contre de la mémoire.

Il est important de noter qu'il existe un compromis entre lire une ligne à la fois et charger 14 Go de données à la fois. Ce juste milieu consiste à lire des morceaux, qui sont des morceaux de mémoire. De cette façon, au lieu de lire l'intégralité du fichier d'un coup, nous pouvons lire des blocs de 128 Mo.

buf := make([]byte, chunkSize)
reader := bufio.NewReader(file)
var leftData []byte
for {
    n, err := reader.Read(buf)
    if err != nil {
        if err == io.EOF {
            break
        }
        panic(err)
    }

    chunk := append(leftData, buf[:n]...)
    lastIndex := bytes.LastIndex(chunk, []byte{'\n'})
    leftData = chunk[lastIndex+1:]

    lines := bytes.Split(chunk[:lastIndex], []byte{'\n'})

    for _, line := range lines {
        before, after, _ := bytes.Cut(line, []byte{';'})
        name := string(before)
        temp := bytesToTemp(after)

        loc, ok := m[name]
        if !ok {
            loc = NewLocation()
            m[name] = loc
        }
        loc.Add(temp)
    }
}

Copier après la connexion

En conséquence, le temps d'exécution est tombé à 70 secondes. Mieux qu'avant, mais il y a encore place à l'amélioration.

Modification des types de données

C'est un fait que tout le défi tourne autour de nombres avec décimales. Cependant, gérer les virgules flottantes constitue toujours un défi de taille (voir IEEE-754). Dans ce cas, pourquoi ne pas utiliser des nombres entiers pour représenter la température ?

type Location struct {
    min   int16
    max   int16
    sum   int32
    count int32
}
Copier après la connexion

Comme défini précédemment, une température est toujours représentée par jusqu'à trois chiffres. Par conséquent, en supprimant la virgule, les valeurs peuvent varier entre -999 et 999, donc int16 suffit à les représenter. Pour additionner et compter, int32 est largement suffisant, car ce type peut varier entre -2147483648 et 2147483647.

Dado que agora esperamos um valor inteiro de 16 bits para a temperatura, precisamos modificar a função bytesToTemp. Para isso, mudamos o retorno para int16 e removemos a divisão no final. Assim, a função vai sempre vai retornar um número inteiro.

func bytesToTemp(b []byte) int16 {
    var v int16
    var isNeg int16 = 1

    for i := 0; i < len(b)-1; i++ {
        char := b[i]
        if char == '-' {
            isNeg = -1
        } else if char == '.' {
            digit := int16(b[i+1] - '0')
            v = v*10 + digit
        } else {
            digit := int16(char - '0')
            v = v*10 + digit
        }
    }

    return v * isNeg
}
Copier après la connexion

Para finalizar, modifiquei a função Add para aceitar os valores inteiros e ajustei o print para dividir os valores antes de mostrá-los na tela. Com isso, o tempo caiu três segundos, indo para 60 segundos. Não é muito, mas uma vitória é uma vitória.

Melhorando a Performance da Conversão de Bytes para String

Novamente analisando o profile, vi que tinha uma certa função chamada slicebytetostring que custava 13,5 segundos de tempo de execução. Analisando, descobri que essa função é a responsável por converter um conjunto de bytes em uma string (o próprio nome da função deixa claro isso). No caso, essa é a função chamada internamente quando se usa a função string(bytes).

Em Go, assim como na maioria das linguagens, strings são imutáveis, o que significa que não podem ser modificadas após serem criadas (normalmente, quando se faz isso, uma nova string é criada). Por outro lado, listas são mutáveis. Ou seja, quando se faz uma conversão de uma lista de bytes para string, é preciso criar uma cópia da lista para garantir que a string não mude se a lista mudar.

Para evitar o custo adicional de alocação de memória nessas conversões, podemos utilizar a biblioteca unsafe para realizar a conversão de bytes para string (Nota: ela é chamada de unsafe por um motivo).

name := unsafe.String(unsafe.SliceData(before), len(before))
Copier après la connexion

Diferente do caso anterior, a função acima reutiliza os bytes passados para gerar a string. O problema disso é que, se a lista original mudar, a string resultante também será afetada. Embora possamos garantir que isso não ocorrerá neste contexto específico, em aplicações maiores e mais complexas, o uso de unsafe pode se tornar bem inseguro.

Com essa mudança, reduzimos o tempo de execução para 51 segundos. Nada mal.

Reimplementando bytes.Cut

Lembra que eu mencionei que as temperaturas sempre tinham formatos específicos? Então, segundo o profile da execução, que separa a linha em duas partes (nome da cidade e temperatura), custa 5.38 segundos para rodar. E refizermos ela na mão?

Para separar os dois valores, precisamos encontrar onde está o ";". Como a gente já sabe, os valores da temperatura podem ter entre três e cinco caracteres. Assim, precisamos verificar se o caractere anterior aos dígitos é um ";". Simples, não?

idx := 0
if line[len(line)-4] == ';' {
    idx = len(line) - 4
} else if line[len(line)-5] == ';' {
    idx = len(line) - 5
} else {
    idx = len(line) - 6
}

before := line[:idx]
after := line[idx+1:]
Copier après la connexion

Com isso, o tempo de execução foi para 46 segundos, cerca de 5 segundos a menos que antes (quem diria, não é?).

Paralelismo para acelerar o processamento

Todo esse tempo, nosso objetivo foi tornar o código o mais rápido possível em um núcleo. Mudando coisas aqui e ali, diminuímos o tempo de 97 segundos para 46 segundos. Claro, ainda daria para melhorar o tempo sem ter que lidar com paralelismo, mas a vida é curta demais para se preocupar com isso, não é?

Para poder rodar o código em vários núcleos, decidi usar a estrutura de canais nativa do Go. Além disso, também criei um grupo de espera que vai indicar quando o processamento dos dados terminaram.

Vale destacar que workers, nesse caso, é uma constante que define quantas goroutines serão criadas para processar os dados. No meu caso, são 12, visto que eu tenho um processador com 6 núcleos e 12 threads.

chunkChan := make(chan []byte, workers)
var wg sync.WaitGroup
wg.Add(workers)
Copier après la connexion

O próximo passo foi criar as goroutines que serão responsável por receber os dados do canal e processá-los. A lógica de processamento dos dados é semelhante ao modelo single thread.

for i := 0; i < workers; i++ {
    go func() {
        for chunk := range chunkChan {
            lines := bytes.Split(chunk, []byte{'\n'})
            for _, line := range lines {
                before, after := parseLine(line)
                name := unsafe.String(unsafe.SliceData(before), len(before))
                temp := bytesToTemp(after)

                loc, ok := m[name]
                if !ok {
                    loc = NewLocation()
                    m[name] = loc
                }
                loc.Add(temp)
            }
        }
        wg.Done()
    }()
}
Copier après la connexion

Por fim, o código responsável por ler os dados do disco e enviá-los ao canal:

for {
    n, err := reader.Read(buf)
    if err != nil {
        if err == io.EOF {
            break
        }
        panic(err)
    }

    chunk := append(leftData, buf[:n]...)
    lastIndex := bytes.LastIndex(chunk, []byte{'\n'})
    leftData = chunk[lastIndex+1:]
    chunkChan <- chunk[:lastIndex]
}
close(chunkChan)
wg.Wait()
Copier après la connexion

Vale ressaltar que os mapas em Go não são thread-safe. Isso significa que acessar ou alterar dados no mesmo mapa de forma concorrente pode levar a problemas de consistência ou erros. Embora não tenha observado problemas durante meus testes, vale a pena tratar esse problema.

Uma das maneiras de resolver esse problema seria criar um mecanismo de trava para o mapa, permitindo que somente uma goroutine consiga utilizá-lo por vez. Isso, claro, poderia tornar a execução um pouco mais lenta.

A segunda alternativa consiste em criar um mapa para cada uma das goroutines, de modo que não vai existir concorrência entre elas. Por fim, os mapas são colocados em um novo canal e os valores do mapa principal calculados a partir deles. Essa solução ainda vai ter um custo, mas vai ser menor que a anterior.

close(chunkChan)

go func() {
    wg.Wait()
    close(mapChan)
}()

keys := make([]string, 0, 825)

m := map[string]*Location{}
for lm := range mapChan {
    for lk, lLoc := range lm {
        loc, ok := m[lk]
        if !ok {
            keys = append(keys, lk)
            m[lk] = lLoc
            continue
        }

        if lLoc.min < loc.min {
            loc.min = lLoc.min
        }
        if lLoc.max > loc.max {
            loc.max = lLoc.max
        }
        loc.sum += lLoc.sum
        loc.count += lLoc.count
    }
}
Copier après la connexion

Além disso, como o processamento passou a ser distribuído entre diferentes núcleos, diminui o tamanho do chunk de 128 MB para 2 MB. Cheguei nesse número testando vários valores, tendo entre 1 MB e 5 MB os melhores resultando. Na média, 2 MB obteve o melhor desempenho.

Enfim, o nosso tempo de processamento caiu de 46 segundos para... 12 segundos.

Otimizando a quebra de linhas no chunk

Todas as vezes que eu analisava o profile, a função bytes.Split chamava a minha atenção. O tempo de execução dela era de 16 segundos (tempo total, considerando todas as goroutines), o que parece justo, visto que ela é responsável por quebrar um chunk em linhas. No entanto, parecia um trabalho dobrado, dado que ela primeiro quebra as linhas para, em seguida, as linhas serem lidas uma a uma. Por que não fazer ambos ao mesmo tempo?

Minha abordagem foi percorrer o chunk e verificar se o byte atual correspondia a um \n. Dessa forma, consegui percorrer todas as linhas ao mesmo tempo em que as quebrava, processando em seguida.

start := 0
start := 0
for end, b := range chunk {
    if b != '\n' {
        continue
    }
    before, after := parseLine(chunk[start:end])
    // ...
    start = end + 1
}
Copier après la connexion

Com essa simples mudança, o tempo de execução caiu para aproximadamente 9 segundos.

Executed in    8.45 secs    fish           external
   usr time   58.47 secs  152.00 micros   58.47 secs
   sys time    4.52 secs  136.00 micros    4.52 secs
Copier après la connexion

Próximos passos

Atualmente, o maior gargalo da aplicação é o mapa. Somando todas as operações de leitura e escrita, são 32 segundos (de longe, o tempo mais alto). Talvez criar um algoritmo de hash mais eficiente resolva? Fica como ideia para o futuro.

No mais, conseguimos diminuir o tempo de 1 minuto e 40 segundos para quase 8 segundos, sem usar qualquer biblioteca externa. Além disso, tentando fazer a aplicação ficar cada vez mais rápida, me fez aprender muita coisa.

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

source:dev.to
Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Tutoriels populaires
Plus>
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal