대화식 쿼리를 사용하려면 hostname:port 형식의 StreamsConfig.APPLICATION_SERVER_CONFIG 를 설정하면 된다. 카프카 스트림 인스턴스가 키를 기반으로 한 쿼리를 받으면 현재 인스턴스의 로컬 저장소에 포함되는지를 찾는다. 더 중요한 것은 로컬 저장소에 없을 경우 키가 포함된 Store를 찾고 싶은 것이다. 카프카 스트림은 동일한 application ID와 APPLICATION_SERVER_CONFIG 가 정의된 인스턴스에 정보를 검색할 수 있는 몇가지 메소드가 제공된다.

아래의 메소드들은 KafkaStreams instance의 method이다.

- allMetadata : 모든 StreamsMetadata 목록을 반환
- allMetadataForStore : StoreName 기반으로 StreamsMetadata 목록을 반환
- metadataForKey : 해당 Key 기반으로 StreamsMetadata 를 반환

위 메소들에서 allMetaData를 이용해서 대화형 쿼리에 적합한 모든 인스턴스 정보를 얻을 수 있다.  



각 카프카 스트림 인스턴스는 경량화된 서버가 내부적으로 떠 있게 된다.
여기서 중요한 것은 '카프카 스트림 인스턴스 중에서 하나만 쿼리를 하면 되고 어디에 해야하는지는 중요하지 않다'

RPC 메커니즘과 메타 데이터를 기반으로 인스턴스에 원하는 데이터가 없다면, 카프카 스트림 인스턴스는 위치를 찾아내서 쿼리후 반환한다. 예를 들어 A 인스턴스에는 key1가 포함되어 있지는 않지만 B 인스턴스테 포함됨을 인지하고, B를 호출하여 데이터를 검색하고 반환한다.

