Home Backend Development Golang External Merge Problem - Complete Guide for Gophers

External Merge Problem - Complete Guide for Gophers

Jan 12, 2025 am 08:09 AM

The external sorting problem is a well-known topic in computer science courses and is often used as a teaching tool. However, it's rare to meet someone who has actually implemented a solution to this problem in code for a specific technical scenario, let alone tackled the required optimizations. Encountering this challenge during a hackathon inspired me to write this article.

So, here is the hackathon task:

You have a simple text file with IPv4 addresses. One line is one address, line by line:

145.67.23.4
8.34.5.23
89.54.3.124
89.54.3.124
3.45.71.5
... 
Copy after login
Copy after login
Copy after login
Copy after login

The file is unlimited in size and can occupy tens and hundreds of gigabytes.

You should calculate the number of unique addresses in this file using as little memory and time as possible. There is a "naive" algorithm for solving this problem (read line by line, put lines into HashSet). It's better if your implementation is more complicated and faster than this naive algorithm.

A 120GB file with 8 billion lines was submitted for parsing.

There were no specific requirements regarding the speed of program execution. However, after quickly reviewing available information on the topic online, I concluded that an acceptable execution time for standard hardware (such as a home PC) would be approximately one hour or less.

For obvious reasons, the file cannot be read and processed in its entirety unless the system has at least 128GB of memory available. But is working with chunks and merging inevitable?

If you are not comfortable implementing an external merge, I suggest you first familiarize yourself with an alternative solution that is acceptable, although far from optimal.

Idea

  • Create a 2^32 bit bitmap. This is a uint64 array, since uint64 contains 64 bits.

  • For each IP:

  1. Parse the string address into four octets: A.B.C.D.
  2. Translate it into a number ipNum = (A << 24) | (B << 16) | (C << 8) | D.
  3. Set the corresponding bit in the bitmap.
  • 1. After reading all the addresses, run through the bitmap and count the number of set bits.

Pros:
Very fast uniqueness detection: setting the bit O(1), no need to check, just set it.

No overhead for hashing, sorting, etc.
Cons:
Huge memory consumption (512 MB for the full IPv4 space, without taking into account overhead).

If the file is huge, but smaller than the full IPv4 space, this can still be advantageous in terms of time, but not always reasonable in terms of memory.

package main

import (
    "bufio"
    "fmt"
    "os"
    "strconv"
    "strings"
    "math/bits"
)

//  Parse IP address "A.B.C.D"  => uint32 number
func ipToUint32(ipStr string) (uint32, error) {
    parts := strings.Split(ipStr, ".")
    if len(parts) != 4 {
        return 0, fmt.Errorf("invalid IP format")
    }

    var ipNum uint32
    for i := 0; i < 4; i++ {
        val, err := strconv.Atoi(parts[i])
        if err != nil || val < 0 || val > 255 {
            return 0, fmt.Errorf("invalid IP octet: %v", parts[i])
        }
        ipNum = (ipNum << 8) | uint32(val)
    }

    return ipNum, nil
}


func popcount64(x uint64) int {
    return bits.OnesCount64(x)
}

func main() {
    filePath := "ips.txt"

    file, err := os.Open(filePath)
    if err != nil {
        fmt.Printf("Error opening file: %v\n", err)
        return
    }
    defer file.Close()

    // IPv4 space size: 2^32 = 4,294,967,296
    // We need 2^32 bits, that is (2^32)/64 64-bit words
    totalBits := uint64(1) << 32       // 2^32
    arraySize := totalBits / 64        //how many uint64 do we need
    bitset := make([]uint64, arraySize)

    scanner := bufio.NewScanner(file)
    for scanner.Scan() {
        ipStr := scanner.Text()
        ipNum, err := ipToUint32(ipStr)
        if err != nil {
            fmt.Printf("Incorrect IP: %s\n", ipStr)
            continue
        }

        idx := ipNum / 64
        bit := ipNum % 64
        mask := uint64(1) << bit
        // Setting the bit
        bitset[idx] |= mask
    }

    if err := scanner.Err(); err != nil {
        fmt.Printf("Error reading file: %v\n", err)
        return
    }

    count := 0
    for _, val := range bitset {
        count += bits.OnesCount64(val)
    }

    fmt.Printf("Number of unique IP addresses: %d\n", count)
}
Copy after login
Copy after login

