这是给您的另一个代码审查面试问题。这个问题比前面的问题更高级,针对的是更资深的受众。该问题需要了解切片以及在并行进程之间共享数据。
如果您不熟悉切片及其构造方式,请查看我之前关于切片标题的文章
当两个或多个线程(或 goroutine,在 Go 的情况下)同时访问共享内存时,就会发生数据竞争,并且这些访问中至少有一个是写操作。如果没有适当的同步机制(例如锁或通道)来管理访问,结果可能是不可预测的行为,包括数据损坏、状态不一致或崩溃。
本质上,数据竞争发生在以下情况:
因此,线程或 goroutine 访问或修改共享内存的顺序是不可预测的,从而导致运行之间可能会发生变化的不确定行为。
+----------------------+ +---------------------+ | Thread A: Write | | Thread B: Read | +----------------------+ +---------------------+ | 1. Reads x | | 1. Reads x | | 2. Adds 1 to x | | | | 3. Writes new value | | | +----------------------+ +---------------------+ Shared variable x (Concurrent access without synchronization)
这里,线程A正在修改x(写入它),而线程B同时正在读取它。如果两个线程同时运行并且没有同步,线程 B 可以在线程 A 完成更新之前读取 x。因此,数据可能不正确或不一致。
问题:您的一位队友提交了以下代码进行代码审查。请仔细检查代码并找出任何潜在的问题。
这是您必须查看的代码:
package main import ( "bufio" "bytes" "io" "math/rand" "time" ) func genData() []byte { r := rand.New(rand.NewSource(time.Now().Unix())) buffer := make([]byte, 512) if _, err := r.Read(buffer); err != nil { return nil } return buffer } func publish(input []byte, output chan<- []byte) { reader := bytes.NewReader(input) bufferSize := 8 buffer := make([]byte, bufferSize) for { n, err := reader.Read(buffer) if err != nil || err == io.EOF { return } if n == 0 { break } output <- buffer[:n] } } func consume(input []byte) { scanner := bufio.NewScanner(bytes.NewReader(input)) for scanner.Scan() { b := scanner.Bytes() _ = b // does the magic } } func main() { data := genData() workersCount := 4 chunkChannel := make(chan []byte, workersCount) for i := 0; i < workersCount; i++ { go func() { for chunk := range chunkChannel { consume(chunk) } }() } publish(data, chunkChannel) close(chunkChannel) }
我们这里有什么?
publish() 函数负责逐块读取输入数据并将每个块发送到输出通道。首先使用 bytes.NewReader(input) 从输入数据创建一个读取器,这允许顺序读取数据。创建大小为 8 的缓冲区来保存从输入读取的每个数据块。在每次迭代期间, reader.Read(buffer) 从输入中读取最多 8 个字节,然后该函数将包含最多 8 个字节的缓冲区 (buffer[:n]) 的切片发送到输出通道。循环继续,直到 reader.Read(buffer) 遇到错误或到达输入数据的末尾。
consume()函数处理从通道接收到的数据块。它使用 bufio.Scanner 处理这些数据块,该扫描仪会扫描每个数据块,并可能根据其配置方式将其分解为行或标记。变量 b := Scanner.Bytes() 检索当前正在扫描的令牌。该函数代表基本的输入处理。
main() 创建一个缓冲通道 chunkChannel,其容量等于workersCount,在本例中设置为 4。然后该函数启动 4 个工作协程,每个协程都会同时从 chunkChannel 读取数据。每次工作进程接收到一块数据时,它都会通过调用 Consumer() 函数来处理该数据块。 publish() 函数读取生成的数据,将其分成最多 8 个字节的块,并将它们发送到通道。
该程序使用 goroutine 创建多个消费者,允许并发数据处理。每个消费者都在单独的 goroutine 中运行,独立处理数据块。
如果您运行此代码,将会发生可疑情况:
[Running] go run "main.go" [Done] exited with code=0 in 0.94 seconds
但是有一个问题。我们存在数据争用风险。在此代码中,存在潜在的数据竞争,因为publish()函数为每个块重用相同的缓冲区切片。消费者同时从此缓冲区中读取数据,并且由于切片共享底层内存,因此多个消费者可能会读取相同的内存,从而导致数据竞争。让我们尝试使用竞争检测。 Go 提供了一个内置工具来检测数据争用:竞赛检测器。您可以通过使用 -race 标志运行程序来启用它:
go run -race main.go
如果我们将 -race 标志添加到运行命令中,我们将收到以下输出:
[Running] go run -race "main.go" ================== WARNING: DATA RACE Read at 0x00c00011e018 by goroutine 6: runtime.slicecopy() /GOROOT/go1.22.0/src/runtime/slice.go:325 +0x0 bytes.(*Reader).Read() /GOROOT/go1.22.0/src/bytes/reader.go:44 +0xcc bufio.(*Scanner).Scan() /GOROOT/go1.22.0/src/bufio/scan.go:219 +0xef4 main.consume() /GOPATH/example/main.go:40 +0x140 main.main.func1() /GOPATH/example/main.go:55 +0x48 Previous write at 0x00c00011e018 by main goroutine: runtime.slicecopy() /GOROOT/go1.22.0/src/runtime/slice.go:325 +0x0 bytes.(*Reader).Read() /GOROOT/go1.22.0/src/bytes/reader.go:44 +0x168 main.publish() /GOPATH/example/main.go:27 +0xe4 main.main() /GOPATH/example/main.go:60 +0xdc Goroutine 6 (running) created at: main.main() /GOPATH/example/main.go:53 +0x50 ================== Found 1 data race(s) exit status 66 [Done] exited with code=0 in 0.94 seconds
The warning you’re seeing is a classic data race detected by Go’s race detector. The warning message indicates that two goroutines are accessing the same memory location (0x00c00011e018) concurrently. One goroutine is reading from this memory, while another goroutine is writing to it at the same time, without proper synchronization.
The first part of the warning tells us that Goroutine 6 (which is one of the worker goroutines in your program) is reading from the memory address 0x00c00011e018 during a call to bufio.Scanner.Scan() inside the consume() function.
Read at 0x00c00011e018 by goroutine 6: runtime.slicecopy() /GOROOT/go1.22.0/src/runtime/slice.go:325 +0x0 bytes.(*Reader).Read() /GOROOT/go1.22.0/src/bytes/reader.go:44 +0xcc bufio.(*Scanner).Scan() /GOROOT/go1.22.0/src/bufio/scan.go:219 +0xef4 main.consume() /GOPATH/example/main.go:40 +0x140 main.main.func1() /GOPATH/example/main.go:55 +0x48
The second part of the warning shows that the main goroutine previously wrote to the same memory location (0x00c00011e018) during a call to bytes.Reader.Read() inside the publish() function.
Previous write at 0x00c00011e018 by main goroutine: runtime.slicecopy() /GOROOT/go1.22.0/src/runtime/slice.go:325 +0x0 bytes.(*Reader).Read() /GOROOT/go1.22.0/src/bytes/reader.go:44 +0x168 main.publish() /GOPATH/example/main.go:27 +0xe4 main.main() /GOPATH/example/main.go:60 +0xdc
The final part of the warning explains that Goroutine 6 was created in the main function.
Goroutine 6 (running) created at: main.main() /GOPATH/example/main.go:53 +0x50
In this case, while one goroutine (Goroutine 6) is reading from the buffer in consume(), the publish() function in the main goroutine is simultaneously writing to the same buffer, leading to the data race.
+-------------------+ +--------------------+ | Publisher | | Consumer | +-------------------+ +--------------------+ | | v | 1. Read data into buffer | | | v | 2. Send slice of buffer to chunkChannel | | | v | +----------------+ | | chunkChannel | | +----------------+ | | | v | 3. Consume reads from slice | v 4. Concurrent access (Data Race occurs)
The data race in this code arises because of how Go slices work and how memory is shared between goroutines when a slice is reused. To fully understand this, let’s break it down into two parts: the behavior of the buffer slice and the mechanics of how the race occurs. When you pass a slice like buffer[:n] to a function or channel, what you are really passing is the slice header which contains a reference to the slice’s underlying array. Any modifications to the slice or the underlying array will affect all other references to that slice.
buffer = [ a, b, c, d, e, f, g, h ] <- Underlying array ↑ Slice: buffer[:n]
func publish(input []byte, output chan<- []byte) { reader := bytes.NewReader(input) bufferSize := 8 buffer := make([]byte, bufferSize) for { // .... output <- buffer[:n] // <-- passing is a reference to the underlying array } }
If you send buffer[:n] to a channel, both the publish() function and any consumer goroutines will be accessing the same memory. During each iteration, the reader.Read(buffer) function reads up to 8 bytes from the input data into this buffer slice. After reading, the publisher sends buffer[:n] to the output channel, where n is the number of bytes read in the current iteration.
The problem here is that buffer is reused across iterations. Every time reader.Read() is called, it overwrites the data stored in buffer.
At this point, if one of the worker goroutines is still processing the first chunk, it is now reading stale or corrupted data because the buffer has been overwritten by the second chunk. Reusing a slice neans sharing the same memory.
To avoid the race condition, we must ensure that each chunk of data sent to the channel has its own independent memory. This can be achieved by creating a new slice for each chunk and copying the data from the buffer to this new slice. The key fix is to copy the contents of the buffer into a new slice before sending it to the chunkChannel:
chunk := make([]byte, n) // Step 1: Create a new slice with its own memory copy(chunk, buffer[:n]) // Step 2: Copy data from buffer to the new slice output <- chunk // Step 3: Send the new chunk to the channel
Why this fix works? By creating a new slice (chunk) for each iteration, you ensure that each chunk has its own memory. This prevents the consumers from reading from the buffer that the publisher is still modifying. copy() function copies the contents of the buffer into the newly allocated slice (chunk). This decouples the memory used by each chunk from the buffer. Now, when the publisher reads new data into the buffer, it doesn’t affect the chunks that have already been sent to the channel.
+-------------------------+ +------------------------+ | Publisher (New Memory) | | Consumers (Read Copy) | | [ a, b, c ] --> chunk1 | | Reading: chunk1 | | [ d, e, f ] --> chunk2 | | Reading: chunk2 | +-------------------------+ +------------------------+ ↑ ↑ (1) (2) Publisher Creates New Chunk Consumers Read Safely
This solution works is that it breaks the connection between the publisher and the consumers by eliminating shared memory. Each consumer now works on its own copy of the data, which the publisher does not modify. Here’s how the modified publish() function looks:
func publish(input []byte, output chan<- []byte) { reader := bytes.NewReader(input) bufferSize := 8 buffer := make([]byte, bufferSize) for { n, err := reader.Read(buffer) if err != nil || err == io.EOF { return } if n == 0 { break } // Create a new slice for each chunk and copy the data from the buffer chunk := make([]byte, n) copy(chunk, buffer[:n]) // Send the newly created chunk to the channel output <- chunk } }
Slices Are Reference Types:
As mentioned earlier, Go slices are reference types, meaning they point to an underlying array. When you pass a slice to a channel or a function, you’re passing a reference to that array, not the data itself. This is why reusing a slice leads to a data race: multiple goroutines end up referencing and modifying the same memory.
메모리 할당:
make([]byte, n)을 사용하여 새 슬라이스를 생성하면 Go는 해당 슬라이스에 대해 별도의 메모리 블록을 할당합니다. 이는 새 슬라이스(청크)가 버퍼와 관계없이 자체 백업 배열을 갖는다는 것을 의미합니다. 버퍼[:n]의 데이터를 청크로 복사하여 각 청크에 고유한 개인 메모리 공간이 있는지 확인합니다.
메모리 분리:
버퍼에서 각 청크의 메모리를 분리함으로써 게시자는 이미 채널로 전송된 청크에 영향을 주지 않고 버퍼로 새 데이터를 계속 읽을 수 있습니다. 이제 각 청크에는 자체적인 독립적인 데이터 복사본이 있으므로 소비자는 게시자의 간섭 없이 청크를 처리할 수 있습니다.
데이터 경합 방지:
데이터 경쟁의 주요 원인은 공유 버퍼에 대한 동시 액세스였습니다. 새로운 슬라이스를 생성하고 데이터를 복사함으로써 공유 메모리를 제거하고 각 고루틴은 자체 데이터에서 작동합니다. 이는 동일한 메모리에 대해 더 이상 경합이 없기 때문에 경쟁 조건의 가능성을 제거합니다.
수정의 핵심은 간단하지만 강력합니다. 즉, 각 데이터 덩어리에 고유한 메모리가 있도록 하여 데이터 경합을 유발하는 공유 리소스(버퍼)를 제거합니다. 이는 데이터를 채널로 보내기 전에 버퍼의 데이터를 새 슬라이스로 복사함으로써 달성됩니다. 이 접근 방식을 사용하면 각 소비자는 게시자의 작업과 관계없이 자체 데이터 복사본으로 작업하여 경합 조건 없이 안전한 동시 처리를 보장합니다. 공유 메모리를 분리하는 이 방법은 동시 프로그래밍의 기본 전략입니다. 이는 경쟁 조건으로 인해 발생하는 예측할 수 없는 동작을 방지하고 여러 고루틴이 동시에 데이터에 액세스하는 경우에도 Go 프로그램이 안전하고 예측 가능하며 올바른 상태를 유지하도록 보장합니다.
정말 쉽죠!
以上是棘手的 Golang 面试问题 - 部分数据竞赛的详细内容。更多信息请关注PHP中文网其他相关文章!