Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | |||||
3 | 4 | 5 | 6 | 7 | 8 | 9 |
10 | 11 | 12 | 13 | 14 | 15 | 16 |
17 | 18 | 19 | 20 | 21 | 22 | 23 |
24 | 25 | 26 | 27 | 28 | 29 | 30 |
Tags
- confluent
- Slick
- aws
- play framework
- 플레이 프레임워크
- enablekafkastreams
- kafkastream
- springboot
- scala
- statestore
- Spring
- 한빛미디어
- spring-batch
- Elk
- kafkastreams
- scala 2.10
- Logstash
- Elasticsearch
- kafka interactive query
- 카프카
- avo
- coursera
- schema registry
- spring-cloud-stream
- kafka streams
- Kafka
- spring-kafka
- reactive
- gradle
- RabbitMQ
Archives
- Today
- Total
b
Kafka Streams 로 Interactive Query 를 위한 초기화 본문
카프카 스트림은으로 StateStore를 구성하여 서비스에 투입하는 것은 튜토리얼과 상용은 하늘과 땅차이다. 특히 데이터가 어느 정도 적재되는 상황에서는 그 문제는 더 커진다. 일반적으로 Application 이 standby가 되면 트래픽을 받을 수 있는 것과는 달리, Kafka Streams로 Query를 받아 내기 위해서는 StateStore가 초기화 되어야 한다.
그래서 아래와 같은 코드를 준비하여야 한다. (시대가 어느시대인데 무한 루프냐 ㅠ)
public class StoreHelper {
public static <T> T waitUntilStoreIsQueryable(final String storeName,
final QueryableStoreType<T> queryableStoreType,
final KafkaStreams streams) throws InterruptedException {
int retry = 60;
while (retry-- > 0) {
try {
T store = streams.store(storeName, queryableStoreType);
log.info("Success build Store {}", storeName);
return store;
} catch (InvalidStateStoreException ignored) {
log.info("Ready kafkaStream for {}/{} , {}", retry, 60, storeName);
Thread.sleep(1000);
}
}
throw new InvalidStateStoreException("Wasn't ready to Query : " + storeName);
}
}
그 다음에서야 비로서 Repository를 빈을 생성해야만, 아래처럼 뭐라도 Query 할 수 있을 것이다-_-;;;
@Component
public class OrderRepository {
public OrderRepository(OrderDataStream stream) throws InterruptedException {
this.stream = stream;
this.orderStore = StateStoreHelper
.waitUntilStoreIsQueryable(ORDER_STATE_VIEW,
QueryableStoreTypes.<Long, Order>keyValueStore(), stream.getKafkaStreams());
}
public Order getOrder(long id) {
if (!stream.isQueryable()) {
throw new RuntimeException("Stores not ready for service, probably re-balancing");
}
return orderStore.get(id);
}
}
만약 이 기능이 Application 에 운영에 중요하다면, Eureka에 OUT_OF_SERVICE 인 상태로 등록 한 이후, 상태를 바꿔야한다.
ps. 1년전의 썻던글의 다음 내용이었다-_-; https://bistros.tistory.com/entry/kafka-stream-%EC%9D%98-%EB%8C%80%ED%99%94%ED%98%95-%EC%BF%BC%EB%A6%AC-interactive-query-%EA%B5%AC%ED%98%84
Comments