현재 업무에서 메지시의 상태는 하나의 필드로 정의하고 있다.  예를 들어 결제 요청이 들어오면 APPROVAL_REQUESTED:type 을 정의하여 payment topic에 적재하고 이 후 저 메시지를 컨슘하여 결제가 완료된다면 '동일한 토픽에' APPROVAL_COMPLETED:type 으로 하나의 메시지를 추가한다.

즉, APPROVAL_REQUESTED 상태는 APPROVAL_COMPLETED or APPROVAL_FAILED 상태로 변경 가능하다.

각각은 Kafka에 의해 Header에 event time이 기록되어 있지만, 별개로 actBy라는 timestamp 필드도 추가하였다.  REQUESTED 상태에서 COMPLETED 상태로 변경되기까지의 시간을 어떻게 측정할 것인가?


무난한 방법으로 하나의 Streams Application을 작성할 수도 있고, KSQL을 통해서 볼 수도 있다. 또는 COMPLTED 메시지가 기록될 때 프로그램적으로 기록 할수도 있고... 내가 선택한 방법은 기존에 모니터링을 위해 사용중인 logstash의 plugin 이다.


https://www.elastic.co/guide/en/logstash/current/plugins-filters-elapsed.html

하나의 필드를 기준으로 2개의 message 사이이 시간을 측정해서 소요시간을 측정하는 것이다.

2번째 메시지에 elapsed_time 필드를 add_field 줄수도 있고, 원본 이벤트는 그대로 나둔채 하나의 추가 이벤트를 발행해서 elasticsearch에 기록 할 수 있다. (이 경우에는 elapsed_match 라는 tag를 가진 document이다) 이 설정은 new_event_on_match 를 통해서 설정 가능하다.


개인적으로 셋팅한 내용은 아래와 같다.

   elapsed {

      start_tag => "APPROVAL_REQUESTED"

      end_tag => "APPROVAL_COMPLETED"

      unique_id_field => "id"

      timeout => 10

      new_event_on_match => false

    }


기준이 되는 필드는 'id' 이고  동일한 ID를 가진 이벤트의 APPROVAL_REQUESTED,  APPROVAL_COMPLETED 사이의 시간을  APPROVAL_COMPLETED tag를 가지는 메시지에 elapsed_time 필드로 기록 해준다. 만약 10초 이내에  동일한 id를 가지는 APPROVAL_COMPLETED 메시지를 수신하지 못한다면 elapsed_expired_error 태그를 가지는 메시지를 발행한다.


이를 통해서 손쉽게 


1카프카 주문 메시지 발행 - 2컨슘 - 3Remote API호출 - 4주문 요청 결과 메시지 발행  

1~4단계의 소요 시간을 측정 할 수 있다.



(물론 이것은 카프카 메시지 간의 소요시간이지, 그 외 측정구간은 또 다른 방법을 이용 했다)

npm -g 인스톨 폴더 변경

분류없음 2018.07.27 16:04 posted by dev.bistro


사내 시스템은 특정 계정으로만 관리된다. 이 계정의 home directory 에 npm package를 설치 하는 방법이다.


1. .bashrc 설정

NPM_PACKAGES="$HOME/.npm-packages"
PATH="$NPM_PACKAGES/bin:$PATH"
unset MANPATH
MANPATH="$NPM_PACKAGES/share/man:$(manpath)"
NODE_PATH="$NPM_PACKAGES/lib/node_modules:$NODE_PATH"

2. 커맨드라인에서 다음의 명령을 수행

$) mkdir -p "$NPM_PACKAGES"
$) echo "prefix = $NPM_PACKAGES" >> ~/.npmrc

참고1. https://stackoverflow.com/questions/10081293/install-npm-into-home-directory-with-distribution-nodejs-package-ubuntu

참고2. https://github.com/sindresorhus/guides/blob/master/npm-global-without-sudo.md

gradle 4.8 릴리즈 그리고 dependency lock

분류없음 2018.06.29 17:43 posted by dev.bistro

6월 gradle 4.8이 릴리즈 되었다. (현재 4.8.1) 가장 눈에 들어오는 점은  nebula의 gradle-dependency-lock-plugin 으로 사용하고 있던 `Dependency locking` 이거다. 

> ./gradlew build --write-locks


