'confluent'에 해당되는 글 2건

  1. 2018.05.31 Schema Registry 는 어떻게 Schema Version 을 확인 하는가?
  2. 2018.03.05 적정한 파티션 갯수

subject란?

- Schema Registry는 Schema를 Subject 단위로 관리되어지는 듯 하다. 디폴트로는 <topc>-key, <topic>-value (ex: data.order-value) 형식으로 사용되지만 confluent 4.1 부터 이 부분을 커스텀마이징 할 수 있는 SubjectNameStrategy가 추가되었다.

기본적으로 사용하는 토픽이 동일하다면 동일한 subject를 보지만, 커스텀마이징 한다면 Record 에 따라 subject로 변경 가능하다.

Get the subject name for the given topic and value type.




1. 최초에 Schema Registry에 등록하고 Schema ID 를 가져오는 동작

- SR은 Cache 기반으로 동작한다 어플리케이션이 시작하면 Application SchemaRegistry는 빈 데이터만을 들고 있다.

- 처음으로 Producer 를 통해서 메시지를 발송하면 Schema 정보를 instance.getSchema() 로 가져온다.

- Registry Server로 요청을 보낸고 응답은 Schema ID로 받는다.

- Map<String, Map<Schema, Integer>> schemaCache 캐시에 등록하고 사용 한다.

- schemaCache 에서 remove 되는 endpoint 는 없다. 즉, 한 번 캐싱이 된다면 다시 호출 되지는 않을 것이다. (어플리케이션에 실행중 instance.getSchema가 변경이 될리는 없기 때문이다)


2. Application이 자동으로 Schema ID를 올리는 것을 막는 방법

- SR 서버 자체의 호환 모드를 NONE 으로 해 되지만 클라이언트 설정에서 auto.register.schemas(default true)를 false로 해도 된다. 이럴 경우 SchemaRegistry RestClientException 이 발생되면서 정상적인 produce가 안된다.




결론

- (avsc에 의해 생성되는) org.apache.avro.specific.SpecificRecord 를 구현하는 인스턴스의 SCHEMA$ 를 가지고 Schema ID를 등록하거나, 내부적으로 캐싱해서 사용한다.

- 이 문자열이 캐싱된다면, 최초의 Produce 할 때만 SR 서버를 호출한다. (즉 이 문자열이 바뀔일이 없기 때문에 1개의 subject당 1번이 호출될 것이다)

- BACKWARD 인지 FORWARD 인지는 순수하게 SR 서버쪽의 담당이다. 클라이언트는 단지 auto.register.schemas: false로 설정함으로써 자동 등록을 해제 할 수 있을 뿐이다.



적정한 파티션 갯수

분류없음 2018.03.05 08:56 posted by dev.bistro

몇개의 파티션으로 구성할 것인가에 대한 도움 글

https://www.confluent.io/blog/how-to-choose-the-number-of-topicspartitions-in-a-kafka-cluster/


읽고 나서 내 맘대로 정리한 내용

- '파티션 갯수가 변경이 된다면, 제대로 메시지가 전달 안될 수 있다. 그래서 현재 필요보다 파티션 수를 많이 구성하라. 파티션 갯수를 변경할 필요가 없도록 해라.  (향후 1-2년치 트래픽도 고민해서 충분한 파티션 수를 고민할것)

- 그때 문제점은 OS의 open file handle이 커지게 된다. (모든 세그먼트 * 2)

- 리더 선출은 주키퍼와 관련된 작업이 포함되고 이것은 선행적이다. 그래서 한 브로커에 파티션 리더가 많다면 선행적으로 파티션 리더를 변경하는 시간이 길어지고, 혹시나 컨트롤러 역할을 하는 브로커가 죽는다면 더 심각해진다.

- 레이턴시가 고려되야 한다면 브로커당 파티션 수를 100x클러스터의 브로커수*레플리카팩터 이하로 제한하라.

- 컨플런트 카프카는 java producer로 커스텀 마이징한다. (기존에는 metrics 부분만 수정하는 줄 알았다), kafka-cpXXX 버전을 쓰는것은 가능한 배제하자.


* 파티션 갯수가 많을 수록 처리량(Throughput)도 높아진다.

처음으로 이해해야 할 것은 Kafka에서는 topic partition이 병렬처리의 단위라는 것이다. producer와 broker 측면에서 본다면 서로 다른 파티션에 데이터를 쓴다는 것은 완전히 병렬로 할 수 있다. 그래서 압축과 같은 비싼 작업을 하드웨어 리소스에 활용할 수 있다. consumer 측면에서 카프카는 하나의 파티션의 데이터가 하나의 컨슈머 스레드에 할당된다. 따라서 consumer(group)의 병럴 처리 단위는 소비하는 파티션 갯수에 의해 제한된다. 따라서 많은 파티션 갯수가 더 많은 처리량을 준다.

