PHP-Editor Apple stellt Ihnen häufige Probleme bei ES-Batch-Anfragen vor: „es_rejected_execution_Exception“. Wenn Sie Elasticsearch zum Erstellen von Batch-Anfragen verwenden, tritt diese Ausnahme manchmal auf. Diese Ausnahme weist normalerweise darauf hin, dass die Anzahl der gleichzeitigen Anforderungen die Verarbeitungskapazität des Elasticsearch-Servers überschreitet, was dazu führt, dass die Ausführung der Anforderung verweigert wird. Dieser Artikel analysiert die Ursache dieser Ausnahme und bietet Ihnen eine Lösung, die Ihnen hilft, das Problem reibungslos zu lösen.
Ich habe ein Segment mit etwa 5 Millionen Einträgen (der Einfachheit halber nehmen wir an, dass jeder Eintrag ein Byte-Slice ist, die Funktion getIndexerItem
函数映射到索引器项),我将其平均分配给 200 个 go 例程。然后每个go例程调用push
verwendet und die Slice-Länge 5 M/200 beträgt.
Nach meinem Verständnis Refresh的理解:wait_for
wird eine Anfrage an Elastic erst dann abgeschlossen, wenn die durch diese Anfrage vorgenommenen Änderungen für die Suche sichtbar sind (IMO verwandelt dies in eine Batch-Anfragewarteschlange, die diese spezifische Anfrage nicht mehr enthält). Warum erhalte ich diesen Fehler?
error indexing item: es_rejected_execution_exception: rejected execution of processing of [358323543][indices:data/write/bulk[s][p]]: request: BulkShardRequest [[ankit-test][3]] containing [3424] requests blocking until refresh, target allocation id: someId, primary term: 1 on EsThreadPoolExecutor [ name = machine_name/write, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@1f483ca1 [Running, pool size = 32, active threads = 32, queued tasks = 200, completed tasks = 44390708] ]
Alle Einträge werden in denselben Index aufgenommen, ankit-test
.
func (e *esClient) getIndexerItem(index string, id string, body []byte) esutil.BulkIndexerItem { return esutil.BulkIndexerItem{ Index: index, DocumentID: id, Body: bytes.NewReader(body), Action: "index", DocumentType: "logs", OnFailure: func(_ context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) { if err != nil { fmt.Printf("error indexing item: %s\n", err.Error()) } else { fmt.Printf("error indexing item: %s: %s\n", res.Error.Type, res.Error.Reason) } }, } } func (e *esClient) push(data []esutil.BulkIndexerItem) (*esutil.BulkIndexerStats, error) { indexer, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ Client: e.client, Refresh: "wait_for", NumWorkers: 1, OnError: func(ctx context.Context, err error) { fmt.Printf("received onError %s\n", err.Error()) }, }) if err != nil { return nil, fmt.Errorf("error creating bulk indexer: %s", err) } ctx := context.Background() for _, d := range data { if err := indexer.Add(ctx, d); err != nil { fmt.Printf("error adding data to indexer: %s\n", err) } } if err := indexer.Close(ctx); err != nil { fmt.Printf("error flushing and closing indexer: %s\n", err) } indexerStats := indexer.Stats() return &indexerStats, nil }
Gehen Sie davon aus, dass keine anderen Prozesse in irgendeiner Weise mit dem Index interagieren.
Durch die Verwendung mehrerer ES-Dokumente konnte ich eine Lösung für das oben genannte Problem finden. Die folgende Antwort basiert auf meinem Verständnis. Wenn Sie etwas sehen, das verbessert/korrigiert werden könnte, hinterlassen Sie bitte einen Kommentar.
Dies ist der Anforderungslebenszyklus:
Mein Problem ist, dass ich refresh = false
(默认)发送请求。相反,应该使用 refresh = wait_for
verwende. Warum? Refresh bietet 3 Modi:
refresh = true
(im Hinblick auf die Serverlast), stellt aber dennoch sicher, dass die Anfrage abgeschlossen wird, bevor eine Antwort zurückgesendet wird. Die Anfrage wurde aus der Knotenwarteschlange entfernt. Alle Daten wurden an denselben Knoten umgeleitet und aufgrund von refresh = false
wurde die Antwort zurückgegeben, bevor die vorhandene Anforderung aus der Warteschlange gelöscht wurde, was den Überlauf verursachte.
Das obige ist der detaillierte Inhalt vonES-Batch-Anfrage „es_rejected_execution_Exception'.. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!