Should You Put Several Event Types in the Same Kafka Topic?

링크 : https://www.confluent.io/blog/put-several-event-types-kafka-topic/ 에서 필요한 부분만 정리

카프카를 쓸 때 가장 중요한 것 중 하나는 토픽을 어떻게 쓸 것인가이다. 만약 여러 이벤트 묶음이 있다면 한 토픽에 넣을 것인가, 여러 토픽에 넣을 것인가? 극단적으로 하나의 토픽에 넣는것은 좋은 생각이 아니다. 컨슈머가 관심있는 이벤트를 선택해서 소비할 수 있는 방법이 없기 때문이다. 반대로 너무 많은 것도 좋은 건 아니다.

사실 성능관점에서 중요한것은 파티션의 갯수이다. 경험에 비추어 보면 레이턴시가 중요하다면 브로커 노드당 수백의 토픽-파티션을 가질 것이고, 아마 많을 수록 레이턴시 관점에서는 대기 시간이 줄어들 것이다.


Topic = collection of events of the same type?

일반적으로는 같은 타입의 이벤트는 같은 토픽을 사용하고, 다른 타입은 다른 토픽을 사용 하는 것이다.

Confluent 스키마 레지스트리는 이 패턴을 사용한다. topic의 모든 메시지에 대해서 동일한 Avro 스키마를 사용하도록 구너장하기 떄문이다. (옵션을 추가해서) 호환성을 유지할 수는 있지만, 궁극적으로 모든 메시지는 레코드 타입을 준수해야 한다. 일반적으로 이러면 되는데, 이벤트 소싱이나 MSA에서 데이터 교환같은 목적으로 사용 하는 사람들도 있다.

이때에는 topic이 같은 스키마를 가지는것이 덜 중요해질수는 있다 그것보다는 topic-partition 내에서 순서를 보장하는게 더 중요하다.

예를 들어 고객이 주소변경/새신용카드추가/조회/지불/계정파기... 이런것들은 순서가 중요하다. 이 순서는 '같은 파티션'을 사용함으로서 보장받을 수 있고, 고객 ID를 파티션 키로 사용한다면 동일한 topic-partiton에 들어갈 것이다.



Ordering problems

customerCreated, customerAddressChanged, and customerInvoicePaid 를 서로 다른 토픽으로 사용한다면 컨슈머는 순서의 의미없는 이벤트라 볼 것이다. 예를 들어 컨슈머는 존재하지 않는 고객에 대해서 주소 변경 이벤트를 받을 수 있을 것이다. 만약 이렇게 다른 토픽을 사용한다면 이벤트를 시간 동기화 처리를 해서 사용할 수 있겠지만 악몽이다. 


When to split topics, when to combine

몇가지 제안 할 것이다.

1. 가장 중요한것은 '고정된 순서로 머물러야 하는 모든 이벤트는' 동일한 토픽을 사용해야 한다.

2. 한 엔티티가 다른 엔티티에 의존하거나, 함께 자주 변경이 된다면 같은 토픽을 사용할수도 있다. (예를 들어 주소가 고객에게 속하는 것처럼) 한편으로 다른 팀과 관련이 없다거나, 다른 팀에서 관리하는 경우에는 토픽을 분리하라 또 하나의 엔티티 타입이 과도하게 비율이 높다면 별도로 분리하는 것을 고민하라 (다른 이벤트 타입들을 위해서이다)

3. 이벤트에 여러 엔티티가 관련된 경우 (예를 들어 구매-제품-고객) 처음에는 이벤트를 원자 단위로 기록하고, 여러 토픽으로 분할 하지 마라. 가능한 받은 그대로 이벤트를 기록하라 스트림을 이용해서 분리할 수 있지만 어려워질 것이다.

