max.in.flight.request.per.connection

분류없음 2018.06.18 14:51 posted by dev.bistro

필요한 수준까지 이해한듯 해서, 메모로 남김

원래의 카프카는 데이터 주입에 대한 순서가 중요하지 않았다.  (정확히는 어느 정도의 버퍼를 카프카 브로커가 가지면서 Event Time에 대한 조정 작업을 알아서 해준다)

하지만 producer의 명등성 옵션과 순서 보장에 대한 이슈가 중요해졌고 max.in.flight.request.per.connection = 1 로 셋팅하면서 순서 꼬임을 좀 방어하고자 했는데...

1로 설정하고 idempotence 옵션이 켜져있고 retry가 가능한 상태일때 
전송하다가 OutOfOrderSequence 예외가 발생한다면 클라이언트 영역에서 이 Sequence Number를 잘 처리해야한다. (재전송 하거나, 다음 idempotence 전송에서 써야한다)


이게 어렵다... 그래서 그냥 하드 코딩으로 각 Topic Partition의 5개의 메타데이터를 들고 있게한것이다. 

if (!inflightBatchesBySequence.containsKey(batch.topicPartition)) {
inflightBatchesBySequence.put(batch.topicPartition, new PriorityQueue<>(5, new Comparator<ProducerBatch>() {
@Override
public int compare(ProducerBatch o1, ProducerBatch o2) {
return o1.baseSequence() - o2.baseSequence();
}
}));

그래서 kafka 문서의 "enable.idempotence" 항목을 보면 Note that enabling idempotence requires max.in.flight.requests.per.connection to be less than or equal to 5 라고 나온다.


idempotence 켜져있고 나부의 매직 넘버인 5보다 크게 셋팅한다면 sequence를 보장하지 못할 수 있기 때문이다.

실제로도 

        if (idempotenceEnabled && 5 < config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)) {

            throw new ConfigException("Must set " + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to at most 5" +

                    " to use the idempotent producer.");

        }

코드에 의해서 실행 자체를 막아놨다.



요약
1. 그냥 디폴트로 쓰자.
2. 프로듀서에서 동기식으로 사용한다면 (.get() 호출) 더더욱 상관하지 않아도 된다.


Schema Registry , Schema Evolution

분류없음 2018.06.11 21:45 posted by dev.bistro

몇 달전에 메모장에 적어놨던 내용인데, 지우면 또 까먹을 듯 해서 기록으로 남김


* 기존 Java Class에서 Avro Schema 획득하기

ReflectData.get().getSchema 를 이용하여, 쉽게 json형식의 AvroSchema를 획득할 수 있다. (검증은 꼭 할것)
아쉽지만 record 타입을 자동으로 찾아서 해준다거나, vargs 형식으로 getSchema를 사용할 수는 없다.

Schema schema = ReflectData.get().getSchema(Order.class);
System.out.println(schema.toString(true));


* Schema Evolution

1. backward : 새스키마로 이전 데이터를 읽을 수 있다.
 ex) 새 field를 추가할때 default value를 넣는다. 필드가 없다면 default value를 사용한다.  즉 데이터는 old type인데 읽는 쪽이 new type

2. forward : 스키마가 변하지 않는데, 새로운 스키마의 데이터가 들어오면 ignore 한다.
 'forward' : we want forward compatible when we want to make a data stream evolve without chaning our downstream consumers . 즉, 데이터는 new type인데 읽는 쪽이 old type

3. full : 1,2번 두개 합친거
- 필드를 추가할 때에는 default를 무조건 넣어야 한다(backward position)
- default가 있는 필드만 remove 할 수 있다.


* avro schema 를 사용하는 추천 가이드

1. PK를 사용해라
2. remove될수 있는 필드라면 default를 제공하라
3. enum은 주의해서 사용하라. - 시간이 흘러도 evolve 할 수 없다?
4. rename 하지 말것, 필요하다면 다른 필드를 넣어라
5. 항상 default value를 쓰고, required field는 삭제하지 마라

subject란?

