분류없음2018.03.22 09:37

https://www.confluent.io/blog/event-sourcing-using-apache-kafka/ 를 읽고 대충 필요한 것만 적음.

추가적으로 알게된 내용보다는 다시 한번 지식을 되새겨볼 수 있던 기회.


Storing events in Kafka


첫 번째 문제는 이벤트를 어떻게 저장하는가이다. 3개의 방법으로 얘기할 수 있다.

1. 모든 타입의 모든 이벤트를 하나의 토픽에 저장하는 방법 (물론 멀티 파티션)

2. Topic-per-entity-type : entity별로 관련된 이벤트들을 분리된 토픽에 저장하는 방법 (예를 들어 user관련 토픽, product관련 토픽)

3. topic-per-entity : 각각의 user, product 처럼 각 entity별로 별도의 토픽을 할당해서 저장하는 방법

low-cardinality entity를 제외하고는 3번째 전략은 실현 가능한 방법은 아니고 1,2번중 선택해야 한다.

1번은 모든 이벤트를 볼 수 있는 장점이 있고 2번은 entity type별로 분리하고 scale 할 수 있다.  아래으  내용은 대부분 1번을 기반으로 얘기를 이어나간다. 2번에 대해 더 알고 싶다면 https://www.confluent.io/blog/put-several-event-types-kafka-topic/ 를 참고하면 된다.


Basic event-sourcing storage operations

데이터 저장소에서 기대하는 가장 기본적인 명령은 '특정 엔티티의 현재 상태를 얻는 것'이다. 일반적으로 각 엔티티는 ID가 있고, ID가 주어지면 저장 시스템은 현재 상태를 반환해 줘야한다.

이벤트 로그는 primary source of truth 이고, '현재 상태'는 엔티티의 이벤트 스트림에서 파생 될 수 있다. 이를 위해서 스토리지 엔진은 Event => State => State를 반환하는 순수 함수가 필요하다.

카프카에서 'read current state'를 native하게 구현해본다면 카프카는 토픽의 모든 이벤트를 필터링 하고, 주어진 function으로 부터 상태를 구해야한다. 만약 많은 이벤트가 있다면 느리고 자원을 많이 소모할 것이다. 그리고 장애나, 캐시 클리어 등으로 인해서 다시 해야할 수도 있다.

그래서 더 좋은 방법이 필요한다. KafkaStreams와 StateStore를 이용하는 것이다. 

Kafka Streams는 고수준의 오퍼레이션을 제공하는데 그 중 하나는 스트림을 localstate로 folding하는 기능이다.

각 local store는 노드에서 사용하는 파티션의 데이터만 포함된다. 기본적으로 RocksDB(디폴트), inMemoryStore 2개의 구현체가 있다. 둘 다 fault-tolerant 이고, in-memory를 사용하더라도 데이터가 손실되지 않는다(주: local store topic을 현실화 하는 것이기 때문이다)

이렇게 이벤트 소싱에서 이벤트 스트림을 state store로 변경해서 각 엔티티의 '현재 상태'를 구할 수 있다. 


Looking up the current state

같은 노드에 있는 데이터를 쿼리하는 건 쉽다. 하지만 다른 노드에 있는 데이터를 쿼리하려면 kafka의 메타 데이터에 대해 먼저 질의한 다음 특정 ID에 대해 처리를 담당하는 노드를 찾을 수 있다. ( streamsMetadataForStoreAndKey ) 그런 다음 rest, akka-remoting 등의 방법으로 해당 노드로 요청을 하면 된다. 


Fail-over

State Store가 좋지만, 노드가 장애라면? 새로 만든다는 것은 비용이 많이 드는 작업이다.  KafkaStreams는 리밸런싱(노드의 추가/삭제) 때문에 레이턴시가 증가하거나, 요청 자체가 실패할 수 있다.  

이 이유 때문에 state-store가  changelog-topic에 로깅된다. 이 topic은 모든 이벤트가 필요없고 최신분만 필요하므로 compacted(압축) 되어 있다. 덕분에 다른 노드에서 다시 store를 구축하는게 빠르다. 하지만 여전히 리밸런싱에 대한 latency는 증가될 수 있다.  이에 대한 해결방법으로 standby 를 제공한다. primary 가 실패한다면 바로 standby 로 장애를 처리한다.


