Avro GenericRecord,SpecificRecord

분류없음 2018.04.16 09:47 posted by dev.bistro

GenericRecord

File/String 기반의 Schema 에서 Avro Object를 생성하는 것을 말한다.  이 방법은 runtime 에서 실패할 수 있기 때문에 사용에서는 추천되는 방법은 아니지만, 쉽게 사용 할 수 있는 장점이 있다.

Schema

{
"type": "record",
"namespace": "com.example",
"name": "user",
"fields": [
{
"name": "name",
"type": "string"
}
]
}다시피 


GenericRecordBuilder builder = new GenericRecordBuilder(schemaString);
builder.set("name", "bistros");
GenericData.Record bistros = builder.build();
System.out.println(bistros);
System.out.println(bistros.getSchema());
// {"name": "bistros"}
// {"type":"record","name":"user","namespace":"com.example", "fields":[{"name":"name","type":"string"}]}

보시다시피  GenericRecordBuilder set(String fieldName, Object value) 를 통해서 값을 셋팅하기 때문에 불안하다.


SpecificRecord

그에 반해서 SpecificRecord 역시 Avro Object 이지만  Avro Schema에 의해 성성되는 코드가 좀 더 포함되어 있다.
GenericRecordBuilder builder()는 Record type을 반환하는 데  그 내부를 보면, Map<String, Object>와 다름이 없다는 것을 확인 할 수 있다.
- GenericRecord 인터페이스를 구현 ( get/put만 있다)
- 내부적으로 Object[] 만을 가지고 있다.
링크 : https://github.com/apache/avro/blob/master/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java#L178

하지만 SpecificRecord는 좀 다르다.  avro schema를 기반으로 maven/gradle 플러기인으로 resources를 생성해보면  extends org.apache.avro.specific.SpecificRecordBase 를 통해서 똑같이 GenericData를 상속하고 있지만  6개 필드를 가진 Avro Schema 파일에서 약 550라인의 Class파일이생성된다. getter/setter/builder 등이 구현되어 있다.


요약)

Gradle Plugin을 이용해서 Class 파일을 만들어야 하지만 SpecificRecord를 사용하는 게 당연하다.


사족)

Confluent의 Kafka-Avro-Serializer는 default로 generic type을 읽고 쓰게 되어 있다.

properties.setProperty("specific.avro.reader", "true");

을 통해서 specific 모드를 활성화 해야 하고,  이것을 활성화 하면 내부적으로 사용하는 DatumReader를  SpecificDatumReader 으로 셋팅하다.

추가적으로 Streams 모드를 위해서는 아예  GenericAvroSerde, SpecificAvroSerde 2개의 Serde를 따로 제공해주고 있다.


어제의 이슈 StreamsBuilder.table은 과연 changelog topic을 만드는가 ?

https://kafka.apache.org/11/javadoc/org/apache/kafka/streams/StreamsBuilder.html#table-java.lang.String- 문서상에 의하면 '쿼리 불가능한 내부 store-name을 만들고, internal changelog topic은 만들어 지지 않는다고 한다.

The resulting KTable will be materialized in a local KeyValueStore with an internal store name. 
Note that store name may not be queriable through Interactive Queries. No internal changelog topic is created since the original input topic can be used for recovery (cf. methods of KGroupedStream and KGroupedTable that return a KTable).


우선 table 메소드를 비교해보자.


- <K,V> KTable<K,V> table​(java.lang.String topic)

- <K,V> KTable<K,V> table​(java.lang.String topic, Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)

둘다 내부에서는 internalStreamsBuilder 을 이용해서 MaterializedInternal 를 생성하고 있다. 만약 내가 명시적으로 Materialized.as 를 해서 인자를 넘긴다면 MaterializedInternal 역시 그 인자를 활용한다.
이 인자는 단순히 MaterializedInternal 를 만들때 사용자의 의도를 반영 할 수 있게 한 것이다.


protected Materialized(final Materialized<K, V, S> materialized) {
this.storeSupplier = materialized.storeSupplier;
this.storeName = materialized.storeName;
this.keySerde = materialized.keySerde;
this.valueSerde = materialized.valueSerde;
this.loggingEnabled = materialized.loggingEnabled;
this.cachingEnabled = materialized.cachingEnabled;
this.topicConfig = materialized.topicConfig;
}

내가 명시적으로 Materialized 를 넘기면 이 생성자를 통해서 몇가지 셋팅 되고, KTable 자체도 쿼리가능한 상태라고기록해 놓는다 ( queryable = true )

만약 위의 인자가 없다면