- Schema Registry는 Schema를 Subject 단위로 관리되어지는 듯 하다. 디폴트로는 <topc>-key, <topic>-value (ex: data.order-value) 형식으로 사용되지만 confluent 4.1 부터 이 부분을 커스텀마이징 할 수 있는 SubjectNameStrategy가 추가되었다.

기본적으로 사용하는 토픽이 동일하다면 동일한 subject를 보지만, 커스텀마이징 한다면 Record 에 따라 subject로 변경 가능하다.

Get the subject name for the given topic and value type.




1. 최초에 Schema Registry에 등록하고 Schema ID 를 가져오는 동작

- SR은 Cache 기반으로 동작한다 어플리케이션이 시작하면 Application SchemaRegistry는 빈 데이터만을 들고 있다.

- 처음으로 Producer 를 통해서 메시지를 발송하면 Schema 정보를 instance.getSchema() 로 가져온다.

- Registry Server로 요청을 보낸고 응답은 Schema ID로 받는다.

- Map<String, Map<Schema, Integer>> schemaCache 캐시에 등록하고 사용 한다.

- schemaCache 에서 remove 되는 endpoint 는 없다. 즉, 한 번 캐싱이 된다면 다시 호출 되지는 않을 것이다. (어플리케이션에 실행중 instance.getSchema가 변경이 될리는 없기 때문이다)


2. Application이 자동으로 Schema ID를 올리는 것을 막는 방법

- SR 서버 자체의 호환 모드를 NONE 으로 해 되지만 클라이언트 설정에서 auto.register.schemas(default true)를 false로 해도 된다. 이럴 경우 SchemaRegistry RestClientException 이 발생되면서 정상적인 produce가 안된다.




결론

- (avsc에 의해 생성되는) org.apache.avro.specific.SpecificRecord 를 구현하는 인스턴스의 SCHEMA$ 를 가지고 Schema ID를 등록하거나, 내부적으로 캐싱해서 사용한다.

- 이 문자열이 캐싱된다면, 최초의 Produce 할 때만 SR 서버를 호출한다. (즉 이 문자열이 바뀔일이 없기 때문에 1개의 subject당 1번이 호출될 것이다)

- BACKWARD 인지 FORWARD 인지는 순수하게 SR 서버쪽의 담당이다. 클라이언트는 단지 auto.register.schemas: false로 설정함으로써 자동 등록을 해제 할 수 있을 뿐이다.



nebula 를 이용한 integration test 분리

분류없음 2018.05.18 07:57 posted by dev.bistro

