max.in.flight.request.per.connection

분류없음 2018.06.18 14:51 posted by dev.bistro

필요한 수준까지 이해한듯 해서, 메모로 남김

원래의 카프카는 데이터 주입에 대한 순서가 중요하지 않았다.  (정확히는 어느 정도의 버퍼를 카프카 브로커가 가지면서 Event Time에 대한 조정 작업을 알아서 해준다)

하지만 producer의 명등성 옵션과 순서 보장에 대한 이슈가 중요해졌고 max.in.flight.request.per.connection = 1 로 셋팅하면서 순서 꼬임을 좀 방어하고자 했는데...

1로 설정하고 idempotence 옵션이 켜져있고 retry가 가능한 상태일때 
전송하다가 OutOfOrderSequence 예외가 발생한다면 클라이언트 영역에서 이 Sequence Number를 잘 처리해야한다. (재전송 하거나, 다음 idempotence 전송에서 써야한다)


이게 어렵다... 그래서 그냥 하드 코딩으로 각 Topic Partition의 5개의 메타데이터를 들고 있게한것이다. 

if (!inflightBatchesBySequence.containsKey(batch.topicPartition)) {
inflightBatchesBySequence.put(batch.topicPartition, new PriorityQueue<>(5, new Comparator<ProducerBatch>() {
@Override
public int compare(ProducerBatch o1, ProducerBatch o2) {
return o1.baseSequence() - o2.baseSequence();
}
}));

그래서 kafka 문서의 "enable.idempotence" 항목을 보면 Note that enabling idempotence requires max.in.flight.requests.per.connection to be less than or equal to 5 라고 나온다.


idempotence 켜져있고 나부의 매직 넘버인 5보다 크게 셋팅한다면 sequence를 보장하지 못할 수 있기 때문이다.

실제로도 

        if (idempotenceEnabled && 5 < config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)) {

            throw new ConfigException("Must set " + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to at most 5" +

                    " to use the idempotent producer.");

        }

코드에 의해서 실행 자체를 막아놨다.



요약
1. 그냥 디폴트로 쓰자.
2. 프로듀서에서 동기식으로 사용한다면 (.get() 호출) 더더욱 상관하지 않아도 된다.


subject란?

- Schema Registry는 Schema를 Subject 단위로 관리되어지는 듯 하다. 디폴트로는 <topc>-key, <topic>-value (ex: data.order-value) 형식으로 사용되지만 confluent 4.1 부터 이 부분을 커스텀마이징 할 수 있는 SubjectNameStrategy가 추가되었다.

기본적으로 사용하는 토픽이 동일하다면 동일한 subject를 보지만, 커스텀마이징 한다면 Record 에 따라 subject로 변경 가능하다.

Get the subject name for the given topic and value type.




1. 최초에 Schema Registry에 등록하고 Schema ID 를 가져오는 동작

- SR은 Cache 기반으로 동작한다 어플리케이션이 시작하면 Application SchemaRegistry는 빈 데이터만을 들고 있다.

- 처음으로 Producer 를 통해서 메시지를 발송하면 Schema 정보를 instance.getSchema() 로 가져온다.

- Registry Server로 요청을 보낸고 응답은 Schema ID로 받는다.

- Map<String, Map<Schema, Integer>> schemaCache 캐시에 등록하고 사용 한다.

- schemaCache 에서 remove 되는 endpoint 는 없다. 즉, 한 번 캐싱이 된다면 다시 호출 되지는 않을 것이다. (어플리케이션에 실행중 instance.getSchema가 변경이 될리는 없기 때문이다)


2. Application이 자동으로 Schema ID를 올리는 것을 막는 방법

- SR 서버 자체의 호환 모드를 NONE 으로 해 되지만 클라이언트 설정에서 auto.register.schemas(default true)를 false로 해도 된다. 이럴 경우 SchemaRegistry RestClientException 이 발생되면서 정상적인 produce가 안된다.




결론

- (avsc에 의해 생성되는) org.apache.avro.specific.SpecificRecord 를 구현하는 인스턴스의 SCHEMA$ 를 가지고 Schema ID를 등록하거나, 내부적으로 캐싱해서 사용한다.

- 이 문자열이 캐싱된다면, 최초의 Produce 할 때만 SR 서버를 호출한다. (즉 이 문자열이 바뀔일이 없기 때문에 1개의 subject당 1번이 호출될 것이다)

