일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | ||||
4 | 5 | 6 | 7 | 8 | 9 | 10 |
11 | 12 | 13 | 14 | 15 | 16 | 17 |
18 | 19 | 20 | 21 | 22 | 23 | 24 |
25 | 26 | 27 | 28 | 29 | 30 | 31 |
- Spring
- Logstash
- coursera
- avo
- scala 2.10
- confluent
- kafkastreams
- enablekafkastreams
- Elasticsearch
- spring-cloud-stream
- kafkastream
- Slick
- scala
- springboot
- reactive
- 플레이 프레임워크
- kafka streams
- spring-batch
- Elk
- 한빛미디어
- statestore
- schema registry
- spring-kafka
- RabbitMQ
- 카프카
- Kafka
- aws
- play framework
- kafka interactive query
- gradle
- Today
- Total
목록분류 전체보기 (167)
b
처음에는 이 메소드의 효용성? 사용범위? 에 대해서 이해하지 못했다. 그냥 최초에 Streams Application이 기동될때, State 가 migration 되는 동안 Application 의 health 를 DOWN 정도으로 바꾸는 정도? (즉 Created -> Running) 최근에 하나 더 든생각은 isRunning 상태에서 다른 상태로 빠질 경우이다. (isRunning은 RUNNING or REBALCING 이다) 이 때에는 API로 제공되는 KeyStore를 query하면 에러가 발생 할 것이다. 그러므로this.kafkaStreams.setStateListener((newState, oldState) -> { if(newState.isRunning() ){ this.ready = tr..
몇개의 파티션으로 구성할 것인가에 대한 도움 글https://www.confluent.io/blog/how-to-choose-the-number-of-topicspartitions-in-a-kafka-cluster/ 읽고 나서 내 맘대로 정리한 내용- '파티션 갯수가 변경이 된다면, 제대로 메시지가 전달 안될 수 있다. 그래서 현재 필요보다 파티션 수를 많이 구성하라. 파티션 갯수를 변경할 필요가 없도록 해라. (향후 1-2년치 트래픽도 고민해서 충분한 파티션 수를 고민할것)- 그때 문제점은 OS의 open file handle이 커지게 된다. (모든 세그먼트 * 2)- 리더 선출은 주키퍼와 관련된 작업이 포함되고 이것은 선행적이다. 그래서 한 브로커에 파티션 리더가 많다면 선행적으로 파티션 리더를 변경..
spring-kafka 를 디펜던시에 추가하면 사용할 수 있는 @EnableKafkaStreams 은 2017년 5월에 구현되었다. (https://github.com/spring-projects/spring-kafka/pull/238)JavaDoc에 의하면 기본 Kafka Stream 컴포넌트를 활성화 해준다고 한다. 정확히 설명하면 defaultKafkaStreamsConfig 라는 StreamsConfig bean이 있을 경우 StreamsBuilderFactoryBean 빈을 만들어 준다. (https://goo.gl/WaCFQ6) * 먼저 InitializingBean를 상속을 받았기 때문에 afterPropertiesSet가 호출이 될 것이고 new StreamsBuilder() 가 자동으로 ..
springboot는 spring framework 를 좀 더 빠르고 쉽게 사용할 수 있는 목적을 가진 영역이다. 하지만 어느 순간부터 Spring 이외에도 Boot 까지 '제대로' 이해하지 못하면 엉망이 될 수 있는 존재가 되어버렸다. 특히, Condition annotation이 추가되면서 내부적으로 자세히 보고 적용을 해야한다. 예를 들어보면,1. KafkaStream을 쓰기 위해 spring-kafka를 import 할경우 (cloud-stream-xxx역시 마찬가지) 간단한 String 이나, 추천되어지는 Avro 가 아니고 Json으로 (디)시리얼라이저를 지정한다고 할 경우, 안에서 어디에서 new ObjectMapper가 만들어지고, 어디 설정은 application.yml의 spring.k..
Spring Application 에서 Stream을 materialize 하기 위해서는 StateStore를 bean으로 접근 가능하게 해야한다. (xxxxRepository를 @Repository로 사용하는것과 같게 생각하면 된다)그러기 위해서는 https://kafka.apache.org/10/javadoc/index.html?org/apache/kafka/streams/KafkaStreams.html 에서 store method를 이용해서 (최소한) KeyValueStore 형태의instance를 가지고 있어야 한다. 당연히 이 코드는 Spring Bean 내부에서 만들것이고, 그러면 제목과 같은 에러 메시지를 50프로 이상 볼 수 있다.confluent의 FAQ에서 해답을 찾을수 있는데 Kafk..
현재 request/reply EDA 를 구축하기 위한 보일플레이트 프로젝트 작성중이다. 와꾸는 Spring Boot 2.0 / Spring 5.0 / Spring Cloud Finchley.M6 를 셋팅하고 있다. 역시나 불안전한 모습을 많이 보여주고 있다. 우선 오늘 발생한 이슈 2가지,compile('org.springframework.cloud:spring-cloud-stream-binder-kafka') compile('org.springframework.cloud:spring-cloud-stream-binder-kstream')디펜던시에 binder-kstream와 binder-kafka를 동시에 넣으면 Field configurationProperties in org.springframewo..
springboot 문서를 참고 하면 /metrics 에 cache hit ratio 가 노출된다고 나온다 ( https://docs.spring.io/spring-boot/docs/current/reference/html/production-ready-metrics.html#production-ready-datasource-cache ) 하지만 그 리스트에 Caffeine 을 지원한 다는 얘기는 없다. 기본적으로 chche size는 metrics 에 노출이 되지만, hit/miss ratio는 노출이 되지 않으므로 구현이 필요하다.현재 상황 : 지원 목록 : EhCache, Hazelcast, Infinispan, JCache and Guava 확인 : 문서에 나온대로 CacheStatisticsAu..
현재 팀은 SpringBoot application 을 좀 더 운영하기 쉽도록 하기 위해서 starter를 하나 생성해서 사용하고 있다.cloud-netflix 셋팅과, 그 많고 많은 zuul, eureka 버그 픽스를 위한 몇몇 코드, 그리고 config server를 기반으로 한 다양한 설정들을 공통화 하기 위한 목적이다. 현재 진행 하는 프로젝트는 이 starter 를 쓰지 않고 순수하게 spring/kafka 만을 사용 하고 있다.이 프로젝트를 위해 별도의 metrics 시스템을 구축하기 보다는 기존의 sleuth-zipkin 인프라를 활용하기 위해서 spring-cloud-sleuth-stream을 사용하려 한다. 1. gradle 설정 추가['stream-kafka'].collect { com..