테스트를 진행함에서 있어서 unit test와 integration test를 분리하고 싶은 니즈가 많다. junit5를 이용한다면,  Tag로 쉽게 처리 가능하지만 ( https://junit.org/junit5/docs/5.0.2/api/org/junit/jupiter/api/Tag.html )  레거시 프로젝트들 대다수가 junit5를 사용하지 못한다.

그래서 보통은 https://selimober.com/gradle_unit_integration/ 이러한 방법으로 처리하는데  nebula plugin을 이용하면 쉽게 처리 할 수 있다.


nebula facet : https://github.com/nebula-plugins/nebula-project-plugin


1. src/test/java 에는 unit-test만 남겨놓고 나머지는 src/integTest/java 에는 인테그레이션 테스트를 위치시킨다.
(디렉토리 변경 가능할줄 알았으나 고정되어 있다)

2. buildScript 부분에 아래와 같이 nebula.facet 의존성 추가와 plugin 설정을 한다.

buildscript {
...
dependencies {
...
classpath "com.netflix.nebula:nebula-project-plugin:3.4.1"
}
} apply plugin: 'nebula.facet'


3. 다음과 같은 가장 기본적인 설정을 추가한다. 

facets {
integTest {
}
}


4. 프로젝트를 reload 하면 'integTest' 라는 gradle-task가 생긴것을 확인 할 수 있다.



이 때의 문제점은 test-report 가 2개가 생긴다는 것이다. (test와 integTest) 이 2개의 리포트를 합치고자 하는 이슈가 있을때에는 아래처럼 TestReport 관련 task를 하나 추가하면 된다.

task testReport(type: TestReport) {
destinationDir = file("$buildDir/reports/combined")
reportOn test
reportOn integTest
}



Avro GenericRecord,SpecificRecord

분류없음 2018.04.16 09:47 posted by dev.bistro

GenericRecord

File/String 기반의 Schema 에서 Avro Object를 생성하는 것을 말한다.  이 방법은 runtime 에서 실패할 수 있기 때문에 사용에서는 추천되는 방법은 아니지만, 쉽게 사용 할 수 있는 장점이 있다.

Schema

{
"type": "record",
"namespace": "com.example",
"name": "user",
"fields": [
{
"name": "name",
"type": "string"
}
]
}다시피 


GenericRecordBuilder builder = new GenericRecordBuilder(schemaString);
builder.set("name", "bistros");
GenericData.Record bistros = builder.build();
System.out.println(bistros);
System.out.println(bistros.getSchema());
// {"name": "bistros"}
// {"type":"record","name":"user","namespace":"com.example", "fields":[{"name":"name","type":"string"}]}

보시다시피  GenericRecordBuilder set(String fieldName, Object value) 를 통해서 값을 셋팅하기 때문에 불안하다.


SpecificRecord

그에 반해서 SpecificRecord 역시 Avro Object 이지만  Avro Schema에 의해 성성되는 코드가 좀 더 포함되어 있다.
GenericRecordBuilder builder()는 Record type을 반환하는 데  그 내부를 보면, Map<String, Object>와 다름이 없다는 것을 확인 할 수 있다.
- GenericRecord 인터페이스를 구현 ( get/put만 있다)
- 내부적으로 Object[] 만을 가지고 있다.
링크 : https://github.com/apache/avro/blob/master/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java#L178

하지만 SpecificRecord는 좀 다르다.  avro schema를 기반으로 maven/gradle 플러기인으로 resources를 생성해보면  extends org.apache.avro.specific.SpecificRecordBase 를 통해서 똑같이 GenericData를 상속하고 있지만  6개 필드를 가진 Avro Schema 파일에서 약 550라인의 Class파일이생성된다. getter/setter/builder 등이 구현되어 있다.


요약)

Gradle Plugin을 이용해서 Class 파일을 만들어야 하지만 SpecificRecord를 사용하는 게 당연하다.


사족)

Confluent의 Kafka-Avro-Serializer는 default로 generic type을 읽고 쓰게 되어 있다.

properties.setProperty("specific.avro.reader", "true");

을 통해서 specific 모드를 활성화 해야 하고,  이것을 활성화 하면 내부적으로 사용하는 DatumReader를  SpecificDatumReader 으로 셋팅하다.

추가적으로 Streams 모드를 위해서는 아예  GenericAvroSerde, SpecificAvroSerde 2개의 Serde를 따로 제공해주고 있다.


어제의 이슈 StreamsBuilder.table은 과연 changelog topic을 만드는가 ?

https://kafka.apache.org/11/javadoc/org/apache/kafka/streams/StreamsBuilder.html#table-java.lang.String- 문서상에 의하면 '쿼리 불가능한 내부 store-name을 만들고, internal changelog topic은 만들어 지지 않는다고 한다.

The resulting KTable will be materialized in a local KeyValueStore with an internal store name. 
Note that store name may not be queriable through Interactive Queries. No internal changelog topic is created since the original input topic can be used for recovery (cf. methods of KGroupedStream and KGroupedTable that return a KTable).


우선 table 메소드를 비교해보자.


- <K,V> KTable<K,V> table​(java.lang.String topic)

- <K,V> KTable<K,V> table​(java.lang.String topic, Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)

둘다 내부에서는 internalStreamsBuilder 을 이용해서 MaterializedInternal 를 생성하고 있다. 만약 내가 명시적으로 Materialized.as 를 해서 인자를 넘긴다면 MaterializedInternal 역시 그 인자를 활용한다.
이 인자는 단순히 MaterializedInternal 를 만들때 사용자의 의도를 반영 할 수 있게 한 것이다.


