max.in.flight.request.per.connection

분류없음 2018.06.18 14:51 posted by dev.bistro

필요한 수준까지 이해한듯 해서, 메모로 남김

원래의 카프카는 데이터 주입에 대한 순서가 중요하지 않았다.  (정확히는 어느 정도의 버퍼를 카프카 브로커가 가지면서 Event Time에 대한 조정 작업을 알아서 해준다)

하지만 producer의 명등성 옵션과 순서 보장에 대한 이슈가 중요해졌고 max.in.flight.request.per.connection = 1 로 셋팅하면서 순서 꼬임을 좀 방어하고자 했는데...

1로 설정하고 idempotence 옵션이 켜져있고 retry가 가능한 상태일때 
전송하다가 OutOfOrderSequence 예외가 발생한다면 클라이언트 영역에서 이 Sequence Number를 잘 처리해야한다. (재전송 하거나, 다음 idempotence 전송에서 써야한다)


이게 어렵다... 그래서 그냥 하드 코딩으로 각 Topic Partition의 5개의 메타데이터를 들고 있게한것이다. 

if (!inflightBatchesBySequence.containsKey(batch.topicPartition)) {
inflightBatchesBySequence.put(batch.topicPartition, new PriorityQueue<>(5, new Comparator<ProducerBatch>() {
@Override
public int compare(ProducerBatch o1, ProducerBatch o2) {
return o1.baseSequence() - o2.baseSequence();
}
}));

그래서 kafka 문서의 "enable.idempotence" 항목을 보면 Note that enabling idempotence requires max.in.flight.requests.per.connection to be less than or equal to 5 라고 나온다.


idempotence 켜져있고 나부의 매직 넘버인 5보다 크게 셋팅한다면 sequence를 보장하지 못할 수 있기 때문이다.

실제로도 

        if (idempotenceEnabled && 5 < config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)) {

            throw new ConfigException("Must set " + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to at most 5" +

                    " to use the idempotent producer.");

        }

코드에 의해서 실행 자체를 막아놨다.



요약
1. 그냥 디폴트로 쓰자.
2. 프로듀서에서 동기식으로 사용한다면 (.get() 호출) 더더욱 상관하지 않아도 된다.


Schema Registry , Schema Evolution

분류없음 2018.06.11 21:45 posted by dev.bistro

몇 달전에 메모장에 적어놨던 내용인데, 지우면 또 까먹을 듯 해서 기록으로 남김


* 기존 Java Class에서 Avro Schema 획득하기

ReflectData.get().getSchema 를 이용하여, 쉽게 json형식의 AvroSchema를 획득할 수 있다. (검증은 꼭 할것)
아쉽지만 record 타입을 자동으로 찾아서 해준다거나, vargs 형식으로 getSchema를 사용할 수는 없다.

Schema schema = ReflectData.get().getSchema(Order.class);
System.out.println(schema.toString(true));


* Schema Evolution

1. backward : 새스키마로 이전 데이터를 읽을 수 있다.
 ex) 새 field를 추가할때 default value를 넣는다. 필드가 없다면 default value를 사용한다.  즉 데이터는 old type인데 읽는 쪽이 new type

2. forward : 스키마가 변하지 않는데, 새로운 스키마의 데이터가 들어오면 ignore 한다.
 'forward' : we want forward compatible when we want to make a data stream evolve without chaning our downstream consumers . 즉, 데이터는 new type인데 읽는 쪽이 old type

3. full : 1,2번 두개 합친거
- 필드를 추가할 때에는 default를 무조건 넣어야 한다(backward position)
- default가 있는 필드만 remove 할 수 있다.


* avro schema 를 사용하는 추천 가이드

1. PK를 사용해라
2. remove될수 있는 필드라면 default를 제공하라
3. enum은 주의해서 사용하라. - 시간이 흘러도 evolve 할 수 없다?
4. rename 하지 말것, 필요하다면 다른 필드를 넣어라
5. 항상 default value를 쓰고, required field는 삭제하지 마라

subject란?

- Schema Registry는 Schema를 Subject 단위로 관리되어지는 듯 하다. 디폴트로는 <topc>-key, <topic>-value (ex: data.order-value) 형식으로 사용되지만 confluent 4.1 부터 이 부분을 커스텀마이징 할 수 있는 SubjectNameStrategy가 추가되었다.

기본적으로 사용하는 토픽이 동일하다면 동일한 subject를 보지만, 커스텀마이징 한다면 Record 에 따라 subject로 변경 가능하다.

Get the subject name for the given topic and value type.




1. 최초에 Schema Registry에 등록하고 Schema ID 를 가져오는 동작

- SR은 Cache 기반으로 동작한다 어플리케이션이 시작하면 Application SchemaRegistry는 빈 데이터만을 들고 있다.

- 처음으로 Producer 를 통해서 메시지를 발송하면 Schema 정보를 instance.getSchema() 로 가져온다.

- Registry Server로 요청을 보낸고 응답은 Schema ID로 받는다.

- Map<String, Map<Schema, Integer>> schemaCache 캐시에 등록하고 사용 한다.

- schemaCache 에서 remove 되는 endpoint 는 없다. 즉, 한 번 캐싱이 된다면 다시 호출 되지는 않을 것이다. (어플리케이션에 실행중 instance.getSchema가 변경이 될리는 없기 때문이다)


2. Application이 자동으로 Schema ID를 올리는 것을 막는 방법

- SR 서버 자체의 호환 모드를 NONE 으로 해 되지만 클라이언트 설정에서 auto.register.schemas(default true)를 false로 해도 된다. 이럴 경우 SchemaRegistry RestClientException 이 발생되면서 정상적인 produce가 안된다.




결론

- (avsc에 의해 생성되는) org.apache.avro.specific.SpecificRecord 를 구현하는 인스턴스의 SCHEMA$ 를 가지고 Schema ID를 등록하거나, 내부적으로 캐싱해서 사용한다.

- 이 문자열이 캐싱된다면, 최초의 Produce 할 때만 SR 서버를 호출한다. (즉 이 문자열이 바뀔일이 없기 때문에 1개의 subject당 1번이 호출될 것이다)

- BACKWARD 인지 FORWARD 인지는 순수하게 SR 서버쪽의 담당이다. 클라이언트는 단지 auto.register.schemas: false로 설정함으로써 자동 등록을 해제 할 수 있을 뿐이다.





티스토리 툴바