Should You Put Several Event Types in the Same Kafka Topic?

링크 : https://www.confluent.io/blog/put-several-event-types-kafka-topic/ 에서 필요한 부분만 정리

카프카를 쓸 때 가장 중요한 것 중 하나는 토픽을 어떻게 쓸 것인가이다. 만약 여러 이벤트 묶음이 있다면 한 토픽에 넣을 것인가, 여러 토픽에 넣을 것인가? 극단적으로 하나의 토픽에 넣는것은 좋은 생각이 아니다. 컨슈머가 관심있는 이벤트를 선택해서 소비할 수 있는 방법이 없기 때문이다. 반대로 너무 많은 것도 좋은 건 아니다.

사실 성능관점에서 중요한것은 파티션의 갯수이다. 경험에 비추어 보면 레이턴시가 중요하다면 브로커 노드당 수백의 토픽-파티션을 가질 것이고, 아마 많을 수록 레이턴시 관점에서는 대기 시간이 줄어들 것이다.


Topic = collection of events of the same type?

일반적으로는 같은 타입의 이벤트는 같은 토픽을 사용하고, 다른 타입은 다른 토픽을 사용 하는 것이다.

Confluent 스키마 레지스트리는 이 패턴을 사용한다. topic의 모든 메시지에 대해서 동일한 Avro 스키마를 사용하도록 구너장하기 떄문이다. (옵션을 추가해서) 호환성을 유지할 수는 있지만, 궁극적으로 모든 메시지는 레코드 타입을 준수해야 한다. 일반적으로 이러면 되는데, 이벤트 소싱이나 MSA에서 데이터 교환같은 목적으로 사용 하는 사람들도 있다.

이때에는 topic이 같은 스키마를 가지는것이 덜 중요해질수는 있다 그것보다는 topic-partition 내에서 순서를 보장하는게 더 중요하다.

예를 들어 고객이 주소변경/새신용카드추가/조회/지불/계정파기... 이런것들은 순서가 중요하다. 이 순서는 '같은 파티션'을 사용함으로서 보장받을 수 있고, 고객 ID를 파티션 키로 사용한다면 동일한 topic-partiton에 들어갈 것이다.



Ordering problems

customerCreated, customerAddressChanged, and customerInvoicePaid 를 서로 다른 토픽으로 사용한다면 컨슈머는 순서의 의미없는 이벤트라 볼 것이다. 예를 들어 컨슈머는 존재하지 않는 고객에 대해서 주소 변경 이벤트를 받을 수 있을 것이다. 만약 이렇게 다른 토픽을 사용한다면 이벤트를 시간 동기화 처리를 해서 사용할 수 있겠지만 악몽이다. 


When to split topics, when to combine

몇가지 제안 할 것이다.

1. 가장 중요한것은 '고정된 순서로 머물러야 하는 모든 이벤트는' 동일한 토픽을 사용해야 한다.

2. 한 엔티티가 다른 엔티티에 의존하거나, 함께 자주 변경이 된다면 같은 토픽을 사용할수도 있다. (예를 들어 주소가 고객에게 속하는 것처럼) 한편으로 다른 팀과 관련이 없다거나, 다른 팀에서 관리하는 경우에는 토픽을 분리하라 또 하나의 엔티티 타입이 과도하게 비율이 높다면 별도로 분리하는 것을 고민하라 (다른 이벤트 타입들을 위해서이다)

3. 이벤트에 여러 엔티티가 관련된 경우 (예를 들어 구매-제품-고객) 처음에는 이벤트를 원자 단위로 기록하고, 여러 토픽으로 분할 하지 마라. 가능한 받은 그대로 이벤트를 기록하라 스트림을 이용해서 분리할 수 있지만 어려워질 것이다.

4. 컨슈머를 살펴보고 여러 토픽을 구독한다면 그 토픽들은 합쳐야 될 수도 있다. 그럴 경우 원치 않은 이벤트를 소비할 수도 있지만 그것은 큰 문제가 아니다. 이벤트를 소비하는 것은 매우 싸기 때문에 이벤트의 절반을 무시해도 큰 문제는 아니다. 대부분의 이벤트를 무시할 경우만 (99프로) 분리를 고민해라.

5. 카프카스트림이 사용하는 changelog topic은 다른 토픽에서 분리되어야 한다.


Schema management