Consistency

카프카는 기본은 at-least-once 정책이다. exactly-once 정책도 지원하지만 topic을 읽고/쓰기 할 때에만 보장해준다. processing.guarantee 옵션을 켜서 exactly-once를 사용 할 수 있으며, 성능은 저하되지만 batch message의 집합이 한번의 커밋에 되므로 큰 비용은 아니다.


Listening for events

지금 까지 'current state'를 업데이트 하고 질의하는 걸 알아봤고, 이제 사이드 이펙트에 대해서 알아 볼 차례이다( 예를 들어 rest 등의 외부 호출이네 이메일 보내기)

이러한 작업들은 blocking이 일어나며 I/O를 발생하므로 state-update의 로직에 포함되는건 종은 생각이 아니다. 그리고 main 스레드에서 처리를 하는것은 성능상의 병목을 일으킨다. 그리고 state-update 자체가 여러번 실행 될 수 있다.

다행히 카프카 토픽을 처리할 많은 유연성을 가질 수 있다. state를 업데이트 전후에 메시지를 소비할 수 있고, 이 메시지 소비 전후에 원하는 방식으로 사용 할 수 있다.




Posted by dev.bistro
분류없음2018.03.19 15:51

카프카의 Header 지원

- 0.11 버전이후 user(custom) header를 지원하기 시작하였고 아래 위키와 이슈를 참고

- https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers

- https://issues.apache.org/jira/browse/KAFKA-4208


실제로 저장되는 형식과 구현체는 아래를 참고

- https://kafka.apache.org/documentation/#record

- https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java#L175



Producer 코드를 따라가며 Key, Header가 어떻게 저장이 되는거 확인 

- Producer는 2개의 구현체가 있지만 MockProducer는 테스트 용이기 때문에 KafkaProducer만 확인하면 된다.

- 실제로 KafkaProducer에서 doSend를 보면 인자로 받는 ProducerRecord 안에 headers 정보가 있다. 

- 이 정보를 이용하여 byte[] serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key()); '시리얼라이징 된 객체 데이터'를 생성한다. 하지만 기본 제공 하는 Key Serializer인 ExtendedSerializer은 Header를  무시한채 Key Data만을 가지고 시리얼라이징 한다. 


Posted by dev.bistro
분류없음2018.03.18 12:08

방법) KafkaStreams#close 호출로서 정상적인 종료가 가능하다고 한다. (참고 : https://kafka.apache.org/10/documentation/streams/developer-guide/write-streams )

그럼 이 close를 어떻게 호출 해야하는지를 찾아보자.


1) spring-cloud-stream 를 사용할 경우

- (나같은 경우에는 order-eda-notification prj)

- general 한 방법으로 사용을 한다면 StreamsBuilderFactoryManager bean이 자동등록 되고 이 매니저 빈은 SmartLifecycle 를 상속받기 때문에 spring application이 종료 시그널을 받으면 stop을 시작하는 시작점이 된다. 

- 이 매니저빈이 실제로 Set<StreamsBuilderFactoryBean>를 차례로 종료 메소드를 호출하고, 그 안에 존재하는 KafkaStream 맴버 인스턴스도 종료 명령을 받게 된다.


2) 메뉴얼하게 KafkaStream 를 생성하는 경우

- (나같은 경우에는 order-eda-gateway prj)

- StreamsBuilderFactoryBean가 포함되어 있고 이 클래스는 명시적으로 KafkaStreams#close()를 호출하지만, 난 이 Factory자체를 쓰지 않는다.

- 하지만 gateway application 내에서 여러 KafkaStreams를 이용하여 독립적인 토폴로지를 구성하고 싶었기 때문에 AbstractOperationStream 클래스를 사용하고 있었다. 이 클래스는  몇몇 인터페이스와, StreamsBuilderFactoryBean의 running과 같은 기능만을 구현한 추상 클래스이다. 여기에 DisposableBean 를 추가하였다.

//중간부분
@Override
public Topology getTopology() {
return topology;
}

@Override
public boolean canQueryable() {
return ready;
}

@Override
public void destroy() throws Exception {
this.kafkaStreams.close();
}



추가) KafkaStreams#close는 어떤 작업을 할까?

