'2018/03/10'에 해당되는 글 1건

  1. 2018.03.10 spring-kafka에서 구현되어 있는 MessageListenerContainer

MessageListenerContainer 인터페이스

* spring-kakfa 에서 사용되는 최상단 '메시지 리스너 컨테이너'의 인터페이스 형태이다. 구현체는 Single-Thread 형태의 KafkaMessageListenerContainer 와 KafkaMessageListenerContainer 를 여러개 붙인 형태인 ConcurrentMessageListenerContainer 이렇게 2개가 기본 제공된다.

* Message Listener 이므로 '메시지의 수신' 관점에서 볼 때 필요하다. Kafka Cluster 의 gateway를 담당하는 application 을 만들 때는 'producer' 쪽이 필요하다.

* pause(), resume()을 인터페이스 레벨에서 가지고 있다.


ㅁ AbstractMessageListenerContainer<K,V> 추상 클래스

** 아래에서 설명할 2개의 실제 구현체 클래스의 상위 추상 클래스이다.(스프링의 다른 패키지에도 많으니 kafka 패키지인지 확인 해야한다)

** doStart, doStop만 위임한채, 나머지는 공통화 시키는 목적이다.

** offset commit을 어떻게 할 것인가에 대한 ACK_MODE 가 여기에서 정의되어 있고 종류는 7개가 있다 (BATCH, COUNT, COUNT_TIME, MANUAL, MANUAL_IMMEDIATE, RECORD, TIME)


ㅁ KafkaMessageListenerContainer<K,V> 클래스

** Java Consumer를 이용하여, 자동/수동 파티션을 할당 받아, Single-Threaded 형태로 동작하는 메시지 리스너 컨테이너

** 생성자 중에서 TopicPartitionInitialOffset 를 인자로 줄 수 있는데, 이를 이용해서 초기 Offset을 지정 할 수 있다. (absolute offset은 물론, relativeToCurrent 인자를 이용해서 relative offset도 지정가능하다)

** 스프링의 Lifecycle 인터페이스를 상속받아 구현된 start()가 시작 포인트이다.

** doStart 안에서 listenerConsumer 라는 컨슈머를 내부에서 만들어 데이터를 수신하게 된다.

** ListenerConsumer 를 Runnable을 상속받는 데몬 스레드 역할을 한다. while(true) 를 하면서 consumer.poll을 하는 것이다. 


ㅁ ConcurrentMessageListenerContainer<K,V> 클래스

** concurrency 와 실제 topic-partition의 갯수에 따라서 위에서 설명한 KafkaMessageListenerContainer 를 N개 만든다. 그리고 스스로 start를 호출한다. 즉, 이 클래스는 KafkaMessageListenerContainer 를 가지는 또 하나의 컨테이너이다.

** stop/resume/start 등등 전부 순차적으로 명령어를 수행한다. (forEach)

** 그 외 추가 구현 내용없는 단순한 형태이다.


그럼 KafkaConsumer 를 그냥 쓰는 것보다 무엇이 더 편할까?

- pause/resume 의 쉬운 사용
- KafkaConsumer는 thread-safe을 보장 하지 않아, 잘 구현해서 만들어야 하지만, 위의 컨테이너를 쓰면 쉽다.
- 좀 더 풍부한 ack모드 지원
- 메뉴얼 토픽파티션 어사인
- 오프셋 점프  등등...



티스토리 툴바