이전 2개의 포스팅에서는 Fanout과 Fanin을 따로 살펴보았습니다. 항목에 대해 개별적으로 작업하고 동시성을 사용하여 안전하게 작업할 수 있는 단일 데이터 스트림이 있는 경우 이를 함께 사용하는 경우가 많습니다. 따라서 우리는 여러 작업자 스레드로 팬아웃한 다음 다시 단일 스트림으로 팬아웃합니다.
예를 들어 대용량 로그 파일이 있다고 가정해 보겠습니다. 파일을 여러 개의 덩어리로 나누어 각 작업자가 파일의 다른 부분을 동시에 작업한 다음 결과를 결합할 수 있습니다.
이전 두 게시물을 따라오셨다면 이 패턴은 명백합니다. 잘 모르겠으면 위의 링크를 참조하세요.
// produce is simulating our single input as a channel func produce() chan int { ch := make(chan int) go func() { for i := 0; i < 10; i++ { ch <- rand.Intn(50) } fmt.Printf("producer done\n") close(ch) // this is important!!! }() return ch } func worker(id int, jobs chan int, out chan OddEven, wg *sync.WaitGroup) { for value := range jobs { odd := "even" if (value & 1) == 1 { odd = "odd" } out <- OddEven{ Number: value, OddEven: odd, } } close(out) // remember this wg.Done() } // OddEven struct will be the result of the work done by each fanout thread // and be the fanin data type OddEven struct { Number int OddEven string } func fanin(inputs []chan OddEven) chan OddEven { output := make(chan OddEven) var wg sync.WaitGroup for i, input := range inputs { wg.Add(1) // explicit params to capture loop vars go func(id int, input chan OddEven, output chan OddEven, wg *sync.WaitGroup) { for value := range input { output <- value } fmt.Printf("done merging source %d\n", id) wg.Done() }(i, input, output, &wg) } go func() { wg.Wait() close(output) // this is important!!! }() return output } func main() { // simulate the input data stream inputCh := produce() numWorkers := 3 // fan-out to send data items to workers as individual jobs var wg sync.WaitGroup workerResults := make([]chan OddEven, numWorkers) for i := 0; i < numWorkers; i++ { wg.Add(1) workerResults[i] = make(chan OddEven) go worker(i, inputCh, workerResults[i], &wg) } go func() { wg.Wait() }() // fan-in the results results := fanin(workerResults) done := make(chan bool) go func() { for value := range results { fmt.Printf("got %d is %s\n", value.Number, value.OddEven) } close(done) }() <-done fmt.Println("done") }
시뮬레이션된 숫자 입력 스트림을 생성하는 producer() 함수가 있습니다.
더 이상 데이터가 없을 때까지 입력 채널에서 작동하는 작업자 기능이 있습니다. 각 값에 대해 입력 데이터를 '처리'한 다음(값이 홀수인지 짝수인지 결정) 결과 구조체를 출력 채널로 보냅니다.
각 작업자가 완료되면 결과 채널을 닫습니다. 팬인 작업이 채널에 대한 추가 데이터를 기다리며 절전 모드로 전환되므로 교착 상태를 방지하는 데 필요합니다.
메인 스레드는 생산에서 입력 스트림을 가져온 다음 각 작업자에게 결과를 보낼 자체 채널을 제공하는 여러 작업자를 시작합니다.
이러한 결과 채널은 fanin 작업으로 전송됩니다. 팬인(fanin)하려면 출력을 수신할 채널을 만든 다음 각 작업자 채널에 대해 고루틴을 시작합니다. 각 고루틴은 더 이상 데이터가 없을 때까지 채널을 반복한 후 종료됩니다. 작업 스레드에서 결과 채널을 닫았으므로 for 루프가 종료될 수 있다는 점을 기억하세요.
팬인 프로세스에 WaitGroup을 사용한다는 점에 유의하세요. 이를 통해 모든 결과 채널의 모든 결과가 언제 출력 채널에 결합되었는지 알 수 있습니다. 이런 일이 발생하면 출력을 소비하는 모든 다운스트림 스레드가 종료될 수 있도록 출력 채널을 닫습니다.
출력 채널의 모든 데이터를 사용하여 메인 스레드가 진행되어 결과를 표시할 수 있습니다. 모든 작업이 완료될 때까지 메인 스레드가 종료되는 것을 방지하기 위해 부울 채널을 사용한다는 점에 유의하세요. 그렇지 않으면 프로세스가 종료됩니다.
select 문을 사용하여 팬인을 수행하는 또 다른 방법이 있습니다. 여기서 사용된 기술은 작업자 수를 늘리거나 줄일 수 있기 때문에 조금 더 깔끔합니다.
또한 SIGTERM 또는 SIGINT와 같은 조기 종료와 관련하여 아무 것도 다루지 않았습니다. 그러면 좀 더 복잡해집니다.
이것을 어떻게 구현하시겠습니까? 팬아웃/팬인 패턴의 다른 구현이 있습니다. 아래에 의견과 생각을 남겨주세요.
감사합니다!
이 게시물과 이 시리즈의 모든 게시물에 대한 코드는 여기에서 확인할 수 있습니다
위 내용은 Go의 팬아웃-패닌 패턴의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!