This approach is straightforward and reliable, making it a viable option when no alternatives are available. However, in a production environment—especially when aiming to achieve optimal performance—it's essential to develop a more efficient solution.

Thus, our approach involves chunking, internal merge sorting, and deduplication.

The Principle of Parallelization in External Sorting

  1. Reading and transforming chunks:

The file is split into relatively small parts (chunks), say a few hundred megabytes or a few gigabytes. For each chunk:

  • A goroutine (or a pool of goroutines) is launched, which reads the chunk, parses the IP addresses into numbers and stores them in a temporary array in memory.

  • Then this array is sorted (for example, with the standard sort.Slice), and the result, after removing duplicates, is written to a temporary file.

Since each part can be processed independently, you can run several such handlers in parallel, if you have several CPU cores and sufficient disk bandwidth. This will allow you to use resources as efficiently as possible.

  1. Merge sorted chunks (merge step):

Once all chunks are sorted and written to temporary files, you need to merge these sorted lists into a single sorted stream, removing duplicates:

  • Similar to the external sorting process, you can parallelize the merge by dividing multiple temporary files into groups, merging them in parallel and gradually reducing the number of files.

  • This leaves one large sorted and deduplicated output stream, from which you can calculate the total number of unique IPs.

Advantages of parallelization:

  • Use of multiple CPU cores:
    Single-threaded sorting of a very large array can be slow, but if you have a multi-core processor, you can sort multiple chunks in parallel, speeding up the process several times.

  • Load balancing:

If the chunk sizes are chosen wisely, each chunk can be processed in approximately the same amount of time. If some chunks are larger/smaller or more complex, you can dynamically distribute their processing across different goroutines.

  • IO optimization:

Parallelization allows one chunk to be read while another is being sorted or written, reducing idle time.

Bottom Line

External sorting naturally lends itself to parallelization through file chunking. This approach enables the efficient use of multi-core processors and minimizes IO bottlenecks, resulting in significantly faster sorting and deduplication compared to a single-threaded approach. By distributing the workload effectively, you can achieve high performance even when dealing with massive datasets.

Important consideration:

While reading the file line by line, we can also count the total number of lines. During the process, we perform deduplication in two stages: first during chunking and then during merging. As a result, there’s no need to count the lines in the final output file. Instead, the total number of unique lines can be calculated as:

finalCount := totalLines - (DeletedInChunks DeletedInMerge)

This approach avoids redundant operations and makes the computation more efficient by keeping track of deletions during each stage of deduplication. This saves us serval minutes.

Оne more thing:

Since any small performance gain matters on huge amounts of data, I suggest using a self-written accelerated analogue of strings.Slice()

145.67.23.4
8.34.5.23
89.54.3.124
89.54.3.124
3.45.71.5
... 
Copy after login
Copy after login
Copy after login
Copy after login

Additionally, a worker template was adopted to manage parallel processing, with the number of threads being configurable. By default, the number of threads is set to runtime.NumCPU(), allowing the program to utilize all available CPU cores efficiently. This approach ensures optimal resource usage while also providing flexibility to adjust the number of threads based on the specific requirements or limitations of the environment.

Important Note: When using multithreading, it is crucial to protect shared data to prevent race conditions and ensure the correctness of the program. This can be achieved by using synchronization mechanisms such as mutexes, channels (in Go), or other concurrency-safe techniques, depending on the specific requirements of your implementation.

Summary so far

The implementation of these ideas resulted in code that, when executed on a Ryzen 7700 processor paired with an M.2 SSD, completed the task in approximately 40 minutes.

Considering compression.

The next consideration, based on the volume of data and hence the presence of significant disk operations, was the use of compression. The Brotli algorithm was chosen for compression. Its high compression ratio and efficient decompression make it a suitable choice for reducing disk IO overhead while maintaining good performance during intermediate storage and processing.

Here is the example of chunking with Brotli:

package main

import (
    "bufio"
    "fmt"
    "os"
    "strconv"
    "strings"
    "math/bits"
)

