php小编苹果为您介绍ES批量请求中的常见问题:`es_rejected_execution_exception`。在使用Elasticsearch进行批量请求时,有时会遇到这个异常。这个异常通常表示请求的并发数超过了Elasticsearch服务器的处理能力,导致请求被拒绝执行。本文将为您解析这个异常的原因,并给出解决方案,帮助您顺利处理该问题。
我有一个大约 5M 条目的切片(为简单起见,假设每个条目都是一个字节切片,它使用 getIndexerItem
函数映射到索引器项),我将其平均分配给 200 个 go 例程。然后每个go例程调用push
函数,切片长度为5M/200。
根据我对Refresh的理解:wait_for
,每当向elastic发出请求时,只有当该请求所做的更改对搜索可见时才会完成(IMO将其转换为不再有此特定请求的批量请求队列) )。那为什么我会收到此错误?
error indexing item: es_rejected_execution_exception: rejected execution of processing of [358323543][indices:data/write/bulk[s][p]]: request: BulkShardRequest [[ankit-test][3]] containing [3424] requests blocking until refresh, target allocation id: someId, primary term: 1 on EsThreadPoolExecutor [ name = machine_name/write, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@1f483ca1 [Running, pool size = 32, active threads = 32, queued tasks = 200, completed tasks = 44390708] ]
所有条目都将进入相同的索引,ankit-test
。
func (e *esClient) getIndexerItem(index string, id string, body []byte) esutil.BulkIndexerItem { return esutil.BulkIndexerItem{ Index: index, DocumentID: id, Body: bytes.NewReader(body), Action: "index", DocumentType: "logs", OnFailure: func(_ context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) { if err != nil { fmt.Printf("error indexing item: %s\n", err.Error()) } else { fmt.Printf("error indexing item: %s: %s\n", res.Error.Type, res.Error.Reason) } }, } } func (e *esClient) push(data []esutil.BulkIndexerItem) (*esutil.BulkIndexerStats, error) { indexer, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ Client: e.client, Refresh: "wait_for", NumWorkers: 1, OnError: func(ctx context.Context, err error) { fmt.Printf("received onError %s\n", err.Error()) }, }) if err != nil { return nil, fmt.Errorf("error creating bulk indexer: %s", err) } ctx := context.Background() for _, d := range data { if err := indexer.Add(ctx, d); err != nil { fmt.Printf("error adding data to indexer: %s\n", err) } } if err := indexer.Close(ctx); err != nil { fmt.Printf("error flushing and closing indexer: %s\n", err) } indexerStats := indexer.Stats() return &indexerStats, nil }
假设没有其他进程以任何方式与索引交互。
使用多个 ES 文档,我能够找到上述问题的解决方案。下面的回答是基于我的理解。如果您发现可以改进/纠正的内容,请发表评论。
这是请求生命周期:
我的问题是我使用 refresh = false
(默认)发送请求。相反,应该使用 refresh = wait_for
。为什么?刷新提供了3种模式:
refresh = true
便宜(就服务器负载而言),但仍然确保在发送回响应之前请求已完成。请求已从节点队列中删除。所有数据都被重定向到同一节点,并且由于 refresh = false
,在现有请求从队列中清除之前返回了响应,这导致了溢出。
以上是ES批量请求`es_rejected_execution_exception`的详细内容。更多信息请关注PHP中文网其他相关文章!