사용하기 위해서는 build.gradle 에서 해당 내용을 활성화 시켜야 한다.
(참고 : https://docs.gradle.org/4.8.1/release-notes.html#locking-of-dynamic-dependencies )

dependencyLocking {
    lockAllConfigurations()
}


처럼 실행하면 ./gradle/dependency-locks 에 현재 실행된 task 들의 각 단계에서 필요한 dependencies 정보가 lockfile로 생성된다. 예를 들어 build를 실행하니, 아래와 같이 생성되었다.

annotationProcessor.lockfile
checkstyle.lockfile
compileClasspath.lockfile
cpd.lockfile
jacocoAgent.lockfile
pmd.lockfile
runtimeClasspath.lockfile
testAnnotationProcessor.lockfile
testCompileClasspath.lockfile
testRuntimeClasspath.lockfile

만약 dependencies tasks를 실행할 때 --write-locks 옵션을 붙인다음 좀 더 많은 lockfile이 생긴다.

annotationProcessor.lockfile
archives.lockfile
bootArchives.lockfile
checkstyle.lockfile
compile.lockfile
compileClasspath.lockfile
compileOnly.lockfile
cpd.lockfile
default.lockfile
jacocoAgent.lockfile
jacocoAnt.lockfile
pmd.lockfile
runtime.lockfile
runtimeClasspath.lockfile
testAnnotationProcessor.lockfile
testCompile.lockfile
testCompileClasspath.lockfile
testCompileOnly.lockfile
testRuntime.lockfile
testRuntimeClasspath.lockfile

이 후 디펜던시 버전을 변경하여 다시 실행하고자 하면 아래와 같은 메시지를 확인 할 수 있을 것이다.


FAILURE: Build failed with an exception.


* What went wrong:
Could not determine the dependencies of task ':cpdCheck'.
> Could not resolve all dependencies for configuration ':cpd'.
   > Dependency lock state for configuration 'cpd' is out of date:
       - Did not resolve 'net.sourceforge.pmd:pmd-php:5.6.1' which is part of the lock state
       생략


springboot2 actuator micrometer

분류없음 2018.06.27 15:58 posted by dev.bistro

마이크로 미터는 2017년  4월 중순  Jon Schneider (슈나이더?) 에 의해 시작되었다. 원래 넷플릭스 직원이었고, 우리도 애용하고 있는 nebula의 몇몇 플러그인 개발자 였다. 하여튼 (동일한 시기에 Pivotal로 이직을 해서인지?, 어떤 이유에서인지)  슈나이더는 micrometer를 시작하였고 "Experimental ground for Spring metrics work" 라는 최초의 커밋을 남겼다.

https://github.com/micrometer-metrics/micrometer/commit/4df27760


그가 micrometer에 대해서 소개한 글이다. 대충 필요하다고 생각되는 내용만 옮겨 적음

https://spring.io/blog/2018/03/16/micrometer-spring-boot-2-s-new-application-metrics-collector



What is it?

마이크로미터(Micrometer)는 벤더중립(vender neutral) API를 이용해서 내 코드를 시간, 카운트, 계측 할 수 있는 차원 우선 계측 집합의 facade이다. (dimensional-first metrics collection facade) 클래스패스나 설정을 통해서 하나 또는 이상의 모니터링 시스템에 메트릭 데이터를 보낼수(export) 할 수 있다.  메트릭 계의 SLF4J 라고 생각하면 된다. 

마이크로미터는 스프링부트1에 존재했던 카운터, 게이지에 더 풍족한 기본 요소를 추가하였다. 예로 단일 마이크로미터 타이머(Timer)는  처리량(throughput), 총 시간(total time), 샘플의 최대 지연시간(maximum latency of recent samples),  퍼센테이지(& 히스토그램)등을 생성 할 수 있다.

차원 측정(dimensional metrics) 에 포커싱을 하고 있지만, 마이크로미터는 계층명(hierarchical names)에 매핑하여 Ganglia 나 JMX 같은 오래된 모니터링 솔루션을 계속 지원한다. 마이크로미터로 변경하게 된 이유는 좀 더 발전된 차원 모니터링 시스템(Prometheus, Datadog, Wavefront, SignalFx, Influx) 때문이다. 

스프링 프레임워크의 장점은 추상화를 이용한 선택을 할 수 있다는 것이다. 마이크로미터를 적용한 이후 하나 이상의 모니터링 시스템을 선택하고, 나중에 metrics 수집부분을 재작성 하지 않고도 모니터링 시스템을 교체할 수 있다.



What do I get out of the box?

스프링 부트 2는 몇가지 메트릭을 포함하고 있다.

* JVM, CPU, Spring Request, RestTemplate, Cache, DataSource, Logback, Tomcat 등등



Which monitoring systems does Micrometer support ?

마이크로미터는 벤더중립적인 메트릭 수집 API(io.micrometer.core.instrument.MeterRegistry) 를 제공하며, 여러 구현체도 가지고 있다.

* Netflix Atlas, Datadog, Influx, JMX, NewRelic, Prometheus 등등

마이크로미터 1.1.0 에서는 다음도 지원할 예정이다.

* AppOptics, Azure Application Insights, Dynatrace, Elasticsearch, StackDriver


스프링부트2는 여러개의 registry implementations 를 등록할 수 있는 MeterRegistry 를 복합(composite) 설정하여 여러개의 모니터링 시스템으로 전송 할 수 있다.  MeterRegistryCustomizer 를 이용한다면 한번에 레지스트리 셋트나 개별 구현을 커스텀마이즈 할 수 있다.

예를 들어 1) Prometheus , CloudWatch로 각각 메트릭을 보내거나 2) 호스트와 Application Tag같은 공통 태그 집합을 추가하거나 3) CloudWatch에는 일부 메트릭에 whitelist를 추가하거나 등등이다.