//  Parse IP address "A.B.C.D"  => uint32 number
func ipToUint32(ipStr string) (uint32, error) {
    parts := strings.Split(ipStr, ".")
    if len(parts) != 4 {
        return 0, fmt.Errorf("invalid IP format")
    }

    var ipNum uint32
    for i := 0; i < 4; i++ {
        val, err := strconv.Atoi(parts[i])
        if err != nil || val < 0 || val > 255 {
            return 0, fmt.Errorf("invalid IP octet: %v", parts[i])
        }
        ipNum = (ipNum << 8) | uint32(val)
    }

    return ipNum, nil
}


func popcount64(x uint64) int {
    return bits.OnesCount64(x)
}

func main() {
    filePath := "ips.txt"

    file, err := os.Open(filePath)
    if err != nil {
        fmt.Printf("Error opening file: %v\n", err)
        return
    }
    defer file.Close()

    // IPv4 space size: 2^32 = 4,294,967,296
    // We need 2^32 bits, that is (2^32)/64 64-bit words
    totalBits := uint64(1) << 32       // 2^32
    arraySize := totalBits / 64        //how many uint64 do we need
    bitset := make([]uint64, arraySize)

    scanner := bufio.NewScanner(file)
    for scanner.Scan() {
        ipStr := scanner.Text()
        ipNum, err := ipToUint32(ipStr)
        if err != nil {
            fmt.Printf("Incorrect IP: %s\n", ipStr)
            continue
        }

        idx := ipNum / 64
        bit := ipNum % 64
        mask := uint64(1) << bit
        // Setting the bit
        bitset[idx] |= mask
    }

    if err := scanner.Err(); err != nil {
        fmt.Printf("Error reading file: %v\n", err)
        return
    }

    count := 0
    for _, val := range bitset {
        count += bits.OnesCount64(val)
    }

    fmt.Printf("Number of unique IP addresses: %d\n", count)
}
Copy after login
Copy after login

Results of Using Compression

The effectiveness of compression is debatable and highly dependent on the conditions under which the solution is used. High compression reduces disk space usage but proportionally increases overall execution time. On slow HDDs, compression can provide a significant speed boost, as disk I/O becomes the bottleneck. Conversely, on fast SSDs, compression may lead to slower execution times.

In tests conducted on a system with M.2 SSDs, compression showed no performance improvement. As a result, I ultimately decided to forgo it. However, if you're willing to risk adding complexity to your code and potentially reducing its readability, you could implement compression as an optional feature, controlled by a configurable flag.

What to do next

In pursuit of further optimization, we turn our attention to the binary transformation of our solution. Once the text-based IP addresses are converted into numeric hashes, all subsequent operations can be performed in binary format.

145.67.23.4
8.34.5.23
89.54.3.124
89.54.3.124
3.45.71.5
... 
Copy after login
Copy after login
Copy after login
Copy after login

Advantages of the Binary Format

  • Compactness:

Each number occupies a fixed size (e.g., uint32 = 4 bytes).
For 1 million IP addresses, the file size will be only ~4 MB.

  • Fast Processing:

There's no need to parse strings, which speeds up reading and writing operations.

  • Cross-Platform Compatibility:

By using a consistent byte order (either LittleEndian or BigEndian), files can be read across different platforms.

Conclusion
Storing data in binary format is a more efficient method for writing and reading numbers. For complete optimization, convert both the data writing and reading processes to binary format. Use binary.Write for writing and binary.Read for reading.

Here's what the processChunk function might look like to work with binary format:

package main

import (
    "bufio"
    "fmt"
    "os"
    "strconv"
    "strings"
    "math/bits"
)

//  Parse IP address "A.B.C.D"  => uint32 number
func ipToUint32(ipStr string) (uint32, error) {
    parts := strings.Split(ipStr, ".")
    if len(parts) != 4 {
        return 0, fmt.Errorf("invalid IP format")
    }

    var ipNum uint32
    for i := 0; i < 4; i++ {
        val, err := strconv.Atoi(parts[i])
        if err != nil || val < 0 || val > 255 {
            return 0, fmt.Errorf("invalid IP octet: %v", parts[i])
        }
        ipNum = (ipNum << 8) | uint32(val)
    }

    return ipNum, nil
}