- BACKWARD 인지 FORWARD 인지는 순수하게 SR 서버쪽의 담당이다. 클라이언트는 단지 auto.register.schemas: false로 설정함으로써 자동 등록을 해제 할 수 있을 뿐이다.



Should You Put Several Event Types in the Same Kafka Topic?

링크 : https://www.confluent.io/blog/put-several-event-types-kafka-topic/ 에서 필요한 부분만 정리

카프카를 쓸 때 가장 중요한 것 중 하나는 토픽을 어떻게 쓸 것인가이다. 만약 여러 이벤트 묶음이 있다면 한 토픽에 넣을 것인가, 여러 토픽에 넣을 것인가? 극단적으로 하나의 토픽에 넣는것은 좋은 생각이 아니다. 컨슈머가 관심있는 이벤트를 선택해서 소비할 수 있는 방법이 없기 때문이다. 반대로 너무 많은 것도 좋은 건 아니다.

사실 성능관점에서 중요한것은 파티션의 갯수이다. 경험에 비추어 보면 레이턴시가 중요하다면 브로커 노드당 수백의 토픽-파티션을 가질 것이고, 아마 많을 수록 레이턴시 관점에서는 대기 시간이 줄어들 것이다.


Topic = collection of events of the same type?

일반적으로는 같은 타입의 이벤트는 같은 토픽을 사용하고, 다른 타입은 다른 토픽을 사용 하는 것이다.

Confluent 스키마 레지스트리는 이 패턴을 사용한다. topic의 모든 메시지에 대해서 동일한 Avro 스키마를 사용하도록 구너장하기 떄문이다. (옵션을 추가해서) 호환성을 유지할 수는 있지만, 궁극적으로 모든 메시지는 레코드 타입을 준수해야 한다. 일반적으로 이러면 되는데, 이벤트 소싱이나 MSA에서 데이터 교환같은 목적으로 사용 하는 사람들도 있다.

이때에는 topic이 같은 스키마를 가지는것이 덜 중요해질수는 있다 그것보다는 topic-partition 내에서 순서를 보장하는게 더 중요하다.

예를 들어 고객이 주소변경/새신용카드추가/조회/지불/계정파기... 이런것들은 순서가 중요하다. 이 순서는 '같은 파티션'을 사용함으로서 보장받을 수 있고, 고객 ID를 파티션 키로 사용한다면 동일한 topic-partiton에 들어갈 것이다.



Ordering problems

customerCreated, customerAddressChanged, and customerInvoicePaid 를 서로 다른 토픽으로 사용한다면 컨슈머는 순서의 의미없는 이벤트라 볼 것이다. 예를 들어 컨슈머는 존재하지 않는 고객에 대해서 주소 변경 이벤트를 받을 수 있을 것이다. 만약 이렇게 다른 토픽을 사용한다면 이벤트를 시간 동기화 처리를 해서 사용할 수 있겠지만 악몽이다. 


When to split topics, when to combine

몇가지 제안 할 것이다.

1. 가장 중요한것은 '고정된 순서로 머물러야 하는 모든 이벤트는' 동일한 토픽을 사용해야 한다.

2. 한 엔티티가 다른 엔티티에 의존하거나, 함께 자주 변경이 된다면 같은 토픽을 사용할수도 있다. (예를 들어 주소가 고객에게 속하는 것처럼) 한편으로 다른 팀과 관련이 없다거나, 다른 팀에서 관리하는 경우에는 토픽을 분리하라 또 하나의 엔티티 타입이 과도하게 비율이 높다면 별도로 분리하는 것을 고민하라 (다른 이벤트 타입들을 위해서이다)

3. 이벤트에 여러 엔티티가 관련된 경우 (예를 들어 구매-제품-고객) 처음에는 이벤트를 원자 단위로 기록하고, 여러 토픽으로 분할 하지 마라. 가능한 받은 그대로 이벤트를 기록하라 스트림을 이용해서 분리할 수 있지만 어려워질 것이다.

4. 컨슈머를 살펴보고 여러 토픽을 구독한다면 그 토픽들은 합쳐야 될 수도 있다. 그럴 경우 원치 않은 이벤트를 소비할 수도 있지만 그것은 큰 문제가 아니다. 이벤트를 소비하는 것은 매우 싸기 때문에 이벤트의 절반을 무시해도 큰 문제는 아니다. 대부분의 이벤트를 무시할 경우만 (99프로) 분리를 고민해라.

5. 카프카스트림이 사용하는 changelog topic은 다른 토픽에서 분리되어야 한다.


Schema management