- 상태를 PENDING_SHUTDOWN 로 바꾼다.

- TaskId의 디렉토리를 삭제하는 스케줄링 메소드 'stateDirCleaner'의 데몬 서비스를 중단한다.

- List<StreamThread>를 순차적으로 중단하라고 명령한다.

- GlobalStreamThread  스레드를 중단하라고 명령한다.

- close를 위한 shutdown 스레드를 만들어서 위의 stream-thread, global-stream-thread가 전부 종료되어 PENDING_SHUTDOWN, DEAD 상태가 될 때까지 동기적으로 기다린다.

- 전부가 정상적으로 멈춘다면 KafkaStream 상태를 NOT_RUNNING 으로 바꾼다. 

- 기존적으로 10초를 기다린다. 

- 만약 정상적인 종료가 되었다는 것을 확인 하고 싶으면  close(final long timeout, final TimeUnit timeUnit) 메소드를 이용하여 boolean type의 return을 확인해야 한다.


Posted by dev.bistro
분류없음2018.03.11 22:28

KafkaListener annotation

토픽의 메시지를 소비할 메소드에 붙이는 어노테이션이다. 이 어노테이션을 붙인다면, KafkaListenerContainerFactory에 의해서 MessageListenerContainer가 기동된다. 현재로서는 ConcurrentKafkaListenerContainerFactory 클래스만 존재 하니까, 기본적으로 KafkaListener을 붙인다는 것은 ConcurrentMessageListenerContainer 리스너 컨테이너를 사용한다는 의미이다.

그리고 @ConditionalOnClass(EnableKafka.class) 에 의해서 KafkaAnnotationDrivenConfiguration 설정 클래스가 로드 되는대, 여기서 "kafkaListenerContainerFactory" 이라는 bean을 가지는ConcurrentKafkaListenerContainerFactory 가 생성이 된다. (즉 spring-kafka만 디펜던시에 추가된다면, 이 클래스가 발생된다)

 @KafkaListener 에 'containerFactory'를 지정 할 수 없지만, 기본적으로 위에서 언급한 'kafkaListenerContainerFactory' 이름을 가지는 컨테이너 팩토리를 사용한다.  이 메소드의 인자로는 @Payload, @Header, @Headers, MessageHeaders등등을 사용하여 메시지를 수신할 수 있다.  


 메소드에 정의된다면 각 메소드별로 '리스너컨테이너'가 만들어진다. - (컨테이너 팩토리가 아니다!)  만약 1개의 컨테이너 팩토리에서 2개의 @KafkaListener을 사용한다면  각각의 beanName은 org.springframework.kafka.KafkaListenerEndpointContainer#1, #2 이다.  만약 동일한 팩토리와 topic을 쓴다면 setConcurrency=2 의 효과만 가질 것이다.

클래스에 정의된다면 하나의 메시지 컨테이너가! @KafkaHandler가 있는 모든 메소드에 적용한다. 모두 실행 한다는 의미가 아닌, payload타입에 따라 적절하게 메소드를 실행해준다는 뜻이다.  @KafkaHandler가 붙은 메소드는 InvocableHandlerMethod 클래스에 감싸지고, DelegatingInvocableHandler 에 List 형으로 들어가서  DelegatingInvocableHandler.findHandlerForPayload 의해서 실행되어 진다.    payload 매칭이 2개 이상이 된다면 'Ambiguous methods...' throw를 발생시킨다.


Posted by dev.bistro
분류없음2018.03.10 18:46

MessageListenerContainer 인터페이스

* spring-kakfa 에서 사용되는 최상단 '메시지 리스너 컨테이너'의 인터페이스 형태이다. 구현체는 Single-Thread 형태의 KafkaMessageListenerContainer 와 KafkaMessageListenerContainer 를 여러개 붙인 형태인 ConcurrentMessageListenerContainer 이렇게 2개가 기본 제공된다.

* Message Listener 이므로 '메시지의 수신' 관점에서 볼 때 필요하다. Kafka Cluster 의 gateway를 담당하는 application 을 만들 때는 'producer' 쪽이 필요하다.

* pause(), resume()을 인터페이스 레벨에서 가지고 있다.


ㅁ AbstractMessageListenerContainer<K,V> 추상 클래스