protected Materialized(final Materialized<K, V, S> materialized) {
this.storeSupplier = materialized.storeSupplier;
this.storeName = materialized.storeName;
this.keySerde = materialized.keySerde;
this.valueSerde = materialized.valueSerde;
this.loggingEnabled = materialized.loggingEnabled;
this.cachingEnabled = materialized.cachingEnabled;
this.topicConfig = materialized.topicConfig;
}

내가 명시적으로 Materialized 를 넘기면 이 생성자를 통해서 몇가지 셋팅 되고, KTable 자체도 쿼리가능한 상태라고기록해 놓는다 ( queryable = true )

만약 위의 인자가 없다면

@Override
public String newStoreName(final String prefix) {
return prefix + String.format(KTableImpl.STATE_STORE_NAME + "%010d", index.getAndIncrement());
}

이러한 메소드를 통해   data.order-STATE-STORE-0000000000 라는 state-store가 생성되었다.


그런데 internal-topic을 안만든다는 javadoc의 문서와 달리 appName-order-STATE-STORE-0000000000-changelog 이라는 토픽이 하나 생겼다. (Configs:cleanup.policy=compact)

이 내용은 지난번에 읽은 https://www.confluent.io/blog/event-sourcing-using-apache-kafka/ 의 fail-over 부분에서 약간 언급이 되는데

That’s why by default persistent state stores are logged: that is, all changes to the store are additionally written to a changelog-topic. This topic is compacted (we only need the latest entry for each ID, without the history of changes, as the history is kept in the events) and hence is as small as possible. Thanks to that, re-creating the store on another node can be much faster.

즉, store는 changelog-topic (compact mode)를 쓴다는 얘기다. 

그럼 StreamsBuilder.table을 쓰면 내부적으로 항상 state-store를 사용하고 그 말은 항상 changelog-topic이 생성된다는 뜻 아닌가? 


kafka javadoc이 말한 changelog-topic이 생기지 않는다라는건  KTable이 compact된 changelog-topic을 안 만든다는 것이지 MaterializedInternal (statestore)의 changelog-topic을 뜻한건 아니지 않았을까?


추가) statestore가 내부적으로 changelog-topic을 쓰는건 확실하다.  대표적인 StateStore인 KeyValue타입의 RocksDBStore를 보면 openDB를 할 때 아래의 코드를 통해서 changelog-topic 이름을 결정해서 StateRestorer , ProcessorStateManager 를 topic을 업데이트 한다.

this.serdes = new StateSerdes<>(
ProcessorStateManager.storeChangelogTopic(context.applicationId(), name),
keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);

this.dbDir = new File(new File(context.stateDir(), parentDir), this.name);
public static String storeChangelogTopic(final String applicationId, 
                                         final String storeName) {
return applicationId + "-" + storeName + STATE_CHANGELOG_TOPIC_SUFFIX;
}



추가2) 팀장님이 아침에 많이 봐주셔서 이유를 찾아냈다.  버그였다-_-; 실제로 0.11 까지는 logConfig를 false로 넘기면서 ChangeLoggingKeyValueBytesStore 를 사용하지 않았었는데 1.0에서 대규모 리팩토링을 거치면서 이슈가 발생하였다.

 https://issues.apache.org/jira/browse/KAFKA-6729 


Spring 4.X 에서 Conditional.... 시리즈의 어노테이션이 추가 되면서 다양한 환경에서 Bean을 등록 할 수 있게 되었다. 현재 작업중인 Event Driven MSA (이름 참 길다...) 의 한 프로젝트에서는  상용과, 개발서버의 Bean이 달라야 하는 니즈가 있다.


@Configuration
@EnableConfigurationProperties(SolProperties.class)
public class SolAutoConfiguration {
private SolProperties solProperties;

@Autowired
public SolAutoConfiguration(SolProperties solProperties) {
this.solProperties = solProperties;
}

private RestTemplate solMailClientOperation() {
}

@Bean
@ConditionalOnMissingBean(MailClient.class)
public MailClient solMailClient() {
}

@Bean
@ConditionalOnProperty(name = "mail.type", havingValue = "fake")
public MailClient fakeMailClient() {
return new FakeMailClient();
}
}