The distinction between metrics and tracing

메트릭(metrics) 이란 시스템의 성능을 추록할 수 있는 정보 클래스를 뜻한다.  중요한 점은 single request가 여러 구성 요소를 지날때의 대기 시간(total latency) 는 제외(excldue)된다. 이것은 Spring Cloud Sleuth 와 같은 분산 트레이싱 수집기의 역할입니다.

분산 트레이싱 시스템은 subsystem latency 에 대한 자세한 정보를 제공하지만 일반적으로 확장을 위해서 샘플링을 한다. (예를 들어 Sleuthsms 디폴트로 10%만 샘플링한다) 메트릭 데이터는 일반적으로 사전 집계(pre aggregated) 하지만 샘플링을 하지 않는다.


그래서 B와 상호작용을 하는 A의 100,000건의 요청에 대해서

- 메트릭 관점에서 보면 A서비스의 스루풋은 100K, B 처리량은 60K,  A 전체 평균 지연 시간은 100ms, B는 50ms 등을 알 수 있다. 또한 최대 지연 시간 및 다른 통계등도 확인 할 수 있다

- 트레이싱 시스템은 특정한 요청(particular request / 샘플링이기 때문에 전체가 아니다)이 A에서 50ms B에서 90ms가 걸렸을 알려줄 수 있다.



The importance of dimensionality

스프링 부트1 의 메트릭은 계층적 ( hierarchical ) 이었다. 이 말은 수집된 메트릭은 이름으로 인해서 식별이 가능했다. 그래서 jvm.memory.used 라는 메트릭이 있을 수 있다. 단일 어플리케이션 인스턴스에서 메트릭을 확인 하는 경우 적합하지만, 10개의 인스턴스일 경우는 어떻할 것인가? 메모리 소비량이 예상외로 급증하면 어떻게 구별할 것인가? 일반적으로는 이름에 접두/접미사를 추가한다 따라서 이름을 {HOST}.jvm.memory.used로 변경 할 수 있다. 그리고 일반적인 계층 모니터링 시스템(hierarchical monitoring system) 에서는 이름을 와일드카드로 지정(*.jvm.memory.used)해서 메모리의 합계를 확인 할 수 있다. 

이런식으로 region 까지 확장해 나간다면 *.*.jvm.memory.used 라고 사용 할 수는 있다. 하지만 이렇게 새로운 메트릭을 설정해서 전부 배포되기 전까지 제대로 메트릭 정보를 얻을 수 없다. 이 예시는 기존의 계층 모니터링 시스템의 제한 중 하나의 예에 불과하다.

차원 모니터링 시스템(Dimensional monitoring systems) 은 자연스럽게 jvm.memory.used를 하나이상의 태그를 drill 하기전까지 모든 태그에 걸쳐 자연스럽게 표시한다. 우선 이름(jvm.memory.used)를 선택하고  이후 태그에 의한 필터링을 한다 문제가 되는 이전 시나리오처럼 기존 차트가 생성된 다음 나중에 region tag를 추가한 경우에는 기존 메트릭도 중단없이 계속 동작 한다. 




