일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
- kafkastream
- kafka streams
- Spring
- Kafka
- scala
- kafkastreams
- spring-kafka
- 카프카
- confluent
- 플레이 프레임워크
- gradle
- Elk
- Elasticsearch
- spring-batch
- scala 2.10
- enablekafkastreams
- play framework
- reactive
- RabbitMQ
- Slick
- schema registry
- kafka interactive query
- avo
- coursera
- 한빛미디어
- aws
- springboot
- statestore
- Logstash
- spring-cloud-stream
- Today
- Total
b
Event Sourcing Using Apache Kafka 본문
https://www.confluent.io/blog/event-sourcing-using-apache-kafka/ 를 읽고 대충 필요한 것만 적음.
추가적으로 알게된 내용보다는 다시 한번 지식을 되새겨볼 수 있던 기회.
Storing events in Kafka
첫 번째 문제는 이벤트를 어떻게 저장하는가이다. 3개의 방법으로 얘기할 수 있다.
1. 모든 타입의 모든 이벤트를 하나의 토픽에 저장하는 방법 (물론 멀티 파티션)
2. Topic-per-entity-type : entity별로 관련된 이벤트들을 분리된 토픽에 저장하는 방법 (예를 들어 user관련 토픽, product관련 토픽)
3. topic-per-entity : 각각의 user, product 처럼 각 entity별로 별도의 토픽을 할당해서 저장하는 방법
low-cardinality entity를 제외하고는 3번째 전략은 실현 가능한 방법은 아니고 1,2번중 선택해야 한다.
1번은 모든 이벤트를 볼 수 있는 장점이 있고 2번은 entity type별로 분리하고 scale 할 수 있다. 아래으 내용은 대부분 1번을 기반으로 얘기를 이어나간다. 2번에 대해 더 알고 싶다면 https://www.confluent.io/blog/put-several-event-types-kafka-topic/ 를 참고하면 된다.
Basic event-sourcing storage operations
데이터 저장소에서 기대하는 가장 기본적인 명령은 '특정 엔티티의 현재 상태를 얻는 것'이다. 일반적으로 각 엔티티는 ID가 있고, ID가 주어지면 저장 시스템은 현재 상태를 반환해 줘야한다.
이벤트 로그는 primary source of truth 이고, '현재 상태'는 엔티티의 이벤트 스트림에서 파생 될 수 있다. 이를 위해서 스토리지 엔진은 Event => State => State를 반환하는 순수 함수가 필요하다.
카프카에서 'read current state'를 native하게 구현해본다면 카프카는 토픽의 모든 이벤트를 필터링 하고, 주어진 function으로 부터 상태를 구해야한다. 만약 많은 이벤트가 있다면 느리고 자원을 많이 소모할 것이다. 그리고 장애나, 캐시 클리어 등으로 인해서 다시 해야할 수도 있다.
그래서 더 좋은 방법이 필요한다. KafkaStreams와 StateStore를 이용하는 것이다.
Kafka Streams는 고수준의 오퍼레이션을 제공하는데 그 중 하나는 스트림을 localstate로 folding하는 기능이다.
각 local store는 노드에서 사용하는 파티션의 데이터만 포함된다. 기본적으로 RocksDB(디폴트), inMemoryStore 2개의 구현체가 있다. 둘 다 fault-tolerant 이고, in-memory를 사용하더라도 데이터가 손실되지 않는다(주: local store topic을 현실화 하는 것이기 때문이다)
이렇게 이벤트 소싱에서 이벤트 스트림을 state store로 변경해서 각 엔티티의 '현재 상태'를 구할 수 있다.
Looking up the current state
같은 노드에 있는 데이터를 쿼리하는 건 쉽다. 하지만 다른 노드에 있는 데이터를 쿼리하려면 kafka의 메타 데이터에 대해 먼저 질의한 다음 특정 ID에 대해 처리를 담당하는 노드를 찾을 수 있다. ( streamsMetadataForStoreAndKey ) 그런 다음 rest, akka-remoting 등의 방법으로 해당 노드로 요청을 하면 된다.
Fail-over
State Store가 좋지만, 노드가 장애라면? 새로 만든다는 것은 비용이 많이 드는 작업이다. KafkaStreams는 리밸런싱(노드의 추가/삭제) 때문에 레이턴시가 증가하거나, 요청 자체가 실패할 수 있다.
이 이유 때문에 state-store가 changelog-topic에 로깅된다. 이 topic은 모든 이벤트가 필요없고 최신분만 필요하므로 compacted(압축) 되어 있다. 덕분에 다른 노드에서 다시 store를 구축하는게 빠르다. 하지만 여전히 리밸런싱에 대한 latency는 증가될 수 있다. 이에 대한 해결방법으로 standby 를 제공한다. primary 가 실패한다면 바로 standby 로 장애를 처리한다.
Consistency
카프카는 기본은 at-least-once 정책이다. exactly-once 정책도 지원하지만 topic을 읽고/쓰기 할 때에만 보장해준다. processing.guarantee 옵션을 켜서 exactly-once를 사용 할 수 있으며, 성능은 저하되지만 batch message의 집합이 한번의 커밋에 되므로 큰 비용은 아니다.
Listening for events
지금 까지 'current state'를 업데이트 하고 질의하는 걸 알아봤고, 이제 사이드 이펙트에 대해서 알아 볼 차례이다( 예를 들어 rest 등의 외부 호출이네 이메일 보내기)
이러한 작업들은 blocking이 일어나며 I/O를 발생하므로 state-update의 로직에 포함되는건 종은 생각이 아니다. 그리고 main 스레드에서 처리를 하는것은 성능상의 병목을 일으킨다. 그리고 state-update 자체가 여러번 실행 될 수 있다.
다행히 카프카 토픽을 처리할 많은 유연성을 가질 수 있다. state를 업데이트 전후에 메시지를 소비할 수 있고, 이 메시지 소비 전후에 원하는 방식으로 사용 할 수 있다.