少し前、友人が 10 億行のファイルを読み取るという課題について話してくれました。このアイデアはとても面白いと思いましたが、大学の試験週間だったので、結局後で見ることにしました。数か月後、私はこの挑戦に関する Theo のビデオを見て、詳しく見てみることにしました。
One Billion Row Challenge の目的は、都市のリストの最低気温、最高気温、平均気温を計算することです。詳細は、このリストには 10 億の項目があり、各項目は都市の名前で構成されています。各都市は複数回出現する可能性があります。最後に、プログラムはこれらの値を都市名のアルファベット順に表示する必要があります。
たとえ報酬がなくても、課題を解決してみるのは楽しいだろうと思いました。とにかく、私は自分のプロセスを説明するこのテキストを書きました。
より複雑な問題を解決する必要があるとき、私の最初の目標はプログラムを動作させることです。これは最速または最もクリーンなコードではないかもしれませんが、機能するコードです。
基本的に、リスト内の各都市を表す Location 構造を作成しました。これには、最低気温と最高気温、気温の合計、およびその都市がリストに表示される回数が含まれます (これら最後の 2 つは平均を計算するために必要です)。 。温度の数とその合計を保存せずに、平均を直接計算する方法があることは知っています。しかし、前に述べたように、目標はコードを機能させることでした。
データ リストは、都市名とその後に続く気温をセミコロンで区切って構成されます。例:
Antananarivo;15.6 Iqaluit;-20.7 Dolisie;25.8 Kuopio;-6.8
データを読み取る最も簡単な方法は、一度に 1 行ずつ読み取ることができるスキャンを使用することです。この線を使用すると、セミコロンの前後の値の 2 つの部分に分けることができます。温度を取得するには、文字列を浮動小数点に変換する strconv.ParseFloat を使用できます。最初の実装の完全なコードは以下にあります:
package main import ( "bufio" "fmt" "math" "os" "sort" "strconv" "strings" ) type Location struct { min float64 max float64 sum float64 count int } func NewLocation() *Location { return &Location{ min: math.MaxInt16, max: math.MinInt16, sum: 0, count: 0, } } func (loc *Location) Add(temp float64) { if temp < loc.min { loc.min = temp } else if temp > loc.max { loc.max = temp } loc.sum += temp loc.count += 1 } var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file") func main() { flag.Parse() if *cpuprofile != "" { f, err := os.Create(*cpuprofile) if err != nil { log.Fatal(err) } pprof.StartCPUProfile(f) defer pprof.StopCPUProfile() } file, _ := os.Open("./measurements.txt") defer file.Close() m := map[string]*Location{} scanner := bufio.NewScanner(file) for scanner.Scan() { line := scanner.Text() name, tempStr, _ := strings.Cut(line, ";") temp, _ := strconv.ParseFloat(tempStr, 32) loc, ok := m[name] if !ok { loc = NewLocation() m[name] = loc } loc.Add(temp) } keys := make([]string, 0, len(m)) for k := range m { keys = append(keys, k) } sort.Strings(keys) for _, name := range keys { loc := m[name] mean := loc.sum / float64(loc.count) fmt.Printf("%s: %.1f/%.1f/%.1f\n", name, loc.min, mean, loc.max) } }
この単純なバージョンの実行には約 97 秒かかりました。
実行プロファイルを分析すると、最大のボトルネックの 1 つが strconv.ParseFloat 関数であることがわかりました。基本的に、その合計実行時間は 23 秒 (合計時間の約 23%) でした。
この関数の問題は、汎用であること、つまり、有効な浮動小数点数で動作するように作られていることです。ただし、私たちのデータは非常に特殊な温度形式を持っています。以下の例を参照してください:
Antananarivo;15.6 Iqaluit;-20.7 Dolisie;5.8 Kuopio;-6.8
温度の形式は常に同じです。ドットの前に 1 桁または 2 桁、ドットの後に 1 桁あり、先頭にマイナス記号が含まれる場合があります。したがって、ParseFloat の一般的なチェックをすべて実行することなく、プロセスを最適化して、特定の方法で値を変換する関数を作成できます。
func bytesToTemp(b []byte) float64 { var v int16 var isNeg int16 = 1 for i := 0; i < len(b)-1; i++ { char := b[i] if char == '-' { isNeg = -1 } else if char == '.' { digit := int16(b[i+1] - '0') v = v*10 + digit } else { digit := int16(char - '0') v = v*10 + digit } } return float64(v*isNeg) / 10 }
文字列ではなくバイト形式でデータを読み取るために、スキャナの戻り値を文字列からバイトに変更しました
line := scanner.Bytes() before, after, _ := bytes.Cut(line, []byte{';'}) name := string(before) temp := bytesToTemp(after)
これらの小さな変更により、実行時間が 75 秒に短縮されました。
Scan を使用する最大の利点は、プログラムがファイル全体を一度にメモリにロードする必要がないことです。代わりに、メモリを犠牲にしてパフォーマンスを犠牲にして、行ごとに読み取ることができます。
一度に 1 行を読み取ることと、一度に 14 GB のデータをロードすることの間には妥協点があることに注意することが重要です。この中間点は、メモリの断片であるチャンクの読み取りです。こうすることで、ファイル全体を一度に読み取るのではなく、128 MB のブロックを読み取ることができます。
buf := make([]byte, chunkSize) reader := bufio.NewReader(file) var leftData []byte for { n, err := reader.Read(buf) if err != nil { if err == io.EOF { break } panic(err) } chunk := append(leftData, buf[:n]...) lastIndex := bytes.LastIndex(chunk, []byte{'\n'}) leftData = chunk[lastIndex+1:] lines := bytes.Split(chunk[:lastIndex], []byte{'\n'}) for _, line := range lines { before, after, _ := bytes.Cut(line, []byte{';'}) name := string(before) temp := bytesToTemp(after) loc, ok := m[name] if !ok { loc = NewLocation() m[name] = loc } loc.Add(temp) } }
その結果、実行時間は 70 秒まで短縮されました。以前よりは良くなりましたが、まだ改善の余地があります。
課題全体が小数点以下の数字を中心に展開しているのは事実です。ただし、浮動小数点の処理は常に大きな課題です (IEEE-754 を参照)。その場合、温度を表すために整数を使用してみてはいかがでしょうか?
type Location struct { min int16 max int16 sum int32 count int32 }
前に定義したように、温度は常に最大 3 桁で表されます。したがって、カンマを削除すると、値は -999 から 999 の間で変化する可能性があるため、それらを表すには int16 で十分です。この型の範囲は -2147483648 ~ 2147483647 であるため、合計とカウントには int32 で十分です。
Dado que agora esperamos um valor inteiro de 16 bits para a temperatura, precisamos modificar a função bytesToTemp. Para isso, mudamos o retorno para int16 e removemos a divisão no final. Assim, a função vai sempre vai retornar um número inteiro.
func bytesToTemp(b []byte) int16 { var v int16 var isNeg int16 = 1 for i := 0; i < len(b)-1; i++ { char := b[i] if char == '-' { isNeg = -1 } else if char == '.' { digit := int16(b[i+1] - '0') v = v*10 + digit } else { digit := int16(char - '0') v = v*10 + digit } } return v * isNeg }
Para finalizar, modifiquei a função Add para aceitar os valores inteiros e ajustei o print para dividir os valores antes de mostrá-los na tela. Com isso, o tempo caiu três segundos, indo para 60 segundos. Não é muito, mas uma vitória é uma vitória.
Novamente analisando o profile, vi que tinha uma certa função chamada slicebytetostring que custava 13,5 segundos de tempo de execução. Analisando, descobri que essa função é a responsável por converter um conjunto de bytes em uma string (o próprio nome da função deixa claro isso). No caso, essa é a função chamada internamente quando se usa a função string(bytes).
Em Go, assim como na maioria das linguagens, strings são imutáveis, o que significa que não podem ser modificadas após serem criadas (normalmente, quando se faz isso, uma nova string é criada). Por outro lado, listas são mutáveis. Ou seja, quando se faz uma conversão de uma lista de bytes para string, é preciso criar uma cópia da lista para garantir que a string não mude se a lista mudar.
Para evitar o custo adicional de alocação de memória nessas conversões, podemos utilizar a biblioteca unsafe para realizar a conversão de bytes para string (Nota: ela é chamada de unsafe por um motivo).
name := unsafe.String(unsafe.SliceData(before), len(before))
Diferente do caso anterior, a função acima reutiliza os bytes passados para gerar a string. O problema disso é que, se a lista original mudar, a string resultante também será afetada. Embora possamos garantir que isso não ocorrerá neste contexto específico, em aplicações maiores e mais complexas, o uso de unsafe pode se tornar bem inseguro.
Com essa mudança, reduzimos o tempo de execução para 51 segundos. Nada mal.
Lembra que eu mencionei que as temperaturas sempre tinham formatos específicos? Então, segundo o profile da execução, que separa a linha em duas partes (nome da cidade e temperatura), custa 5.38 segundos para rodar. E refizermos ela na mão?
Para separar os dois valores, precisamos encontrar onde está o ";". Como a gente já sabe, os valores da temperatura podem ter entre três e cinco caracteres. Assim, precisamos verificar se o caractere anterior aos dígitos é um ";". Simples, não?
idx := 0 if line[len(line)-4] == ';' { idx = len(line) - 4 } else if line[len(line)-5] == ';' { idx = len(line) - 5 } else { idx = len(line) - 6 } before := line[:idx] after := line[idx+1:]
Com isso, o tempo de execução foi para 46 segundos, cerca de 5 segundos a menos que antes (quem diria, não é?).
Todo esse tempo, nosso objetivo foi tornar o código o mais rápido possível em um núcleo. Mudando coisas aqui e ali, diminuímos o tempo de 97 segundos para 46 segundos. Claro, ainda daria para melhorar o tempo sem ter que lidar com paralelismo, mas a vida é curta demais para se preocupar com isso, não é?
Para poder rodar o código em vários núcleos, decidi usar a estrutura de canais nativa do Go. Além disso, também criei um grupo de espera que vai indicar quando o processamento dos dados terminaram.
Vale destacar que workers, nesse caso, é uma constante que define quantas goroutines serão criadas para processar os dados. No meu caso, são 12, visto que eu tenho um processador com 6 núcleos e 12 threads.
chunkChan := make(chan []byte, workers) var wg sync.WaitGroup wg.Add(workers)
O próximo passo foi criar as goroutines que serão responsável por receber os dados do canal e processá-los. A lógica de processamento dos dados é semelhante ao modelo single thread.
for i := 0; i < workers; i++ { go func() { for chunk := range chunkChan { lines := bytes.Split(chunk, []byte{'\n'}) for _, line := range lines { before, after := parseLine(line) name := unsafe.String(unsafe.SliceData(before), len(before)) temp := bytesToTemp(after) loc, ok := m[name] if !ok { loc = NewLocation() m[name] = loc } loc.Add(temp) } } wg.Done() }() }
Por fim, o código responsável por ler os dados do disco e enviá-los ao canal:
for { n, err := reader.Read(buf) if err != nil { if err == io.EOF { break } panic(err) } chunk := append(leftData, buf[:n]...) lastIndex := bytes.LastIndex(chunk, []byte{'\n'}) leftData = chunk[lastIndex+1:] chunkChan <- chunk[:lastIndex] } close(chunkChan) wg.Wait()
Vale ressaltar que os mapas em Go não são thread-safe. Isso significa que acessar ou alterar dados no mesmo mapa de forma concorrente pode levar a problemas de consistência ou erros. Embora não tenha observado problemas durante meus testes, vale a pena tratar esse problema.
Uma das maneiras de resolver esse problema seria criar um mecanismo de trava para o mapa, permitindo que somente uma goroutine consiga utilizá-lo por vez. Isso, claro, poderia tornar a execução um pouco mais lenta.
A segunda alternativa consiste em criar um mapa para cada uma das goroutines, de modo que não vai existir concorrência entre elas. Por fim, os mapas são colocados em um novo canal e os valores do mapa principal calculados a partir deles. Essa solução ainda vai ter um custo, mas vai ser menor que a anterior.
close(chunkChan) go func() { wg.Wait() close(mapChan) }() keys := make([]string, 0, 825) m := map[string]*Location{} for lm := range mapChan { for lk, lLoc := range lm { loc, ok := m[lk] if !ok { keys = append(keys, lk) m[lk] = lLoc continue } if lLoc.min < loc.min { loc.min = lLoc.min } if lLoc.max > loc.max { loc.max = lLoc.max } loc.sum += lLoc.sum loc.count += lLoc.count } }
Além disso, como o processamento passou a ser distribuído entre diferentes núcleos, diminui o tamanho do chunk de 128 MB para 2 MB. Cheguei nesse número testando vários valores, tendo entre 1 MB e 5 MB os melhores resultando. Na média, 2 MB obteve o melhor desempenho.
Enfim, o nosso tempo de processamento caiu de 46 segundos para... 12 segundos.
Todas as vezes que eu analisava o profile, a função bytes.Split chamava a minha atenção. O tempo de execução dela era de 16 segundos (tempo total, considerando todas as goroutines), o que parece justo, visto que ela é responsável por quebrar um chunk em linhas. No entanto, parecia um trabalho dobrado, dado que ela primeiro quebra as linhas para, em seguida, as linhas serem lidas uma a uma. Por que não fazer ambos ao mesmo tempo?
Minha abordagem foi percorrer o chunk e verificar se o byte atual correspondia a um \n. Dessa forma, consegui percorrer todas as linhas ao mesmo tempo em que as quebrava, processando em seguida.
start := 0 start := 0 for end, b := range chunk { if b != '\n' { continue } before, after := parseLine(chunk[start:end]) // ... start = end + 1 }
Com essa simples mudança, o tempo de execução caiu para aproximadamente 9 segundos.
Executed in 8.45 secs fish external usr time 58.47 secs 152.00 micros 58.47 secs sys time 4.52 secs 136.00 micros 4.52 secs
Atualmente, o maior gargalo da aplicação é o mapa. Somando todas as operações de leitura e escrita, são 32 segundos (de longe, o tempo mais alto). Talvez criar um algoritmo de hash mais eficiente resolva? Fica como ideia para o futuro.
No mais, conseguimos diminuir o tempo de 1 minuto e 40 segundos para quase 8 segundos, sem usar qualquer biblioteca externa. Além disso, tentando fazer a aplicação ficar cada vez mais rápida, me fez aprender muita coisa.
以上がGo での 10 億行の課題の解決 (から s)の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。