** 아래에서 설명할 2개의 실제 구현체 클래스의 상위 추상 클래스이다.(스프링의 다른 패키지에도 많으니 kafka 패키지인지 확인 해야한다)

** doStart, doStop만 위임한채, 나머지는 공통화 시키는 목적이다.

** offset commit을 어떻게 할 것인가에 대한 ACK_MODE 가 여기에서 정의되어 있고 종류는 7개가 있다 (BATCH, COUNT, COUNT_TIME, MANUAL, MANUAL_IMMEDIATE, RECORD, TIME)


ㅁ KafkaMessageListenerContainer<K,V> 클래스

** Java Consumer를 이용하여, 자동/수동 파티션을 할당 받아, Single-Threaded 형태로 동작하는 메시지 리스너 컨테이너

** 생성자 중에서 TopicPartitionInitialOffset 를 인자로 줄 수 있는데, 이를 이용해서 초기 Offset을 지정 할 수 있다. (absolute offset은 물론, relativeToCurrent 인자를 이용해서 relative offset도 지정가능하다)

** 스프링의 Lifecycle 인터페이스를 상속받아 구현된 start()가 시작 포인트이다.

** doStart 안에서 listenerConsumer 라는 컨슈머를 내부에서 만들어 데이터를 수신하게 된다.

** ListenerConsumer 를 Runnable을 상속받는 데몬 스레드 역할을 한다. while(true) 를 하면서 consumer.poll을 하는 것이다. 


ㅁ ConcurrentMessageListenerContainer<K,V> 클래스

** concurrency 와 실제 topic-partition의 갯수에 따라서 위에서 설명한 KafkaMessageListenerContainer 를 N개 만든다. 그리고 스스로 start를 호출한다. 즉, 이 클래스는 KafkaMessageListenerContainer 를 가지는 또 하나의 컨테이너이다.

** stop/resume/start 등등 전부 순차적으로 명령어를 수행한다. (forEach)

** 그 외 추가 구현 내용없는 단순한 형태이다.


그럼 KafkaConsumer 를 그냥 쓰는 것보다 무엇이 더 편할까?

- pause/resume 의 쉬운 사용
- KafkaConsumer는 thread-safe을 보장 하지 않아, 잘 구현해서 만들어야 하지만, 위의 컨테이너를 쓰면 쉽다.
- 좀 더 풍부한 ack모드 지원
- 메뉴얼 토픽파티션 어사인
- 오프셋 점프  등등...

Posted by dev.bistro
분류없음2018.03.06 20:49

대화식 쿼리를 사용하려면 hostname:port 형식의 StreamsConfig.APPLICATION_SERVER_CONFIG 를 설정하면 된다. 카프카 스트림 인스턴스가 키를 기반으로 한 쿼리를 받으면 현재 인스턴스의 로컬 저장소에 포함되는지를 찾는다. 더 중요한 것은 로컬 저장소에 없을 경우 키가 포함된 Store를 찾고 싶은 것이다. 카프카 스트림은 동일한 application ID와 APPLICATION_SERVER_CONFIG 가 정의된 인스턴스에 정보를 검색할 수 있는 몇가지 메소드가 제공된다.

아래의 메소드들은 KafkaStreams instance의 method이다.

- allMetadata : 모든 StreamsMetadata 목록을 반환
- allMetadataForStore : StoreName 기반으로 StreamsMetadata 목록을 반환
- metadataForKey : 해당 Key 기반으로 StreamsMetadata 를 반환

위 메소들에서 allMetaData를 이용해서 대화형 쿼리에 적합한 모든 인스턴스 정보를 얻을 수 있다.  



각 카프카 스트림 인스턴스는 경량화된 서버가 내부적으로 떠 있게 된다.
여기서 중요한 것은 '카프카 스트림 인스턴스 중에서 하나만 쿼리를 하면 되고 어디에 해야하는지는 중요하지 않다'

RPC 메커니즘과 메타 데이터를 기반으로 인스턴스에 원하는 데이터가 없다면, 카프카 스트림 인스턴스는 위치를 찾아내서 쿼리후 반환한다. 예를 들어 A 인스턴스에는 key1가 포함되어 있지는 않지만 B 인스턴스테 포함됨을 인지하고, B를 호출하여 데이터를 검색하고 반환한다.

