일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | |||||
3 | 4 | 5 | 6 | 7 | 8 | 9 |
10 | 11 | 12 | 13 | 14 | 15 | 16 |
17 | 18 | 19 | 20 | 21 | 22 | 23 |
24 | 25 | 26 | 27 | 28 | 29 | 30 |
- scala
- 카프카
- 플레이 프레임워크
- spring-batch
- spring-kafka
- gradle
- Kafka
- coursera
- kafkastreams
- schema registry
- confluent
- kafka interactive query
- play framework
- avo
- reactive
- Slick
- statestore
- kafka streams
- springboot
- Elasticsearch
- aws
- Spring
- kafkastream
- Logstash
- scala 2.10
- 한빛미디어
- enablekafkastreams
- spring-cloud-stream
- RabbitMQ
- Elk
- Today
- Total
b
max.in.flight.request.per.connection 본문
필요한 수준까지 이해한듯 해서, 메모로 남김
원래의 카프카는 데이터 주입에 대한 순서가 중요하지 않았다. (정확히는 어느 정도의 버퍼를 카프카 브로커가 가지면서 Event Time에 대한 조정 작업을 알아서 해준다)
하지만 producer의 명등성 옵션과 순서 보장에 대한 이슈가 중요해졌고 max.in.flight.request.per.connection = 1 로 셋팅하면서 순서 꼬임을 좀 방어하고자 했는데...
1로 설정하고 idempotence 옵션이 켜져있고 retry가 가능한 상태일때
전송하다가 OutOfOrderSequence 예외가 발생한다면 클라이언트 영역에서 이 Sequence Number를 잘 처리해야한다. (재전송 하거나, 다음 idempotence 전송에서 써야한다)
이게 어렵다... 그래서 그냥 하드 코딩으로 각 Topic Partition의 5개의 메타데이터를 들고 있게한것이다.
if (!inflightBatchesBySequence.containsKey(batch.topicPartition)) {
inflightBatchesBySequence.put(batch.topicPartition, new PriorityQueue<>(5, new Comparator<ProducerBatch>() {
@Override
public int compare(ProducerBatch o1, ProducerBatch o2) {
return o1.baseSequence() - o2.baseSequence();
}
}));
그래서 kafka 문서의 "enable.idempotence" 항목을 보면 Note that enabling idempotence requires max.in.flight.requests.per.connection to be less than or equal to 5 라고 나온다.
idempotence 켜져있고 내부의 매직 넘버인 5보다 크게 셋팅한다면 sequence를 보장하지 못할 수 있기 때문이다.
실제로도
if (idempotenceEnabled && 5 < config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)) {
throw new ConfigException("Must set " + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to at most 5" +
" to use the idempotent producer.");
}
코드에 의해서 실행 자체를 막아놨다.