b

StreamsBuilder의 table 메소드는 항상 statestore와 changelog 토픽을 만든다. 본문

카테고리 없음

StreamsBuilder의 table 메소드는 항상 statestore와 changelog 토픽을 만든다.

dev.bistro 2018. 4. 6. 07:24

어제의 이슈 StreamsBuilder.table은 과연 changelog topic을 만드는가 ?

https://kafka.apache.org/11/javadoc/org/apache/kafka/streams/StreamsBuilder.html#table-java.lang.String- 문서상에 의하면 '쿼리 불가능한 내부 store-name을 만들고, internal changelog topic은 만들어 지지 않는다고 한다.

The resulting KTable will be materialized in a local KeyValueStore with an internal store name. 
Note that store name may not be queriable through Interactive Queries. No internal changelog topic is created since the original input topic can be used for recovery (cf. methods of KGroupedStream and KGroupedTable that return a KTable).


우선 table 메소드를 비교해보자.


- <K,V> KTable<K,V> table​(java.lang.String topic)

- <K,V> KTable<K,V> table​(java.lang.String topic, Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)

둘다 내부에서는 internalStreamsBuilder 을 이용해서 MaterializedInternal 를 생성하고 있다. 만약 내가 명시적으로 Materialized.as 를 해서 인자를 넘긴다면 MaterializedInternal 역시 그 인자를 활용한다.
이 인자는 단순히 MaterializedInternal 를 만들때 사용자의 의도를 반영 할 수 있게 한 것이다.


protected Materialized(final Materialized<K, V, S> materialized) {
this.storeSupplier = materialized.storeSupplier;
this.storeName = materialized.storeName;
this.keySerde = materialized.keySerde;
this.valueSerde = materialized.valueSerde;
this.loggingEnabled = materialized.loggingEnabled;
this.cachingEnabled = materialized.cachingEnabled;
this.topicConfig = materialized.topicConfig;
}

내가 명시적으로 Materialized 를 넘기면 이 생성자를 통해서 몇가지 셋팅 되고, KTable 자체도 쿼리가능한 상태라고기록해 놓는다 ( queryable = true )

만약 위의 인자가 없다면

@Override
public String newStoreName(final String prefix) {
return prefix + String.format(KTableImpl.STATE_STORE_NAME + "%010d", index.getAndIncrement());
}

이러한 메소드를 통해   data.order-STATE-STORE-0000000000 라는 state-store가 생성되었다.


그런데 internal-topic을 안만든다는 javadoc의 문서와 달리 appName-order-STATE-STORE-0000000000-changelog 이라는 토픽이 하나 생겼다. (Configs:cleanup.policy=compact)

이 내용은 지난번에 읽은 https://www.confluent.io/blog/event-sourcing-using-apache-kafka/ 의 fail-over 부분에서 약간 언급이 되는데

That’s why by default persistent state stores are logged: that is, all changes to the store are additionally written to a changelog-topic. This topic is compacted (we only need the latest entry for each ID, without the history of changes, as the history is kept in the events) and hence is as small as possible. Thanks to that, re-creating the store on another node can be much faster.

즉, store는 changelog-topic (compact mode)를 쓴다는 얘기다. 

그럼 StreamsBuilder.table을 쓰면 내부적으로 항상 state-store를 사용하고 그 말은 항상 changelog-topic이 생성된다는 뜻 아닌가? 


kafka javadoc이 말한 changelog-topic이 생기지 않는다라는건  KTable이 compact된 changelog-topic을 안 만든다는 것이지 MaterializedInternal (statestore)의 changelog-topic을 뜻한건 아니지 않았을까?


추가) statestore가 내부적으로 changelog-topic을 쓰는건 확실하다.  대표적인 StateStore인 KeyValue타입의 RocksDBStore를 보면 openDB를 할 때 아래의 코드를 통해서 changelog-topic 이름을 결정해서 StateRestorer , ProcessorStateManager 를 topic을 업데이트 한다.

this.serdes = new StateSerdes<>(
ProcessorStateManager.storeChangelogTopic(context.applicationId(), name),
keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);

this.dbDir = new File(new File(context.stateDir(), parentDir), this.name);
public static String storeChangelogTopic(final String applicationId, 
                                         final String storeName) {
return applicationId + "-" + storeName + STATE_CHANGELOG_TOPIC_SUFFIX;
}



추가2) 팀장님이 아침에 많이 봐주셔서 이유를 찾아냈다.  버그였다-_-; 실제로 0.11 까지는 logConfig를 false로 넘기면서 ChangeLoggingKeyValueBytesStore 를 사용하지 않았었는데 1.0에서 대규모 리팩토링을 거치면서 이슈가 발생하였다.

 https://issues.apache.org/jira/browse/KAFKA-6729 

Comments