Meter filters

Meter 필터를 이용하면 미터가 등록되는 방법,시기,통계의 종류 등을 제어 할 수 있다. 미터는 3가지 기본 기능을 제공한다.

. 거부 : 미터 등록을 거부/승인 한다.
. 변환 : 이름, 태그의 추가 삭제, 설명이나 기본단위 변경 등을 바꿀 수 있다.
. 설정 : 백분위 수, 타이머 SLA등의 통계 구성을 설정 할 수 있다.


스프링 부트2는 속성을 통해 미터 필터에 바인딩 할 수 있다.

management.metrics.enable.jvm=false
management.metrics.distribution.percentiles-histogram.http.server.requests=true
management.metrics.distribution.sla.http.server.requests=1ms,5ms

위의 설정은 jvm metrics을 해제하고, SpringBoot에 의 http-request 백분위 메트릭을 설정하고, sla 주기를 1ms, 5ms 이하로 셋팅한 것이다.



Why the /actuator/metrics endpoint changed in Spring Boot 2

스프링 부트1 에서는 단지 카운터/게이지만 있었기 때문에 하나의 REST 포인트에 계층적으로 제공하는 것이 간단했습니다. 하지만 차원 모니터링 시스템으로 변경되면서 모든 정보를 하나의 페이로드에 출력할 수 있는 방법이 없다는 사실을 알았다. 

커스텀 된 UI를 원한다면 UI에 필요한 데이터만 표시하는 컴포넌트를 구성하는 것이 좋다. MeterRegistry 를 컴포넌트에 삽입하여 find, get 메소드를 이용해서 필요한 메트릭을 expose 한 뒤 용도에 맞게 직렬화해서 사용하라.



max.in.flight.request.per.connection

분류없음 2018.06.18 14:51 posted by dev.bistro

필요한 수준까지 이해한듯 해서, 메모로 남김

원래의 카프카는 데이터 주입에 대한 순서가 중요하지 않았다.  (정확히는 어느 정도의 버퍼를 카프카 브로커가 가지면서 Event Time에 대한 조정 작업을 알아서 해준다)

하지만 producer의 명등성 옵션과 순서 보장에 대한 이슈가 중요해졌고 max.in.flight.request.per.connection = 1 로 셋팅하면서 순서 꼬임을 좀 방어하고자 했는데...

1로 설정하고 idempotence 옵션이 켜져있고 retry가 가능한 상태일때 
전송하다가 OutOfOrderSequence 예외가 발생한다면 클라이언트 영역에서 이 Sequence Number를 잘 처리해야한다. (재전송 하거나, 다음 idempotence 전송에서 써야한다)


이게 어렵다... 그래서 그냥 하드 코딩으로 각 Topic Partition의 5개의 메타데이터를 들고 있게한것이다. 

if (!inflightBatchesBySequence.containsKey(batch.topicPartition)) {
inflightBatchesBySequence.put(batch.topicPartition, new PriorityQueue<>(5, new Comparator<ProducerBatch>() {
@Override
public int compare(ProducerBatch o1, ProducerBatch o2) {
return o1.baseSequence() - o2.baseSequence();
}
}));

그래서 kafka 문서의 "enable.idempotence" 항목을 보면 Note that enabling idempotence requires max.in.flight.requests.per.connection to be less than or equal to 5 라고 나온다.


idempotence 켜져있고 나부의 매직 넘버인 5보다 크게 셋팅한다면 sequence를 보장하지 못할 수 있기 때문이다.

실제로도 

        if (idempotenceEnabled && 5 < config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)) {

            throw new ConfigException("Must set " + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to at most 5" +

                    " to use the idempotent producer.");

        }

코드에 의해서 실행 자체를 막아놨다.



요약
1. 그냥 디폴트로 쓰자.
2. 프로듀서에서 동기식으로 사용한다면 (.get() 호출) 더더욱 상관하지 않아도 된다.


Schema Registry , Schema Evolution

분류없음 2018.06.11 21:45 posted by dev.bistro

몇 달전에 메모장에 적어놨던 내용인데, 지우면 또 까먹을 듯 해서 기록으로 남김