4. 컨슈머를 살펴보고 여러 토픽을 구독한다면 그 토픽들은 합쳐야 될 수도 있다. 그럴 경우 원치 않은 이벤트를 소비할 수도 있지만 그것은 큰 문제가 아니다. 이벤트를 소비하는 것은 매우 싸기 때문에 이벤트의 절반을 무시해도 큰 문제는 아니다. 대부분의 이벤트를 무시할 경우만 (99프로) 분리를 고민해라.

5. 카프카스트림이 사용하는 changelog topic은 다른 토픽에서 분리되어야 한다.


Schema management

정적 스키마가 없는 json으로 인코딩을 한다면, 쉽게 다양한 이벤트 타입을 하나의 토픽에 넣을 수 있다. 하지만 avro 를 사용할 경우 여러 이벤트 타입을 처리하는데 좀 더 많은 작업을 해야한다. 위에 언급한 것처럼 현재의 컨플런트 스키마 레지스트리는 각 토픽마다 하나의 스키마가 있다고 가정한다. 새 버전의 스키마를 등록할 수 있고, 호환되는지 체크한다. 좋은 점은 서로 다른 스키마 버전을 동시에 사용하는 프로듀서/컨슈머를 가질 수 있고, 서호 호환 가능하다는 점이다. 

컨플런트 Avro 시리얼라이저에 key.subject.name.strategy 를 추가하였으니 이걸 좀 더 참고하면 된다.


Event Sourcing Using Apache Kafka

분류없음 2018.03.22 09:37 posted by dev.bistro

https://www.confluent.io/blog/event-sourcing-using-apache-kafka/ 를 읽고 대충 필요한 것만 적음.

추가적으로 알게된 내용보다는 다시 한번 지식을 되새겨볼 수 있던 기회.


Storing events in Kafka


첫 번째 문제는 이벤트를 어떻게 저장하는가이다. 3개의 방법으로 얘기할 수 있다.

1. 모든 타입의 모든 이벤트를 하나의 토픽에 저장하는 방법 (물론 멀티 파티션)

2. Topic-per-entity-type : entity별로 관련된 이벤트들을 분리된 토픽에 저장하는 방법 (예를 들어 user관련 토픽, product관련 토픽)

3. topic-per-entity : 각각의 user, product 처럼 각 entity별로 별도의 토픽을 할당해서 저장하는 방법

low-cardinality entity를 제외하고는 3번째 전략은 실현 가능한 방법은 아니고 1,2번중 선택해야 한다.

1번은 모든 이벤트를 볼 수 있는 장점이 있고 2번은 entity type별로 분리하고 scale 할 수 있다.  아래으  내용은 대부분 1번을 기반으로 얘기를 이어나간다. 2번에 대해 더 알고 싶다면 https://www.confluent.io/blog/put-several-event-types-kafka-topic/ 를 참고하면 된다.


Basic event-sourcing storage operations

데이터 저장소에서 기대하는 가장 기본적인 명령은 '특정 엔티티의 현재 상태를 얻는 것'이다. 일반적으로 각 엔티티는 ID가 있고, ID가 주어지면 저장 시스템은 현재 상태를 반환해 줘야한다.

이벤트 로그는 primary source of truth 이고, '현재 상태'는 엔티티의 이벤트 스트림에서 파생 될 수 있다. 이를 위해서 스토리지 엔진은 Event => State => State를 반환하는 순수 함수가 필요하다.

카프카에서 'read current state'를 native하게 구현해본다면 카프카는 토픽의 모든 이벤트를 필터링 하고, 주어진 function으로 부터 상태를 구해야한다. 만약 많은 이벤트가 있다면 느리고 자원을 많이 소모할 것이다. 그리고 장애나, 캐시 클리어 등으로 인해서 다시 해야할 수도 있다.

그래서 더 좋은 방법이 필요한다. KafkaStreams와 StateStore를 이용하는 것이다. 

Kafka Streams는 고수준의 오퍼레이션을 제공하는데 그 중 하나는 스트림을 localstate로 folding하는 기능이다.

