SpringKafka의 EnableKafkaStreams

분류없음 2018.02.28 22:58 posted by dev.bistro

spring-kafka 를 디펜던시에 추가하면 사용할 수 있는 @EnableKafkaStreams 은 2017년 5월에 구현되었다. (https://github.com/spring-projects/spring-kafka/pull/238)

JavaDoc에 의하면 기본 Kafka Stream 컴포넌트를 활성화 해준다고 한다. 

정확히 설명하면  defaultKafkaStreamsConfig 라는 StreamsConfig bean이 있을 경우 StreamsBuilderFactoryBean 빈을 만들어 준다. (https://goo.gl/WaCFQ6)


* 먼저 InitializingBean를 상속을 받았기 때문에 afterPropertiesSet가 호출이 될 것이고 new StreamsBuilder() 가 자동으로 생성된다 (여기까진 괜찮다, 만들기만 하고 뭘 하지는 않으니까, 그리고 이것까지는 누구나 필요하다)

* 이후 step도 너무 자동화되었다는게 문제다.
interface Lifecycle 를 상속받고 있기 때문에  스프링의 DefaultLifeCycleProcessor에 의해 https://goo.gl/xSSasV 가 호출이 되고 이 때 default StreamConfig를 가지고 바로 KafkaStreams 를 생성 & 시작해버린다.
난 뭔가 토폴로지를 만들지도 않은 상태에서 internalTopologyBuilder에 의해 빌드가 되버리는 것이다...

즉, 뭔가 뭔가 좀 세밀한 작업을 하고 싶다면 EnableKafkaStream에 기대기 보다는 직접 StreamBuilder를 작성하자... 이 모들 내용은 https://goo.gl/kvjGNn 에 있다.



EnableKafkaStream annotation

분류없음 2018.02.28 20:29 posted by dev.bistro

springboot는 spring framework 를 좀 더 빠르고 쉽게 사용할 수 있는 목적을 가진 영역이다. 하지만 어느 순간부터 Spring 이외에도 Boot 까지 '제대로' 이해하지 못하면 엉망이 될 수 있는 존재가 되어버렸다. 특히, Condition annotation이 추가되면서 내부적으로 자세히 보고 적용을 해야한다.


예를 들어보면,

1. KafkaStream을 쓰기 위해 spring-kafka를 import 할경우 (cloud-stream-xxx역시 마찬가지)  간단한 String 이나,  추천되어지는 Avro 가 아니고 Json으로 (디)시리얼라이저를 지정한다고 할 경우, 안에서 어디에서 new ObjectMapper가 만들어지고, 어디 설정은 application.yml의 spring.kafka를 보는지, 계속 주의해야 한다. 간단한 rocksdb 기반의 StateStore를 API로 expose하기 위해서는

- Consumed.with(Serdes.Long(), orderSerdes), (ktable로 컨슈밍 할때)

- .withValueSerde(orderSerdes)); (Materialized를 지정할 때 )

- KafkaTemplate에 ProducerFactory를 주입할 때 등등... 

이렇게 하더라도 KafkaHeader에서도 new ObjectMapper를 했지만, 어차피 다음주엔 byte로 바꿀꺼라 pass..


2. 방금전에 겪은 하나 더  @EnableKafkaStreams 를 붙일 경우 StreamConfig가 spring bean으로 떠 있다면, StreamsBuilderFactoryBean 를 통해, KafkaStreams가 만들어지고 .start된다. 

아주 간단한 Cloud Native에서 , 하나의 role을 가지는 endpoint application을 만들때는 문제가 없으나, gateway, aggregator 역할 등등, 하나의 어플리케이션에서 2개의 StreamThread 를 메뉴얼 하게 제어하려면, 제대로 이해하고 있어야 한다.....

그냥 그랬다...

Spring Application 에서 Stream을 materialize 하기 위해서는 StateStore를 bean으로 접근 가능하게 해야한다. (xxxxRepository를 @Repository로 사용하는것과 같게 생각하면 된다)

그러기 위해서는 https://kafka.apache.org/10/javadoc/index.html?org/apache/kafka/streams/KafkaStreams.html 에서 store method를 이용해서 (최소한) KeyValueStore 형태의instance를 가지고 있어야 한다. 당연히 이 코드는 Spring Bean 내부에서 만들것이고, 그러면 제목과 같은 에러 메시지를 50프로 이상 볼 수 있다.

confluent의 FAQ에서 해답을 찾을수 있는데  KafkaStream의 state와 StateStore를 만드는 타이밍의 문제이다.
문제를 해결하기 위해서는 ApplicationReadyEvent 에서 처리를 하거나, 아니면 KafkaStream이  Running 상태로 바뀌기를 기다리거나 해야하는데, confluent 문서처럼 retry 형태로 구현하기로 했다.

sleep도 찝찝하지만 while true는 더 찝찝하기에 retry를 추가하였고, 그 횟수동안 store를 못 만든다면 그냥 Application 자체가 실행이 안되도록 RuntimeException을 일으키도록 하였다.



Spring Cloud Finchley.M6 issue

spring framework 2018.02.21 21:04 posted by dev.bistro


현재 request/reply EDA 를 구축하기 위한 보일플레이트 프로젝트 작성중이다. 와꾸는 Spring Boot 2.0 / Spring 5.0 / Spring Cloud Finchley.M6 를 셋팅하고 있다.


역시나 불안전한 모습을 많이 보여주고 있다. 우선 오늘 발생한 이슈 2가지,

compile('org.springframework.cloud:spring-cloud-stream-binder-kafka')
compile('org.springframework.cloud:spring-cloud-stream-binder-kstream')

디펜던시에 binder-kstream와 binder-kafka를 동시에 넣으면 

Field configurationProperties in org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration required a single bean, but 2 were found:

오류가 발생한다. 어느 개발자의 PR로 다음에는 해결 되는 이슈이다. 

PR : https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/pull/315/files


다음 이슈로 binding channel 마다.. 각각의 consumer와, producer 설정을 해야 한다는 점. 설정을 안하면 NPE 를 볼 수 있다.

bindings:
output:
destination: "logging.notification.shipping.result"
producer:
headerMode: raw
input:
destination: "data.order.sheet"
group: "in-grp"
consumer:
headerMode: raw

처럼 각각의 채널마다 producer consumer를 설정해야한다....  

springboot 문서를 참고 하면 /metrics 에 cache hit ratio 가 노출된다고 나온다 ( https://docs.spring.io/spring-boot/docs/current/reference/html/production-ready-metrics.html#production-ready-datasource-cache ) 하지만 그 리스트에 Caffeine 을 지원한 다는 얘기는 없다.  기본적으로 chche size는 metrics 에 노출이 되지만, hit/miss ratio는 노출이 되지 않으므로 구현이 필요하다.

현재 상황 : 

지원 목록 : EhCache, Hazelcast, Infinispan, JCache and Guava


확인 : 문서에 나온대로 CacheStatisticsAutoConfiguration, CacheStatisticsProvider 를 사용하면 가능 할 것 같다.

그래서 아래와 같은 Spring Bean을 등록

@Bean
public CacheStatisticsProvider<CaffeineCache> getCaffeineCacheStatisticsProvider() {
return (cacheManager, cache) -> {
DefaultCacheStatistics statistics = new DefaultCacheStatistics();
CacheStats status = cache.getNativeCache().stats();
long result = status.requestCount();
if (result > 0) {
statistics.setSize(cache.getNativeCache().estimatedSize());
statistics.setHitRatio(status.hitRate());
statistics.setMissRatio(status.missRate());
}
return statistics;
};
}

 

이후 실행을 해보았지만, 원하는 대로 노출이 안되었다. Caffeine 쪽에서 해답을 얻을 수 있었다. 아래 내용을 참고 하여 기존 Cache를 만드는 부분에  아래 부분을 추가하였다.

.recordStats()

참고 링크 : https://github.com/ben-manes/caffeine/wiki/Statistics


결과 : http://localhost:8080/metrics

  • cache.product.miss.ratio0.42857142857142855,
  • cache.product.hit.ratio0.5714285714285714,
  • cache.product.size3,


추가 : SpringBoot 2.0 부터는 micrometer 기반으로 caffeine로 지원을 한다.