func popcount64(x uint64) int {
    return bits.OnesCount64(x)
}

func main() {
    filePath := "ips.txt"

    file, err := os.Open(filePath)
    if err != nil {
        fmt.Printf("Error opening file: %v\n", err)
        return
    }
    defer file.Close()

    // IPv4 space size: 2^32 = 4,294,967,296
    // We need 2^32 bits, that is (2^32)/64 64-bit words
    totalBits := uint64(1) << 32       // 2^32
    arraySize := totalBits / 64        //how many uint64 do we need
    bitset := make([]uint64, arraySize)

    scanner := bufio.NewScanner(file)
    for scanner.Scan() {
        ipStr := scanner.Text()
        ipNum, err := ipToUint32(ipStr)
        if err != nil {
            fmt.Printf("Incorrect IP: %s\n", ipStr)
            continue
        }

        idx := ipNum / 64
        bit := ipNum % 64
        mask := uint64(1) << bit
        // Setting the bit
        bitset[idx] |= mask
    }

    if err := scanner.Err(); err != nil {
        fmt.Printf("Error reading file: %v\n", err)
        return
    }

    count := 0
    for _, val := range bitset {
        count += bits.OnesCount64(val)
    }

    fmt.Printf("Number of unique IP addresses: %d\n", count)
}




WTF?! It became much slower!!

In binary format it became slower to work. A file with 100 million lines (IP addresses) is processed in binary form in 4.5 minutes, against 25 seconds in text. With equal chunk size and number of workers. Why?

Working with Binary Format May Be Slower than Text Format
Using binary format can sometimes be slower than text format due to the specifics of how binary.Read and binary.Write operate, as well as potential inefficiencies in their implementation. Here are the main reasons why this might happen:

I/O Operations

  • Text Format:

Works with larger data blocks using bufio.Scanner, which is optimized for reading lines.
Reads entire lines and parses them, which can be more efficient for small conversion operations.

  • Binary Format:

binary.Read reads 4 bytes at a time, resulting in more frequent small I/O operations.
Frequent calls to binary.Read increase overhead from switching between user and system space.

Solution: Use a buffer to read multiple numbers at once.

func fastSplit(s string) []string {
    n := 1
    c := DelimiterByte

    for i := 0; i < len(s); i++ {
        if s[i] == c {
            n++
        }
    }

    out := make([]string, n)
    count := 0
    begin := 0
    length := len(s) - 1

    for i := 0; i <= length; i++ {
        if s[i] == c {
            out[count] = s[begin:i]
            count++
            begin = i + 1
        }
    }
    out[count] = s[begin : length+1]

    return out
}
Copy after login

Why Does Buffering Improve Performance?

  • Fewer I/O Operations:
    Instead of writing each number directly to disk, data is accumulated in a buffer and written in larger blocks.

  • Reduced Overhead:

Each disk write operation incurs overhead due to context switching between the process and the operating system. Buffering reduces the number of such calls.

We also present the code for binary multiphase merge:

145.67.23.4
8.34.5.23
89.54.3.124
89.54.3.124
3.45.71.5
... 
Copy after login
Copy after login
Copy after login
Copy after login

The result is fantastic: 14 min for 110Gb file with 8 billion lines!

Image description

That's an outstanding result! Processing an 110 GB file with 8 billion lines in 14 minutes is indeed impressive. It demonstrates the power of:

  • Buffered I/O:

By processing large chunks of data in memory instead of line-by-line or value-by-value, you drastically reduce the number of I/O operations, which are often the bottleneck.

  • Optimized Binary Processing:

Switching to binary reading and writing minimizes parsing overhead, reduces the size of intermediate data, and improves memory efficiency.

  • Efficient Deduplication:

Using memory-efficient algorithms for deduplication and sorting ensures that CPU cycles are utilized effectively.

  • Parallelism:

Leveraging goroutines and channels to parallelize the workload across workers balances CPU and disk utilization.

Conclusion

Finally, here is the complete code for the final solution. Feel free to use it and adapt it to your needs!

External merge solution for Gophers

Good luck!

