일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | |||||
3 | 4 | 5 | 6 | 7 | 8 | 9 |
10 | 11 | 12 | 13 | 14 | 15 | 16 |
17 | 18 | 19 | 20 | 21 | 22 | 23 |
24 | 25 | 26 | 27 | 28 | 29 | 30 |
- spring-cloud-stream
- 플레이 프레임워크
- Kafka
- schema registry
- reactive
- avo
- Elasticsearch
- statestore
- kafka interactive query
- coursera
- kafka streams
- aws
- spring-batch
- scala 2.10
- enablekafkastreams
- play framework
- confluent
- RabbitMQ
- 한빛미디어
- Elk
- Logstash
- Slick
- kafkastreams
- Spring
- scala
- springboot
- 카프카
- kafkastream
- spring-kafka
- gradle
- Today
- Total
b
Kafka Message를 logstash를 이용하여 Elasticsearch에 저장하기 본문
logstash 에는 이미 kafka input 모듈이 있기 때문에 쉽게 카프카 메시지를 엘라스틱에 저장할 수 있다.
( https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html )
몇가지 주의할 점
1) 주문번호등을 Partition Key로 이용할 경우에, 데이터가 어느 파티션에 들어가 있는지, Offset은 얼마인지도 중요한 디버깅 요소가 된다. 이러한 데이터도 함께 elasticsearch 에 넣으려면 add_field를 이용한다.
우선 input 쪽에 아래의 설정을 추가하여 메타 정보를 사용할 수 있게 한다
input {
kafka {
decoreate_events => true
}
}
그리고 filter 영역에 아래처럼 추가하여 Kafka Message의 Offset, Partition, EventTime을 ES document에 추가한다.
filter {
mutate {
"partition" => "%{[@metadata][kafka][partition]}"
"offset" => "%{[@metadata][kafka][offset]}"
"eventtime" => "%{[@metadata][kafka][timestamp]}"
}
}
2) logstash를 이용하여 데이터를 주입 받을 경우, Document의 생성시간은 logstash의 서버시간에 의존적이다. 하지만 Kafka Message를 모니터링 하기 위해서는 EventTime을 기반으로 index time을 설정하는 것이 편리했다.
아래처럼 Kafka Header에 존재하는 EventTime을 @timestamp 로 변경한다.
filter {
date {
match => ["eventtime", "UNIX_MS"]
target => "@timestmap"
}
}
이제 kibana에서 @timestamp로 index-pattern을 생성한다면, 그것은 kafka의 eventtime을 기반으로 보는것이다.
3) 카프카를 사용하다보면 실제로는 Avro Protocols을 사용하는 경우가 많다. 이 경우에는 외부 플러그인을 통해서 엘라스틱 서치로 데이터를 저장 할 수 있다.
https://github.com/revpoint/logstash-codec-avro_schema_registry 를 설치한다.
kafka input을 아래처럼 수정한다.
input {
kafka {
...
codec => avro_schema_registry {
endpoint => "http://registry:8081"
}
value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
}
}