php editor Apple introduces you to common problems in ES batch requests: `es_rejected_execution_exception`. When using Elasticsearch to make batch requests, you sometimes encounter this exception. This exception usually indicates that the number of concurrent requests exceeds the processing capacity of the Elasticsearch server, causing the request to be refused execution. This article will analyze the cause of this exception and give you a solution to help you handle the problem smoothly.
I have a slice of about 5M entries (for simplicity, assume each entry is a byte slice, which is mapped to using the getIndexerItem
function indexer item), I divided it evenly among the 200 go routines. Then each go routine calls the push
function, and the slice length is 5M/200.
From my understanding of Refresh:wait_for
, whenever a request is made to elastic, it will only be completed when the changes made by that request are visible to search (IMO that translates to no longer Bulk request queue for this specific request)). So why am I getting this error?
error indexing item: es_rejected_execution_exception: rejected execution of processing of [358323543][indices:data/write/bulk[s][p]]: request: BulkShardRequest [[ankit-test][3]] containing [3424] requests blocking until refresh, target allocation id: someId, primary term: 1 on EsThreadPoolExecutor [ name = machine_name/write, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@1f483ca1 [Running, pool size = 32, active threads = 32, queued tasks = 200, completed tasks = 44390708] ]
All entries will go into the same index, ankit-test
.
func (e *esClient) getIndexerItem(index string, id string, body []byte) esutil.BulkIndexerItem { return esutil.BulkIndexerItem{ Index: index, DocumentID: id, Body: bytes.NewReader(body), Action: "index", DocumentType: "logs", OnFailure: func(_ context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) { if err != nil { fmt.Printf("error indexing item: %s\n", err.Error()) } else { fmt.Printf("error indexing item: %s: %s\n", res.Error.Type, res.Error.Reason) } }, } } func (e *esClient) push(data []esutil.BulkIndexerItem) (*esutil.BulkIndexerStats, error) { indexer, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ Client: e.client, Refresh: "wait_for", NumWorkers: 1, OnError: func(ctx context.Context, err error) { fmt.Printf("received onError %s\n", err.Error()) }, }) if err != nil { return nil, fmt.Errorf("error creating bulk indexer: %s", err) } ctx := context.Background() for _, d := range data { if err := indexer.Add(ctx, d); err != nil { fmt.Printf("error adding data to indexer: %s\n", err) } } if err := indexer.Close(ctx); err != nil { fmt.Printf("error flushing and closing indexer: %s\n", err) } indexerStats := indexer.Stats() return &indexerStats, nil }
Assume no other processes interact with the index in any way.
Using multiple ES documents, I was able to find a solution to the above problem. The answer below is based on my understanding. If you see something that could be improved/corrected, please leave a comment.
This is the request life cycle:
My problem is that I send the request using refresh = false
(default). Instead, use refresh = wait_for
. Why? Refresh provides 3 modes:
refresh = true
but still ensures that the request is completed before sending back a response. The request has been removed from the node queue. All data was redirected to the same node, and because refresh = false
, the response was returned before the existing request was cleared from the queue, which caused the overflow.
The above is the detailed content of ES batch request `es_rejected_execution_exception`. For more information, please follow other related articles on the PHP Chinese website!