정적 스키마가 없는 json으로 인코딩을 한다면, 쉽게 다양한 이벤트 타입을 하나의 토픽에 넣을 수 있다. 하지만 avro 를 사용할 경우 여러 이벤트 타입을 처리하는데 좀 더 많은 작업을 해야한다. 위에 언급한 것처럼 현재의 컨플런트 스키마 레지스트리는 각 토픽마다 하나의 스키마가 있다고 가정한다. 새 버전의 스키마를 등록할 수 있고, 호환되는지 체크한다. 좋은 점은 서로 다른 스키마 버전을 동시에 사용하는 프로듀서/컨슈머를 가질 수 있고, 서호 호환 가능하다는 점이다. 

컨플런트 Avro 시리얼라이저에 key.subject.name.strategy 를 추가하였으니 이걸 좀 더 참고하면 된다.


Event Sourcing Using Apache Kafka

분류없음 2018.03.22 09:37 posted by dev.bistro

https://www.confluent.io/blog/event-sourcing-using-apache-kafka/ 를 읽고 대충 필요한 것만 적음.

추가적으로 알게된 내용보다는 다시 한번 지식을 되새겨볼 수 있던 기회.


Storing events in Kafka


첫 번째 문제는 이벤트를 어떻게 저장하는가이다. 3개의 방법으로 얘기할 수 있다.

1. 모든 타입의 모든 이벤트를 하나의 토픽에 저장하는 방법 (물론 멀티 파티션)

2. Topic-per-entity-type : entity별로 관련된 이벤트들을 분리된 토픽에 저장하는 방법 (예를 들어 user관련 토픽, product관련 토픽)

3. topic-per-entity : 각각의 user, product 처럼 각 entity별로 별도의 토픽을 할당해서 저장하는 방법

low-cardinality entity를 제외하고는 3번째 전략은 실현 가능한 방법은 아니고 1,2번중 선택해야 한다.

1번은 모든 이벤트를 볼 수 있는 장점이 있고 2번은 entity type별로 분리하고 scale 할 수 있다.  아래으  내용은 대부분 1번을 기반으로 얘기를 이어나간다. 2번에 대해 더 알고 싶다면 https://www.confluent.io/blog/put-several-event-types-kafka-topic/ 를 참고하면 된다.


Basic event-sourcing storage operations

데이터 저장소에서 기대하는 가장 기본적인 명령은 '특정 엔티티의 현재 상태를 얻는 것'이다. 일반적으로 각 엔티티는 ID가 있고, ID가 주어지면 저장 시스템은 현재 상태를 반환해 줘야한다.

이벤트 로그는 primary source of truth 이고, '현재 상태'는 엔티티의 이벤트 스트림에서 파생 될 수 있다. 이를 위해서 스토리지 엔진은 Event => State => State를 반환하는 순수 함수가 필요하다.

카프카에서 'read current state'를 native하게 구현해본다면 카프카는 토픽의 모든 이벤트를 필터링 하고, 주어진 function으로 부터 상태를 구해야한다. 만약 많은 이벤트가 있다면 느리고 자원을 많이 소모할 것이다. 그리고 장애나, 캐시 클리어 등으로 인해서 다시 해야할 수도 있다.

그래서 더 좋은 방법이 필요한다. KafkaStreams와 StateStore를 이용하는 것이다. 

Kafka Streams는 고수준의 오퍼레이션을 제공하는데 그 중 하나는 스트림을 localstate로 folding하는 기능이다.

각 local store는 노드에서 사용하는 파티션의 데이터만 포함된다. 기본적으로 RocksDB(디폴트), inMemoryStore 2개의 구현체가 있다. 둘 다 fault-tolerant 이고, in-memory를 사용하더라도 데이터가 손실되지 않는다(주: local store topic을 현실화 하는 것이기 때문이다)

이렇게 이벤트 소싱에서 이벤트 스트림을 state store로 변경해서 각 엔티티의 '현재 상태'를 구할 수 있다. 


Looking up the current state

같은 노드에 있는 데이터를 쿼리하는 건 쉽다. 하지만 다른 노드에 있는 데이터를 쿼리하려면 kafka의 메타 데이터에 대해 먼저 질의한 다음 특정 ID에 대해 처리를 담당하는 노드를 찾을 수 있다. ( streamsMetadataForStoreAndKey ) 그런 다음 rest, akka-remoting 등의 방법으로 해당 노드로 요청을 하면 된다. 


Fail-over

