일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
- Logstash
- kafkastream
- aws
- avo
- reactive
- schema registry
- kafka interactive query
- scala 2.10
- confluent
- springboot
- 한빛미디어
- Elasticsearch
- 플레이 프레임워크
- spring-cloud-stream
- statestore
- play framework
- gradle
- kafkastreams
- 카프카
- spring-batch
- Spring
- scala
- spring-kafka
- Slick
- Kafka
- Elk
- RabbitMQ
- coursera
- kafka streams
- enablekafkastreams
- Today
- Total
b
Kafka idempotent producer - 멱등성에 관해서 본문
Kafka 는 기본적으로 1번 이상의 메시지 전송을 보장 하는 At Least Once Delivery 정책이었다. 메시지가 전송되었다는 ack를 확실히 받지 못할 경우 retry send를 하는 방법으로 최소 1회의 전송을 보장 하였던 것이다. 하지만 0.11 버전부터 exactly once delivery 를 위해서 idempotent producer가 지원되기 시작하였다 (함께 transactional producer도 포함된다)
메시지가 at least once delivery (즉 duplicate send)는 링크의 글과 이미지를 참고하면 쉽게 이해할 수 있다.
https://dzone.com/articles/exactly-once-semantics-with-apache-kafka-1
사족으로 consumer 에서의 중복 메시지 수신은 auto commit 을 false로 정의하고, 메시지 처리가 완벽히 성공한 경우에만 commit offset을 하는 방법등으로 처리가 가능하다.
idempotent , transaction producer 는 KIP-98 에서 정의되었다.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging
Idempotent Producer Guarantees 를 보면 어떻게 exactly once delivery를 보장하는지에 대한 개념이 설명된다. 요약하자면 각 Producer는 Unique한 ID를 가지고 메시지를 발행할따 Sequence Number를 함께 보낸다. 그리고 Broker는 PID, TopicPartition 별로 해당 Sequenece Number를 Memory상으로 기억한다. 그래서 Sequence Number를 보고 동일 메시지를 추적하는 것이다. 여기의 ProducerId는 최초에 InitProducerIdRequest API를 통해서 Broker로 요청을 하며, 그 응답값을 사용한다.
PID를 생성하는 로직은 broker 에서 ProducerIdManager object class 에 의해서 생성된다. 로직은 단순히 increment 이다. 그래서 계속 producer 를 만들어 보면 PID 는 숫자1만 증가하는것을 볼 수 있다.
만약 Producer Application이 재시작된다면 PID도 새로 할당이 되므로, 다른 값을 가진다. 그렇기 때문에 session 단위에서만 유효한 기능이다.
설정 참고
- enable.idempotence 를 true로 설정한다.
- 특별한 설정이 없다면 다음의 코드에 의해 acks, retries 등의 값이 설정이 된다.
- acks 는 반드시 all(-1) 이어야 한다.
- retries 는 0보다 커야 한다.
- 등등 옵션의 의존성이 있으니 잘 확인하고 설정해야한다.
- 주요한건은 transaction.id 를 설정하면 transaction producer 로 동작한다는 점이다. ( 이 값을 설정안해야만 idempotence 으로 동작한다)
enable.idempotence = true를 설정하면 KafkaProducer는 어떻게 적용되는가?
KafkaProducer가 만들어질때 TransactionManager 가 만들어진다 코드 즉, idempotent producer 또는 transcation producer는 TransactionManager 에 의해 처리된다. 그리고 이 TransactionManager는 2곳에서 사용이 된다. 바로 Sender 와 RecordAccumulator 이다.
Sender : Kafka Cluster에 메시지를 보내는 background thread class 이다. 해당 Sender는 코드 에서 ioThread로 Naming 되어 Thread 가 시작된다.
RecordAccumulator : Kafka 는 메시지를 보내거나 받을때 message 단위로 처리하지 않는다. 메시지를 받을 때는 fetch 한 다음 메시지를 하나씩 건내주고, 메시지를 발행 할때는 accumulator 에 임시로 저장한 다음 한번에 batch 단위로 발송한다. 그래서 accumulator instance를 생성할 때에는 batch.size, linger.ms, buffer.memory 등의 설정값을 가지고 만들어 진다.
실제 retry logic 이나 transaction producer 는 다음에...