분류없음2018.02.27 19:25

Spring Application 에서 Stream을 materialize 하기 위해서는 StateStore를 bean으로 접근 가능하게 해야한다. (xxxxRepository를 @Repository로 사용하는것과 같게 생각하면 된다)

그러기 위해서는 https://kafka.apache.org/10/javadoc/index.html?org/apache/kafka/streams/KafkaStreams.html 에서 store method를 이용해서 (최소한) KeyValueStore 형태의instance를 가지고 있어야 한다. 당연히 이 코드는 Spring Bean 내부에서 만들것이고, 그러면 제목과 같은 에러 메시지를 50프로 이상 볼 수 있다.

confluent의 FAQ에서 해답을 찾을수 있는데  KafkaStream의 state와 StateStore를 만드는 타이밍의 문제이다.
문제를 해결하기 위해서는 ApplicationReadyEvent 에서 처리를 하거나, 아니면 KafkaStream이  Running 상태로 바뀌기를 기다리거나 해야하는데, confluent 문서처럼 retry 형태로 구현하기로 했다.

sleep도 찝찝하지만 while true는 더 찝찝하기에 retry를 추가하였고, 그 횟수동안 store를 못 만든다면 그냥 Application 자체가 실행이 안되도록 RuntimeException을 일으키도록 하였다.



Posted by dev.bistro
spring framework2018.02.21 21:04


현재 request/reply EDA 를 구축하기 위한 보일플레이트 프로젝트 작성중이다. 와꾸는 Spring Boot 2.0 / Spring 5.0 / Spring Cloud Finchley.M6 를 셋팅하고 있다.


역시나 불안전한 모습을 많이 보여주고 있다. 우선 오늘 발생한 이슈 2가지,

compile('org.springframework.cloud:spring-cloud-stream-binder-kafka')
compile('org.springframework.cloud:spring-cloud-stream-binder-kstream')

디펜던시에 binder-kstream와 binder-kafka를 동시에 넣으면 

Field configurationProperties in org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration required a single bean, but 2 were found:

오류가 발생한다. 어느 개발자의 PR로 다음에는 해결 되는 이슈이다. 

PR : https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/pull/315/files


다음 이슈로 binding channel 마다.. 각각의 consumer와, producer 설정을 해야 한다는 점. 설정을 안하면 NPE 를 볼 수 있다.

bindings:
output:
destination: "logging.notification.shipping.result"
producer:
headerMode: raw
input:
destination: "data.order.sheet"
group: "in-grp"
consumer:
headerMode: raw

처럼 각각의 채널마다 producer consumer를 설정해야한다....  

Posted by dev.bistro
spring framework2018.02.05 22:05

