我正在尝试根据搜索结果删除某些数据。我正在将Elasticsearch 2.3版与嵌入式服务器一起使用。我已经设法通过独立服务器上的按查询删除插件来做到这一点,但是看来我们不能在嵌入式服务器上使用插件。
DeleteByQueryResponse delete = new DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE)
.setIndices(ES_INDEX_NAME)
.setTypes(ES_RECORD_TYPE)
.setQuery(QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery(ENTITY_ID, entityId)))
.execute().actionGet();
有什么方法可以在v2.3中的嵌入式服务器上使用按查询删除插件?如果没有,如何根据某些匹配条件删除数据?
任何帮助将不胜感激。
我在这里找到了针对Elasticsearch早期版本的解决方案。这是我的解决方案。
public void resetStatistics(String entityId) {
if (client.admin().indices().exists(new IndicesExistsRequest(ES_INDEX_NAME)).actionGet().isExists()) {
BulkRequestBuilder bulkRequest = client.prepareBulk();
try {
logger.info("Elasticsearch cleaning task execution started");
SearchResponse scrollResp = client.prepareSearch(ES_INDEX_NAME)
.setTypes(ES_RECORD_TYPE)
.setScroll(TIME_VALUE)
.setQuery(
QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery(ENTITY_ID, entityId)))
.addSort("_doc", SortOrder.ASC)
.setNoFields()
.setSize(100).execute().actionGet();
while (true) {
for (SearchHit hit : scrollResp.getHits().getHits()) {
bulkRequest.add(client.prepareDelete()
.setIndex(ES_INDEX_NAME)
.setType(ES_RECORD_TYPE)
.setId(hit.getId()));
}
scrollResp = client.prepareSearchScroll(scrollResp.getScrollId()).setScroll(TIME_VALUE).execute().actionGet();
if (scrollResp.getHits().getHits().length == 0) {
break;
}
}
if (!bulkRequest.request().requests().isEmpty()) {
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
if (bulkResponse.hasFailures()) {
logger.error("Elasticsearch cleaning task execution failed");
} else {
logger.info("Elasticsearch cleaning task finished execution deleting {} documents", bulkResponse.getItems().length);
}
} else {
logger.info("Elasticsearch cleaning task finished execution without deleting any documents");
}
} catch (Exception e) {
logger.error("Unable to delete data", e);
}
try {
logger.info("Elasticsearch index optimization started");
ForceMergeResponse forceMergeResponse = client.admin().indices()
.prepareForceMerge(ES_INDEX_NAME)
.setFlush(true)
.setOnlyExpungeDeletes(false)
.execute().actionGet();
logger.info("Elasticsearch index optimization finished with {} successful and {} failed shards out of " +
"{} total shards", forceMergeResponse.getSuccessfulShards(), forceMergeResponse.getFailedShards(),
forceMergeResponse.getTotalShards());
} catch (Exception e) {
logger.error("Error while optimizing Elasticsearch index", e);
}
}
}
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句