* 기존 Java Class에서 Avro Schema 획득하기

ReflectData.get().getSchema 를 이용하여, 쉽게 json형식의 AvroSchema를 획득할 수 있다. (검증은 꼭 할것)
아쉽지만 record 타입을 자동으로 찾아서 해준다거나, vargs 형식으로 getSchema를 사용할 수는 없다.

Schema schema = ReflectData.get().getSchema(Order.class);
System.out.println(schema.toString(true));


* Schema Evolution

1. backward : 새스키마로 이전 데이터를 읽을 수 있다.
 ex) 새 field를 추가할때 default value를 넣는다. 필드가 없다면 default value를 사용한다.  즉 데이터는 old type인데 읽는 쪽이 new type

2. forward : 스키마가 변하지 않는데, 새로운 스키마의 데이터가 들어오면 ignore 한다.
 'forward' : we want forward compatible when we want to make a data stream evolve without chaning our downstream consumers . 즉, 데이터는 new type인데 읽는 쪽이 old type

3. full : 1,2번 두개 합친거
- 필드를 추가할 때에는 default를 무조건 넣어야 한다(backward position)
- default가 있는 필드만 remove 할 수 있다.


* avro schema 를 사용하는 추천 가이드

1. PK를 사용해라
2. remove될수 있는 필드라면 default를 제공하라
3. enum은 주의해서 사용하라. - 시간이 흘러도 evolve 할 수 없다?
4. rename 하지 말것, 필요하다면 다른 필드를 넣어라
5. 항상 default value를 쓰고, required field는 삭제하지 마라

subject란?

- Schema Registry는 Schema를 Subject 단위로 관리되어지는 듯 하다. 디폴트로는 <topc>-key, <topic>-value (ex: data.order-value) 형식으로 사용되지만 confluent 4.1 부터 이 부분을 커스텀마이징 할 수 있는 SubjectNameStrategy가 추가되었다.

기본적으로 사용하는 토픽이 동일하다면 동일한 subject를 보지만, 커스텀마이징 한다면 Record 에 따라 subject로 변경 가능하다.

Get the subject name for the given topic and value type.




1. 최초에 Schema Registry에 등록하고 Schema ID 를 가져오는 동작

- SR은 Cache 기반으로 동작한다 어플리케이션이 시작하면 Application SchemaRegistry는 빈 데이터만을 들고 있다.

- 처음으로 Producer 를 통해서 메시지를 발송하면 Schema 정보를 instance.getSchema() 로 가져온다.

- Registry Server로 요청을 보낸고 응답은 Schema ID로 받는다.

- Map<String, Map<Schema, Integer>> schemaCache 캐시에 등록하고 사용 한다.

- schemaCache 에서 remove 되는 endpoint 는 없다. 즉, 한 번 캐싱이 된다면 다시 호출 되지는 않을 것이다. (어플리케이션에 실행중 instance.getSchema가 변경이 될리는 없기 때문이다)


2. Application이 자동으로 Schema ID를 올리는 것을 막는 방법

- SR 서버 자체의 호환 모드를 NONE 으로 해 되지만 클라이언트 설정에서 auto.register.schemas(default true)를 false로 해도 된다. 이럴 경우 SchemaRegistry RestClientException 이 발생되면서 정상적인 produce가 안된다.




결론

- (avsc에 의해 생성되는) org.apache.avro.specific.SpecificRecord 를 구현하는 인스턴스의 SCHEMA$ 를 가지고 Schema ID를 등록하거나, 내부적으로 캐싱해서 사용한다.

- 이 문자열이 캐싱된다면, 최초의 Produce 할 때만 SR 서버를 호출한다. (즉 이 문자열이 바뀔일이 없기 때문에 1개의 subject당 1번이 호출될 것이다)

- BACKWARD 인지 FORWARD 인지는 순수하게 SR 서버쪽의 담당이다. 클라이언트는 단지 auto.register.schemas: false로 설정함으로써 자동 등록을 해제 할 수 있을 뿐이다.



nebula 를 이용한 integration test 분리

분류없음 2018.05.18 07:57 posted by dev.bistro