파티션 갯수를 선택하는 공식은 처리량을 기반으로 한다. 하나의 파티션에서 데이터를 넣을 수 있는 p, 소비할 수 있는 스루풋을 c 라고 하고 목표 처리량을 t 라고 하자. 그러면 최소  t/p, t/c의 파티션이 필요로 하다.

producer의 파티션별 스루풋은  배치사잊, 압축코덱, ack 쇼ㅔㄷ, replica-factor등의 설정에 따라 달라진다. 그러나 일반적으로 벤치마크에서 보여지는 것처럼 하나의 파티션에서 10MB/sec의 데이터를 넣을 수 있다. 컨슈머의 처리량은 컨슈머가 얼마나 빨리 메시지를 처리할 수 있느냐에 따라 다르다. 실제로 측정해 봐야 한다.

시간이 흘러 파티션 갯수를 증가 시킬 수 있지만 한가지 주의할 점은 메세지가 key를 가지고 생성될 때이다. 키를 가진 메시지가 publishing 될때 카프카는 hash key기반으로 파티션을 매핑하낟. 이렇게 하면 동일한 키가 항상 동일한 파티션으로 가도록 보장해준다. 이 것은 어플리케이션에서 중요할 수 있는데, 만약 파티션 갯수가 변경이 된다면 이것을 보장되지 않을 수도 있다.

이러한 상황을 피하기 위해 과도하게 파티션을 만드는것이 일반적이다. 기본적으로 향후 1-2년치를 고민하고 파티션 갯수를 결정한다. 초기에는 작은 카프카 클러스터를 만들것이고, 시간이 지나 브로커를 추가하며 기존 브로커의 일부를 새 브로커로 옮길 수 있다.(온라인으로도 가능하다) 이 방법은 어플리케이션의 break 없이 스루풋을 증가 시킬 수 있는 방법이다.처리량 이외에도 몇몇 고려해야할 요소가 있다. 많은 파티션 갯수가 부정적인 영향을 줄 수도 있다.


* 많은 파티션 갯수는 더 많은 Open FIle Handles를 필요로 한다.

각 파티션은 브로커내의 파일 스스템 디렉토리에 할당된다.  해당 로그 디렉토리에는 세그먼트당 2개의 파일(색인, 실제 데이터용)이 존재한다. 현재 카프카는 모든(every) 로그 세그먼트에 대해서 인덱스, 데이터 전부다 파일 핸들을 연다. 따라서, 파티션이 많아 질 수록 open file handle이 많이 필요하게 된다. 이건 단지 설정 문제이고, 우리는 브로커당 3천개의 open file handle을 사용하는 클러스터를 확인했다. (1500개 세그먼트? - 세그먼트당 10메가 잡으면 ? - 15기가?)


* 많은 파티션은 비가용성을 증가 시킬수 있다.