State Store가 좋지만, 노드가 장애라면? 새로 만든다는 것은 비용이 많이 드는 작업이다.  KafkaStreams는 리밸런싱(노드의 추가/삭제) 때문에 레이턴시가 증가하거나, 요청 자체가 실패할 수 있다.  

이 이유 때문에 state-store가  changelog-topic에 로깅된다. 이 topic은 모든 이벤트가 필요없고 최신분만 필요하므로 compacted(압축) 되어 있다. 덕분에 다른 노드에서 다시 store를 구축하는게 빠르다. 하지만 여전히 리밸런싱에 대한 latency는 증가될 수 있다.  이에 대한 해결방법으로 standby 를 제공한다. primary 가 실패한다면 바로 standby 로 장애를 처리한다.


Consistency

카프카는 기본은 at-least-once 정책이다. exactly-once 정책도 지원하지만 topic을 읽고/쓰기 할 때에만 보장해준다. processing.guarantee 옵션을 켜서 exactly-once를 사용 할 수 있으며, 성능은 저하되지만 batch message의 집합이 한번의 커밋에 되므로 큰 비용은 아니다.


Listening for events

지금 까지 'current state'를 업데이트 하고 질의하는 걸 알아봤고, 이제 사이드 이펙트에 대해서 알아 볼 차례이다( 예를 들어 rest 등의 외부 호출이네 이메일 보내기)

이러한 작업들은 blocking이 일어나며 I/O를 발생하므로 state-update의 로직에 포함되는건 종은 생각이 아니다. 그리고 main 스레드에서 처리를 하는것은 성능상의 병목을 일으킨다. 그리고 state-update 자체가 여러번 실행 될 수 있다.

다행히 카프카 토픽을 처리할 많은 유연성을 가질 수 있다. state를 업데이트 전후에 메시지를 소비할 수 있고, 이 메시지 소비 전후에 원하는 방식으로 사용 할 수 있다.




Kafka Header

분류없음 2018.03.19 15:51 posted by dev.bistro

카프카의 Header 지원

- 0.11 버전이후 user(custom) header를 지원하기 시작하였고 아래 위키와 이슈를 참고

- https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers

- https://issues.apache.org/jira/browse/KAFKA-4208


실제로 저장되는 형식과 구현체는 아래를 참고

- https://kafka.apache.org/documentation/#record

- https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java#L175



Producer 코드를 따라가며 Key, Header가 어떻게 저장이 되는거 확인 

- Producer는 2개의 구현체가 있지만 MockProducer는 테스트 용이기 때문에 KafkaProducer만 확인하면 된다.

- 실제로 KafkaProducer에서 doSend를 보면 인자로 받는 ProducerRecord 안에 headers 정보가 있다. 

- 이 정보를 이용하여 byte[] serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key()); '시리얼라이징 된 객체 데이터'를 생성한다. 하지만 기본 제공 하는 Key Serializer인 ExtendedSerializer은 Header를  무시한채 Key Data만을 가지고 시리얼라이징 한다. 


적정한 파티션 갯수

분류없음 2018.03.05 08:56 posted by dev.bistro

몇개의 파티션으로 구성할 것인가에 대한 도움 글

https://www.confluent.io/blog/how-to-choose-the-number-of-topicspartitions-in-a-kafka-cluster/


읽고 나서 내 맘대로 정리한 내용

- '파티션 갯수가 변경이 된다면, 제대로 메시지가 전달 안될 수 있다. 그래서 현재 필요보다 파티션 수를 많이 구성하라. 파티션 갯수를 변경할 필요가 없도록 해라.  (향후 1-2년치 트래픽도 고민해서 충분한 파티션 수를 고민할것)

- 그때 문제점은 OS의 open file handle이 커지게 된다. (모든 세그먼트 * 2)

- 리더 선출은 주키퍼와 관련된 작업이 포함되고 이것은 선행적이다. 그래서 한 브로커에 파티션 리더가 많다면 선행적으로 파티션 리더를 변경하는 시간이 길어지고, 혹시나 컨트롤러 역할을 하는 브로커가 죽는다면 더 심각해진다.

- 레이턴시가 고려되야 한다면 브로커당 파티션 수를 100x클러스터의 브로커수*레플리카팩터 이하로 제한하라.

- 컨플런트 카프카는 java producer로 커스텀 마이징한다. (기존에는 metrics 부분만 수정하는 줄 알았다), kafka-cpXXX 버전을 쓰는것은 가능한 배제하자.


* 파티션 갯수가 많을 수록 처리량(Throughput)도 높아진다.