테스트를 진행함에서 있어서 unit test와 integration test를 분리하고 싶은 니즈가 많다. junit5를 이용한다면,  Tag로 쉽게 처리 가능하지만 ( https://junit.org/junit5/docs/5.0.2/api/org/junit/jupiter/api/Tag.html )  레거시 프로젝트들 대다수가 junit5를 사용하지 못한다.

그래서 보통은 https://selimober.com/gradle_unit_integration/ 이러한 방법으로 처리하는데  nebula plugin을 이용하면 쉽게 처리 할 수 있다.


nebula facet : https://github.com/nebula-plugins/nebula-project-plugin


1. src/test/java 에는 unit-test만 남겨놓고 나머지는 src/integTest/java 에는 인테그레이션 테스트를 위치시킨다.
(디렉토리 변경 가능할줄 알았으나 고정되어 있다)

2. buildScript 부분에 아래와 같이 nebula.facet 의존성 추가와 plugin 설정을 한다.

buildscript {
...
dependencies {
...
classpath "com.netflix.nebula:nebula-project-plugin:3.4.1"
}
} apply plugin: 'nebula.facet'


3. 다음과 같은 가장 기본적인 설정을 추가한다. 

facets {
integTest {
}
}


4. 프로젝트를 reload 하면 'integTest' 라는 gradle-task가 생긴것을 확인 할 수 있다.



이 때의 문제점은 test-report 가 2개가 생긴다는 것이다. (test와 integTest) 이 2개의 리포트를 합치고자 하는 이슈가 있을때에는 아래처럼 TestReport 관련 task를 하나 추가하면 된다.

task testReport(type: TestReport) {
destinationDir = file("$buildDir/reports/combined")
reportOn test
reportOn integTest
}



Avro GenericRecord,SpecificRecord

분류없음 2018.04.16 09:47 posted by dev.bistro

GenericRecord

File/String 기반의 Schema 에서 Avro Object를 생성하는 것을 말한다.  이 방법은 runtime 에서 실패할 수 있기 때문에 사용에서는 추천되는 방법은 아니지만, 쉽게 사용 할 수 있는 장점이 있다.

Schema

{
"type": "record",
"namespace": "com.example",
"name": "user",
"fields": [
{
"name": "name",
"type": "string"
}
]
}다시피 


GenericRecordBuilder builder = new GenericRecordBuilder(schemaString);
builder.set("name", "bistros");
GenericData.Record bistros = builder.build();
System.out.println(bistros);
System.out.println(bistros.getSchema());
// {"name": "bistros"}
// {"type":"record","name":"user","namespace":"com.example", "fields":[{"name":"name","type":"string"}]}

보시다시피  GenericRecordBuilder set(String fieldName, Object value) 를 통해서 값을 셋팅하기 때문에 불안하다.


SpecificRecord

그에 반해서 SpecificRecord 역시 Avro Object 이지만  Avro Schema에 의해 성성되는 코드가 좀 더 포함되어 있다.
GenericRecordBuilder builder()는 Record type을 반환하는 데  그 내부를 보면, Map<String, Object>와 다름이 없다는 것을 확인 할 수 있다.
- GenericRecord 인터페이스를 구현 ( get/put만 있다)
- 내부적으로 Object[] 만을 가지고 있다.
링크 : https://github.com/apache/avro/blob/master/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java#L178

하지만 SpecificRecord는 좀 다르다.  avro schema를 기반으로 maven/gradle 플러기인으로 resources를 생성해보면  extends org.apache.avro.specific.SpecificRecordBase 를 통해서 똑같이 GenericData를 상속하고 있지만  6개 필드를 가진 Avro Schema 파일에서 약 550라인의 Class파일이생성된다. getter/setter/builder 등이 구현되어 있다.


요약)

Gradle Plugin을 이용해서 Class 파일을 만들어야 하지만 SpecificRecord를 사용하는 게 당연하다.


사족)

Confluent의 Kafka-Avro-Serializer는 default로 generic type을 읽고 쓰게 되어 있다.

properties.setProperty("specific.avro.reader", "true");

을 통해서 specific 모드를 활성화 해야 하고,  이것을 활성화 하면 내부적으로 사용하는 DatumReader를  SpecificDatumReader 으로 셋팅하다.

추가적으로 Streams 모드를 위해서는 아예  GenericAvroSerde, SpecificAvroSerde 2개의 Serde를 따로 제공해주고 있다.


어제의 이슈 StreamsBuilder.table은 과연 changelog topic을 만드는가 ?

