어제의 이슈 StreamsBuilder.table은 과연 changelog topic을 만드는가 ?

https://kafka.apache.org/11/javadoc/org/apache/kafka/streams/StreamsBuilder.html#table-java.lang.String- 문서상에 의하면 '쿼리 불가능한 내부 store-name을 만들고, internal changelog topic은 만들어 지지 않는다고 한다.

The resulting KTable will be materialized in a local KeyValueStore with an internal store name. 
Note that store name may not be queriable through Interactive Queries. No internal changelog topic is created since the original input topic can be used for recovery (cf. methods of KGroupedStream and KGroupedTable that return a KTable).


우선 table 메소드를 비교해보자.


- <K,V> KTable<K,V> table​(java.lang.String topic)

- <K,V> KTable<K,V> table​(java.lang.String topic, Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)

둘다 내부에서는 internalStreamsBuilder 을 이용해서 MaterializedInternal 를 생성하고 있다. 만약 내가 명시적으로 Materialized.as 를 해서 인자를 넘긴다면 MaterializedInternal 역시 그 인자를 활용한다.
이 인자는 단순히 MaterializedInternal 를 만들때 사용자의 의도를 반영 할 수 있게 한 것이다.


protected Materialized(final Materialized<K, V, S> materialized) {
this.storeSupplier = materialized.storeSupplier;
this.storeName = materialized.storeName;
this.keySerde = materialized.keySerde;
this.valueSerde = materialized.valueSerde;
this.loggingEnabled = materialized.loggingEnabled;
this.cachingEnabled = materialized.cachingEnabled;
this.topicConfig = materialized.topicConfig;
}

내가 명시적으로 Materialized 를 넘기면 이 생성자를 통해서 몇가지 셋팅 되고, KTable 자체도 쿼리가능한 상태라고기록해 놓는다 ( queryable = true )

만약 위의 인자가 없다면

@Override
public String newStoreName(final String prefix) {
return prefix + String.format(KTableImpl.STATE_STORE_NAME + "%010d", index.getAndIncrement());
}

이러한 메소드를 통해   data.order-STATE-STORE-0000000000 라는 state-store가 생성되었다.


그런데 internal-topic을 안만든다는 javadoc의 문서와 달리 appName-order-STATE-STORE-0000000000-changelog 이라는 토픽이 하나 생겼다. (Configs:cleanup.policy=compact)

이 내용은 지난번에 읽은 https://www.confluent.io/blog/event-sourcing-using-apache-kafka/ 의 fail-over 부분에서 약간 언급이 되는데

That’s why by default persistent state stores are logged: that is, all changes to the store are additionally written to a changelog-topic. This topic is compacted (we only need the latest entry for each ID, without the history of changes, as the history is kept in the events) and hence is as small as possible. Thanks to that, re-creating the store on another node can be much faster.

즉, store는 changelog-topic (compact mode)를 쓴다는 얘기다. 

그럼 StreamsBuilder.table을 쓰면 내부적으로 항상 state-store를 사용하고 그 말은 항상 changelog-topic이 생성된다는 뜻 아닌가? 


kafka javadoc이 말한 changelog-topic이 생기지 않는다라는건  KTable이 compact된 changelog-topic을 안 만든다는 것이지 MaterializedInternal (statestore)의 changelog-topic을 뜻한건 아니지 않았을까?


추가) statestore가 내부적으로 changelog-topic을 쓰는건 확실하다.  대표적인 StateStore인 KeyValue타입의 RocksDBStore를 보면 openDB를 할 때 아래의 코드를 통해서 changelog-topic 이름을 결정해서 StateRestorer , ProcessorStateManager 를 topic을 업데이트 한다.

this.serdes = new StateSerdes<>(
ProcessorStateManager.storeChangelogTopic(context.applicationId(), name),
keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);

this.dbDir = new File(new File(context.stateDir(), parentDir), this.name);
public static String storeChangelogTopic(final String applicationId, 
                                         final String storeName) {
return applicationId + "-" + storeName + STATE_CHANGELOG_TOPIC_SUFFIX;
}



추가2) 팀장님이 아침에 많이 봐주셔서 이유를 찾아냈다.  버그였다-_-; 실제로 0.11 까지는 logConfig를 false로 넘기면서 ChangeLoggingKeyValueBytesStore 를 사용하지 않았었는데 1.0에서 대규모 리팩토링을 거치면서 이슈가 발생하였다.

 https://issues.apache.org/jira/browse/KAFKA-6729 

Event Sourcing Using Apache Kafka

분류없음 2018.03.22 09:37 posted by dev.bistro

https://www.confluent.io/blog/event-sourcing-using-apache-kafka/ 를 읽고 대충 필요한 것만 적음.

추가적으로 알게된 내용보다는 다시 한번 지식을 되새겨볼 수 있던 기회.


Storing events in Kafka


첫 번째 문제는 이벤트를 어떻게 저장하는가이다. 3개의 방법으로 얘기할 수 있다.

