'2018/11'에 해당되는 글 1건

  1. 2018.11.09 Kafka Message를 logstash를 이용하여 Elasticsearch에 저장하기


logstash 에는 이미 kafka input 모듈이 있기 때문에 쉽게 카프카 메시지를 엘라스틱에 저장할 수 있다.
( https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html )

몇가지 주의할 점


1) 주문번호등을 Partition Key로 이용할 경우에, 데이터가 어느 파티션에 들어가 있는지, Offset은 얼마인지도 중요한 디버깅 요소가 된다. 이러한 데이터도 함께 elasticsearch 에 넣으려면 add_field를 이용한다.


우선 input 쪽에 아래의 설정을 추가하여 메타 정보를 사용할 수 있게 한다

input {

kafka {

decoreate_events => true

}

}


그리고 filter 영역에 아래처럼 추가하여 Kafka Message의 Offset, Partition, EventTime을 ES document에 추가한다.

filter {

mutate {

"partition" => "%{[@metadata][kafka][partition]}"

"offset" => "%{[@metadata][kafka][offset]}"

"eventtime" => "%{[@metadata][kafka][timestamp]}"

}

}



2)  logstash를 이용하여 데이터를 주입 받을 경우, Document의 생성시간은 logstash의 서버시간에 의존적이다. 하지만 Kafka Message를 모니터링 하기 위해서는 EventTime을 기반으로 index time을 설정하는 것이 편리했다.
아래처럼 Kafka Header에 존재하는 EventTime을 @timestamp 로 변경한다.

filter {

data {

match => ["eventtime", "UNIX_MS"]

target => "@timestmap"

}

}

이제 kibana에서 @timestamp로 index-pattern을 생성한다면, 그것은 kafka의 eventtime을 기반으로 보는것이다.


3) 카프카를 사용하다보면 실제로는 Avro Protocols을 사용하는 경우가 많다. 이 경우에는 외부 플러그인을 통해서 엘라스틱 서치로 데이터를 저장 할 수 있다.

https://github.com/revpoint/logstash-codec-avro_schema_registry 를 설치한다.

kafka input을 아래처럼 수정한다.

input {

kafka {

...

codec => avro_schema_registry {

endpoint => "http://registry:8081"

}

value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"

}

}