카프카는 높은 가용성/내구성을 제공하는 intra-cluster replication(https://goo.gl/r77Lxg)을 지원한다. 각 파티션은 다수의 레플리카를 가질수 있고, 각각 다른 브로커에 저장된다. 레플리카 중 하나는 'leader'가 되고 나머지는 'follower'가 된다. 카프카는 모든 레플리카를 자동으로 관리하고, 동기 상태를 유지한다. 프로듀서/컨슈머의 요청들은 모두다 리더 레플리카에서 처리된다. 만약 브로커가 fail 상태라면, 해당 브로커의 리더가 있는 파티션은 일시적으로 사용 할 수 없게 된다. 카프카는 이 서비스 불가상태의 파티션 리더를 다른 레플리카로 이동시켜 클라이언트 요청을 계속 처리 할 수 있게 한다. 이 처리는 카프카 브로커 중 하나인 '컨트롤러'에 의해 수행된다. 그리고 주키퍼에 메타 데이터를 읽고 쓰는 작업이 포함되고, 컨트롤러에 의해 순차적으로 수행이 된다.

일반적인 경우, 정상 종료일때에는 컨트롤러가 한번에 하나씩 리더를 이동시킨다.  하나의 리더의 이동은 몇ms밖에 걸리지 않는다 그래서 클라이언트 관점에서는 정상적인 브로커 종료일때는 비가용상태가 작다.

하지만 브로커가 불확실하게 죽어버린다면(ex kill -9), 비가용성은 파티션 갯수에 비례 할 수 있다. 

한 브로커에 2000개의 파티션이 있고 각각 2개의 레플리카가 있다고 가정하자. 그러면 브로커너는 약 1000개의 파티션의 리더를 가지고 있을 것이다. 이 브로커가 죽어버린다면 동시에 1000개의 파티션이 사용할 수 없게 된다. 단일 파티션에 대해 새로운 리더를 선출하는데 5ms가 걸린다고 가정하면, 1000개의 파티션의 리더를 선출하는데는 5초가 걸린다. 그래서, 일부 파티션의 비가용성은 5초에 실패를 감지한 시간이 더해진 시간동안이 될 것이다.

혹시나 실패한 브로커가 '컨트롤러'일수도 있다. 이 경우에는 새 브로커가 컨트롤러가 되기 전까지 리더를 선출하는 작업이 진행되지 않을수도 있다. 컨트롤러의 페일오버는 자동이지만, 새 컨트롤러는 메타 데이터를 초기화 단계에서 주키퍼에서 읽어야 한다. 예를 들어 1만개의 파티션이 있고 개당 2ms가 걸린다고 할때, 20초 이상 장애가 발생할 수 있다. 

일반적으로 이러한 unclean 장애는 드믈지만, 그것도 신경쓰인다면 브로커당 파티션 수를 2-4천개로 제한하고 클러스터의 총 파티션 갯수를 몇 만개 단위로 제한하는게 좋다.


* 파티션 수가 많을 수록 e2e 레이턴시가 증가한다.

카프카에서 e2e latency 는 프로듀서에 의해 메시지가 생성되고, 컨슈머가 메시지를 읽는 그 시간으로 정의한다. 카프카는 메시지가 모든 ISR에 복제된 이후, 즉 커밋이 된 이후 메시지를 노출시킨다. 그래서 메시지를 커밋하는 시간은 e2e 레이턴시의 많은 부분을 차지할 수 있다. 기본적으로  카프카 브로커는 두 브로커간에 복제본을 공유하는 모든 파티션에 대해서 싱글 스레드로 데이터를 복제한다. 우리의 실험에 따르면 브로커간에 1000개의 파티션을 복제하면 약 20ms의 응답시간이 증가한다. 이것은 일부 리얼타임 어플리케이션에서 너무 높은 값이 될 수 있다.

하지만 이 문제는 클러스터가 크다면 좀 더 완화되어질 수 있다. 예를 들어 브로커에 1000 개의 파티션 리더가 있고, 클러스터에 10개의 브로커가 더 있다고 예를 들자. 각 브로커는 100개의 파티션의 데이터만을 가져오면 되고, 추가되는 커밋 시간은 10ms단위가 아닌 ms단위일 것이다.

레이턴시를 고려해야 한다면 브로커당 파티션 수를 100xbxr로 제한하라. b는 클러스터의 브로커 수이고 r는 레플리카 팩터이다.


* 파티션 수가 많을 수록 client 메모리를 더 요구한다.

최근 컨플런트1.0(0.8.2기반0) Java 프로듀서를 좀 더 효과적으로 향상시켯다. 새 기능중 하나는 사용자가 메시지를 버퍼링하는데 사용하는 메모리 상한을 설정할 수 있다는 것이다. 내부적으로 프로듀서는 파티션당 메시지를 버퍼링한다. 데이터가 축적되거나, 충분한 시간이 지나면 메시지는 버퍼에서 브로커로 전송된다.

파티션수가 늘어난다면, 프로듀서의 파티션에 더 많은 메시지가 누적된다. 사용된 메모리 총 합이 설정값을 초과할 수 있다. 이 때 프로듀서는 새 메시지를 차단하거나, 버려야 한다. 어느쪽도 좋은 것은 아니다. 이걸 막으려면 메모리를 설정을 더 크게 해야한다.

좋은 스루풋을 얻으며녀 적어도 파티션당 수십kb를 할 당하고, 파티션 갯수가 늘어날 수록 메모리 총 합도 조정해라.

유사한 이슈가 컨슈머에도 있다. 컨슈머는 파티션 단위로 메시지를 가져온다. 파티션이 많을수록 메모리가 많이 필요하라. 하지만 이것은 일반적이 아닌, 컨슈머의 문제이다.


* 요약

일반적으로 파티션 수가 많으면 카프카 클러스터는 높은 스루풋을 보여준다.  하지만 가용성, 레이턴시와 같은 잠재적인 impact를 인지하고 있어야 한다. 앞으로는 이러한 제약 사항을 개선할 계획이다.



티스토리 툴바