springboot 문서를 참고 하면 /metrics 에 cache hit ratio 가 노출된다고 나온다 ( https://docs.spring.io/spring-boot/docs/current/reference/html/production-ready-metrics.html#production-ready-datasource-cache ) 하지만 그 리스트에 Caffeine 을 지원한 다는 얘기는 없다.  기본적으로 chche size는 metrics 에 노출이 되지만, hit/miss ratio는 노출이 되지 않으므로 구현이 필요하다.

현재 상황 : 

지원 목록 : EhCache, Hazelcast, Infinispan, JCache and Guava


확인 : 문서에 나온대로 CacheStatisticsAutoConfiguration, CacheStatisticsProvider 를 사용하면 가능 할 것 같다.

그래서 아래와 같은 Spring Bean을 등록

@Bean
public CacheStatisticsProvider<CaffeineCache> getCaffeineCacheStatisticsProvider() {
return (cacheManager, cache) -> {
DefaultCacheStatistics statistics = new DefaultCacheStatistics();
CacheStats status = cache.getNativeCache().stats();
long result = status.requestCount();
if (result > 0) {
statistics.setSize(cache.getNativeCache().estimatedSize());
statistics.setHitRatio(status.hitRate());
statistics.setMissRatio(status.missRate());
}
return statistics;
};
}

 

이후 실행을 해보았지만, 원하는 대로 노출이 안되었다. Caffeine 쪽에서 해답을 얻을 수 있었다. 아래 내용을 참고 하여 기존 Cache를 만드는 부분에  아래 부분을 추가하였다.

.recordStats()

참고 링크 : https://github.com/ben-manes/caffeine/wiki/Statistics


결과 : http://localhost:8080/metrics

  • cache.product.miss.ratio0.42857142857142855,
  • cache.product.hit.ratio0.5714285714285714,
  • cache.product.size3,


추가 : SpringBoot 2.0 부터는 micrometer 기반으로 caffeine로 지원을 한다. 

Posted by dev.bistro
Cloud2018.01.31 20:06

현재 팀은 SpringBoot application 을  좀 더 운영하기 쉽도록 하기 위해서 starter를 하나 생성해서 사용하고 있다.

cloud-netflix 셋팅과, 그 많고 많은 zuul, eureka 버그 픽스를 위한 몇몇 코드, 그리고 config server를 기반으로 한 다양한 설정들을 공통화 하기 위한 목적이다. 현재 진행 하는 프로젝트는 이 starter 를 쓰지 않고 순수하게 spring/kafka 만을 사용 하고 있다.

이 프로젝트를 위해 별도의 metrics 시스템을 구축하기 보다는 기존의 sleuth-zipkin 인프라를 활용하기 위해서 spring-cloud-sleuth-stream을 사용하려 한다.


1. gradle 설정 추가

['stream-kafka'].collect {
compile "org.springframework.cloud:spring-cloud-starter-$it"
}
compile 'org.springframework.cloud:spring-cloud-stream'

에서 

compile 'org.springframework.cloud:spring-cloud-sleuth-stream'
compile 'org.springframework.cloud:spring-cloud-stream-rabbit'
compile "org.springframework.cloud:spring-cloud-starter-sleuth'

를 추가하였다.


2. 설정 추가

spring:
application:
name: pacman
rabbitmq:
addresses: 172.18.176.196:5672
username: rabbitmq
password: rabbitmq
sleuth:
sampler:
percentage: 1.0
stream:
enabled: true
zipkin:
service:
name: vine-event-pacman

* zipkin.service.name 을 추가해야만 했다. application.name 을 그대로 사용 할 줄 알았는데.. 어디선가 자동으로 해주고 있는 거였나보다.

이렇게만 설정하면 아래와 같은 에러를 볼 수 있다.  

현재 우리의 zipkin은 rabbitmq 기반인데, 하필이면 이 프로젝트는 binder로 kafka를 사용하고 있었다.
당연하게 stream binder로 kafka, rabbit를 동시에 쓰고 있어서 발생한 문제였고, DefaultBinderFactory 를 확인하여 default binder 를 설정함으로서 해결 할 수 있었다.

Caused by: java.lang.IllegalStateException: A default binder has been requested, but there is more than one binder available for 'org.springframework.integration.channel.DirectChannel' : kafka,rabbit, and no default binder has been set

cloud:
stream:
default-binder: "rabbit"



Posted by dev.bistro
Cloud2018.01.30 19:55

18/01/30 idea


카프카 자체의 data recovery는 훌륭하다. 당연한게 파일로 저장하고 심지어 여러 셋트로 저장한다. 유실되는 경우는 저장 기간을 넘기거나, 1개의 IDC에만 설치한 후 IDC가 물에 잠기는 방법 뿐이다. 이건 다 아는 얘기고, 문제는 이 kafka 를 기반으로 한 streams 를 서비스 할 때이다.


1. Kafka는 기본적으로 1 개의 TopicPartion 에서 1 개의 Consumer Group의 노드가 접속 한다.

2. Kafka Topic은 KStream/KTsble/StateStore 등을 통해서 각각의 topic-partition을  local DB에 싱크를 맞춘후 materilized 한다  (비동기/latency 존재)


우선 1번의 문제부터....

1. 해당 TopicPartion의 정보를 단 1개의 VM(instance)만이 가지고 있을 수 있다 (Application Cluster 자체를 active-standby로 구성하지 않는이상)

 이 얘기는 해당 Node가 죽어버리면 '그 순간' 그 데이터들은 VM instnace안에서 존재 하지 않는 다는 것이다. 

 물론 Kafka Broker 내에는 존재하기 때문에 일정 시간의 기다린다면 새로운 Instacne가 해당 데이터를 살릴 수 있다. 하지만 그 '일정 시간의 텀'이 문제이다.

 위에서 얘기한 데로 Application 자체를 standby로 준비하지 않는 이상 힘들다. kafka 역시 standby 로 해결한다.

 링크 : https://kafka.apache.org/10/documentation/streams/developer-guide/config-streams.html#id10

 동일한 셋을 하나 더 구성하지만 그 녀석은 말그대로 standby이다. 실제 서비스에는 투입이 되지 않으며, resource 역시 많이 차지하지만, 장애 시간을 짧게 가져가는데 필수적인 요소이다.

 StreamThread 내에서 task를 active 와 standby로 가지고 있다. 


 이 부분은 ... 계속 파야할 것 같다.



2. Event가 topic으로 전송이 된다면 해당 topic-partition을 materialized view 로 가지고 있는 consuer group 의 instance는 consumer로 붙는다.

   이 때 topic과 state-store의 데이터 불일치는 필연적이다.

   이 때 state-store에서 get을 하는 '재고차감' 같은 Domain은 어떤 식으로 처리 할 것인가?

   A. 모든것을 sync로 한다 - 의미 없으니 pass

   B. instance의 state-store를 ReadyOnlyQueryType으로 사용하는게 아닌 WriteableQueryType으로 사용하여, put도 할 수 있게 한다. 그리고 이 changelog는 

      remote kafka cluster와 연동한다. 

      atmoic을 보장 할 수 있는 방법이고, 서버가 죽었을때도 문제가 되지 않는다. 내부 rocksdb에 데이터는 유지되므로 다시 살리기만 한다면 cluster와 동기화를 할 수 있을테니까..

      문제는 server가 살릴수 없을 때이다. 이 이슈는 바로 데이터 유실로 이어진다.



적당한 방법들은 보이지만, 정답인지 아닌지 확신이 안선다.

      



Posted by dev.bistro
Cloud2018.01.28 21:53

밑에는 잡담이니 결론 부터

* 결론 Ditmars.SR3 사용,  Spring 4, Boot 1.5.x는 기존대로 유지
* Kafka 는 0.1x 유지
* spring-cloud-stream-reactive & reactor 3.1.X 사용 ( rxjava2 버림 )




* 우리가 사용중인 Spring Cloud Dalston SR5는 Ditmars 버전 매트릭스에 맞지 않다 이전 버전인 Chelsea.SR2를 고려해야한다.

* 하지만 Ditmars.SR3 버전으로 리서치를 진행 할 예정이다. 

* Ditmars.SR3 은 spring-kafka 1.1.7 이므로 @EnableStream 사용등은 커스텀마이징을 해야 하기 때문에 추천버전은 Kafka 0.10을 그대로 사용 해서 진행 한다


Ditmars.SR3 의존성 정보

Spring Cloud Stream Core 1.3.2.RELEASE
Spring Cloud Stream Binder Kafka 1.3.2.RELEASE
Spring Boot : Spring Boot 1.5.7 or a later 1.5.x release. It is not compatible with Spring Boot 2.x.
Spring Cloud : Spring Cloud Edgware.SR1


라이브 러리 버전 상세 정보

- 링크 : https://github.com/spring-cloud/spring-cloud-stream-starters/blob/vDitmars.SR3/spring-cloud-stream-dependencies/pom.xml

- 링크 :  https://github.com/spring-cloud/spring-cloud-stream/blob/v1.3.2.RELEASE/pom.xml

- reactor : 2.0.8.RELEASE (default)

- rxjava : 1.1.10 (optional 이므로 spring-cloud-stream-rxjava 를 추가)



spring-cloud-stream-reactive


* reference : https://docs.spring.io/spring-cloud-stream/docs/Ditmars.SR3/reference/htmlsingle/#_reactive_programming_support

* spring-cloud-stream 에 reactive API를 사용하기 위한 지원용 라이브러리이다.

* 기본은 reactor 이며, ConditionalOnClass 에 의해 rxjava 를 지원하지만 가장 최신 버전까지도 rxjava 1.x 만을 지원한다.

* 추가로 @StreamEmitter 어노테이션이 추가된다. 

* ReactiveSupportAutoConfiguration 가 존재하며, rx.Observable 클래스를 확인 한후 rxjava 를 지원한다

* 아래의 Adapter 들을 Spring bean으로 등록시켜서 Observable type을 처리해준다.

  - MessageChannelToInputObservableParameterAdapter

  - MessageChannelToObservableSenderParameterAdapter

  - ObservableToMessageChannelResultAdapter

* rxjava를 굳이 쓰겠지만 rxjava1을 쓰는게 속편하다, 하지만 rxjava2 를 현재 쓰고 있는데 down-grade할 정도로 필요성은 있어 보이지 않고

그냥 이번기회에 (어차피 옮길거) reactor를 써보기로 함

Posted by dev.bistro
Cloud2018.01.28 12:58


18/01/26 idea


구현해야 할 도메인은 https://www.confluent.io/blog/building-a-microservices-ecosystem-with-kafka-streams-and-ksql/

필요한 내용은

- 정확한 request/ responses
- scale out
- high availability
- ktable & state-store를 기반으로 한 local - storage의 사용

블로그 글과 예제에 대해서는 100퍼센트 공감이 되지만, 몇몇 부분에서는 '구현' 자체가 고민스러움.
나 뿐 아니라 다른 이의 구현을 살펴보아도 동일하다.  아래의 github 개발자도 위의 블로깅 내용을 보고 example project를 작성하였지만 같은 문제점을 그대로 노출하고 있다..  

- https://github.com/hpgrahsl/kafka-streams-emojitracker
- 사용 : confluent connect, stream, statestore, reactive(boot2.0)
- 특징 : 트랜잭션 사용안함(at_least_once) / 

- 데이터 서칭

더보기

이 분은 그리고 request/reply 모델이 아니다.  그래서 아래와 같은 부분에 대해서는 구현이 빠져있다.
https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/main/java/io/confluent/examples/streams/microservices/OrdersService.java#L232 

reply에 대한 고민은 
- 채번을 하고, 해당 Order Id의 reply를 consumer로 받을 수 있는 Node으로 요청을 보낸다.
  이슈) zuul, client load loadbalancer가 주키퍼를 기반으로 브로커 파티션/파티셔너 정보를 알고 있어야 한다.
