분류없음2018.11.09 12:09


logstash 에는 이미 kafka input 모듈이 있기 때문에 쉽게 카프카 메시지를 엘라스틱에 저장할 수 있다.
( https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html )

몇가지 주의할 점


1) 주문번호등을 Partition Key로 이용할 경우에, 데이터가 어느 파티션에 들어가 있는지, Offset은 얼마인지도 중요한 디버깅 요소가 된다. 이러한 데이터도 함께 elasticsearch 에 넣으려면 add_field를 이용한다.


우선 input 쪽에 아래의 설정을 추가하여 메타 정보를 사용할 수 있게 한다

input {

kafka {

decoreate_events => true

}

}


그리고 filter 영역에 아래처럼 추가하여 Kafka Message의 Offset, Partition, EventTime을 ES document에 추가한다.

filter {

mutate {

"partition" => "%{[@metadata][kafka][partition]}"

"offset" => "%{[@metadata][kafka][offset]}"

"eventtime" => "%{[@metadata][kafka][timestamp]}"

}

}



2)  logstash를 이용하여 데이터를 주입 받을 경우, Document의 생성시간은 logstash의 서버시간에 의존적이다. 하지만 Kafka Message를 모니터링 하기 위해서는 EventTime을 기반으로 index time을 설정하는 것이 편리했다.
아래처럼 Kafka Header에 존재하는 EventTime을 @timestamp 로 변경한다.

filter {

data {

match => ["eventtime", "UNIX_MS"]

target => "@timestmap"

}

}

이제 kibana에서 @timestamp로 index-pattern을 생성한다면, 그것은 kafka의 eventtime을 기반으로 보는것이다.


3) 카프카를 사용하다보면 실제로는 Avro Protocols을 사용하는 경우가 많다. 이 경우에는 외부 플러그인을 통해서 엘라스틱 서치로 데이터를 저장 할 수 있다.

https://github.com/revpoint/logstash-codec-avro_schema_registry 를 설치한다.

kafka input을 아래처럼 수정한다.

input {

kafka {

...

codec => avro_schema_registry {

endpoint => "http://registry:8081"

}

value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"

}

}


Posted by dev.bistro
분류없음2018.04.16 09:47

GenericRecord

File/String 기반의 Schema 에서 Avro Object를 생성하는 것을 말한다.  이 방법은 runtime 에서 실패할 수 있기 때문에 사용에서는 추천되는 방법은 아니지만, 쉽게 사용 할 수 있는 장점이 있다.

Schema

{
"type": "record",
"namespace": "com.example",
"name": "user",
"fields": [
{
"name": "name",
"type": "string"
}
]
}다시피 


GenericRecordBuilder builder = new GenericRecordBuilder(schemaString);
builder.set("name", "bistros");
GenericData.Record bistros = builder.build();
System.out.println(bistros);
System.out.println(bistros.getSchema());
// {"name": "bistros"}
// {"type":"record","name":"user","namespace":"com.example", "fields":[{"name":"name","type":"string"}]}

보시다시피  GenericRecordBuilder set(String fieldName, Object value) 를 통해서 값을 셋팅하기 때문에 불안하다.


SpecificRecord

그에 반해서 SpecificRecord 역시 Avro Object 이지만  Avro Schema에 의해 성성되는 코드가 좀 더 포함되어 있다.
GenericRecordBuilder builder()는 Record type을 반환하는 데  그 내부를 보면, Map<String, Object>와 다름이 없다는 것을 확인 할 수 있다.
- GenericRecord 인터페이스를 구현 ( get/put만 있다)
- 내부적으로 Object[] 만을 가지고 있다.
링크 : https://github.com/apache/avro/blob/master/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java#L178

하지만 SpecificRecord는 좀 다르다.  avro schema를 기반으로 maven/gradle 플러기인으로 resources를 생성해보면  extends org.apache.avro.specific.SpecificRecordBase 를 통해서 똑같이 GenericData를 상속하고 있지만  6개 필드를 가진 Avro Schema 파일에서 약 550라인의 Class파일이생성된다. getter/setter/builder 등이 구현되어 있다.


요약)

Gradle Plugin을 이용해서 Class 파일을 만들어야 하지만 SpecificRecord를 사용하는 게 당연하다.


사족)

Confluent의 Kafka-Avro-Serializer는 default로 generic type을 읽고 쓰게 되어 있다.

properties.setProperty("specific.avro.reader", "true");

을 통해서 specific 모드를 활성화 해야 하고,  이것을 활성화 하면 내부적으로 사용하는 DatumReader를  SpecificDatumReader 으로 셋팅하다.

추가적으로 Streams 모드를 위해서는 아예  GenericAvroSerde, SpecificAvroSerde 2개의 Serde를 따로 제공해주고 있다.


Posted by dev.bistro