'enablekafkastreams'에 해당되는 글 2건

  1. 2018.02.28 SpringKafka의 EnableKafkaStreams
  2. 2018.02.28 EnableKafkaStream annotation

SpringKafka의 EnableKafkaStreams

분류없음 2018.02.28 22:58 posted by dev.bistro

spring-kafka 를 디펜던시에 추가하면 사용할 수 있는 @EnableKafkaStreams 은 2017년 5월에 구현되었다. (https://github.com/spring-projects/spring-kafka/pull/238)

JavaDoc에 의하면 기본 Kafka Stream 컴포넌트를 활성화 해준다고 한다. 

정확히 설명하면  defaultKafkaStreamsConfig 라는 StreamsConfig bean이 있을 경우 StreamsBuilderFactoryBean 빈을 만들어 준다. (https://goo.gl/WaCFQ6)


* 먼저 InitializingBean를 상속을 받았기 때문에 afterPropertiesSet가 호출이 될 것이고 new StreamsBuilder() 가 자동으로 생성된다 (여기까진 괜찮다, 만들기만 하고 뭘 하지는 않으니까, 그리고 이것까지는 누구나 필요하다)

* 이후 step도 너무 자동화되었다는게 문제다.
interface Lifecycle 를 상속받고 있기 때문에  스프링의 DefaultLifeCycleProcessor에 의해 https://goo.gl/xSSasV 가 호출이 되고 이 때 default StreamConfig를 가지고 바로 KafkaStreams 를 생성 & 시작해버린다.
난 뭔가 토폴로지를 만들지도 않은 상태에서 internalTopologyBuilder에 의해 빌드가 되버리는 것이다...

즉, 뭔가 뭔가 좀 세밀한 작업을 하고 싶다면 EnableKafkaStream에 기대기 보다는 직접 StreamBuilder를 작성하자... 이 모들 내용은 https://goo.gl/kvjGNn 에 있다.



EnableKafkaStream annotation

분류없음 2018.02.28 20:29 posted by dev.bistro

springboot는 spring framework 를 좀 더 빠르고 쉽게 사용할 수 있는 목적을 가진 영역이다. 하지만 어느 순간부터 Spring 이외에도 Boot 까지 '제대로' 이해하지 못하면 엉망이 될 수 있는 존재가 되어버렸다. 특히, Condition annotation이 추가되면서 내부적으로 자세히 보고 적용을 해야한다.


예를 들어보면,

1. KafkaStream을 쓰기 위해 spring-kafka를 import 할경우 (cloud-stream-xxx역시 마찬가지)  간단한 String 이나,  추천되어지는 Avro 가 아니고 Json으로 (디)시리얼라이저를 지정한다고 할 경우, 안에서 어디에서 new ObjectMapper가 만들어지고, 어디 설정은 application.yml의 spring.kafka를 보는지, 계속 주의해야 한다. 간단한 rocksdb 기반의 StateStore를 API로 expose하기 위해서는

- Consumed.with(Serdes.Long(), orderSerdes), (ktable로 컨슈밍 할때)

- .withValueSerde(orderSerdes)); (Materialized를 지정할 때 )

- KafkaTemplate에 ProducerFactory를 주입할 때 등등... 

이렇게 하더라도 KafkaHeader에서도 new ObjectMapper를 했지만, 어차피 다음주엔 byte로 바꿀꺼라 pass..


2. 방금전에 겪은 하나 더  @EnableKafkaStreams 를 붙일 경우 StreamConfig가 spring bean으로 떠 있다면, StreamsBuilderFactoryBean 를 통해, KafkaStreams가 만들어지고 .start된다. 

아주 간단한 Cloud Native에서 , 하나의 role을 가지는 endpoint application을 만들때는 문제가 없으나, gateway, aggregator 역할 등등, 하나의 어플리케이션에서 2개의 StreamThread 를 메뉴얼 하게 제어하려면, 제대로 이해하고 있어야 한다.....

그냥 그랬다...