'spring-kafka'에 해당되는 글 2건

  1. 2018.03.11 spring-kafka의 KafkaListener
  2. 2018.03.10 spring-kafka에서 구현되어 있는 MessageListenerContainer

spring-kafka의 KafkaListener

분류없음 2018.03.11 22:28 posted by dev.bistro

KafkaListener annotation

토픽의 메시지를 소비할 메소드에 붙이는 어노테이션이다. 이 어노테이션을 붙인다면, KafkaListenerContainerFactory에 의해서 MessageListenerContainer가 기동된다. 현재로서는 ConcurrentKafkaListenerContainerFactory 클래스만 존재 하니까, 기본적으로 KafkaListener을 붙인다는 것은 ConcurrentMessageListenerContainer 리스너 컨테이너를 사용한다는 의미이다.

그리고 @ConditionalOnClass(EnableKafka.class) 에 의해서 KafkaAnnotationDrivenConfiguration 설정 클래스가 로드 되는대, 여기서 "kafkaListenerContainerFactory" 이라는 bean을 가지는ConcurrentKafkaListenerContainerFactory 가 생성이 된다. (즉 spring-kafka만 디펜던시에 추가된다면, 이 클래스가 발생된다)

 @KafkaListener 에 'containerFactory'를 지정 할 수 없지만, 기본적으로 위에서 언급한 'kafkaListenerContainerFactory' 이름을 가지는 컨테이너 팩토리를 사용한다.  이 메소드의 인자로는 @Payload, @Header, @Headers, MessageHeaders등등을 사용하여 메시지를 수신할 수 있다.  


 메소드에 정의된다면 각 메소드별로 '리스너컨테이너'가 만들어진다. - (컨테이너 팩토리가 아니다!)  만약 1개의 컨테이너 팩토리에서 2개의 @KafkaListener을 사용한다면  각각의 beanName은 org.springframework.kafka.KafkaListenerEndpointContainer#1, #2 이다.  만약 동일한 팩토리와 topic을 쓴다면 setConcurrency=2 의 효과만 가질 것이다.

클래스에 정의된다면 하나의 메시지 컨테이너가! @KafkaHandler가 있는 모든 메소드에 적용한다. 모두 실행 한다는 의미가 아닌, payload타입에 따라 적절하게 메소드를 실행해준다는 뜻이다.  @KafkaHandler가 붙은 메소드는 InvocableHandlerMethod 클래스에 감싸지고, DelegatingInvocableHandler 에 List 형으로 들어가서  DelegatingInvocableHandler.findHandlerForPayload 의해서 실행되어 진다.    payload 매칭이 2개 이상이 된다면 'Ambiguous methods...' throw를 발생시킨다.


MessageListenerContainer 인터페이스

* spring-kakfa 에서 사용되는 최상단 '메시지 리스너 컨테이너'의 인터페이스 형태이다. 구현체는 Single-Thread 형태의 KafkaMessageListenerContainer 와 KafkaMessageListenerContainer 를 여러개 붙인 형태인 ConcurrentMessageListenerContainer 이렇게 2개가 기본 제공된다.

* Message Listener 이므로 '메시지의 수신' 관점에서 볼 때 필요하다. Kafka Cluster 의 gateway를 담당하는 application 을 만들 때는 'producer' 쪽이 필요하다.

* pause(), resume()을 인터페이스 레벨에서 가지고 있다.


ㅁ AbstractMessageListenerContainer<K,V> 추상 클래스

** 아래에서 설명할 2개의 실제 구현체 클래스의 상위 추상 클래스이다.(스프링의 다른 패키지에도 많으니 kafka 패키지인지 확인 해야한다)

** doStart, doStop만 위임한채, 나머지는 공통화 시키는 목적이다.

** offset commit을 어떻게 할 것인가에 대한 ACK_MODE 가 여기에서 정의되어 있고 종류는 7개가 있다 (BATCH, COUNT, COUNT_TIME, MANUAL, MANUAL_IMMEDIATE, RECORD, TIME)


ㅁ KafkaMessageListenerContainer<K,V> 클래스

** Java Consumer를 이용하여, 자동/수동 파티션을 할당 받아, Single-Threaded 형태로 동작하는 메시지 리스너 컨테이너

** 생성자 중에서 TopicPartitionInitialOffset 를 인자로 줄 수 있는데, 이를 이용해서 초기 Offset을 지정 할 수 있다. (absolute offset은 물론, relativeToCurrent 인자를 이용해서 relative offset도 지정가능하다)

** 스프링의 Lifecycle 인터페이스를 상속받아 구현된 start()가 시작 포인트이다.

** doStart 안에서 listenerConsumer 라는 컨슈머를 내부에서 만들어 데이터를 수신하게 된다.

** ListenerConsumer 를 Runnable을 상속받는 데몬 스레드 역할을 한다. while(true) 를 하면서 consumer.poll을 하는 것이다. 


ㅁ ConcurrentMessageListenerContainer<K,V> 클래스

** concurrency 와 실제 topic-partition의 갯수에 따라서 위에서 설명한 KafkaMessageListenerContainer 를 N개 만든다. 그리고 스스로 start를 호출한다. 즉, 이 클래스는 KafkaMessageListenerContainer 를 가지는 또 하나의 컨테이너이다.

** stop/resume/start 등등 전부 순차적으로 명령어를 수행한다. (forEach)

** 그 외 추가 구현 내용없는 단순한 형태이다.


그럼 KafkaConsumer 를 그냥 쓰는 것보다 무엇이 더 편할까?

- pause/resume 의 쉬운 사용
- KafkaConsumer는 thread-safe을 보장 하지 않아, 잘 구현해서 만들어야 하지만, 위의 컨테이너를 쓰면 쉽다.
- 좀 더 풍부한 ack모드 지원
- 메뉴얼 토픽파티션 어사인
- 오프셋 점프  등등...