처음으로 이해해야 할 것은 Kafka에서는 topic partition이 병렬처리의 단위라는 것이다. producer와 broker 측면에서 본다면 서로 다른 파티션에 데이터를 쓴다는 것은 완전히 병렬로 할 수 있다. 그래서 압축과 같은 비싼 작업을 하드웨어 리소스에 활용할 수 있다. consumer 측면에서 카프카는 하나의 파티션의 데이터가 하나의 컨슈머 스레드에 할당된다. 따라서 consumer(group)의 병럴 처리 단위는 소비하는 파티션 갯수에 의해 제한된다. 따라서 많은 파티션 갯수가 더 많은 처리량을 준다.

파티션 갯수를 선택하는 공식은 처리량을 기반으로 한다. 하나의 파티션에서 데이터를 넣을 수 있는 p, 소비할 수 있는 스루풋을 c 라고 하고 목표 처리량을 t 라고 하자. 그러면 최소  t/p, t/c의 파티션이 필요로 하다.

producer의 파티션별 스루풋은  배치사잊, 압축코덱, ack 쇼ㅔㄷ, replica-factor등의 설정에 따라 달라진다. 그러나 일반적으로 벤치마크에서 보여지는 것처럼 하나의 파티션에서 10MB/sec의 데이터를 넣을 수 있다. 컨슈머의 처리량은 컨슈머가 얼마나 빨리 메시지를 처리할 수 있느냐에 따라 다르다. 실제로 측정해 봐야 한다.

시간이 흘러 파티션 갯수를 증가 시킬 수 있지만 한가지 주의할 점은 메세지가 key를 가지고 생성될 때이다. 키를 가진 메시지가 publishing 될때 카프카는 hash key기반으로 파티션을 매핑하낟. 이렇게 하면 동일한 키가 항상 동일한 파티션으로 가도록 보장해준다. 이 것은 어플리케이션에서 중요할 수 있는데, 만약 파티션 갯수가 변경이 된다면 이것을 보장되지 않을 수도 있다.

이러한 상황을 피하기 위해 과도하게 파티션을 만드는것이 일반적이다. 기본적으로 향후 1-2년치를 고민하고 파티션 갯수를 결정한다. 초기에는 작은 카프카 클러스터를 만들것이고, 시간이 지나 브로커를 추가하며 기존 브로커의 일부를 새 브로커로 옮길 수 있다.(온라인으로도 가능하다) 이 방법은 어플리케이션의 break 없이 스루풋을 증가 시킬 수 있는 방법이다.처리량 이외에도 몇몇 고려해야할 요소가 있다. 많은 파티션 갯수가 부정적인 영향을 줄 수도 있다.


* 많은 파티션 갯수는 더 많은 Open FIle Handles를 필요로 한다.

각 파티션은 브로커내의 파일 스스템 디렉토리에 할당된다.  해당 로그 디렉토리에는 세그먼트당 2개의 파일(색인, 실제 데이터용)이 존재한다. 현재 카프카는 모든(every) 로그 세그먼트에 대해서 인덱스, 데이터 전부다 파일 핸들을 연다. 따라서, 파티션이 많아 질 수록 open file handle이 많이 필요하게 된다. 이건 단지 설정 문제이고, 우리는 브로커당 3천개의 open file handle을 사용하는 클러스터를 확인했다. (1500개 세그먼트? - 세그먼트당 10메가 잡으면 ? - 15기가?)


* 많은 파티션은 비가용성을 증가 시킬수 있다.