https://kafka.apache.org/11/javadoc/org/apache/kafka/streams/StreamsBuilder.html#table-java.lang.String- 문서상에 의하면 '쿼리 불가능한 내부 store-name을 만들고, internal changelog topic은 만들어 지지 않는다고 한다.

The resulting KTable will be materialized in a local KeyValueStore with an internal store name. 
Note that store name may not be queriable through Interactive Queries. No internal changelog topic is created since the original input topic can be used for recovery (cf. methods of KGroupedStream and KGroupedTable that return a KTable).


우선 table 메소드를 비교해보자.


- <K,V> KTable<K,V> table​(java.lang.String topic)

- <K,V> KTable<K,V> table​(java.lang.String topic, Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)

둘다 내부에서는 internalStreamsBuilder 을 이용해서 MaterializedInternal 를 생성하고 있다. 만약 내가 명시적으로 Materialized.as 를 해서 인자를 넘긴다면 MaterializedInternal 역시 그 인자를 활용한다.
이 인자는 단순히 MaterializedInternal 를 만들때 사용자의 의도를 반영 할 수 있게 한 것이다.


protected Materialized(final Materialized<K, V, S> materialized) {
this.storeSupplier = materialized.storeSupplier;
this.storeName = materialized.storeName;
this.keySerde = materialized.keySerde;
this.valueSerde = materialized.valueSerde;
this.loggingEnabled = materialized.loggingEnabled;
this.cachingEnabled = materialized.cachingEnabled;
this.topicConfig = materialized.topicConfig;
}

내가 명시적으로 Materialized 를 넘기면 이 생성자를 통해서 몇가지 셋팅 되고, KTable 자체도 쿼리가능한 상태라고기록해 놓는다 ( queryable = true )

만약 위의 인자가 없다면

@Override
public String newStoreName(final String prefix) {
return prefix + String.format(KTableImpl.STATE_STORE_NAME + "%010d", index.getAndIncrement());
}

이러한 메소드를 통해   data.order-STATE-STORE-0000000000 라는 state-store가 생성되었다.


그런데 internal-topic을 안만든다는 javadoc의 문서와 달리 appName-order-STATE-STORE-0000000000-changelog 이라는 토픽이 하나 생겼다. (Configs:cleanup.policy=compact)

이 내용은 지난번에 읽은 https://www.confluent.io/blog/event-sourcing-using-apache-kafka/ 의 fail-over 부분에서 약간 언급이 되는데

That’s why by default persistent state stores are logged: that is, all changes to the store are additionally written to a changelog-topic. This topic is compacted (we only need the latest entry for each ID, without the history of changes, as the history is kept in the events) and hence is as small as possible. Thanks to that, re-creating the store on another node can be much faster.

즉, store는 changelog-topic (compact mode)를 쓴다는 얘기다. 

그럼 StreamsBuilder.table을 쓰면 내부적으로 항상 state-store를 사용하고 그 말은 항상 changelog-topic이 생성된다는 뜻 아닌가? 


kafka javadoc이 말한 changelog-topic이 생기지 않는다라는건  KTable이 compact된 changelog-topic을 안 만든다는 것이지 MaterializedInternal (statestore)의 changelog-topic을 뜻한건 아니지 않았을까?


추가) statestore가 내부적으로 changelog-topic을 쓰는건 확실하다.  대표적인 StateStore인 KeyValue타입의 RocksDBStore를 보면 openDB를 할 때 아래의 코드를 통해서 changelog-topic 이름을 결정해서 StateRestorer , ProcessorStateManager 를 topic을 업데이트 한다.

this.serdes = new StateSerdes<>(
ProcessorStateManager.storeChangelogTopic(context.applicationId(), name),
keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);

this.dbDir = new File(new File(context.stateDir(), parentDir), this.name);
public static String storeChangelogTopic(final String applicationId, 
                                         final String storeName) {
return applicationId + "-" + storeName + STATE_CHANGELOG_TOPIC_SUFFIX;
}



추가2) 팀장님이 아침에 많이 봐주셔서 이유를 찾아냈다.  버그였다-_-; 실제로 0.11 까지는 logConfig를 false로 넘기면서 ChangeLoggingKeyValueBytesStore 를 사용하지 않았었는데 1.0에서 대규모 리팩토링을 거치면서 이슈가 발생하였다.

 https://issues.apache.org/jira/browse/KAFKA-6729 



티스토리 툴바