(중요!: RPC 메커니즘은 기본적으로 제공되지 않으므로 직접 구현해야한다. ex: https://github.com/davidandcode/hackathon_kafka/blob/master/src/main/java/io/confluent/examples/streams/interactivequeries/kafkamusic/MusicPlaysRestService.java )


ps. 결국엔 ben stopord 의 예제처럼 HostInfo를 이용해서 라우팅을 하거나, 처음부터, Key기반의 요청/응답을 특정 인스턴스로 제한 하는 수 밖에 없다. ... (원래 하던데로다.ㅋ )


처음에는 이 메소드의 효용성? 사용범위? 에 대해서 이해하지 못했다. 그냥 최초에 Streams Application이 기동될때, State 가 migration 되는 동안  Application 의 health 를 DOWN 정도으로 바꾸는 정도?  (즉 Created -> Running) 

최근에 하나 더 든생각은 isRunning 상태에서 다른 상태로 빠질 경우이다. (isRunning은 RUNNING or REBALCING 이다)  이 때에는 API로 제공되는 KeyStore를 query하면 에러가 발생 할 것이다. 그러므로

this.kafkaStreams.setStateListener((newState, oldState) -> {
if(newState.isRunning() ){
this.ready = true;
}else {
this.ready = false;// State is not RUNNING,
}
});
this.kafkaStreams.cleanUp();
this.kafkaStreams.start();

이와 같은 리스너를 등록하여,  queryable 상태를 체크하는 부분이 있어야 할 것 같았고, 그렇게 구현을 했다.

그럼  언제 isRunning 에서 다른 상태로 변경이 될까? 

가장 쉽게 찾을 수 있는 경우는 StreamThread가 shutdown이 될 때 PENDING_SHUTDOWN 으로 상태가 바뀐다. cleanup()을 호출하거나, 여러경우/다양한 곳에서 shutdown/close를 호출하는 경우이다.

그 외에는... DEAD 나 PENDING_SHUTDOWN 으로 변경하는 케이스를 찾지 못함 ㅠ ( 더 있을 줄 알았다!!) 

적정한 파티션 갯수

분류없음 2018.03.05 08:56 posted by dev.bistro

몇개의 파티션으로 구성할 것인가에 대한 도움 글

https://www.confluent.io/blog/how-to-choose-the-number-of-topicspartitions-in-a-kafka-cluster/


읽고 나서 내 맘대로 정리한 내용

- '파티션 갯수가 변경이 된다면, 제대로 메시지가 전달 안될 수 있다. 그래서 현재 필요보다 파티션 수를 많이 구성하라. 파티션 갯수를 변경할 필요가 없도록 해라.  (향후 1-2년치 트래픽도 고민해서 충분한 파티션 수를 고민할것)

- 그때 문제점은 OS의 open file handle이 커지게 된다. (모든 세그먼트 * 2)

- 리더 선출은 주키퍼와 관련된 작업이 포함되고 이것은 선행적이다. 그래서 한 브로커에 파티션 리더가 많다면 선행적으로 파티션 리더를 변경하는 시간이 길어지고, 혹시나 컨트롤러 역할을 하는 브로커가 죽는다면 더 심각해진다.

- 레이턴시가 고려되야 한다면 브로커당 파티션 수를 100x클러스터의 브로커수*레플리카팩터 이하로 제한하라.

- 컨플런트 카프카는 java producer로 커스텀 마이징한다. (기존에는 metrics 부분만 수정하는 줄 알았다), kafka-cpXXX 버전을 쓰는것은 가능한 배제하자.


* 파티션 갯수가 많을 수록 처리량(Throughput)도 높아진다.

처음으로 이해해야 할 것은 Kafka에서는 topic partition이 병렬처리의 단위라는 것이다. producer와 broker 측면에서 본다면 서로 다른 파티션에 데이터를 쓴다는 것은 완전히 병렬로 할 수 있다. 그래서 압축과 같은 비싼 작업을 하드웨어 리소스에 활용할 수 있다. consumer 측면에서 카프카는 하나의 파티션의 데이터가 하나의 컨슈머 스레드에 할당된다. 따라서 consumer(group)의 병럴 처리 단위는 소비하는 파티션 갯수에 의해 제한된다. 따라서 많은 파티션 갯수가 더 많은 처리량을 준다.

파티션 갯수를 선택하는 공식은 처리량을 기반으로 한다. 하나의 파티션에서 데이터를 넣을 수 있는 p, 소비할 수 있는 스루풋을 c 라고 하고 목표 처리량을 t 라고 하자. 그러면 최소  t/p, t/c의 파티션이 필요로 하다.

producer의 파티션별 스루풋은  배치사잊, 압축코덱, ack 쇼ㅔㄷ, replica-factor등의 설정에 따라 달라진다. 그러나 일반적으로 벤치마크에서 보여지는 것처럼 하나의 파티션에서 10MB/sec의 데이터를 넣을 수 있다. 컨슈머의 처리량은 컨슈머가 얼마나 빨리 메시지를 처리할 수 있느냐에 따라 다르다. 실제로 측정해 봐야 한다.

시간이 흘러 파티션 갯수를 증가 시킬 수 있지만 한가지 주의할 점은 메세지가 key를 가지고 생성될 때이다. 키를 가진 메시지가 publishing 될때 카프카는 hash key기반으로 파티션을 매핑하낟. 이렇게 하면 동일한 키가 항상 동일한 파티션으로 가도록 보장해준다. 이 것은 어플리케이션에서 중요할 수 있는데, 만약 파티션 갯수가 변경이 된다면 이것을 보장되지 않을 수도 있다.

이러한 상황을 피하기 위해 과도하게 파티션을 만드는것이 일반적이다. 기본적으로 향후 1-2년치를 고민하고 파티션 갯수를 결정한다. 초기에는 작은 카프카 클러스터를 만들것이고, 시간이 지나 브로커를 추가하며 기존 브로커의 일부를 새 브로커로 옮길 수 있다.(온라인으로도 가능하다) 이 방법은 어플리케이션의 break 없이 스루풋을 증가 시킬 수 있는 방법이다.처리량 이외에도 몇몇 고려해야할 요소가 있다. 많은 파티션 갯수가 부정적인 영향을 줄 수도 있다.


* 많은 파티션 갯수는 더 많은 Open FIle Handles를 필요로 한다.

각 파티션은 브로커내의 파일 스스템 디렉토리에 할당된다.  해당 로그 디렉토리에는 세그먼트당 2개의 파일(색인, 실제 데이터용)이 존재한다. 현재 카프카는 모든(every) 로그 세그먼트에 대해서 인덱스, 데이터 전부다 파일 핸들을 연다. 따라서, 파티션이 많아 질 수록 open file handle이 많이 필요하게 된다. 이건 단지 설정 문제이고, 우리는 브로커당 3천개의 open file handle을 사용하는 클러스터를 확인했다. (1500개 세그먼트? - 세그먼트당 10메가 잡으면 ? - 15기가?)


* 많은 파티션은 비가용성을 증가 시킬수 있다.

카프카는 높은 가용성/내구성을 제공하는 intra-cluster replication(https://goo.gl/r77Lxg)을 지원한다. 각 파티션은 다수의 레플리카를 가질수 있고, 각각 다른 브로커에 저장된다. 레플리카 중 하나는 'leader'가 되고 나머지는 'follower'가 된다. 카프카는 모든 레플리카를 자동으로 관리하고, 동기 상태를 유지한다. 프로듀서/컨슈머의 요청들은 모두다 리더 레플리카에서 처리된다. 만약 브로커가 fail 상태라면, 해당 브로커의 리더가 있는 파티션은 일시적으로 사용 할 수 없게 된다. 카프카는 이 서비스 불가상태의 파티션 리더를 다른 레플리카로 이동시켜 클라이언트 요청을 계속 처리 할 수 있게 한다. 이 처리는 카프카 브로커 중 하나인 '컨트롤러'에 의해 수행된다. 그리고 주키퍼에 메타 데이터를 읽고 쓰는 작업이 포함되고, 컨트롤러에 의해 순차적으로 수행이 된다.

일반적인 경우, 정상 종료일때에는 컨트롤러가 한번에 하나씩 리더를 이동시킨다.  하나의 리더의 이동은 몇ms밖에 걸리지 않는다 그래서 클라이언트 관점에서는 정상적인 브로커 종료일때는 비가용상태가 작다.

하지만 브로커가 불확실하게 죽어버린다면(ex kill -9), 비가용성은 파티션 갯수에 비례 할 수 있다. 

한 브로커에 2000개의 파티션이 있고 각각 2개의 레플리카가 있다고 가정하자. 그러면 브로커너는 약 1000개의 파티션의 리더를 가지고 있을 것이다. 이 브로커가 죽어버린다면 동시에 1000개의 파티션이 사용할 수 없게 된다. 단일 파티션에 대해 새로운 리더를 선출하는데 5ms가 걸린다고 가정하면, 1000개의 파티션의 리더를 선출하는데는 5초가 걸린다. 그래서, 일부 파티션의 비가용성은 5초에 실패를 감지한 시간이 더해진 시간동안이 될 것이다.

혹시나 실패한 브로커가 '컨트롤러'일수도 있다. 이 경우에는 새 브로커가 컨트롤러가 되기 전까지 리더를 선출하는 작업이 진행되지 않을수도 있다. 컨트롤러의 페일오버는 자동이지만, 새 컨트롤러는 메타 데이터를 초기화 단계에서 주키퍼에서 읽어야 한다. 예를 들어 1만개의 파티션이 있고 개당 2ms가 걸린다고 할때, 20초 이상 장애가 발생할 수 있다. 

일반적으로 이러한 unclean 장애는 드믈지만, 그것도 신경쓰인다면 브로커당 파티션 수를 2-4천개로 제한하고 클러스터의 총 파티션 갯수를 몇 만개 단위로 제한하는게 좋다.


* 파티션 수가 많을 수록 e2e 레이턴시가 증가한다.

카프카에서 e2e latency 는 프로듀서에 의해 메시지가 생성되고, 컨슈머가 메시지를 읽는 그 시간으로 정의한다. 카프카는 메시지가 모든 ISR에 복제된 이후, 즉 커밋이 된 이후 메시지를 노출시킨다. 그래서 메시지를 커밋하는 시간은 e2e 레이턴시의 많은 부분을 차지할 수 있다. 기본적으로  카프카 브로커는 두 브로커간에 복제본을 공유하는 모든 파티션에 대해서 싱글 스레드로 데이터를 복제한다. 우리의 실험에 따르면 브로커간에 1000개의 파티션을 복제하면 약 20ms의 응답시간이 증가한다. 이것은 일부 리얼타임 어플리케이션에서 너무 높은 값이 될 수 있다.

하지만 이 문제는 클러스터가 크다면 좀 더 완화되어질 수 있다. 예를 들어 브로커에 1000 개의 파티션 리더가 있고, 클러스터에 10개의 브로커가 더 있다고 예를 들자. 각 브로커는 100개의 파티션의 데이터만을 가져오면 되고, 추가되는 커밋 시간은 10ms단위가 아닌 ms단위일 것이다.

레이턴시를 고려해야 한다면 브로커당 파티션 수를 100xbxr로 제한하라. b는 클러스터의 브로커 수이고 r는 레플리카 팩터이다.


* 파티션 수가 많을 수록 client 메모리를 더 요구한다.

최근 컨플런트1.0(0.8.2기반0) Java 프로듀서를 좀 더 효과적으로 향상시켯다. 새 기능중 하나는 사용자가 메시지를 버퍼링하는데 사용하는 메모리 상한을 설정할 수 있다는 것이다. 내부적으로 프로듀서는 파티션당 메시지를 버퍼링한다. 데이터가 축적되거나, 충분한 시간이 지나면 메시지는 버퍼에서 브로커로 전송된다.

파티션수가 늘어난다면, 프로듀서의 파티션에 더 많은 메시지가 누적된다. 사용된 메모리 총 합이 설정값을 초과할 수 있다. 이 때 프로듀서는 새 메시지를 차단하거나, 버려야 한다. 어느쪽도 좋은 것은 아니다. 이걸 막으려면 메모리를 설정을 더 크게 해야한다.

좋은 스루풋을 얻으며녀 적어도 파티션당 수십kb를 할 당하고, 파티션 갯수가 늘어날 수록 메모리 총 합도 조정해라.

유사한 이슈가 컨슈머에도 있다. 컨슈머는 파티션 단위로 메시지를 가져온다. 파티션이 많을수록 메모리가 많이 필요하라. 하지만 이것은 일반적이 아닌, 컨슈머의 문제이다.


* 요약

일반적으로 파티션 수가 많으면 카프카 클러스터는 높은 스루풋을 보여준다.  하지만 가용성, 레이턴시와 같은 잠재적인 impact를 인지하고 있어야 한다. 앞으로는 이러한 제약 사항을 개선할 계획이다.



티스토리 툴바