kafka stream의 fail-over & high-availability

Cloud 2018.01.30 19:55 posted by dev.bistro

18/01/30 idea


카프카 자체의 data recovery는 훌륭하다. 당연한게 파일로 저장하고 심지어 여러 셋트로 저장한다. 유실되는 경우는 저장 기간을 넘기거나, 1개의 IDC에만 설치한 후 IDC가 물에 잠기는 방법 뿐이다. 이건 다 아는 얘기고, 문제는 이 kafka 를 기반으로 한 streams 를 서비스 할 때이다.


1. Kafka는 기본적으로 1 개의 TopicPartion 에서 1 개의 Consumer Group의 노드가 접속 한다.

2. Kafka Topic은 KStream/KTsble/StateStore 등을 통해서 각각의 topic-partition을  local DB에 싱크를 맞춘후 materilized 한다  (비동기/latency 존재)


우선 1번의 문제부터....

1. 해당 TopicPartion의 정보를 단 1개의 VM(instance)만이 가지고 있을 수 있다 (Application Cluster 자체를 active-standby로 구성하지 않는이상)

 이 얘기는 해당 Node가 죽어버리면 '그 순간' 그 데이터들은 VM instnace안에서 존재 하지 않는 다는 것이다. 

 물론 Kafka Broker 내에는 존재하기 때문에 일정 시간의 기다린다면 새로운 Instacne가 해당 데이터를 살릴 수 있다. 하지만 그 '일정 시간의 텀'이 문제이다.

 위에서 얘기한 데로 Application 자체를 standby로 준비하지 않는 이상 힘들다. kafka 역시 standby 로 해결한다.

 링크 : https://kafka.apache.org/10/documentation/streams/developer-guide/config-streams.html#id10

 동일한 셋을 하나 더 구성하지만 그 녀석은 말그대로 standby이다. 실제 서비스에는 투입이 되지 않으며, resource 역시 많이 차지하지만, 장애 시간을 짧게 가져가는데 필수적인 요소이다.

 StreamThread 내에서 task를 active 와 standby로 가지고 있다. 


 이 부분은 ... 계속 파야할 것 같다.



2. Event가 topic으로 전송이 된다면 해당 topic-partition을 materialized view 로 가지고 있는 consuer group 의 instance는 consumer로 붙는다.

   이 때 topic과 state-store의 데이터 불일치는 필연적이다.

   이 때 state-store에서 get을 하는 '재고차감' 같은 Domain은 어떤 식으로 처리 할 것인가?

   A. 모든것을 sync로 한다 - 의미 없으니 pass

   B. instance의 state-store를 ReadyOnlyQueryType으로 사용하는게 아닌 WriteableQueryType으로 사용하여, put도 할 수 있게 한다. 그리고 이 changelog는 

      remote kafka cluster와 연동한다. 

      atmoic을 보장 할 수 있는 방법이고, 서버가 죽었을때도 문제가 되지 않는다. 내부 rocksdb에 데이터는 유지되므로 다시 살리기만 한다면 cluster와 동기화를 할 수 있을테니까..

      문제는 server가 살릴수 없을 때이다. 이 이슈는 바로 데이터 유실로 이어진다.



적당한 방법들은 보이지만, 정답인지 아닌지 확신이 안선다.

      



Spring Cloud Stream 의 버전 선택

Cloud 2018.01.28 21:53 posted by dev.bistro

밑에는 잡담이니 결론 부터

* 결론 Ditmars.SR3 사용,  Spring 4, Boot 1.5.x는 기존대로 유지
* Kafka 는 0.1x 유지
* spring-cloud-stream-reactive & reactor 3.1.X 사용 ( rxjava2 버림 )




* 우리가 사용중인 Spring Cloud Dalston SR5는 Ditmars 버전 매트릭스에 맞지 않다 이전 버전인 Chelsea.SR2를 고려해야한다.

* 하지만 Ditmars.SR3 버전으로 리서치를 진행 할 예정이다. 

* Ditmars.SR3 은 spring-kafka 1.1.7 이므로 @EnableStream 사용등은 커스텀마이징을 해야 하기 때문에 추천버전은 Kafka 0.10을 그대로 사용 해서 진행 한다


Ditmars.SR3 의존성 정보

Spring Cloud Stream Core 1.3.2.RELEASE
Spring Cloud Stream Binder Kafka 1.3.2.RELEASE
Spring Boot : Spring Boot 1.5.7 or a later 1.5.x release. It is not compatible with Spring Boot 2.x.
Spring Cloud : Spring Cloud Edgware.SR1


