일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
- Spring
- RabbitMQ
- play framework
- kafkastream
- avo
- scala 2.10
- confluent
- spring-batch
- enablekafkastreams
- 한빛미디어
- 플레이 프레임워크
- Elasticsearch
- statestore
- Slick
- springboot
- kafkastreams
- reactive
- Elk
- kafka streams
- schema registry
- Logstash
- 카프카
- gradle
- Kafka
- spring-kafka
- aws
- kafka interactive query
- spring-cloud-stream
- scala
- coursera
- Today
- Total
b
elasticsearch 에서 많은 문서를 순차 검색 하기 본문
엘라스틱서치는 한번에 볼 수 있는 데이터의 건수가 제한적이다. 예를 들어
GET INDEX_NAME/_search
{
"size" :15000
}
//결과
"root_cause": [
{
"type": "illegal_argument_exception",
"reason": "Result window is too large,
from + size must be less than or equal to: [10000] but was [15000].
See the scroll api for a more efficient way to request large data sets.
This limit can be set by changing the [index.max_result_window] i
ndex level setting."
}
],....
처럼 호출을 한다면 illegal exception 이 일어나고 max_result_window 값을 변경하라고 한다.
어느 정도는 이 값을 늘여서 효과를 볼 수 있지만, heap memory 나 elasped time 등의 이슈로 무제한으로 늘일수 없다 ( https://www.elastic.co/guide/en/elasticsearch/reference/current/index-modules.html )
다음 대안으로 Scroll 이나 search after를 이용하라고 하는데, 이중 search -after를 이용하였다.
QueryBuilder qb = getDailyQueryBuilder(statDate);
String[] includeFields = new String[]{"@timestamp", "usn", "j.room"};
Object[] searchAfter = null;
while (true) {
SearchSourceBuilder searchSourceBuilder =
new SearchSourceBuilder()
.size(1000)
.query(qb)
.sort(SortBuilders.fieldSort("@timestamp")) //defautl ASC
.sort(SortBuilders.fieldSort("usn"))
.fetchSource(includeFields, null)
.timeout(new TimeValue(60, TimeUnit.SECONDS));
if (searchAfter != null) {
// 바로 직전 쿼리의 마지막 Docuemnt의 sort_value를 셋팅한다.
// 동일한 쿼리에 'sort_value' 의 조건식이 더 추가된다.
searchSourceBuilder.searchAfter(searchAfter);
}
SearchRequest searchRequest =
new SearchRequest()
.indices(ElasticConstant.Indices.INDEX_NAME)
.source(searchSourceBuilder);
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
SearchHit[] hits = searchResponse.getHits().getHits();
if (hits.length > 0) {
// foreach 각 문서 처리
SearchHit lastHitDocument = hits[hits.length - 1];
searchAfter = lastHitDocument.getSortValues();
} else {
return;
}
}
SearchSourceBuilder
코드를 보면 바로 이해할 수 있다.
- ( 1000건의 데이터를 데이터를 불러온다 -> 각 문서 처리 -> 마지막 문서의 sort value를 가지고 다시 한번 호출한다. ) * 반복
- 쿼리된 결과 문서가 0 건이면, 전체 다 불러온것이므로 로직을 종료한다.
주의 사항
1. hits.length 대신 total hit 는 사용하면 안된다. total hit은 질의에 대한 전체 건수이므로 이 값으로 iteration 를 하기에는 부적절 했다.
2. search_after를 쓸때는 해당 document를 unque 하게 추출 할 수 있어야 한다. 즉 order by timestamp로 했을때 모든 문서의 timestamp가 달라야 한다는 것이다. 그래서 위의 예제에서도 sort 조건식을 2개 (timestamp, usn) 으로 하여 확실한 결과를 받을 수 있도록 했다.