首頁 > 後端開發 > Golang > 外部合併問題 - Gophers 完整指南

外部合併問題 - Gophers 完整指南

Susan Sarandon
發布: 2025-01-12 08:09:42
原創
373 人瀏覽過

外部排序問題是電腦科學課程中的一個眾所周知的話題,並且經常被用作教學工具。然而,很少有人能夠在特定技術場景的程式碼中實際實現此問題的解決方案,更不用說解決所需的最佳化了。在一次黑客馬拉松中遇到這個挑戰激發了我寫這篇文章的靈感。

所以,這是黑客馬拉松任務:

您有一個包含 IPv4 位址的簡單文字檔案。一行是一個位址,逐行:

145.67.23.4
8.34.5.23
89.54.3.124
89.54.3.124
3.45.71.5
... 
登入後複製
登入後複製
登入後複製
登入後複製

檔案大小無限制,可以佔用數十、數百GB。

您應該使用盡可能少的記憶體和時間來計算該檔案中唯一位址的數量。有一個「天真的」演算法可以解決這個問題(逐行讀取,將行放入 HashSet)。如果您的實作比這個簡單的演算法更複雜、更快,那就更好了。

提交了一個 120GB、80 億行的檔案進行解析。

對於程式執行速度沒有具體要求。然而,在快速查看有關該主題的線上可用資訊後,我得出結論,標準硬體(例如家用 PC)可接受的執行時間約為一小時或更短。

由於顯而易見的原因,除非系統至少有 128GB 可用內存,否則無法完整讀取和處理文件。但是使用區塊和合併是不可避免的嗎?

如果您不習慣實施外部合併,我建議您首先熟悉一個可以接受的替代解決方案,儘管遠非最佳。