(중요!: RPC 메커니즘은 기본적으로 제공되지 않으므로 직접 구현해야한다. ex: https://github.com/davidandcode/hackathon_kafka/blob/master/src/main/java/io/confluent/examples/streams/interactivequeries/kafkamusic/MusicPlaysRestService.java )


ps. 결국엔 ben stopord 의 예제처럼 HostInfo를 이용해서 라우팅을 하거나, 처음부터, Key기반의 요청/응답을 특정 인스턴스로 제한 하는 수 밖에 없다. ... (원래 하던데로다.ㅋ )

Posted by dev.bistro
분류없음2018.03.06 20:44


처음에는 이 메소드의 효용성? 사용범위? 에 대해서 이해하지 못했다. 그냥 최초에 Streams Application이 기동될때, State 가 migration 되는 동안  Application 의 health 를 DOWN 정도으로 바꾸는 정도?  (즉 Created -> Running) 

최근에 하나 더 든생각은 isRunning 상태에서 다른 상태로 빠질 경우이다. (isRunning은 RUNNING or REBALCING 이다)  이 때에는 API로 제공되는 KeyStore를 query하면 에러가 발생 할 것이다. 그러므로

this.kafkaStreams.setStateListener((newState, oldState) -> {
if(newState.isRunning() ){
this.ready = true;
}else {
this.ready = false;// State is not RUNNING,
}
});
this.kafkaStreams.cleanUp();
this.kafkaStreams.start();

이와 같은 리스너를 등록하여,  queryable 상태를 체크하는 부분이 있어야 할 것 같았고, 그렇게 구현을 했다.

그럼  언제 isRunning 에서 다른 상태로 변경이 될까? 

가장 쉽게 찾을 수 있는 경우는 StreamThread가 shutdown이 될 때 PENDING_SHUTDOWN 으로 상태가 바뀐다. cleanup()을 호출하거나, 여러경우/다양한 곳에서 shutdown/close를 호출하는 경우이다.

그 외에는... DEAD 나 PENDING_SHUTDOWN 으로 변경하는 케이스를 찾지 못함 ㅠ ( 더 있을 줄 알았다!!) 

Posted by dev.bistro
분류없음2018.03.05 08:56

몇개의 파티션으로 구성할 것인가에 대한 도움 글

https://www.confluent.io/blog/how-to-choose-the-number-of-topicspartitions-in-a-kafka-cluster/


읽고 나서 내 맘대로 정리한 내용

- '파티션 갯수가 변경이 된다면, 제대로 메시지가 전달 안될 수 있다. 그래서 현재 필요보다 파티션 수를 많이 구성하라. 파티션 갯수를 변경할 필요가 없도록 해라.  (향후 1-2년치 트래픽도 고민해서 충분한 파티션 수를 고민할것)

- 그때 문제점은 OS의 open file handle이 커지게 된다. (모든 세그먼트 * 2)

- 리더 선출은 주키퍼와 관련된 작업이 포함되고 이것은 선행적이다. 그래서 한 브로커에 파티션 리더가 많다면 선행적으로 파티션 리더를 변경하는 시간이 길어지고, 혹시나 컨트롤러 역할을 하는 브로커가 죽는다면 더 심각해진다.

- 레이턴시가 고려되야 한다면 브로커당 파티션 수를 100x클러스터의 브로커수*레플리카팩터 이하로 제한하라.

- 컨플런트 카프카는 java producer로 커스텀 마이징한다. (기존에는 metrics 부분만 수정하는 줄 알았다), kafka-cpXXX 버전을 쓰는것은 가능한 배제하자.


* 파티션 갯수가 많을 수록 처리량(Throughput)도 높아진다.

처음으로 이해해야 할 것은 Kafka에서는 topic partition이 병렬처리의 단위라는 것이다. producer와 broker 측면에서 본다면 서로 다른 파티션에 데이터를 쓴다는 것은 완전히 병렬로 할 수 있다. 그래서 압축과 같은 비싼 작업을 하드웨어 리소스에 활용할 수 있다. consumer 측면에서 카프카는 하나의 파티션의 데이터가 하나의 컨슈머 스레드에 할당된다. 따라서 consumer(group)의 병럴 처리 단위는 소비하는 파티션 갯수에 의해 제한된다. 따라서 많은 파티션 갯수가 더 많은 처리량을 준다.

파티션 갯수를 선택하는 공식은 처리량을 기반으로 한다. 하나의 파티션에서 데이터를 넣을 수 있는 p, 소비할 수 있는 스루풋을 c 라고 하고 목표 처리량을 t 라고 하자. 그러면 최소  t/p, t/c의 파티션이 필요로 하다.

producer의 파티션별 스루풋은  배치사잊, 압축코덱, ack 쇼ㅔㄷ, replica-factor등의 설정에 따라 달라진다. 그러나 일반적으로 벤치마크에서 보여지는 것처럼 하나의 파티션에서 10MB/sec의 데이터를 넣을 수 있다. 컨슈머의 처리량은 컨슈머가 얼마나 빨리 메시지를 처리할 수 있느냐에 따라 다르다. 실제로 측정해 봐야 한다.

시간이 흘러 파티션 갯수를 증가 시킬 수 있지만 한가지 주의할 점은 메세지가 key를 가지고 생성될 때이다. 키를 가진 메시지가 publishing 될때 카프카는 hash key기반으로 파티션을 매핑하낟. 이렇게 하면 동일한 키가 항상 동일한 파티션으로 가도록 보장해준다. 이 것은 어플리케이션에서 중요할 수 있는데, 만약 파티션 갯수가 변경이 된다면 이것을 보장되지 않을 수도 있다.

이러한 상황을 피하기 위해 과도하게 파티션을 만드는것이 일반적이다. 기본적으로 향후 1-2년치를 고민하고 파티션 갯수를 결정한다. 초기에는 작은 카프카 클러스터를 만들것이고, 시간이 지나 브로커를 추가하며 기존 브로커의 일부를 새 브로커로 옮길 수 있다.(온라인으로도 가능하다) 이 방법은 어플리케이션의 break 없이 스루풋을 증가 시킬 수 있는 방법이다.처리량 이외에도 몇몇 고려해야할 요소가 있다. 많은 파티션 갯수가 부정적인 영향을 줄 수도 있다.


* 많은 파티션 갯수는 더 많은 Open FIle Handles를 필요로 한다.

각 파티션은 브로커내의 파일 스스템 디렉토리에 할당된다.  해당 로그 디렉토리에는 세그먼트당 2개의 파일(색인, 실제 데이터용)이 존재한다. 현재 카프카는 모든(every) 로그 세그먼트에 대해서 인덱스, 데이터 전부다 파일 핸들을 연다. 따라서, 파티션이 많아 질 수록 open file handle이 많이 필요하게 된다. 이건 단지 설정 문제이고, 우리는 브로커당 3천개의 open file handle을 사용하는 클러스터를 확인했다. (1500개 세그먼트? - 세그먼트당 10메가 잡으면 ? - 15기가?)


* 많은 파티션은 비가용성을 증가 시킬수 있다.

카프카는 높은 가용성/내구성을 제공하는 intra-cluster replication(https://goo.gl/r77Lxg)을 지원한다. 각 파티션은 다수의 레플리카를 가질수 있고, 각각 다른 브로커에 저장된다. 레플리카 중 하나는 'leader'가 되고 나머지는 'follower'가 된다. 카프카는 모든 레플리카를 자동으로 관리하고, 동기 상태를 유지한다. 프로듀서/컨슈머의 요청들은 모두다 리더 레플리카에서 처리된다. 만약 브로커가 fail 상태라면, 해당 브로커의 리더가 있는 파티션은 일시적으로 사용 할 수 없게 된다. 카프카는 이 서비스 불가상태의 파티션 리더를 다른 레플리카로 이동시켜 클라이언트 요청을 계속 처리 할 수 있게 한다. 이 처리는 카프카 브로커 중 하나인 '컨트롤러'에 의해 수행된다. 그리고 주키퍼에 메타 데이터를 읽고 쓰는 작업이 포함되고, 컨트롤러에 의해 순차적으로 수행이 된다.

일반적인 경우, 정상 종료일때에는 컨트롤러가 한번에 하나씩 리더를 이동시킨다.  하나의 리더의 이동은 몇ms밖에 걸리지 않는다 그래서 클라이언트 관점에서는 정상적인 브로커 종료일때는 비가용상태가 작다.

하지만 브로커가 불확실하게 죽어버린다면(ex kill -9), 비가용성은 파티션 갯수에 비례 할 수 있다. 

한 브로커에 2000개의 파티션이 있고 각각 2개의 레플리카가 있다고 가정하자. 그러면 브로커너는 약 1000개의 파티션의 리더를 가지고 있을 것이다. 이 브로커가 죽어버린다면 동시에 1000개의 파티션이 사용할 수 없게 된다. 단일 파티션에 대해 새로운 리더를 선출하는데 5ms가 걸린다고 가정하면, 1000개의 파티션의 리더를 선출하는데는 5초가 걸린다. 그래서, 일부 파티션의 비가용성은 5초에 실패를 감지한 시간이 더해진 시간동안이 될 것이다.

혹시나 실패한 브로커가 '컨트롤러'일수도 있다. 이 경우에는 새 브로커가 컨트롤러가 되기 전까지 리더를 선출하는 작업이 진행되지 않을수도 있다. 컨트롤러의 페일오버는 자동이지만, 새 컨트롤러는 메타 데이터를 초기화 단계에서 주키퍼에서 읽어야 한다. 예를 들어 1만개의 파티션이 있고 개당 2ms가 걸린다고 할때, 20초 이상 장애가 발생할 수 있다. 

일반적으로 이러한 unclean 장애는 드믈지만, 그것도 신경쓰인다면 브로커당 파티션 수를 2-4천개로 제한하고 클러스터의 총 파티션 갯수를 몇 만개 단위로 제한하는게 좋다.


* 파티션 수가 많을 수록 e2e 레이턴시가 증가한다.

카프카에서 e2e latency 는 프로듀서에 의해 메시지가 생성되고, 컨슈머가 메시지를 읽는 그 시간으로 정의한다. 카프카는 메시지가 모든 ISR에 복제된 이후, 즉 커밋이 된 이후 메시지를 노출시킨다. 그래서 메시지를 커밋하는 시간은 e2e 레이턴시의 많은 부분을 차지할 수 있다. 기본적으로  카프카 브로커는 두 브로커간에 복제본을 공유하는 모든 파티션에 대해서 싱글 스레드로 데이터를 복제한다. 우리의 실험에 따르면 브로커간에 1000개의 파티션을 복제하면 약 20ms의 응답시간이 증가한다. 이것은 일부 리얼타임 어플리케이션에서 너무 높은 값이 될 수 있다.

하지만 이 문제는 클러스터가 크다면 좀 더 완화되어질 수 있다. 예를 들어 브로커에 1000 개의 파티션 리더가 있고, 클러스터에 10개의 브로커가 더 있다고 예를 들자. 각 브로커는 100개의 파티션의 데이터만을 가져오면 되고, 추가되는 커밋 시간은 10ms단위가 아닌 ms단위일 것이다.

레이턴시를 고려해야 한다면 브로커당 파티션 수를 100xbxr로 제한하라. b는 클러스터의 브로커 수이고 r는 레플리카 팩터이다.


* 파티션 수가 많을 수록 client 메모리를 더 요구한다.

최근 컨플런트1.0(0.8.2기반0) Java 프로듀서를 좀 더 효과적으로 향상시켯다. 새 기능중 하나는 사용자가 메시지를 버퍼링하는데 사용하는 메모리 상한을 설정할 수 있다는 것이다. 내부적으로 프로듀서는 파티션당 메시지를 버퍼링한다. 데이터가 축적되거나, 충분한 시간이 지나면 메시지는 버퍼에서 브로커로 전송된다.

파티션수가 늘어난다면, 프로듀서의 파티션에 더 많은 메시지가 누적된다. 사용된 메모리 총 합이 설정값을 초과할 수 있다. 이 때 프로듀서는 새 메시지를 차단하거나, 버려야 한다. 어느쪽도 좋은 것은 아니다. 이걸 막으려면 메모리를 설정을 더 크게 해야한다.

좋은 스루풋을 얻으며녀 적어도 파티션당 수십kb를 할 당하고, 파티션 갯수가 늘어날 수록 메모리 총 합도 조정해라.

유사한 이슈가 컨슈머에도 있다. 컨슈머는 파티션 단위로 메시지를 가져온다. 파티션이 많을수록 메모리가 많이 필요하라. 하지만 이것은 일반적이 아닌, 컨슈머의 문제이다.


* 요약

일반적으로 파티션 수가 많으면 카프카 클러스터는 높은 스루풋을 보여준다.  하지만 가용성, 레이턴시와 같은 잠재적인 impact를 인지하고 있어야 한다. 앞으로는 이러한 제약 사항을 개선할 계획이다.

Posted by dev.bistro
분류없음2018.02.28 22:58

spring-kafka 를 디펜던시에 추가하면 사용할 수 있는 @EnableKafkaStreams 은 2017년 5월에 구현되었다. (https://github.com/spring-projects/spring-kafka/pull/238)

JavaDoc에 의하면 기본 Kafka Stream 컴포넌트를 활성화 해준다고 한다. 

정확히 설명하면  defaultKafkaStreamsConfig 라는 StreamsConfig bean이 있을 경우 StreamsBuilderFactoryBean 빈을 만들어 준다. (https://goo.gl/WaCFQ6)


* 먼저 InitializingBean를 상속을 받았기 때문에 afterPropertiesSet가 호출이 될 것이고 new StreamsBuilder() 가 자동으로 생성된다 (여기까진 괜찮다, 만들기만 하고 뭘 하지는 않으니까, 그리고 이것까지는 누구나 필요하다)

* 이후 step도 너무 자동화되었다는게 문제다.
interface Lifecycle 를 상속받고 있기 때문에  스프링의 DefaultLifeCycleProcessor에 의해 https://goo.gl/xSSasV 가 호출이 되고 이 때 default StreamConfig를 가지고 바로 KafkaStreams 를 생성 & 시작해버린다.
난 뭔가 토폴로지를 만들지도 않은 상태에서 internalTopologyBuilder에 의해 빌드가 되버리는 것이다...

즉, 뭔가 뭔가 좀 세밀한 작업을 하고 싶다면 EnableKafkaStream에 기대기 보다는 직접 StreamBuilder를 작성하자... 이 모들 내용은 https://goo.gl/kvjGNn 에 있다.



Posted by dev.bistro
분류없음2018.02.28 20:29

springboot는 spring framework 를 좀 더 빠르고 쉽게 사용할 수 있는 목적을 가진 영역이다. 하지만 어느 순간부터 Spring 이외에도 Boot 까지 '제대로' 이해하지 못하면 엉망이 될 수 있는 존재가 되어버렸다. 특히, Condition annotation이 추가되면서 내부적으로 자세히 보고 적용을 해야한다.


예를 들어보면,

1. KafkaStream을 쓰기 위해 spring-kafka를 import 할경우 (cloud-stream-xxx역시 마찬가지)  간단한 String 이나,  추천되어지는 Avro 가 아니고 Json으로 (디)시리얼라이저를 지정한다고 할 경우, 안에서 어디에서 new ObjectMapper가 만들어지고, 어디 설정은 application.yml의 spring.kafka를 보는지, 계속 주의해야 한다. 간단한 rocksdb 기반의 StateStore를 API로 expose하기 위해서는

- Consumed.with(Serdes.Long(), orderSerdes), (ktable로 컨슈밍 할때)

- .withValueSerde(orderSerdes)); (Materialized를 지정할 때 )

- KafkaTemplate에 ProducerFactory를 주입할 때 등등... 

이렇게 하더라도 KafkaHeader에서도 new ObjectMapper를 했지만, 어차피 다음주엔 byte로 바꿀꺼라 pass..


2. 방금전에 겪은 하나 더  @EnableKafkaStreams 를 붙일 경우 StreamConfig가 spring bean으로 떠 있다면, StreamsBuilderFactoryBean 를 통해, KafkaStreams가 만들어지고 .start된다. 

아주 간단한 Cloud Native에서 , 하나의 role을 가지는 endpoint application을 만들때는 문제가 없으나, gateway, aggregator 역할 등등, 하나의 어플리케이션에서 2개의 StreamThread 를 메뉴얼 하게 제어하려면, 제대로 이해하고 있어야 한다.....

그냥 그랬다...

Posted by dev.bistro