spring-kafka의 KafkaListener

분류없음 2018.03.11 22:28 posted by dev.bistro

KafkaListener annotation

토픽의 메시지를 소비할 메소드에 붙이는 어노테이션이다. 이 어노테이션을 붙인다면, KafkaListenerContainerFactory에 의해서 MessageListenerContainer가 기동된다. 현재로서는 ConcurrentKafkaListenerContainerFactory 클래스만 존재 하니까, 기본적으로 KafkaListener을 붙인다는 것은 ConcurrentMessageListenerContainer 리스너 컨테이너를 사용한다는 의미이다.

그리고 @ConditionalOnClass(EnableKafka.class) 에 의해서 KafkaAnnotationDrivenConfiguration 설정 클래스가 로드 되는대, 여기서 "kafkaListenerContainerFactory" 이라는 bean을 가지는ConcurrentKafkaListenerContainerFactory 가 생성이 된다. (즉 spring-kafka만 디펜던시에 추가된다면, 이 클래스가 발생된다)

 @KafkaListener 에 'containerFactory'를 지정 할 수 없지만, 기본적으로 위에서 언급한 'kafkaListenerContainerFactory' 이름을 가지는 컨테이너 팩토리를 사용한다.  이 메소드의 인자로는 @Payload, @Header, @Headers, MessageHeaders등등을 사용하여 메시지를 수신할 수 있다.  


 메소드에 정의된다면 각 메소드별로 '리스너컨테이너'가 만들어진다. - (컨테이너 팩토리가 아니다!)  만약 1개의 컨테이너 팩토리에서 2개의 @KafkaListener을 사용한다면  각각의 beanName은 org.springframework.kafka.KafkaListenerEndpointContainer#1, #2 이다.  만약 동일한 팩토리와 topic을 쓴다면 setConcurrency=2 의 효과만 가질 것이다.

클래스에 정의된다면 하나의 메시지 컨테이너가! @KafkaHandler가 있는 모든 메소드에 적용한다. 모두 실행 한다는 의미가 아닌, payload타입에 따라 적절하게 메소드를 실행해준다는 뜻이다.  @KafkaHandler가 붙은 메소드는 InvocableHandlerMethod 클래스에 감싸지고, DelegatingInvocableHandler 에 List 형으로 들어가서  DelegatingInvocableHandler.findHandlerForPayload 의해서 실행되어 진다.    payload 매칭이 2개 이상이 된다면 'Ambiguous methods...' throw를 발생시킨다.


MessageListenerContainer 인터페이스

* spring-kakfa 에서 사용되는 최상단 '메시지 리스너 컨테이너'의 인터페이스 형태이다. 구현체는 Single-Thread 형태의 KafkaMessageListenerContainer 와 KafkaMessageListenerContainer 를 여러개 붙인 형태인 ConcurrentMessageListenerContainer 이렇게 2개가 기본 제공된다.

* Message Listener 이므로 '메시지의 수신' 관점에서 볼 때 필요하다. Kafka Cluster 의 gateway를 담당하는 application 을 만들 때는 'producer' 쪽이 필요하다.

* pause(), resume()을 인터페이스 레벨에서 가지고 있다.


ㅁ AbstractMessageListenerContainer<K,V> 추상 클래스

** 아래에서 설명할 2개의 실제 구현체 클래스의 상위 추상 클래스이다.(스프링의 다른 패키지에도 많으니 kafka 패키지인지 확인 해야한다)

** doStart, doStop만 위임한채, 나머지는 공통화 시키는 목적이다.

** offset commit을 어떻게 할 것인가에 대한 ACK_MODE 가 여기에서 정의되어 있고 종류는 7개가 있다 (BATCH, COUNT, COUNT_TIME, MANUAL, MANUAL_IMMEDIATE, RECORD, TIME)


ㅁ KafkaMessageListenerContainer<K,V> 클래스

** Java Consumer를 이용하여, 자동/수동 파티션을 할당 받아, Single-Threaded 형태로 동작하는 메시지 리스너 컨테이너