카프카는 높은 가용성/내구성을 제공하는 intra-cluster replication(https://goo.gl/r77Lxg)을 지원한다. 각 파티션은 다수의 레플리카를 가질수 있고, 각각 다른 브로커에 저장된다. 레플리카 중 하나는 'leader'가 되고 나머지는 'follower'가 된다. 카프카는 모든 레플리카를 자동으로 관리하고, 동기 상태를 유지한다. 프로듀서/컨슈머의 요청들은 모두다 리더 레플리카에서 처리된다. 만약 브로커가 fail 상태라면, 해당 브로커의 리더가 있는 파티션은 일시적으로 사용 할 수 없게 된다. 카프카는 이 서비스 불가상태의 파티션 리더를 다른 레플리카로 이동시켜 클라이언트 요청을 계속 처리 할 수 있게 한다. 이 처리는 카프카 브로커 중 하나인 '컨트롤러'에 의해 수행된다. 그리고 주키퍼에 메타 데이터를 읽고 쓰는 작업이 포함되고, 컨트롤러에 의해 순차적으로 수행이 된다.

일반적인 경우, 정상 종료일때에는 컨트롤러가 한번에 하나씩 리더를 이동시킨다.  하나의 리더의 이동은 몇ms밖에 걸리지 않는다 그래서 클라이언트 관점에서는 정상적인 브로커 종료일때는 비가용상태가 작다.

하지만 브로커가 불확실하게 죽어버린다면(ex kill -9), 비가용성은 파티션 갯수에 비례 할 수 있다. 

한 브로커에 2000개의 파티션이 있고 각각 2개의 레플리카가 있다고 가정하자. 그러면 브로커너는 약 1000개의 파티션의 리더를 가지고 있을 것이다. 이 브로커가 죽어버린다면 동시에 1000개의 파티션이 사용할 수 없게 된다. 단일 파티션에 대해 새로운 리더를 선출하는데 5ms가 걸린다고 가정하면, 1000개의 파티션의 리더를 선출하는데는 5초가 걸린다. 그래서, 일부 파티션의 비가용성은 5초에 실패를 감지한 시간이 더해진 시간동안이 될 것이다.

혹시나 실패한 브로커가 '컨트롤러'일수도 있다. 이 경우에는 새 브로커가 컨트롤러가 되기 전까지 리더를 선출하는 작업이 진행되지 않을수도 있다. 컨트롤러의 페일오버는 자동이지만, 새 컨트롤러는 메타 데이터를 초기화 단계에서 주키퍼에서 읽어야 한다. 예를 들어 1만개의 파티션이 있고 개당 2ms가 걸린다고 할때, 20초 이상 장애가 발생할 수 있다. 

일반적으로 이러한 unclean 장애는 드믈지만, 그것도 신경쓰인다면 브로커당 파티션 수를 2-4천개로 제한하고 클러스터의 총 파티션 갯수를 몇 만개 단위로 제한하는게 좋다.


* 파티션 수가 많을 수록 e2e 레이턴시가 증가한다.

카프카에서 e2e latency 는 프로듀서에 의해 메시지가 생성되고, 컨슈머가 메시지를 읽는 그 시간으로 정의한다. 카프카는 메시지가 모든 ISR에 복제된 이후, 즉 커밋이 된 이후 메시지를 노출시킨다. 그래서 메시지를 커밋하는 시간은 e2e 레이턴시의 많은 부분을 차지할 수 있다. 기본적으로  카프카 브로커는 두 브로커간에 복제본을 공유하는 모든 파티션에 대해서 싱글 스레드로 데이터를 복제한다. 우리의 실험에 따르면 브로커간에 1000개의 파티션을 복제하면 약 20ms의 응답시간이 증가한다. 이것은 일부 리얼타임 어플리케이션에서 너무 높은 값이 될 수 있다.

하지만 이 문제는 클러스터가 크다면 좀 더 완화되어질 수 있다. 예를 들어 브로커에 1000 개의 파티션 리더가 있고, 클러스터에 10개의 브로커가 더 있다고 예를 들자. 각 브로커는 100개의 파티션의 데이터만을 가져오면 되고, 추가되는 커밋 시간은 10ms단위가 아닌 ms단위일 것이다.

레이턴시를 고려해야 한다면 브로커당 파티션 수를 100xbxr로 제한하라. b는 클러스터의 브로커 수이고 r는 레플리카 팩터이다.


* 파티션 수가 많을 수록 client 메모리를 더 요구한다.

최근 컨플런트1.0(0.8.2기반0) Java 프로듀서를 좀 더 효과적으로 향상시켯다. 새 기능중 하나는 사용자가 메시지를 버퍼링하는데 사용하는 메모리 상한을 설정할 수 있다는 것이다. 내부적으로 프로듀서는 파티션당 메시지를 버퍼링한다. 데이터가 축적되거나, 충분한 시간이 지나면 메시지는 버퍼에서 브로커로 전송된다.

파티션수가 늘어난다면, 프로듀서의 파티션에 더 많은 메시지가 누적된다. 사용된 메모리 총 합이 설정값을 초과할 수 있다. 이 때 프로듀서는 새 메시지를 차단하거나, 버려야 한다. 어느쪽도 좋은 것은 아니다. 이걸 막으려면 메모리를 설정을 더 크게 해야한다.

좋은 스루풋을 얻으며녀 적어도 파티션당 수십kb를 할 당하고, 파티션 갯수가 늘어날 수록 메모리 총 합도 조정해라.

유사한 이슈가 컨슈머에도 있다. 컨슈머는 파티션 단위로 메시지를 가져온다. 파티션이 많을수록 메모리가 많이 필요하라. 하지만 이것은 일반적이 아닌, 컨슈머의 문제이다.


* 요약

일반적으로 파티션 수가 많으면 카프카 클러스터는 높은 스루풋을 보여준다.  하지만 가용성, 레이턴시와 같은 잠재적인 impact를 인지하고 있어야 한다. 앞으로는 이러한 제약 사항을 개선할 계획이다.

stream kafka app 에 rabbitmq sleuth 사용하기

Cloud 2018.01.31 20:06 posted by dev.bistro

현재 팀은 SpringBoot application 을  좀 더 운영하기 쉽도록 하기 위해서 starter를 하나 생성해서 사용하고 있다.

cloud-netflix 셋팅과, 그 많고 많은 zuul, eureka 버그 픽스를 위한 몇몇 코드, 그리고 config server를 기반으로 한 다양한 설정들을 공통화 하기 위한 목적이다. 현재 진행 하는 프로젝트는 이 starter 를 쓰지 않고 순수하게 spring/kafka 만을 사용 하고 있다.

이 프로젝트를 위해 별도의 metrics 시스템을 구축하기 보다는 기존의 sleuth-zipkin 인프라를 활용하기 위해서 spring-cloud-sleuth-stream을 사용하려 한다.


1. gradle 설정 추가

['stream-kafka'].collect {
compile "org.springframework.cloud:spring-cloud-starter-$it"
}
compile 'org.springframework.cloud:spring-cloud-stream'

에서 

compile 'org.springframework.cloud:spring-cloud-sleuth-stream'
compile 'org.springframework.cloud:spring-cloud-stream-rabbit'
compile "org.springframework.cloud:spring-cloud-starter-sleuth'

를 추가하였다.


2. 설정 추가

spring:
application:
name: pacman
rabbitmq:
addresses: 172.18.176.196:5672
username: rabbitmq
password: rabbitmq
sleuth:
sampler:
percentage: 1.0
stream:
enabled: true
zipkin:
service:
name: vine-event-pacman

* zipkin.service.name 을 추가해야만 했다. application.name 을 그대로 사용 할 줄 알았는데.. 어디선가 자동으로 해주고 있는 거였나보다.

이렇게만 설정하면 아래와 같은 에러를 볼 수 있다.  

현재 우리의 zipkin은 rabbitmq 기반인데, 하필이면 이 프로젝트는 binder로 kafka를 사용하고 있었다.
당연하게 stream binder로 kafka, rabbit를 동시에 쓰고 있어서 발생한 문제였고, DefaultBinderFactory 를 확인하여 default binder 를 설정함으로서 해결 할 수 있었다.

Caused by: java.lang.IllegalStateException: A default binder has been requested, but there is more than one binder available for 'org.springframework.integration.channel.DirectChannel' : kafka,rabbit, and no default binder has been set

cloud:
stream:
default-binder: "rabbit"



kafka stream의 fail-over & high-availability

Cloud 2018.01.30 19:55 posted by dev.bistro

18/01/30 idea


카프카 자체의 data recovery는 훌륭하다. 당연한게 파일로 저장하고 심지어 여러 셋트로 저장한다. 유실되는 경우는 저장 기간을 넘기거나, 1개의 IDC에만 설치한 후 IDC가 물에 잠기는 방법 뿐이다. 이건 다 아는 얘기고, 문제는 이 kafka 를 기반으로 한 streams 를 서비스 할 때이다.


1. Kafka는 기본적으로 1 개의 TopicPartion 에서 1 개의 Consumer Group의 노드가 접속 한다.

2. Kafka Topic은 KStream/KTsble/StateStore 등을 통해서 각각의 topic-partition을  local DB에 싱크를 맞춘후 materilized 한다  (비동기/latency 존재)


우선 1번의 문제부터....

1. 해당 TopicPartion의 정보를 단 1개의 VM(instance)만이 가지고 있을 수 있다 (Application Cluster 자체를 active-standby로 구성하지 않는이상)

 이 얘기는 해당 Node가 죽어버리면 '그 순간' 그 데이터들은 VM instnace안에서 존재 하지 않는 다는 것이다. 

 물론 Kafka Broker 내에는 존재하기 때문에 일정 시간의 기다린다면 새로운 Instacne가 해당 데이터를 살릴 수 있다. 하지만 그 '일정 시간의 텀'이 문제이다.

 위에서 얘기한 데로 Application 자체를 standby로 준비하지 않는 이상 힘들다. kafka 역시 standby 로 해결한다.

 링크 : https://kafka.apache.org/10/documentation/streams/developer-guide/config-streams.html#id10

 동일한 셋을 하나 더 구성하지만 그 녀석은 말그대로 standby이다. 실제 서비스에는 투입이 되지 않으며, resource 역시 많이 차지하지만, 장애 시간을 짧게 가져가는데 필수적인 요소이다.

 StreamThread 내에서 task를 active 와 standby로 가지고 있다. 


 이 부분은 ... 계속 파야할 것 같다.



2. Event가 topic으로 전송이 된다면 해당 topic-partition을 materialized view 로 가지고 있는 consuer group 의 instance는 consumer로 붙는다.

   이 때 topic과 state-store의 데이터 불일치는 필연적이다.

   이 때 state-store에서 get을 하는 '재고차감' 같은 Domain은 어떤 식으로 처리 할 것인가?

   A. 모든것을 sync로 한다 - 의미 없으니 pass

   B. instance의 state-store를 ReadyOnlyQueryType으로 사용하는게 아닌 WriteableQueryType으로 사용하여, put도 할 수 있게 한다. 그리고 이 changelog는 

      remote kafka cluster와 연동한다. 

      atmoic을 보장 할 수 있는 방법이고, 서버가 죽었을때도 문제가 되지 않는다. 내부 rocksdb에 데이터는 유지되므로 다시 살리기만 한다면 cluster와 동기화를 할 수 있을테니까..

      문제는 server가 살릴수 없을 때이다. 이 이슈는 바로 데이터 유실로 이어진다.



적당한 방법들은 보이지만, 정답인지 아닌지 확신이 안선다.

      



Kafka 기반의 event driven stateful microservices

Cloud 2018.01.28 12:58 posted by dev.bistro


18/01/26 idea


구현해야 할 도메인은 https://www.confluent.io/blog/building-a-microservices-ecosystem-with-kafka-streams-and-ksql/

필요한 내용은

- 정확한 request/ responses
- scale out
- high availability
- ktable & state-store를 기반으로 한 local - storage의 사용

블로그 글과 예제에 대해서는 100퍼센트 공감이 되지만, 몇몇 부분에서는 '구현' 자체가 고민스러움.
나 뿐 아니라 다른 이의 구현을 살펴보아도 동일하다.  아래의 github 개발자도 위의 블로깅 내용을 보고 example project를 작성하였지만 같은 문제점을 그대로 노출하고 있다..  

- https://github.com/hpgrahsl/kafka-streams-emojitracker
- 사용 : confluent connect, stream, statestore, reactive(boot2.0)
- 특징 : 트랜잭션 사용안함(at_least_once) / 

- 데이터 서칭

더보기

이 분은 그리고 request/reply 모델이 아니다.  그래서 아래와 같은 부분에 대해서는 구현이 빠져있다.
https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/main/java/io/confluent/examples/streams/microservices/OrdersService.java#L232 

reply에 대한 고민은 
- 채번을 하고, 해당 Order Id의 reply를 consumer로 받을 수 있는 Node으로 요청을 보낸다.
  이슈) zuul, client load loadbalancer가 주키퍼를 기반으로 브로커 파티션/파티셔너 정보를 알고 있어야 한다.
- zuul 에서 기존 routing 정책에 의해 요청을 보낸후, 실제 Node에서 '응답을 받을 수 있는' 채번을 한다.
  이슈) Range(0,PartionNum).boxed().filter(local-state::matchingkey).findFirst();
     그럼 아마 ID가 1,7,11 이런식으로 생기겠지. (중간에 사용못하는 애들 생김)

- 또는... 정말 옛날로 돌아간다. (생각도 안해봤지만, 팀원분께서 차라리 이럴꺼면 2번 호출해라고)
  ex) 채번 자체는 Node 에서 하고, 이벤트를 발생한다. 그리고 그 노드가 '응답'을 받는 다는 보장이 없으므로 Mono로 FT로 바로 리턴, FT는 응답을 바로 받고 'OrderId'를 가지고 다시 한번 zuul 영역을 호출한다. -_-;

- 또는 제일 심플하게 각각의 Node를 Consumer Group으로 사용하지 않는다!
  이슈) 트래픽이 WAS 만큼 '배수'로 든다. 즉 기존에 WAS 50대로 10MB/SEC 트래픽이었다면 이렇게 하면 500MB/SEC -_-;;;



티스토리 툴바