정적 스키마가 없는 json으로 인코딩을 한다면, 쉽게 다양한 이벤트 타입을 하나의 토픽에 넣을 수 있다. 하지만 avro 를 사용할 경우 여러 이벤트 타입을 처리하는데 좀 더 많은 작업을 해야한다. 위에 언급한 것처럼 현재의 컨플런트 스키마 레지스트리는 각 토픽마다 하나의 스키마가 있다고 가정한다. 새 버전의 스키마를 등록할 수 있고, 호환되는지 체크한다. 좋은 점은 서로 다른 스키마 버전을 동시에 사용하는 프로듀서/컨슈머를 가질 수 있고, 서호 호환 가능하다는 점이다. 

컨플런트 Avro 시리얼라이저에 key.subject.name.strategy 를 추가하였으니 이걸 좀 더 참고하면 된다.


Event Sourcing Using Apache Kafka

분류없음 2018.03.22 09:37 posted by dev.bistro

https://www.confluent.io/blog/event-sourcing-using-apache-kafka/ 를 읽고 대충 필요한 것만 적음.

추가적으로 알게된 내용보다는 다시 한번 지식을 되새겨볼 수 있던 기회.


Storing events in Kafka


첫 번째 문제는 이벤트를 어떻게 저장하는가이다. 3개의 방법으로 얘기할 수 있다.

1. 모든 타입의 모든 이벤트를 하나의 토픽에 저장하는 방법 (물론 멀티 파티션)

2. Topic-per-entity-type : entity별로 관련된 이벤트들을 분리된 토픽에 저장하는 방법 (예를 들어 user관련 토픽, product관련 토픽)

3. topic-per-entity : 각각의 user, product 처럼 각 entity별로 별도의 토픽을 할당해서 저장하는 방법

low-cardinality entity를 제외하고는 3번째 전략은 실현 가능한 방법은 아니고 1,2번중 선택해야 한다.

1번은 모든 이벤트를 볼 수 있는 장점이 있고 2번은 entity type별로 분리하고 scale 할 수 있다.  아래으  내용은 대부분 1번을 기반으로 얘기를 이어나간다. 2번에 대해 더 알고 싶다면 https://www.confluent.io/blog/put-several-event-types-kafka-topic/ 를 참고하면 된다.


Basic event-sourcing storage operations

데이터 저장소에서 기대하는 가장 기본적인 명령은 '특정 엔티티의 현재 상태를 얻는 것'이다. 일반적으로 각 엔티티는 ID가 있고, ID가 주어지면 저장 시스템은 현재 상태를 반환해 줘야한다.

이벤트 로그는 primary source of truth 이고, '현재 상태'는 엔티티의 이벤트 스트림에서 파생 될 수 있다. 이를 위해서 스토리지 엔진은 Event => State => State를 반환하는 순수 함수가 필요하다.

카프카에서 'read current state'를 native하게 구현해본다면 카프카는 토픽의 모든 이벤트를 필터링 하고, 주어진 function으로 부터 상태를 구해야한다. 만약 많은 이벤트가 있다면 느리고 자원을 많이 소모할 것이다. 그리고 장애나, 캐시 클리어 등으로 인해서 다시 해야할 수도 있다.

그래서 더 좋은 방법이 필요한다. KafkaStreams와 StateStore를 이용하는 것이다. 

Kafka Streams는 고수준의 오퍼레이션을 제공하는데 그 중 하나는 스트림을 localstate로 folding하는 기능이다.

각 local store는 노드에서 사용하는 파티션의 데이터만 포함된다. 기본적으로 RocksDB(디폴트), inMemoryStore 2개의 구현체가 있다. 둘 다 fault-tolerant 이고, in-memory를 사용하더라도 데이터가 손실되지 않는다(주: local store topic을 현실화 하는 것이기 때문이다)

이렇게 이벤트 소싱에서 이벤트 스트림을 state store로 변경해서 각 엔티티의 '현재 상태'를 구할 수 있다. 


Looking up the current state

같은 노드에 있는 데이터를 쿼리하는 건 쉽다. 하지만 다른 노드에 있는 데이터를 쿼리하려면 kafka의 메타 데이터에 대해 먼저 질의한 다음 특정 ID에 대해 처리를 담당하는 노드를 찾을 수 있다. ( streamsMetadataForStoreAndKey ) 그런 다음 rest, akka-remoting 등의 방법으로 해당 노드로 요청을 하면 된다. 


Fail-over