- zuul 에서 기존 routing 정책에 의해 요청을 보낸후, 실제 Node에서 '응답을 받을 수 있는' 채번을 한다.
  이슈) Range(0,PartionNum).boxed().filter(local-state::matchingkey).findFirst();
     그럼 아마 ID가 1,7,11 이런식으로 생기겠지. (중간에 사용못하는 애들 생김)

- 또는... 정말 옛날로 돌아간다. (생각도 안해봤지만, 팀원분께서 차라리 이럴꺼면 2번 호출해라고)
  ex) 채번 자체는 Node 에서 하고, 이벤트를 발생한다. 그리고 그 노드가 '응답'을 받는 다는 보장이 없으므로 Mono로 FT로 바로 리턴, FT는 응답을 바로 받고 'OrderId'를 가지고 다시 한번 zuul 영역을 호출한다. -_-;

- 또는 제일 심플하게 각각의 Node를 Consumer Group으로 사용하지 않는다!
  이슈) 트래픽이 WAS 만큼 '배수'로 든다. 즉 기존에 WAS 50대로 10MB/SEC 트래픽이었다면 이렇게 하면 500MB/SEC -_-;;;

Posted by dev.bistro
life/book2017.08.27 20:10


Java 에 대한 아쉬움으로 Scala 나 Haskell 을 본지는 몇년 되었고, 최근 들어 RxJava 프로젝트를 몇 달 째 참여하고 있으면서 이 책을 기다렸다.  번역 하신분( 오현석님 책은 다 구입을 한 것 같다)이나 주제가 분명히 나에게 도움이 될 거라고 생각했기 때문이였다.
(뭔가 어색한 제목은... 기분탓이다. 저자분도 이거에 대한 고민을 하셨다고... - 어디서 봤지?Slack이었나...)


 

