'2018/06'에 해당되는 글 2건

  1. 2018.06.18 max.in.flight.request.per.connection
  2. 2018.06.11 Schema Registry , Schema Evolution

max.in.flight.request.per.connection

분류없음 2018.06.18 14:51 posted by dev.bistro

필요한 수준까지 이해한듯 해서, 메모로 남김

원래의 카프카는 데이터 주입에 대한 순서가 중요하지 않았다.  (정확히는 어느 정도의 버퍼를 카프카 브로커가 가지면서 Event Time에 대한 조정 작업을 알아서 해준다)

하지만 producer의 명등성 옵션과 순서 보장에 대한 이슈가 중요해졌고 max.in.flight.request.per.connection = 1 로 셋팅하면서 순서 꼬임을 좀 방어하고자 했는데...

1로 설정하고 idempotence 옵션이 켜져있고 retry가 가능한 상태일때 
전송하다가 OutOfOrderSequence 예외가 발생한다면 클라이언트 영역에서 이 Sequence Number를 잘 처리해야한다. (재전송 하거나, 다음 idempotence 전송에서 써야한다)


이게 어렵다... 그래서 그냥 하드 코딩으로 각 Topic Partition의 5개의 메타데이터를 들고 있게한것이다. 

if (!inflightBatchesBySequence.containsKey(batch.topicPartition)) {
inflightBatchesBySequence.put(batch.topicPartition, new PriorityQueue<>(5, new Comparator<ProducerBatch>() {
@Override
public int compare(ProducerBatch o1, ProducerBatch o2) {
return o1.baseSequence() - o2.baseSequence();
}
}));

그래서 kafka 문서의 "enable.idempotence" 항목을 보면 Note that enabling idempotence requires max.in.flight.requests.per.connection to be less than or equal to 5 라고 나온다.


idempotence 켜져있고 나부의 매직 넘버인 5보다 크게 셋팅한다면 sequence를 보장하지 못할 수 있기 때문이다.

실제로도 

        if (idempotenceEnabled && 5 < config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)) {

            throw new ConfigException("Must set " + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to at most 5" +

                    " to use the idempotent producer.");

        }

코드에 의해서 실행 자체를 막아놨다.



요약
1. 그냥 디폴트로 쓰자.
2. 프로듀서에서 동기식으로 사용한다면 (.get() 호출) 더더욱 상관하지 않아도 된다.


Schema Registry , Schema Evolution

분류없음 2018.06.11 21:45 posted by dev.bistro

몇 달전에 메모장에 적어놨던 내용인데, 지우면 또 까먹을 듯 해서 기록으로 남김


* 기존 Java Class에서 Avro Schema 획득하기

ReflectData.get().getSchema 를 이용하여, 쉽게 json형식의 AvroSchema를 획득할 수 있다. (검증은 꼭 할것)
아쉽지만 record 타입을 자동으로 찾아서 해준다거나, vargs 형식으로 getSchema를 사용할 수는 없다.

Schema schema = ReflectData.get().getSchema(Order.class);
System.out.println(schema.toString(true));


* Schema Evolution

1. backward : 새스키마로 이전 데이터를 읽을 수 있다.
 ex) 새 field를 추가할때 default value를 넣는다. 필드가 없다면 default value를 사용한다.  즉 데이터는 old type인데 읽는 쪽이 new type

2. forward : 스키마가 변하지 않는데, 새로운 스키마의 데이터가 들어오면 ignore 한다.
 'forward' : we want forward compatible when we want to make a data stream evolve without chaning our downstream consumers . 즉, 데이터는 new type인데 읽는 쪽이 old type

3. full : 1,2번 두개 합친거
- 필드를 추가할 때에는 default를 무조건 넣어야 한다(backward position)
- default가 있는 필드만 remove 할 수 있다.


* avro schema 를 사용하는 추천 가이드

1. PK를 사용해라
2. remove될수 있는 필드라면 default를 제공하라
3. enum은 주의해서 사용하라. - 시간이 흘러도 evolve 할 수 없다?
4. rename 하지 말것, 필요하다면 다른 필드를 넣어라
5. 항상 default value를 쓰고, required field는 삭제하지 마라



티스토리 툴바