뭐 대충 이런식으로  작성한다면 원하는대로 동작 가능하지만, Spring 5 에서는 좀 더 특별한 방법으로 할 수 있다. 이 내용은 이전 내용인 링크 의 연장 선이다.



@SpringBootApplication
public class EventApplication {
private static final String OA = "mail.type";
private static final String OS = "sms.type";

public static void main(String[] args) {
ApplicationArguments aa = new DefaultApplicationArguments(args);
new SpringApplicationBuilder()
.sources(EventApplication.class)
.initializers(
(ApplicationContextInitializer<GenericApplicationContext>)
context -> {
if (aa.containsOption(OA)
&& aa.getOptionValues(OA).contains("fake")) {
context.registerBean(MailClient.class, FakeMailClient::new);
}
if (aa.containsOption(OS)
&& aa.getOptionValues(OS).contains("fake")) {
context.registerBean(SmsClient.class, FakeSmsClient::new);
}
}).run(args);
}
}

대충 뭐 이런식으로 작성할 수 있다.

실제로 실행 할 때는 java -jar app.jar --sms.type=fake --mail.type=fake  이렇게 된다.

https://spring.io/blog/2018/03/07/testing-auto-configurations-with-spring-boot-2-0 지지난주 spring.io 에  올라온 글이다. 

스프링부트 2.0에  ApplicationContextRunner 가 추가 되었고 이에 대한 사용법을 설명하는 내용인데, 실제로 활용해보니 기존의 static class 로 Spring 설정을 slicing 하는 것보다 가독성도 좋고, 사용도 편하다.


1. SolMailAutoConfiguration 빈을 생성할때 JSR380에 의해 validate한지 확인하는 테스트

new ApplicationContextRunner()
.withConfiguration(of(SolMailAutoConfiguration.class))
.run((context) -> {
assertThat(context.getStartupFailure())
.isNotNull();
assertThat(context.getStartupFailure())
.isInstanceOf(UnsatisfiedDependencyException.class);
});


2. SolMailAutoConfiguration 빈을 생성 할 때 제대로 값이 주입되는지 확인하는 테스트

new ApplicationContextRunner()
.withConfiguration(of(SolMailAutoConfiguration.class))
.withPropertyValues(
"app.sol.mail.connection-timeout:10000",
"app.sol.mail.api-url:http://localhost/api/v1/sol/"
).run((context) -> {
SolMailProperties properties = context.getBean(SolMailProperties.class);
assertThat(properties.getApiUrl()).isEqualTo("http://localhost/api/v1/sol/");
assertThat(properties.getConnectionTimeout()).isEqualTo(10000);
});


3. 커맨드라인이나, 프로퍼티로 값을 받을때 원하는 Bean이 로드 되는지 확인하는 테스트

new ApplicationContextRunner()
.withInitializer(new ConfigFileApplicationContextInitializer())
.withConfiguration(of(FakeMailClient.class, SolMailAutoConfiguration.class))
.withPropertyValues("mail.type=fake")
.run((context) ->
assertThat(context).hasSingleBean(FakeMailClient.class));

legacy 에서는  initializers = ConfigFileApplicationContextInitializer 라고 initializers 를 정의해서 썻지만 이렇게 코드 레벨에서 사용할 수 있다. 


ApplicationContextRunner 를 before로 올려서 DRY를 준수할 수도 있지만, 그렇게 올려서 얻는  중복 코드의 제거보다, 지금 처럼 각각 초기화 하는게 더 나은 가독성과 사용성을 주는 것 같아서, 위처럼 작성함 이게 대한 좋은 글로

https://stackoverflow.com/questions/6453235/what-does-damp-not-dry-mean-when-talking-about-unit-tests

Working Effectively with Unit Tests

분류없음 2018.03.23 14:41 posted by dev.bistro

Working Effectively with Unit Tests #1


https://leanpub.com/wewut 를 읽고 정리한 내용 첫번째 단락


Unit Testing, a First Example


* Replace Loop with individual Test

- https://github.com/bistros/wewut-code#replace-loop-with-individual-tests

- 읽기 쉬운 테스트를 위한 첫 번째 단계는 반복 테스트를 각각의 테스트로 변경하는 것이다.