각 local store는 노드에서 사용하는 파티션의 데이터만 포함된다. 기본적으로 RocksDB(디폴트), inMemoryStore 2개의 구현체가 있다. 둘 다 fault-tolerant 이고, in-memory를 사용하더라도 데이터가 손실되지 않는다(주: local store topic을 현실화 하는 것이기 때문이다)

이렇게 이벤트 소싱에서 이벤트 스트림을 state store로 변경해서 각 엔티티의 '현재 상태'를 구할 수 있다. 


Looking up the current state

같은 노드에 있는 데이터를 쿼리하는 건 쉽다. 하지만 다른 노드에 있는 데이터를 쿼리하려면 kafka의 메타 데이터에 대해 먼저 질의한 다음 특정 ID에 대해 처리를 담당하는 노드를 찾을 수 있다. ( streamsMetadataForStoreAndKey ) 그런 다음 rest, akka-remoting 등의 방법으로 해당 노드로 요청을 하면 된다. 


Fail-over

State Store가 좋지만, 노드가 장애라면? 새로 만든다는 것은 비용이 많이 드는 작업이다.  KafkaStreams는 리밸런싱(노드의 추가/삭제) 때문에 레이턴시가 증가하거나, 요청 자체가 실패할 수 있다.  

이 이유 때문에 state-store가  changelog-topic에 로깅된다. 이 topic은 모든 이벤트가 필요없고 최신분만 필요하므로 compacted(압축) 되어 있다. 덕분에 다른 노드에서 다시 store를 구축하는게 빠르다. 하지만 여전히 리밸런싱에 대한 latency는 증가될 수 있다.  이에 대한 해결방법으로 standby 를 제공한다. primary 가 실패한다면 바로 standby 로 장애를 처리한다.


Consistency

카프카는 기본은 at-least-once 정책이다. exactly-once 정책도 지원하지만 topic을 읽고/쓰기 할 때에만 보장해준다. processing.guarantee 옵션을 켜서 exactly-once를 사용 할 수 있으며, 성능은 저하되지만 batch message의 집합이 한번의 커밋에 되므로 큰 비용은 아니다.


Listening for events

지금 까지 'current state'를 업데이트 하고 질의하는 걸 알아봤고, 이제 사이드 이펙트에 대해서 알아 볼 차례이다( 예를 들어 rest 등의 외부 호출이네 이메일 보내기)

이러한 작업들은 blocking이 일어나며 I/O를 발생하므로 state-update의 로직에 포함되는건 종은 생각이 아니다. 그리고 main 스레드에서 처리를 하는것은 성능상의 병목을 일으킨다. 그리고 state-update 자체가 여러번 실행 될 수 있다.

다행히 카프카 토픽을 처리할 많은 유연성을 가질 수 있다. state를 업데이트 전후에 메시지를 소비할 수 있고, 이 메시지 소비 전후에 원하는 방식으로 사용 할 수 있다.




Kafka Header

분류없음 2018.03.19 15:51 posted by dev.bistro

카프카의 Header 지원

- 0.11 버전이후 user(custom) header를 지원하기 시작하였고 아래 위키와 이슈를 참고

- https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers

- https://issues.apache.org/jira/browse/KAFKA-4208


실제로 저장되는 형식과 구현체는 아래를 참고

- https://kafka.apache.org/documentation/#record

- https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java#L175



Producer 코드를 따라가며 Key, Header가 어떻게 저장이 되는거 확인 

- Producer는 2개의 구현체가 있지만 MockProducer는 테스트 용이기 때문에 KafkaProducer만 확인하면 된다.

- 실제로 KafkaProducer에서 doSend를 보면 인자로 받는 ProducerRecord 안에 headers 정보가 있다. 

- 이 정보를 이용하여 byte[] serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key()); '시리얼라이징 된 객체 데이터'를 생성한다. 하지만 기본 제공 하는 Key Serializer인 ExtendedSerializer은 Header를  무시한채 Key Data만을 가지고 시리얼라이징 한다. 




티스토리 툴바