@Override
public String newStoreName(final String prefix) {
return prefix + String.format(KTableImpl.STATE_STORE_NAME + "%010d", index.getAndIncrement());
}

이러한 메소드를 통해   data.order-STATE-STORE-0000000000 라는 state-store가 생성되었다.


그런데 internal-topic을 안만든다는 javadoc의 문서와 달리 appName-order-STATE-STORE-0000000000-changelog 이라는 토픽이 하나 생겼다. (Configs:cleanup.policy=compact)

이 내용은 지난번에 읽은 https://www.confluent.io/blog/event-sourcing-using-apache-kafka/ 의 fail-over 부분에서 약간 언급이 되는데

That’s why by default persistent state stores are logged: that is, all changes to the store are additionally written to a changelog-topic. This topic is compacted (we only need the latest entry for each ID, without the history of changes, as the history is kept in the events) and hence is as small as possible. Thanks to that, re-creating the store on another node can be much faster.

즉, store는 changelog-topic (compact mode)를 쓴다는 얘기다. 

그럼 StreamsBuilder.table을 쓰면 내부적으로 항상 state-store를 사용하고 그 말은 항상 changelog-topic이 생성된다는 뜻 아닌가? 


kafka javadoc이 말한 changelog-topic이 생기지 않는다라는건  KTable이 compact된 changelog-topic을 안 만든다는 것이지 MaterializedInternal (statestore)의 changelog-topic을 뜻한건 아니지 않았을까?


추가) statestore가 내부적으로 changelog-topic을 쓰는건 확실하다.  대표적인 StateStore인 KeyValue타입의 RocksDBStore를 보면 openDB를 할 때 아래의 코드를 통해서 changelog-topic 이름을 결정해서 StateRestorer , ProcessorStateManager 를 topic을 업데이트 한다.

this.serdes = new StateSerdes<>(
ProcessorStateManager.storeChangelogTopic(context.applicationId(), name),
keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);

this.dbDir = new File(new File(context.stateDir(), parentDir), this.name);
public static String storeChangelogTopic(final String applicationId, 
                                         final String storeName) {
return applicationId + "-" + storeName + STATE_CHANGELOG_TOPIC_SUFFIX;
}



추가2) 팀장님이 아침에 많이 봐주셔서 이유를 찾아냈다.  버그였다-_-; 실제로 0.11 까지는 logConfig를 false로 넘기면서 ChangeLoggingKeyValueBytesStore 를 사용하지 않았었는데 1.0에서 대규모 리팩토링을 거치면서 이슈가 발생하였다.

 https://issues.apache.org/jira/browse/KAFKA-6729 


Spring 4.X 에서 Conditional.... 시리즈의 어노테이션이 추가 되면서 다양한 환경에서 Bean을 등록 할 수 있게 되었다. 현재 작업중인 Event Driven MSA (이름 참 길다...) 의 한 프로젝트에서는  상용과, 개발서버의 Bean이 달라야 하는 니즈가 있다.


@Configuration
@EnableConfigurationProperties(SolProperties.class)
public class SolAutoConfiguration {
private SolProperties solProperties;

@Autowired
public SolAutoConfiguration(SolProperties solProperties) {
this.solProperties = solProperties;
}

private RestTemplate solMailClientOperation() {
}

@Bean
@ConditionalOnMissingBean(MailClient.class)
public MailClient solMailClient() {
}

@Bean
@ConditionalOnProperty(name = "mail.type", havingValue = "fake")
public MailClient fakeMailClient() {
return new FakeMailClient();
}
}

뭐 대충 이런식으로  작성한다면 원하는대로 동작 가능하지만, Spring 5 에서는 좀 더 특별한 방법으로 할 수 있다. 이 내용은 이전 내용인 링크 의 연장 선이다.



@SpringBootApplication
public class EventApplication {
private static final String OA = "mail.type";
private static final String OS = "sms.type";

public static void main(String[] args) {
ApplicationArguments aa = new DefaultApplicationArguments(args);
new SpringApplicationBuilder()
.sources(EventApplication.class)
.initializers(
(ApplicationContextInitializer<GenericApplicationContext>)
context -> {
if (aa.containsOption(OA)
&& aa.getOptionValues(OA).contains("fake")) {
context.registerBean(MailClient.class, FakeMailClient::new);
}
if (aa.containsOption(OS)
&& aa.getOptionValues(OS).contains("fake")) {
context.registerBean(SmsClient.class, FakeSmsClient::new);
}
}).run(args);
}
}

대충 뭐 이런식으로 작성할 수 있다.

실제로 실행 할 때는 java -jar app.jar --sms.type=fake --mail.type=fake  이렇게 된다.