1. 모든 타입의 모든 이벤트를 하나의 토픽에 저장하는 방법 (물론 멀티 파티션)

2. Topic-per-entity-type : entity별로 관련된 이벤트들을 분리된 토픽에 저장하는 방법 (예를 들어 user관련 토픽, product관련 토픽)

3. topic-per-entity : 각각의 user, product 처럼 각 entity별로 별도의 토픽을 할당해서 저장하는 방법

low-cardinality entity를 제외하고는 3번째 전략은 실현 가능한 방법은 아니고 1,2번중 선택해야 한다.

1번은 모든 이벤트를 볼 수 있는 장점이 있고 2번은 entity type별로 분리하고 scale 할 수 있다.  아래으  내용은 대부분 1번을 기반으로 얘기를 이어나간다. 2번에 대해 더 알고 싶다면 https://www.confluent.io/blog/put-several-event-types-kafka-topic/ 를 참고하면 된다.


Basic event-sourcing storage operations

데이터 저장소에서 기대하는 가장 기본적인 명령은 '특정 엔티티의 현재 상태를 얻는 것'이다. 일반적으로 각 엔티티는 ID가 있고, ID가 주어지면 저장 시스템은 현재 상태를 반환해 줘야한다.

이벤트 로그는 primary source of truth 이고, '현재 상태'는 엔티티의 이벤트 스트림에서 파생 될 수 있다. 이를 위해서 스토리지 엔진은 Event => State => State를 반환하는 순수 함수가 필요하다.

카프카에서 'read current state'를 native하게 구현해본다면 카프카는 토픽의 모든 이벤트를 필터링 하고, 주어진 function으로 부터 상태를 구해야한다. 만약 많은 이벤트가 있다면 느리고 자원을 많이 소모할 것이다. 그리고 장애나, 캐시 클리어 등으로 인해서 다시 해야할 수도 있다.

그래서 더 좋은 방법이 필요한다. KafkaStreams와 StateStore를 이용하는 것이다. 

Kafka Streams는 고수준의 오퍼레이션을 제공하는데 그 중 하나는 스트림을 localstate로 folding하는 기능이다.

각 local store는 노드에서 사용하는 파티션의 데이터만 포함된다. 기본적으로 RocksDB(디폴트), inMemoryStore 2개의 구현체가 있다. 둘 다 fault-tolerant 이고, in-memory를 사용하더라도 데이터가 손실되지 않는다(주: local store topic을 현실화 하는 것이기 때문이다)

이렇게 이벤트 소싱에서 이벤트 스트림을 state store로 변경해서 각 엔티티의 '현재 상태'를 구할 수 있다. 


Looking up the current state

같은 노드에 있는 데이터를 쿼리하는 건 쉽다. 하지만 다른 노드에 있는 데이터를 쿼리하려면 kafka의 메타 데이터에 대해 먼저 질의한 다음 특정 ID에 대해 처리를 담당하는 노드를 찾을 수 있다. ( streamsMetadataForStoreAndKey ) 그런 다음 rest, akka-remoting 등의 방법으로 해당 노드로 요청을 하면 된다. 


Fail-over

State Store가 좋지만, 노드가 장애라면? 새로 만든다는 것은 비용이 많이 드는 작업이다.  KafkaStreams는 리밸런싱(노드의 추가/삭제) 때문에 레이턴시가 증가하거나, 요청 자체가 실패할 수 있다.  

이 이유 때문에 state-store가  changelog-topic에 로깅된다. 이 topic은 모든 이벤트가 필요없고 최신분만 필요하므로 compacted(압축) 되어 있다. 덕분에 다른 노드에서 다시 store를 구축하는게 빠르다. 하지만 여전히 리밸런싱에 대한 latency는 증가될 수 있다.  이에 대한 해결방법으로 standby 를 제공한다. primary 가 실패한다면 바로 standby 로 장애를 처리한다.


Consistency

카프카는 기본은 at-least-once 정책이다. exactly-once 정책도 지원하지만 topic을 읽고/쓰기 할 때에만 보장해준다. processing.guarantee 옵션을 켜서 exactly-once를 사용 할 수 있으며, 성능은 저하되지만 batch message의 집합이 한번의 커밋에 되므로 큰 비용은 아니다.


Listening for events

지금 까지 'current state'를 업데이트 하고 질의하는 걸 알아봤고, 이제 사이드 이펙트에 대해서 알아 볼 차례이다( 예를 들어 rest 등의 외부 호출이네 이메일 보내기)

이러한 작업들은 blocking이 일어나며 I/O를 발생하므로 state-update의 로직에 포함되는건 종은 생각이 아니다. 그리고 main 스레드에서 처리를 하는것은 성능상의 병목을 일으킨다. 그리고 state-update 자체가 여러번 실행 될 수 있다.

다행히 카프카 토픽을 처리할 많은 유연성을 가질 수 있다. state를 업데이트 전후에 메시지를 소비할 수 있고, 이 메시지 소비 전후에 원하는 방식으로 사용 할 수 있다.