b

max.in.flight.request.per.connection 본문

카테고리 없음

max.in.flight.request.per.connection

dev.bistro 2018. 6. 18. 14:51

필요한 수준까지 이해한듯 해서, 메모로 남김

원래의 카프카는 데이터 주입에 대한 순서가 중요하지 않았다.  (정확히는 어느 정도의 버퍼를 카프카 브로커가 가지면서 Event Time에 대한 조정 작업을 알아서 해준다)

하지만 producer의 명등성 옵션과 순서 보장에 대한 이슈가 중요해졌고 max.in.flight.request.per.connection = 1 로 셋팅하면서 순서 꼬임을 좀 방어하고자 했는데...

1로 설정하고 idempotence 옵션이 켜져있고 retry가 가능한 상태일때 
전송하다가 OutOfOrderSequence 예외가 발생한다면 클라이언트 영역에서 이 Sequence Number를 잘 처리해야한다. (재전송 하거나, 다음 idempotence 전송에서 써야한다)


이게 어렵다... 그래서 그냥 하드 코딩으로 각 Topic Partition의 5개의 메타데이터를 들고 있게한것이다. 

if (!inflightBatchesBySequence.containsKey(batch.topicPartition)) {
inflightBatchesBySequence.put(batch.topicPartition, new PriorityQueue<>(5, new Comparator<ProducerBatch>() {
@Override
public int compare(ProducerBatch o1, ProducerBatch o2) {
return o1.baseSequence() - o2.baseSequence();
}
}));

그래서 kafka 문서의 "enable.idempotence" 항목을 보면 Note that enabling idempotence requires max.in.flight.requests.per.connection to be less than or equal to 5 라고 나온다.


idempotence 켜져있고 내부의 매직 넘버인 5보다 크게 셋팅한다면 sequence를 보장하지 못할 수 있기 때문이다.

실제로도 

        if (idempotenceEnabled && 5 < config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)) {

            throw new ConfigException("Must set " + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to at most 5" +

                    " to use the idempotent producer.");

        }

코드에 의해서 실행 자체를 막아놨다.



요약
1. 그냥 디폴트로 쓰자.
2. 프로듀서에서 동기식으로 사용한다면 (.get() 호출) 더더욱 상관하지 않아도 된다.


Comments