State Store가 좋지만, 노드가 장애라면? 새로 만든다는 것은 비용이 많이 드는 작업이다.  KafkaStreams는 리밸런싱(노드의 추가/삭제) 때문에 레이턴시가 증가하거나, 요청 자체가 실패할 수 있다.  

이 이유 때문에 state-store가  changelog-topic에 로깅된다. 이 topic은 모든 이벤트가 필요없고 최신분만 필요하므로 compacted(압축) 되어 있다. 덕분에 다른 노드에서 다시 store를 구축하는게 빠르다. 하지만 여전히 리밸런싱에 대한 latency는 증가될 수 있다.  이에 대한 해결방법으로 standby 를 제공한다. primary 가 실패한다면 바로 standby 로 장애를 처리한다.


Consistency

카프카는 기본은 at-least-once 정책이다. exactly-once 정책도 지원하지만 topic을 읽고/쓰기 할 때에만 보장해준다. processing.guarantee 옵션을 켜서 exactly-once를 사용 할 수 있으며, 성능은 저하되지만 batch message의 집합이 한번의 커밋에 되므로 큰 비용은 아니다.


Listening for events

지금 까지 'current state'를 업데이트 하고 질의하는 걸 알아봤고, 이제 사이드 이펙트에 대해서 알아 볼 차례이다( 예를 들어 rest 등의 외부 호출이네 이메일 보내기)

이러한 작업들은 blocking이 일어나며 I/O를 발생하므로 state-update의 로직에 포함되는건 종은 생각이 아니다. 그리고 main 스레드에서 처리를 하는것은 성능상의 병목을 일으킨다. 그리고 state-update 자체가 여러번 실행 될 수 있다.

다행히 카프카 토픽을 처리할 많은 유연성을 가질 수 있다. state를 업데이트 전후에 메시지를 소비할 수 있고, 이 메시지 소비 전후에 원하는 방식으로 사용 할 수 있다.




spring-kafka의 KafkaListener

분류없음 2018.03.11 22:28 posted by dev.bistro

KafkaListener annotation

토픽의 메시지를 소비할 메소드에 붙이는 어노테이션이다. 이 어노테이션을 붙인다면, KafkaListenerContainerFactory에 의해서 MessageListenerContainer가 기동된다. 현재로서는 ConcurrentKafkaListenerContainerFactory 클래스만 존재 하니까, 기본적으로 KafkaListener을 붙인다는 것은 ConcurrentMessageListenerContainer 리스너 컨테이너를 사용한다는 의미이다.

그리고 @ConditionalOnClass(EnableKafka.class) 에 의해서 KafkaAnnotationDrivenConfiguration 설정 클래스가 로드 되는대, 여기서 "kafkaListenerContainerFactory" 이라는 bean을 가지는ConcurrentKafkaListenerContainerFactory 가 생성이 된다. (즉 spring-kafka만 디펜던시에 추가된다면, 이 클래스가 발생된다)

 @KafkaListener 에 'containerFactory'를 지정 할 수 없지만, 기본적으로 위에서 언급한 'kafkaListenerContainerFactory' 이름을 가지는 컨테이너 팩토리를 사용한다.  이 메소드의 인자로는 @Payload, @Header, @Headers, MessageHeaders등등을 사용하여 메시지를 수신할 수 있다.  


 메소드에 정의된다면 각 메소드별로 '리스너컨테이너'가 만들어진다. - (컨테이너 팩토리가 아니다!)  만약 1개의 컨테이너 팩토리에서 2개의 @KafkaListener을 사용한다면  각각의 beanName은 org.springframework.kafka.KafkaListenerEndpointContainer#1, #2 이다.  만약 동일한 팩토리와 topic을 쓴다면 setConcurrency=2 의 효과만 가질 것이다.

클래스에 정의된다면 하나의 메시지 컨테이너가! @KafkaHandler가 있는 모든 메소드에 적용한다. 모두 실행 한다는 의미가 아닌, payload타입에 따라 적절하게 메소드를 실행해준다는 뜻이다.  @KafkaHandler가 붙은 메소드는 InvocableHandlerMethod 클래스에 감싸지고, DelegatingInvocableHandler 에 List 형으로 들어가서  DelegatingInvocableHandler.findHandlerForPayload 의해서 실행되어 진다.    payload 매칭이 2개 이상이 된다면 'Ambiguous methods...' throw를 발생시킨다.