일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
- kafka streams
- Spring
- 카프카
- Elasticsearch
- aws
- Elk
- Logstash
- play framework
- scala
- 플레이 프레임워크
- RabbitMQ
- kafkastream
- springboot
- kafka interactive query
- spring-batch
- 한빛미디어
- coursera
- avo
- kafkastreams
- schema registry
- Slick
- scala 2.10
- enablekafkastreams
- Kafka
- spring-kafka
- spring-cloud-stream
- statestore
- reactive
- gradle
- confluent
- Today
- Total
b
Schema Registry 는 어떻게 Schema Version 을 확인 하는가? 본문
subject란?
- Schema Registry는 Schema를 Subject 단위로 관리되어지는 듯 하다. 디폴트로는 <topc>-key, <topic>-value (ex: data.order-value) 형식으로 사용되지만 confluent 4.1 부터 이 부분을 커스텀마이징 할 수 있는 SubjectNameStrategy가 추가되었다.
기본적으로 사용하는 토픽이 동일하다면 동일한 subject를 보지만, 커스텀마이징 한다면 Record 에 따라 subject로 변경 가능하다.
Get the subject name for the given topic and value type.
1. 최초에 Schema Registry에 등록하고 Schema ID 를 가져오는 동작
- SR은 Cache 기반으로 동작한다 어플리케이션이 시작하면 Application SchemaRegistry는 빈 데이터만을 들고 있다.
- 처음으로 Producer 를 통해서 메시지를 발송하면 Schema 정보를 instance.getSchema() 로 가져온다.
- Registry Server로 요청을 보낸고 응답은 Schema ID로 받는다.
- Map<String, Map<Schema, Integer>> schemaCache 캐시에 등록하고 사용 한다.
- schemaCache 에서 remove 되는 endpoint 는 없다. 즉, 한 번 캐싱이 된다면 다시 호출 되지는 않을 것이다. (어플리케이션에 실행중 instance.getSchema가 변경이 될리는 없기 때문이다)
2. Application이 자동으로 Schema ID를 올리는 것을 막는 방법
- SR 서버 자체의 호환 모드를 NONE 으로 해 되지만 클라이언트 설정에서 auto.register.schemas(default true)를 false로 해도 된다. 이럴 경우 SchemaRegistry RestClientException 이 발생되면서 정상적인 produce가 안된다.
결론
- (avsc에 의해 생성되는) org.apache.avro.specific.SpecificRecord 를 구현하는 인스턴스의 SCHEMA$ 를 가지고 Schema ID를 등록하거나, 내부적으로 캐싱해서 사용한다.
- 이 문자열이 캐싱된다면, 최초의 Produce 할 때만 SR 서버를 호출한다. (즉 이 문자열이 바뀔일이 없기 때문에 1개의 subject당 1번이 호출될 것이다)
- BACKWARD 인지 FORWARD 인지는 순수하게 SR 서버쪽의 담당이다. 클라이언트는 단지 auto.register.schemas: false로 설정함으로써 자동 등록을 해제 할 수 있을 뿐이다.