일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
- reactive
- statestore
- Slick
- schema registry
- play framework
- kafkastream
- spring-cloud-stream
- aws
- scala 2.10
- kafkastreams
- springboot
- scala
- gradle
- enablekafkastreams
- 카프카
- 한빛미디어
- spring-kafka
- Logstash
- spring-batch
- Elasticsearch
- Spring
- Kafka
- RabbitMQ
- kafka interactive query
- coursera
- 플레이 프레임워크
- kafka streams
- avo
- Elk
- confluent
- Today
- Total
b
spring-kafka에서 구현되어 있는 MessageListenerContainer 본문
MessageListenerContainer 인터페이스
* spring-kakfa 에서 사용되는 최상단 '메시지 리스너 컨테이너'의 인터페이스 형태이다. 구현체는 Single-Thread 형태의 KafkaMessageListenerContainer 와 KafkaMessageListenerContainer 를 여러개 붙인 형태인 ConcurrentMessageListenerContainer 이렇게 2개가 기본 제공된다.
* Message Listener 이므로 '메시지의 수신' 관점에서 볼 때 필요하다. Kafka Cluster 의 gateway를 담당하는 application 을 만들 때는 'producer' 쪽이 필요하다.
* pause(), resume()을 인터페이스 레벨에서 가지고 있다.
ㅁ AbstractMessageListenerContainer<K,V> 추상 클래스
** 아래에서 설명할 2개의 실제 구현체 클래스의 상위 추상 클래스이다.(스프링의 다른 패키지에도 많으니 kafka 패키지인지 확인 해야한다)
** doStart, doStop만 위임한채, 나머지는 공통화 시키는 목적이다.
** offset commit을 어떻게 할 것인가에 대한 ACK_MODE 가 여기에서 정의되어 있고 종류는 7개가 있다 (BATCH, COUNT, COUNT_TIME, MANUAL, MANUAL_IMMEDIATE, RECORD, TIME)
ㅁ KafkaMessageListenerContainer<K,V> 클래스
** Java Consumer를 이용하여, 자동/수동 파티션을 할당 받아, Single-Threaded 형태로 동작하는 메시지 리스너 컨테이너
** 생성자 중에서 TopicPartitionInitialOffset 를 인자로 줄 수 있는데, 이를 이용해서 초기 Offset을 지정 할 수 있다. (absolute offset은 물론, relativeToCurrent 인자를 이용해서 relative offset도 지정가능하다)
** 스프링의 Lifecycle 인터페이스를 상속받아 구현된 start()가 시작 포인트이다.
** doStart 안에서 listenerConsumer 라는 컨슈머를 내부에서 만들어 데이터를 수신하게 된다.
** ListenerConsumer 를 Runnable을 상속받는 데몬 스레드 역할을 한다. while(true) 를 하면서 consumer.poll을 하는 것이다.
ㅁ ConcurrentMessageListenerContainer<K,V> 클래스
** concurrency 와 실제 topic-partition의 갯수에 따라서 위에서 설명한 KafkaMessageListenerContainer 를 N개 만든다. 그리고 스스로 start를 호출한다. 즉, 이 클래스는 KafkaMessageListenerContainer 를 가지는 또 하나의 컨테이너이다.
** stop/resume/start 등등 전부 순차적으로 명령어를 수행한다. (forEach)
** 그 외 추가 구현 내용없는 단순한 형태이다.
그럼 KafkaConsumer 를 그냥 쓰는 것보다 무엇이 더 편할까?
- pause/resume 의 쉬운 사용
- KafkaConsumer는 thread-safe을 보장 하지 않아, 잘 구현해서 만들어야 하지만, 위의 컨테이너를 쓰면 쉽다.
- 좀 더 풍부한 ack모드 지원
- 메뉴얼 토픽파티션 어사인
- 오프셋 점프 등등...