Sodium 이라는 프레임웍을 기반으로 Swing을 확장한 swidget.* 으로 설명을 시작한다. 처음에는 웬 스윙..? 이라고 생각했으나 중간 중간 나오는 그림과 스윙 기반의 예제는 나름 도움이 되는 보조역을 해주었다. 하지만 너무 많은 UI 쪽 코드가 집중력이 분산된 건 좀 아쉬웠다. 그리고 처음 보는 Cell, Stream 단어를 가지고 설명을 해서 좀 어려웠던 것 같다  

(오히려 나는 기본에 내가 알던 지식에 이 책을 끼워 맞추려고 노력하느라 더 좀 혼란 스러웠던 것 같다. 오른쪽 사진이 책의 일부분인데 실제로 익숙한 건 map , orElse 정도였다)






개인적으로는 초반의 개념이나 기초 펑션에 대한 설명보다는 오히려 10장이후의 내용들이 많이 도움이 되었다. 고민되는 부분이나, 평소에 궁금했던 내용들을 언급해 주어서 많은 생각을 해주게 했다.

책 자체는 400 page 정도로 두꺼운 편은 아니지만,  빨간책 스칼라 만큼이나 천천히 읽고, 다시 읽고, 읽어도 뭔소리야? 라고 많이  생각한 것 같다. 한 2-3번 더 읽어보면 놓쳤던 부분을 좀 더 내것으로 만들 수 있지 않을까? 라고 생각이 든다.

Posted by dev.bistro
life/book2017.08.20 12:36