ex) for-each를 돌면서 여러값을 테스트 하는 것을 제거한다


* Expect Literals

- https://github.com/bistros/wewut-code#expect-literals

그 다음 가독성을 높이는 단계로는 literal 값을 예상하는 것이다. 여전히 테스트는 실패 하겠지만, value를 직접적으로 확인 할 수 있게 된다.


* Inline Setup

- https://github.com/bistros/wewut-code#inline-setup

- 실패한 테스트의 원인을 쉽게 찾을 수 있어야한다. Template Method Pattern을 이용하면, 중복을 약간 줄일 수있는 있지만 다른 장점은 없다. 그리고 오버헤드가 증가할 수도 있다. 이러한 @Before setup에서 일어나는 이러한 부분을 없앤다면, 실패 했을 경우 setup을 안봐도 된다. 즉 code search가 거의 필요없게 되는 것이다. 

코드 중복에 대해서는 밑의 Final Thoughts on our Tests 에서 책 이외의 내용을 좀 더 추가하였다.


* Replace ObjectMother with DataBuilder

- https://github.com/bistros/wewut-code#replace-objectmother-with-databuilder

- 테스트를 위해 mock instance를 만드는 ObjectMother가 제한적일때는 유리하지만, 다른 시나리오(케이스)가 필요할때는 난감하다. 이 시점에서는 ObjectMother style 보다는 new Domain Model Instance 스타일로 변경되는 것이 실용적일 것이다. 대신에 Builder 가 변경이 필요하다면, 사용되는 모든 곳의 코드를 업데이트 쳐야하는 문제가 있는데 그건 다른 방법으로 해결 할 수 있다 (주: 챕터6 TestDataBuilder를 참고하면 된다) 물론 기존의 ObjectMother 보다는 코딩해야 할 내용이 많다. 하지만 좀 더 일반적이고 독립적으로 객체를 만들어 낼 수 있다. 


* Comparing the Results

- https://github.com/bistros/wewut-code#comparing-the-results

- 쉽게 assertion 하게 하라 (좋은 테스트는 나뿐 아니라 미래의 팀원들을 위해서이기도 하다)


* Final Thoughts on our Tests

- https://github.com/bistros/wewut-code#final-thoughts-on-our-tests

- 참고1) DAMP  https://stackoverflow.com/questions/6453235/what-does-damp-not-dry-mean-when-talking-about-unit-tests

프로덕션 레벨과는 달리 유닛 테스트에서는 파일/단일 테스트 내에서만 코드가 적용된다. 이 때문에 중복 코드에 대한 minimal and obvious 다.  또한 이러한 중복을 제거해 버린다면 테스트의 가독성이 떨어진다. 그래서 DRY를 프로덕션에서 선호하고 테스트에는 DAMP를 우선하여 균형을 이뤄야 한다

참고2) http://blog.jayfields.com/2006/05/dry-code-damp-dsls.html

DRY를 DAMP로 바꾸더라도 크게 라인수가 늘어나지는 않음을 보여줬고, 대신 가독성 측면 등에서 가치를 높일 수 있었다.

Should You Put Several Event Types in the Same Kafka Topic?

링크 : https://www.confluent.io/blog/put-several-event-types-kafka-topic/ 에서 필요한 부분만 정리

카프카를 쓸 때 가장 중요한 것 중 하나는 토픽을 어떻게 쓸 것인가이다. 만약 여러 이벤트 묶음이 있다면 한 토픽에 넣을 것인가, 여러 토픽에 넣을 것인가? 극단적으로 하나의 토픽에 넣는것은 좋은 생각이 아니다. 컨슈머가 관심있는 이벤트를 선택해서 소비할 수 있는 방법이 없기 때문이다. 반대로 너무 많은 것도 좋은 건 아니다.

사실 성능관점에서 중요한것은 파티션의 갯수이다. 경험에 비추어 보면 레이턴시가 중요하다면 브로커 노드당 수백의 토픽-파티션을 가질 것이고, 아마 많을 수록 레이턴시 관점에서는 대기 시간이 줄어들 것이다.


Topic = collection of events of the same type?