** 생성자 중에서 TopicPartitionInitialOffset 를 인자로 줄 수 있는데, 이를 이용해서 초기 Offset을 지정 할 수 있다. (absolute offset은 물론, relativeToCurrent 인자를 이용해서 relative offset도 지정가능하다)

** 스프링의 Lifecycle 인터페이스를 상속받아 구현된 start()가 시작 포인트이다.

** doStart 안에서 listenerConsumer 라는 컨슈머를 내부에서 만들어 데이터를 수신하게 된다.

** ListenerConsumer 를 Runnable을 상속받는 데몬 스레드 역할을 한다. while(true) 를 하면서 consumer.poll을 하는 것이다. 


ㅁ ConcurrentMessageListenerContainer<K,V> 클래스

** concurrency 와 실제 topic-partition의 갯수에 따라서 위에서 설명한 KafkaMessageListenerContainer 를 N개 만든다. 그리고 스스로 start를 호출한다. 즉, 이 클래스는 KafkaMessageListenerContainer 를 가지는 또 하나의 컨테이너이다.

** stop/resume/start 등등 전부 순차적으로 명령어를 수행한다. (forEach)

** 그 외 추가 구현 내용없는 단순한 형태이다.


그럼 KafkaConsumer 를 그냥 쓰는 것보다 무엇이 더 편할까?

- pause/resume 의 쉬운 사용
- KafkaConsumer는 thread-safe을 보장 하지 않아, 잘 구현해서 만들어야 하지만, 위의 컨테이너를 쓰면 쉽다.
- 좀 더 풍부한 ack모드 지원
- 메뉴얼 토픽파티션 어사인
- 오프셋 점프  등등...

대화식 쿼리를 사용하려면 hostname:port 형식의 StreamsConfig.APPLICATION_SERVER_CONFIG 를 설정하면 된다. 카프카 스트림 인스턴스가 키를 기반으로 한 쿼리를 받으면 현재 인스턴스의 로컬 저장소에 포함되는지를 찾는다. 더 중요한 것은 로컬 저장소에 없을 경우 키가 포함된 Store를 찾고 싶은 것이다. 카프카 스트림은 동일한 application ID와 APPLICATION_SERVER_CONFIG 가 정의된 인스턴스에 정보를 검색할 수 있는 몇가지 메소드가 제공된다.

아래의 메소드들은 KafkaStreams instance의 method이다.

- allMetadata : 모든 StreamsMetadata 목록을 반환
- allMetadataForStore : StoreName 기반으로 StreamsMetadata 목록을 반환
- metadataForKey : 해당 Key 기반으로 StreamsMetadata 를 반환

위 메소들에서 allMetaData를 이용해서 대화형 쿼리에 적합한 모든 인스턴스 정보를 얻을 수 있다.  



각 카프카 스트림 인스턴스는 경량화된 서버가 내부적으로 떠 있게 된다.
여기서 중요한 것은 '카프카 스트림 인스턴스 중에서 하나만 쿼리를 하면 되고 어디에 해야하는지는 중요하지 않다'

RPC 메커니즘과 메타 데이터를 기반으로 인스턴스에 원하는 데이터가 없다면, 카프카 스트림 인스턴스는 위치를 찾아내서 쿼리후 반환한다. 예를 들어 A 인스턴스에는 key1가 포함되어 있지는 않지만 B 인스턴스테 포함됨을 인지하고, B를 호출하여 데이터를 검색하고 반환한다.

(중요!: RPC 메커니즘은 기본적으로 제공되지 않으므로 직접 구현해야한다. ex: https://github.com/davidandcode/hackathon_kafka/blob/master/src/main/java/io/confluent/examples/streams/interactivequeries/kafkamusic/MusicPlaysRestService.java )


ps. 결국엔 ben stopord 의 예제처럼 HostInfo를 이용해서 라우팅을 하거나, 처음부터, Key기반의 요청/응답을 특정 인스턴스로 제한 하는 수 밖에 없다. ... (원래 하던데로다.ㅋ )



티스토리 툴바