개인적으로는 스칼라나 하스켈 못지 않게 어려운 (그리고 생각과는 많이 다른) 언어가 자바스크립트인 것 같다. 그 난해함에 추가되는 문제는 생태계가 가장 빠르게 변화하는 언어라 단순히 언어 스펙을 공부하는 것만으로는 충분하지 못하다는 것이다. 이 책은 이러한 문제를 조금은 해결 할 수 있는 책이 아닌가 생각이 된다.

프로그래밍을 전혀 모르는 사람에게는 의미 없는 책이지만, ES3 부터 자바스크립트를 사용한 개발자나, 다시 한번 ES6 수준에서 알고 있는 지식을 정리하려는 사람들에게 추천할 만한 내용을 가지고 있다.

예를 들면 "30"* 3 = 90 이 나온다던가 let a = [1,2,3,] 처럼 마지막 콤마를 허용한다는 것 같은 것... 
arrow function 에서 this나 arguments가 펑션과 좀 다른다는 것은 제대로 정리가 안된다면 매번 헷갈려 하는 이슈 들이다. (물론 "30"*3 처럼 난해한 표현식을 추천하는 것은 아니다)


책을 쭈욱 읽어가면서  이 책의 장점을 정리해 보았다.

1. 30페이지 정도 나뉜 챕터들은 주제가 명확히 분리되어 있고, 그 안에서는 원리/이론 - 기초 - 심화로 이어진다. 처음부터 읽어가지 않아도 필요한 부분만 참고할 수 있다.

2. 중간중간 NOTE 도 도움이 되고, 챕터별 마지막 요약은 한 번 리마인드 할 수 있는 기회를 줬다.

3. 책의 내용뿐 아니라 번역 역시 책의 퀄리티를 결정하는 중요한 요소인데 (개인적으로는 ) 만족스러웠고, 단어 옆에 원래 영어단어가 충분히 표현되어 있어서 헷갈림이 없었다.   


본문 중 일부분


하지만  책의 본래 목적이기도 하지만 타겟 독자층이 애매하지 않나 생각이 들었고,  책 내용적으로는 ES6 부분은 별도 표시를 해 줬더라면 좋지 않았을 까 해본다.  

6개월전 Vue.js를 운영툴에 적용한 이후, Java와 Kotlin, Scala로만 프로젝트를 진행하고 있었는데, 이 책을 한번 읽음으로서 다시 한번 자바스크립트를 복습 할 수 있는 좋은 기회가 되었다. 

Posted by dev.bistro
spring framework2017.07.07 09:52


결론적으로 @SpringBootApplication 과 Main method으로 시작되는 startup은 불가능하지만, 그 부분을 생성자 부분으로 변경한 다면 사용 가능하다. 다만, 첫  시작은 dependency lookup 방식으로 bean을 가져와야 한다.
(이건 올해 초 얘기고, 지금은 https://github.com/spring-cloud/spring-cloud-function 을 이용하면 된다)

테스트환경

  • intellij 2016.3
  • gradle
  • spring-boot 1.4.3

intellij Spring Initializr 의 셋팅

%e1%84%89%e1%85%b3%e1%84%8f%e1%85%b3%e1%84%85%e1%85%b5%e1%86%ab%e1%84%89%e1%85%a3%e1%86%ba-2017-01-14-%e1%84%8b%e1%85%a9%e1%84%92%e1%85%ae-6-38-44

이후 build.gradle은 다음과 같이 셋팅하였다. boot를 이용한 gradle의  task package 나  assemble는 main이 없기때문에 사용이 힘들고 maven shade 플러그인이나 gradle shadow를 이용한다.

참고 build.gradle
( plugin : https://github.com/johnrengelman/shadow 는 몇년전에 PR 1줄 했네…)

이후 https://github.com/bistros/test-springboot-lambda 으로 프로젝트를 구동 시켜봤지만 단순히 String을 리턴하는데도 2초씩 걸린다.  (handleRequest에서 스프링 프로젝트를 초기화 하니 당연히 느릴수밖에… source link )

%e1%84%89%e1%85%b3%e1%84%8f%e1%85%b3%e1%84%85%e1%85%b5%e1%86%ab%e1%84%89%e1%85%a3%e1%86%ba-2017-01-14-%e1%84%8b%e1%85%a9%e1%84%92%e1%85%ae-7-14-56

그래서 다음처럼 생성자에서 초기화하도록 수정 (source link) 하였다.

%e1%84%89%e1%85%b3%e1%84%8f%e1%85%b3%e1%84%85%e1%85%b5%e1%86%ab%e1%84%89%e1%85%a3%e1%86%ba-2017-01-14-%e1%84%8b%e1%85%a9%e1%84%92%e1%85%ae-7-15-55


Posted by dev.bistro