일반적으로는 같은 타입의 이벤트는 같은 토픽을 사용하고, 다른 타입은 다른 토픽을 사용 하는 것이다.

Confluent 스키마 레지스트리는 이 패턴을 사용한다. topic의 모든 메시지에 대해서 동일한 Avro 스키마를 사용하도록 구너장하기 떄문이다. (옵션을 추가해서) 호환성을 유지할 수는 있지만, 궁극적으로 모든 메시지는 레코드 타입을 준수해야 한다. 일반적으로 이러면 되는데, 이벤트 소싱이나 MSA에서 데이터 교환같은 목적으로 사용 하는 사람들도 있다.

이때에는 topic이 같은 스키마를 가지는것이 덜 중요해질수는 있다 그것보다는 topic-partition 내에서 순서를 보장하는게 더 중요하다.

예를 들어 고객이 주소변경/새신용카드추가/조회/지불/계정파기... 이런것들은 순서가 중요하다. 이 순서는 '같은 파티션'을 사용함으로서 보장받을 수 있고, 고객 ID를 파티션 키로 사용한다면 동일한 topic-partiton에 들어갈 것이다.



Ordering problems

customerCreated, customerAddressChanged, and customerInvoicePaid 를 서로 다른 토픽으로 사용한다면 컨슈머는 순서의 의미없는 이벤트라 볼 것이다. 예를 들어 컨슈머는 존재하지 않는 고객에 대해서 주소 변경 이벤트를 받을 수 있을 것이다. 만약 이렇게 다른 토픽을 사용한다면 이벤트를 시간 동기화 처리를 해서 사용할 수 있겠지만 악몽이다. 


When to split topics, when to combine

몇가지 제안 할 것이다.

1. 가장 중요한것은 '고정된 순서로 머물러야 하는 모든 이벤트는' 동일한 토픽을 사용해야 한다.

2. 한 엔티티가 다른 엔티티에 의존하거나, 함께 자주 변경이 된다면 같은 토픽을 사용할수도 있다. (예를 들어 주소가 고객에게 속하는 것처럼) 한편으로 다른 팀과 관련이 없다거나, 다른 팀에서 관리하는 경우에는 토픽을 분리하라 또 하나의 엔티티 타입이 과도하게 비율이 높다면 별도로 분리하는 것을 고민하라 (다른 이벤트 타입들을 위해서이다)

3. 이벤트에 여러 엔티티가 관련된 경우 (예를 들어 구매-제품-고객) 처음에는 이벤트를 원자 단위로 기록하고, 여러 토픽으로 분할 하지 마라. 가능한 받은 그대로 이벤트를 기록하라 스트림을 이용해서 분리할 수 있지만 어려워질 것이다.

4. 컨슈머를 살펴보고 여러 토픽을 구독한다면 그 토픽들은 합쳐야 될 수도 있다. 그럴 경우 원치 않은 이벤트를 소비할 수도 있지만 그것은 큰 문제가 아니다. 이벤트를 소비하는 것은 매우 싸기 때문에 이벤트의 절반을 무시해도 큰 문제는 아니다. 대부분의 이벤트를 무시할 경우만 (99프로) 분리를 고민해라.

5. 카프카스트림이 사용하는 changelog topic은 다른 토픽에서 분리되어야 한다.


Schema management

정적 스키마가 없는 json으로 인코딩을 한다면, 쉽게 다양한 이벤트 타입을 하나의 토픽에 넣을 수 있다. 하지만 avro 를 사용할 경우 여러 이벤트 타입을 처리하는데 좀 더 많은 작업을 해야한다. 위에 언급한 것처럼 현재의 컨플런트 스키마 레지스트리는 각 토픽마다 하나의 스키마가 있다고 가정한다. 새 버전의 스키마를 등록할 수 있고, 호환되는지 체크한다. 좋은 점은 서로 다른 스키마 버전을 동시에 사용하는 프로듀서/컨슈머를 가질 수 있고, 서호 호환 가능하다는 점이다. 

컨플런트 Avro 시리얼라이저에 key.subject.name.strategy 를 추가하였으니 이걸 좀 더 참고하면 된다.




티스토리 툴바