主意

  • 建立 2^32 位元位圖。這是一個 uint64 數組,因為 uint64 包含 64 位元。

  • 對每個 IP:

  1. 將字串位址解析為四個八位元組:A.B.C.D.
  2. 將其轉換為數字 ipNum = (A
  3. 設定位圖中對應的位元。
  • 1.讀取所有位址後,遍歷位圖並計算設定位的數量。

優點:
非常快速的唯一性檢測:設定位 O(1),無需檢查,只需設定即可。

沒有雜湊、排序等開銷
缺點:
龐大的記憶體消耗(整個 IPv4 空間需要 512 MB,不考慮開銷)。

如果檔案很大,但小於完整的 IPv4 空間,這在時間方面仍然具有優勢,但在記憶體方面並不總是合理。

package main

import (
    "bufio"
    "fmt"
    "os"
    "strconv"
    "strings"
    "math/bits"
)

//  Parse IP address "A.B.C.D"  => uint32 number
func ipToUint32(ipStr string) (uint32, error) {
    parts := strings.Split(ipStr, ".")
    if len(parts) != 4 {
        return 0, fmt.Errorf("invalid IP format")
    }

    var ipNum uint32
    for i := 0; i < 4; i++ {
        val, err := strconv.Atoi(parts[i])
        if err != nil || val < 0 || val > 255 {
            return 0, fmt.Errorf("invalid IP octet: %v", parts[i])
        }
        ipNum = (ipNum << 8) | uint32(val)
    }

    return ipNum, nil
}


func popcount64(x uint64) int {
    return bits.OnesCount64(x)
}

func main() {
    filePath := "ips.txt"

    file, err := os.Open(filePath)
    if err != nil {
        fmt.Printf("Error opening file: %v\n", err)
        return
    }
    defer file.Close()

    // IPv4 space size: 2^32 = 4,294,967,296
    // We need 2^32 bits, that is (2^32)/64 64-bit words
    totalBits := uint64(1) << 32       // 2^32
    arraySize := totalBits / 64        //how many uint64 do we need
    bitset := make([]uint64, arraySize)

    scanner := bufio.NewScanner(file)
    for scanner.Scan() {
        ipStr := scanner.Text()
        ipNum, err := ipToUint32(ipStr)
        if err != nil {
            fmt.Printf("Incorrect IP: %s\n", ipStr)
            continue
        }

        idx := ipNum / 64
        bit := ipNum % 64
        mask := uint64(1) << bit
        // Setting the bit
        bitset[idx] |= mask
    }

    if err := scanner.Err(); err != nil {
        fmt.Printf("Error reading file: %v\n", err)
        return
    }

    count := 0
    for _, val := range bitset {
        count += bits.OnesCount64(val)
    }

    fmt.Printf("Number of unique IP addresses: %d\n", count)
}
登入後複製
登入後複製
登入後複製

這種方法簡單可靠,在沒有替代方案時成為可行的選擇。然而,在生產環境中,尤其是當旨在實現最佳效能時,開發更有效率的解決方案至關重要。

因此,我們的方法涉及分塊、內部合併排序和重複資料刪除。

外部排序的平行化原理

  1. 讀取與轉換區塊:

檔案被分割成相對較小的部分(區塊),例如幾百兆位元組或幾千兆位元組。對於每個區塊:

  • 啟動一個 goroutine(或一個 goroutine 池),它讀取區塊,將 IP 位址解析為數字並將它們儲存在記憶體中的臨時數組中。

  • 然後對該陣列進行排序(例如,使用標準 sort.Slice),並在刪除重複項後將結果寫入臨時檔案。

由於每個部分都可以獨立處理,因此如果您有多個 CPU 核心和足夠的磁碟頻寬,您可以並行運行多個此類處理程序。這將使您能夠盡可能有效地使用資源。

  1. 合併排序的區塊(合併步驟):

所有區塊都排序並寫入臨時檔案後,您需要將這些排序清單合併到單一排序流中,刪除重複項:

  • 與外部排序過程類似,可以透過將多個臨時檔案分組,並行合併並逐漸減少檔案數量來並行合併。

  • 這會留下一個大的已排序和去重的輸出流,您可以從中計算唯一 IP 的總數。

平行化的優點:

  • 使用多個CPU核心:
    對非常大的陣列進行單執行緒排序可能會很慢,但如果您有多核心處理器,則可以並行對多個區塊進行排序,從而將過程加快數倍。

  • 負載平衡:

如果明智地選擇區塊大小,則可以在大約相同的時間內處理每個區塊。如果某些區塊更大/更小或更複雜,您可以在不同的 goroutine 之間動態分配它們的處理。

  • IO 最佳化:

並行化允許讀取一個區塊,同時對另一個區塊進行排序或寫入,從而減少空閒時間。

底線

外部排序自然適合透過檔案分塊進行並行化。這種方法可以有效利用多核心處理器並最大限度地減少 IO 瓶頸,與單執行緒方法相比,排序和重複資料刪除速度顯著加快。透過有效地分配工作負載,即使在處理海量資料集時也可以獲得高效能。

重要考慮因素:

在逐行讀取檔案的同時,我們也可以統計總行數。在此過程中,我們分兩個階段執行重複資料刪除:首先是在分塊期間,然後在合併期間。因此,無需計算最終輸出檔中的行數。相反,唯一行的總數可以計算為:

finalCount :=totalLines - (DeletedInChunks DeletedInMerge)

這種方法避免了冗餘操作,並透過在重複資料刪除的每個階段追蹤刪除操作來提高計算效率。這為我們節省了幾分鐘。

還有一件事:

由於任何小的效能提升對大量資料都很重要,我建議使用自行編寫的字串加速模擬。 Slice()

145.67.23.4
8.34.5.23
89.54.3.124
89.54.3.124
3.45.71.5
... 
登入後複製
登入後複製
登入後複製
登入後複製

此外,採用了工作範本來管理並行處理,執行緒數量是可設定的。預設情況下,執行緒數設定為 runtime.NumCPU(),讓程式有效利用所有可用的 CPU 核心。這種方法確保了最佳的資源使用,同時也提供了根據環境的特定要求或限制來靈活調整線程數量的能力。

重要提示:使用多執行緒時,保護共享資料以防止競爭條件並確保程式的正確性至關重要。這可以透過使用同步機制來實現,例如互斥體、通道(在 Go 中)或其他並發安全技術,具體取決於您的實現的特定要求。

到目前為止的總結

這些想法的實現產生了程式碼,當在搭配 M.2 SSD 的 Ryzen 7700 處理器上執行時,可以在大約 40 分鐘內完成任務。

考慮壓縮。

基於資料量以及因此存在的重要磁碟操作,下一個考慮因素是壓縮的使用。選擇 Brotli 演算法進行壓縮。其高壓縮比和高效率解壓縮使其成為減少磁碟IO開銷同時在中間儲存和處理過程中保持良好效能的合適選擇。

這是使用 Brotli 進行分塊的範例:

package main

import (
    "bufio"
    "fmt"
    "os"
    "strconv"
    "strings"
    "math/bits"
)

//  Parse IP address "A.B.C.D"  => uint32 number
func ipToUint32(ipStr string) (uint32, error) {
    parts := strings.Split(ipStr, ".")
    if len(parts) != 4 {
        return 0, fmt.Errorf("invalid IP format")
    }

    var ipNum uint32
    for i := 0; i < 4; i++ {
        val, err := strconv.Atoi(parts[i])
        if err != nil || val < 0 || val > 255 {
            return 0, fmt.Errorf("invalid IP octet: %v", parts[i])
        }
        ipNum = (ipNum << 8) | uint32(val)
    }

    return ipNum, nil
}


func popcount64(x uint64) int {
    return bits.OnesCount64(x)
}

func main() {
    filePath := "ips.txt"

    file, err := os.Open(filePath)
    if err != nil {
        fmt.Printf("Error opening file: %v\n", err)
        return
    }
    defer file.Close()

    // IPv4 space size: 2^32 = 4,294,967,296
    // We need 2^32 bits, that is (2^32)/64 64-bit words
    totalBits := uint64(1) << 32       // 2^32
    arraySize := totalBits / 64        //how many uint64 do we need
    bitset := make([]uint64, arraySize)

    scanner := bufio.NewScanner(file)
    for scanner.Scan() {
        ipStr := scanner.Text()
        ipNum, err := ipToUint32(ipStr)
        if err != nil {
            fmt.Printf("Incorrect IP: %s\n", ipStr)
            continue
        }

        idx := ipNum / 64
        bit := ipNum % 64
        mask := uint64(1) << bit
        // Setting the bit
        bitset[idx] |= mask
    }

    if err := scanner.Err(); err != nil {
        fmt.Printf("Error reading file: %v\n", err)
        return
    }

    count := 0
    for _, val := range bitset {
        count += bits.OnesCount64(val)
    }

    fmt.Printf("Number of unique IP addresses: %d\n", count)
}
登入後複製
登入後複製
登入後複製

使用壓縮的結果

壓縮的有效性是有爭議的,並且高度依賴解決方案的使用條件。高壓縮可減少磁碟空間的使用,但會成比例地增加總體執行時間。在慢速 HDD 上,壓縮可以顯著提高速度,因為磁碟 I/O 成為瓶頸。相反,在快速 SSD 上,壓縮可能會導致執行時間變慢。

在配備 M.2 SSD 的系統上進行的測試中,壓縮並未顯示出效能提升。結果,我最終決定放棄它。但是,如果您願意冒增加程式碼複雜度並可能降低其可讀性的風險,則可以將壓縮實現為可選功能,由可配置標誌控制。

接下來做什麼

為了追求進一步最佳化,我們將注意力轉向解決方案的二進位轉換。一旦基於文字的 IP 位址轉換為數字雜湊值,所有後續操作都可以以二進位格式執行。

145.67.23.4
8.34.5.23
89.54.3.124
89.54.3.124
3.45.71.5
... 
登入後複製
登入後複製
登入後複製
登入後複製

二進位格式的優點

  • 緊湊性:

每個數字佔固定大小(例如,uint32 = 4 位元組)。
對於 100 萬個 IP 位址,檔案大小僅為 ~4 MB。

  • 快速處理:

無需解析字串,加快讀寫操作

  • 跨平台相容性:

透過使用一致的位元組順序(LittleEndian 或 BigEndian),可以跨不同平台讀取檔案。

結論
以二進位格式儲存資料是一種更有效的寫入和讀取數字的方法。為了完整最佳化,將資料寫入和讀取過程都轉換為二進位格式。使用binary.Write進行寫入,使用binary.Read進行讀取。

以下是 processChunk 函數使用二進位格式時的樣子:

package main

import (
    "bufio"
    "fmt"
    "os"
    "strconv"
    "strings"
    "math/bits"
)

//  Parse IP address "A.B.C.D"  => uint32 number
func ipToUint32(ipStr string) (uint32, error) {
    parts := strings.Split(ipStr, ".")
    if len(parts) != 4 {
        return 0, fmt.Errorf("invalid IP format")
    }

    var ipNum uint32
    for i := 0; i < 4; i++ {
        val, err := strconv.Atoi(parts[i])
        if err != nil || val < 0 || val > 255 {
            return 0, fmt.Errorf("invalid IP octet: %v", parts[i])
        }
        ipNum = (ipNum << 8) | uint32(val)
    }

    return ipNum, nil
}


func popcount64(x uint64) int {
    return bits.OnesCount64(x)
}

func main() {
    filePath := "ips.txt"

    file, err := os.Open(filePath)
    if err != nil {
        fmt.Printf("Error opening file: %v\n", err)
        return
    }
    defer file.Close()

    // IPv4 space size: 2^32 = 4,294,967,296
    // We need 2^32 bits, that is (2^32)/64 64-bit words
    totalBits := uint64(1) << 32       // 2^32
    arraySize := totalBits / 64        //how many uint64 do we need
    bitset := make([]uint64, arraySize)

    scanner := bufio.NewScanner(file)
    for scanner.Scan() {
        ipStr := scanner.Text()
        ipNum, err := ipToUint32(ipStr)
        if err != nil {
            fmt.Printf("Incorrect IP: %s\n", ipStr)
            continue
        }

        idx := ipNum / 64
        bit := ipNum % 64
        mask := uint64(1) << bit
        // Setting the bit
        bitset[idx] |= mask
    }

    if err := scanner.Err(); err != nil {
        fmt.Printf("Error reading file: %v\n", err)
        return
    }

    count := 0
    for _, val := range bitset {
        count += bits.OnesCount64(val)
    }

    fmt.Printf("Number of unique IP addresses: %d\n", count)
}
登入後複製
登入後複製
登入後複製

搞什麼? !速度變慢了很多! !

在二進位​​格式下,工作速度變得更慢。包含 1 億行(IP 位址)的檔案以二進位形式處理需要 4.5 分鐘,而文字形式則需要 25 秒。具有相同的區塊大小和工作人員數量。為什麼?

使用二進位格式可能比文字格式慢
由於binary.Read和binary.Write操作方式的具體情況以及其實現中潛在的低效率,使用二進位格式有時可能比文字格式慢。以下是可能發生這種情況的主要原因:

I/O 操作

  • 文字格式:

使用 bufio.Scanner 處理更大的資料區塊,該掃描器針對讀取行進行了最佳化。
讀取整行並解析它們,這對於小型轉換操作來說更有效率。

  • 二進位格式:

binary.Read 一次讀取 4 個位元組,導緻小 I/O 運算更頻繁。
頻繁呼叫binary.Read會增加使用者空間和系統空間之間切換的開銷。

解:使用緩衝區一次讀取多個數字。

func fastSplit(s string) []string {
    n := 1
    c := DelimiterByte

    for i := 0; i < len(s); i++ {
        if s[i] == c {
            n++
        }
    }

    out := make([]string, n)
    count := 0
    begin := 0
    length := len(s) - 1

    for i := 0; i <= length; i++ {
        if s[i] == c {
            out[count] = s[begin:i]
            count++
            begin = i + 1
        }
    }
    out[count] = s[begin : length+1]

    return out
}
登入後複製

為什麼緩衝可以提高效能?

  • 更少的 I/O 操作:
    資料不是直接將每個數字寫入磁碟,而是累積在緩衝區中並寫入更大的區塊中。

  • 減少開銷:

由於進程和作業系統之間的上下文切換,每個磁碟寫入操作都會產生開銷。緩衝可以減少此類呼叫的數量。

我們也展示了二進位多相合併的程式碼:

145.67.23.4
8.34.5.23
89.54.3.124
89.54.3.124
3.45.71.5
... 
登入後複製
登入後複製
登入後複製
登入後複製

結果太棒了:110Gb 檔案、80 億行只需 14 分鐘!

Image description

這真是了不起的成績!在 14 分鐘內處理一個包含 80 億行的 110 GB 檔案確實令人印象深刻。它展示了以下功能的力量:

  • 緩衝 I/O:

透過在記憶體中處理大塊資料而不是逐行或逐值處理,可以大幅減少 I/O 操作的數量,而 I/O 操作通常是瓶頸。

  • 最佳化的二進位處理:

切換為二進位讀寫可以最大限度地減少解析開銷,減少中間資料的大小,提高記憶體效率。

  • 高效率重複資料刪除:

使用記憶體高效演算法進行重複資料刪除和排序可確保 CPU 週期有效利用。

  • 並行度:

利用 goroutine 和通道並行處理工作執行緒之間的工作負載,平衡 CPU 和磁碟使用率。

結論

最後,這是最終解決方案的完整程式碼。請隨意使用它並根據您的需求進行調整!

Gophers 的外部合併解決方案

祝你好運!

以上是外部合併問題 - Gophers 完整指南的詳細內容。更多資訊請關注PHP中文網其他相關文章!

來源:dev.to
本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn
作者最新文章
熱門教學
更多>
最新下載
更多>
網站特效
網站源碼
網站素材
前端模板