The above is the detailed content of External Merge Problem - Complete Guide for Gophers. For more information, please follow other related articles on the PHP Chinese website!

Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn

Hot AI Tools

Undresser.AI Undress

Undresser.AI Undress

AI-powered app for creating realistic nude photos

AI Clothes Remover

AI Clothes Remover

Online AI tool for removing clothes from photos.

Undress AI Tool

Undress AI Tool

Undress images for free

Clothoff.io

Clothoff.io

AI clothes remover

Video Face Swap

Video Face Swap

Swap faces in any video effortlessly with our completely free AI face swap tool!

Hot Tools

Notepad++7.3.1

Notepad++7.3.1

Easy-to-use and free code editor

SublimeText3 Chinese version

SublimeText3 Chinese version

Chinese version, very easy to use

Zend Studio 13.0.1

Zend Studio 13.0.1

Powerful PHP integrated development environment

Dreamweaver CS6

Dreamweaver CS6

Visual web development tools

SublimeText3 Mac version

SublimeText3 Mac version

God-level code editing software (SublimeText3)

What are the vulnerabilities of Debian OpenSSL What are the vulnerabilities of Debian OpenSSL Apr 02, 2025 am 07:30 AM

OpenSSL, as an open source library widely used in secure communications, provides encryption algorithms, keys and certificate management functions. However, there are some known security vulnerabilities in its historical version, some of which are extremely harmful. This article will focus on common vulnerabilities and response measures for OpenSSL in Debian systems. DebianOpenSSL known vulnerabilities: OpenSSL has experienced several serious vulnerabilities, such as: Heart Bleeding Vulnerability (CVE-2014-0160): This vulnerability affects OpenSSL 1.0.1 to 1.0.1f and 1.0.2 to 1.0.2 beta versions. An attacker can use this vulnerability to unauthorized read sensitive information on the server, including encryption keys, etc.

What libraries are used for floating point number operations in Go? What libraries are used for floating point number operations in Go? Apr 02, 2025 pm 02:06 PM

The library used for floating-point number operation in Go language introduces how to ensure the accuracy is...

What is the problem with Queue thread in Go's crawler Colly? What is the problem with Queue thread in Go's crawler Colly? Apr 02, 2025 pm 02:09 PM

Queue threading problem in Go crawler Colly explores the problem of using the Colly crawler library in Go language, developers often encounter problems with threads and request queues. �...

Transforming from front-end to back-end development, is it more promising to learn Java or Golang? Transforming from front-end to back-end development, is it more promising to learn Java or Golang? Apr 02, 2025 am 09:12 AM

Backend learning path: The exploration journey from front-end to back-end As a back-end beginner who transforms from front-end development, you already have the foundation of nodejs,...

In Go, why does printing strings with Println and string() functions have different effects? In Go, why does printing strings with Println and string() functions have different effects? Apr 02, 2025 pm 02:03 PM

The difference between string printing in Go language: The difference in the effect of using Println and string() functions is in Go...

How to specify the database associated with the model in Beego ORM? How to specify the database associated with the model in Beego ORM? Apr 02, 2025 pm 03:54 PM

Under the BeegoORM framework, how to specify the database associated with the model? Many Beego projects require multiple databases to be operated simultaneously. When using Beego...

How to solve the user_id type conversion problem when using Redis Stream to implement message queues in Go language? How to solve the user_id type conversion problem when using Redis Stream to implement message queues in Go language? Apr 02, 2025 pm 04:54 PM

The problem of using RedisStream to implement message queues in Go language is using Go language and Redis...

PostgreSQL monitoring method under Debian PostgreSQL monitoring method under Debian Apr 02, 2025 am 07:27 AM

This article introduces a variety of methods and tools to monitor PostgreSQL databases under the Debian system, helping you to fully grasp database performance monitoring. 1. Use PostgreSQL to build-in monitoring view PostgreSQL itself provides multiple views for monitoring database activities: pg_stat_activity: displays database activities in real time, including connections, queries, transactions and other information. pg_stat_replication: Monitors replication status, especially suitable for stream replication clusters. pg_stat_database: Provides database statistics, such as database size, transaction commit/rollback times and other key indicators. 2. Use log analysis tool pgBadg

See all articles