라이브 러리 버전 상세 정보

- 링크 : https://github.com/spring-cloud/spring-cloud-stream-starters/blob/vDitmars.SR3/spring-cloud-stream-dependencies/pom.xml

- 링크 :  https://github.com/spring-cloud/spring-cloud-stream/blob/v1.3.2.RELEASE/pom.xml

- reactor : 2.0.8.RELEASE (default)

- rxjava : 1.1.10 (optional 이므로 spring-cloud-stream-rxjava 를 추가)



spring-cloud-stream-reactive


* reference : https://docs.spring.io/spring-cloud-stream/docs/Ditmars.SR3/reference/htmlsingle/#_reactive_programming_support

* spring-cloud-stream 에 reactive API를 사용하기 위한 지원용 라이브러리이다.

* 기본은 reactor 이며, ConditionalOnClass 에 의해 rxjava 를 지원하지만 가장 최신 버전까지도 rxjava 1.x 만을 지원한다.

* 추가로 @StreamEmitter 어노테이션이 추가된다. 

* ReactiveSupportAutoConfiguration 가 존재하며, rx.Observable 클래스를 확인 한후 rxjava 를 지원한다

* 아래의 Adapter 들을 Spring bean으로 등록시켜서 Observable type을 처리해준다.

  - MessageChannelToInputObservableParameterAdapter

  - MessageChannelToObservableSenderParameterAdapter

  - ObservableToMessageChannelResultAdapter

* rxjava를 굳이 쓰겠지만 rxjava1을 쓰는게 속편하다, 하지만 rxjava2 를 현재 쓰고 있는데 down-grade할 정도로 필요성은 있어 보이지 않고

그냥 이번기회에 (어차피 옮길거) reactor를 써보기로 함

Kafka 기반의 event driven stateful microservices

Cloud 2018.01.28 12:58 posted by dev.bistro


18/01/26 idea


구현해야 할 도메인은 https://www.confluent.io/blog/building-a-microservices-ecosystem-with-kafka-streams-and-ksql/

필요한 내용은

- 정확한 request/ responses
- scale out
- high availability
- ktable & state-store를 기반으로 한 local - storage의 사용

블로그 글과 예제에 대해서는 100퍼센트 공감이 되지만, 몇몇 부분에서는 '구현' 자체가 고민스러움.
나 뿐 아니라 다른 이의 구현을 살펴보아도 동일하다.  아래의 github 개발자도 위의 블로깅 내용을 보고 example project를 작성하였지만 같은 문제점을 그대로 노출하고 있다..  

- https://github.com/hpgrahsl/kafka-streams-emojitracker
- 사용 : confluent connect, stream, statestore, reactive(boot2.0)
- 특징 : 트랜잭션 사용안함(at_least_once) / 

- 데이터 서칭

더보기

이 분은 그리고 request/reply 모델이 아니다.  그래서 아래와 같은 부분에 대해서는 구현이 빠져있다.
https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/main/java/io/confluent/examples/streams/microservices/OrdersService.java#L232 

reply에 대한 고민은 
- 채번을 하고, 해당 Order Id의 reply를 consumer로 받을 수 있는 Node으로 요청을 보낸다.
  이슈) zuul, client load loadbalancer가 주키퍼를 기반으로 브로커 파티션/파티셔너 정보를 알고 있어야 한다.
- zuul 에서 기존 routing 정책에 의해 요청을 보낸후, 실제 Node에서 '응답을 받을 수 있는' 채번을 한다.
  이슈) Range(0,PartionNum).boxed().filter(local-state::matchingkey).findFirst();
     그럼 아마 ID가 1,7,11 이런식으로 생기겠지. (중간에 사용못하는 애들 생김)

- 또는... 정말 옛날로 돌아간다. (생각도 안해봤지만, 팀원분께서 차라리 이럴꺼면 2번 호출해라고)
  ex) 채번 자체는 Node 에서 하고, 이벤트를 발생한다. 그리고 그 노드가 '응답'을 받는 다는 보장이 없으므로 Mono로 FT로 바로 리턴, FT는 응답을 바로 받고 'OrderId'를 가지고 다시 한번 zuul 영역을 호출한다. -_-;

- 또는 제일 심플하게 각각의 Node를 Consumer Group으로 사용하지 않는다!
  이슈) 트래픽이 WAS 만큼 '배수'로 든다. 즉 기존에 WAS 50대로 10MB/SEC 트래픽이었다면 이렇게 하면 500MB/SEC -_-;;;



티스토리 툴바