처음에는 이 메소드의 효용성? 사용범위? 에 대해서 이해하지 못했다. 그냥 최초에 Streams Application이 기동될때, State 가 migration 되는 동안  Application 의 health 를 DOWN 정도으로 바꾸는 정도?  (즉 Created -> Running) 

최근에 하나 더 든생각은 isRunning 상태에서 다른 상태로 빠질 경우이다. (isRunning은 RUNNING or REBALCING 이다)  이 때에는 API로 제공되는 KeyStore를 query하면 에러가 발생 할 것이다. 그러므로

this.kafkaStreams.setStateListener((newState, oldState) -> {
if(newState.isRunning() ){
this.ready = true;
}else {
this.ready = false;// State is not RUNNING,
}
});
this.kafkaStreams.cleanUp();
this.kafkaStreams.start();

이와 같은 리스너를 등록하여,  queryable 상태를 체크하는 부분이 있어야 할 것 같았고, 그렇게 구현을 했다.

그럼  언제 isRunning 에서 다른 상태로 변경이 될까? 

가장 쉽게 찾을 수 있는 경우는 StreamThread가 shutdown이 될 때 PENDING_SHUTDOWN 으로 상태가 바뀐다. cleanup()을 호출하거나, 여러경우/다양한 곳에서 shutdown/close를 호출하는 경우이다.

그 외에는... DEAD 나 PENDING_SHUTDOWN 으로 변경하는 케이스를 찾지 못함 ㅠ ( 더 있을 줄 알았다!!) 

EnableKafkaStream annotation

분류없음 2018.02.28 20:29 posted by dev.bistro

springboot는 spring framework 를 좀 더 빠르고 쉽게 사용할 수 있는 목적을 가진 영역이다. 하지만 어느 순간부터 Spring 이외에도 Boot 까지 '제대로' 이해하지 못하면 엉망이 될 수 있는 존재가 되어버렸다. 특히, Condition annotation이 추가되면서 내부적으로 자세히 보고 적용을 해야한다.


예를 들어보면,

1. KafkaStream을 쓰기 위해 spring-kafka를 import 할경우 (cloud-stream-xxx역시 마찬가지)  간단한 String 이나,  추천되어지는 Avro 가 아니고 Json으로 (디)시리얼라이저를 지정한다고 할 경우, 안에서 어디에서 new ObjectMapper가 만들어지고, 어디 설정은 application.yml의 spring.kafka를 보는지, 계속 주의해야 한다. 간단한 rocksdb 기반의 StateStore를 API로 expose하기 위해서는

- Consumed.with(Serdes.Long(), orderSerdes), (ktable로 컨슈밍 할때)

- .withValueSerde(orderSerdes)); (Materialized를 지정할 때 )

- KafkaTemplate에 ProducerFactory를 주입할 때 등등... 

이렇게 하더라도 KafkaHeader에서도 new ObjectMapper를 했지만, 어차피 다음주엔 byte로 바꿀꺼라 pass..


2. 방금전에 겪은 하나 더  @EnableKafkaStreams 를 붙일 경우 StreamConfig가 spring bean으로 떠 있다면, StreamsBuilderFactoryBean 를 통해, KafkaStreams가 만들어지고 .start된다. 

아주 간단한 Cloud Native에서 , 하나의 role을 가지는 endpoint application을 만들때는 문제가 없으나, gateway, aggregator 역할 등등, 하나의 어플리케이션에서 2개의 StreamThread 를 메뉴얼 하게 제어하려면, 제대로 이해하고 있어야 한다.....

그냥 그랬다...

Spring Application 에서 Stream을 materialize 하기 위해서는 StateStore를 bean으로 접근 가능하게 해야한다. (xxxxRepository를 @Repository로 사용하는것과 같게 생각하면 된다)

그러기 위해서는 https://kafka.apache.org/10/javadoc/index.html?org/apache/kafka/streams/KafkaStreams.html 에서 store method를 이용해서 (최소한) KeyValueStore 형태의instance를 가지고 있어야 한다. 당연히 이 코드는 Spring Bean 내부에서 만들것이고, 그러면 제목과 같은 에러 메시지를 50프로 이상 볼 수 있다.

confluent의 FAQ에서 해답을 찾을수 있는데  KafkaStream의 state와 StateStore를 만드는 타이밍의 문제이다.
문제를 해결하기 위해서는 ApplicationReadyEvent 에서 처리를 하거나, 아니면 KafkaStream이  Running 상태로 바뀌기를 기다리거나 해야하는데, confluent 문서처럼 retry 형태로 구현하기로 했다.

sleep도 찝찝하지만 while true는 더 찝찝하기에 retry를 추가하였고, 그 횟수동안 store를 못 만든다면 그냥 Application 자체가 실행이 안되도록 RuntimeException을 일으키도록 하였다.