https://spring.io/blog/2018/03/07/testing-auto-configurations-with-spring-boot-2-0 지지난주 spring.io 에  올라온 글이다. 

스프링부트 2.0에  ApplicationContextRunner 가 추가 되었고 이에 대한 사용법을 설명하는 내용인데, 실제로 활용해보니 기존의 static class 로 Spring 설정을 slicing 하는 것보다 가독성도 좋고, 사용도 편하다.


1. SolMailAutoConfiguration 빈을 생성할때 JSR380에 의해 validate한지 확인하는 테스트

new ApplicationContextRunner()
.withConfiguration(of(SolMailAutoConfiguration.class))
.run((context) -> {
assertThat(context.getStartupFailure())
.isNotNull();
assertThat(context.getStartupFailure())
.isInstanceOf(UnsatisfiedDependencyException.class);
});


2. SolMailAutoConfiguration 빈을 생성 할 때 제대로 값이 주입되는지 확인하는 테스트

new ApplicationContextRunner()
.withConfiguration(of(SolMailAutoConfiguration.class))
.withPropertyValues(
"app.sol.mail.connection-timeout:10000",
"app.sol.mail.api-url:http://localhost/api/v1/sol/"
).run((context) -> {
SolMailProperties properties = context.getBean(SolMailProperties.class);
assertThat(properties.getApiUrl()).isEqualTo("http://localhost/api/v1/sol/");
assertThat(properties.getConnectionTimeout()).isEqualTo(10000);
});


3. 커맨드라인이나, 프로퍼티로 값을 받을때 원하는 Bean이 로드 되는지 확인하는 테스트

new ApplicationContextRunner()
.withInitializer(new ConfigFileApplicationContextInitializer())
.withConfiguration(of(FakeMailClient.class, SolMailAutoConfiguration.class))
.withPropertyValues("mail.type=fake")
.run((context) ->
assertThat(context).hasSingleBean(FakeMailClient.class));

legacy 에서는  initializers = ConfigFileApplicationContextInitializer 라고 initializers 를 정의해서 썻지만 이렇게 코드 레벨에서 사용할 수 있다. 


ApplicationContextRunner 를 before로 올려서 DRY를 준수할 수도 있지만, 그렇게 올려서 얻는  중복 코드의 제거보다, 지금 처럼 각각 초기화 하는게 더 나은 가독성과 사용성을 주는 것 같아서, 위처럼 작성함 이게 대한 좋은 글로

https://stackoverflow.com/questions/6453235/what-does-damp-not-dry-mean-when-talking-about-unit-tests

Working Effectively with Unit Tests

분류없음 2018.03.23 14:41 posted by dev.bistro

Working Effectively with Unit Tests #1


https://leanpub.com/wewut 를 읽고 정리한 내용 첫번째 단락


Unit Testing, a First Example


* Replace Loop with individual Test

- https://github.com/bistros/wewut-code#replace-loop-with-individual-tests

- 읽기 쉬운 테스트를 위한 첫 번째 단계는 반복 테스트를 각각의 테스트로 변경하는 것이다.

ex) for-each를 돌면서 여러값을 테스트 하는 것을 제거한다


* Expect Literals

- https://github.com/bistros/wewut-code#expect-literals

그 다음 가독성을 높이는 단계로는 literal 값을 예상하는 것이다. 여전히 테스트는 실패 하겠지만, value를 직접적으로 확인 할 수 있게 된다.


* Inline Setup

- https://github.com/bistros/wewut-code#inline-setup

- 실패한 테스트의 원인을 쉽게 찾을 수 있어야한다. Template Method Pattern을 이용하면, 중복을 약간 줄일 수있는 있지만 다른 장점은 없다. 그리고 오버헤드가 증가할 수도 있다. 이러한 @Before setup에서 일어나는 이러한 부분을 없앤다면, 실패 했을 경우 setup을 안봐도 된다. 즉 code search가 거의 필요없게 되는 것이다. 

코드 중복에 대해서는 밑의 Final Thoughts on our Tests 에서 책 이외의 내용을 좀 더 추가하였다.


* Replace ObjectMother with DataBuilder

- https://github.com/bistros/wewut-code#replace-objectmother-with-databuilder

- 테스트를 위해 mock instance를 만드는 ObjectMother가 제한적일때는 유리하지만, 다른 시나리오(케이스)가 필요할때는 난감하다. 이 시점에서는 ObjectMother style 보다는 new Domain Model Instance 스타일로 변경되는 것이 실용적일 것이다. 대신에 Builder 가 변경이 필요하다면, 사용되는 모든 곳의 코드를 업데이트 쳐야하는 문제가 있는데 그건 다른 방법으로 해결 할 수 있다 (주: 챕터6 TestDataBuilder를 참고하면 된다) 물론 기존의 ObjectMother 보다는 코딩해야 할 내용이 많다. 하지만 좀 더 일반적이고 독립적으로 객체를 만들어 낼 수 있다. 


* Comparing the Results

- https://github.com/bistros/wewut-code#comparing-the-results

- 쉽게 assertion 하게 하라 (좋은 테스트는 나뿐 아니라 미래의 팀원들을 위해서이기도 하다)


* Final Thoughts on our Tests

- https://github.com/bistros/wewut-code#final-thoughts-on-our-tests

- 참고1) DAMP  https://stackoverflow.com/questions/6453235/what-does-damp-not-dry-mean-when-talking-about-unit-tests

프로덕션 레벨과는 달리 유닛 테스트에서는 파일/단일 테스트 내에서만 코드가 적용된다. 이 때문에 중복 코드에 대한 minimal and obvious 다.  또한 이러한 중복을 제거해 버린다면 테스트의 가독성이 떨어진다. 그래서 DRY를 프로덕션에서 선호하고 테스트에는 DAMP를 우선하여 균형을 이뤄야 한다

참고2) http://blog.jayfields.com/2006/05/dry-code-damp-dsls.html

DRY를 DAMP로 바꾸더라도 크게 라인수가 늘어나지는 않음을 보여줬고, 대신 가독성 측면 등에서 가치를 높일 수 있었다.

Should You Put Several Event Types in the Same Kafka Topic?

링크 : https://www.confluent.io/blog/put-several-event-types-kafka-topic/ 에서 필요한 부분만 정리

카프카를 쓸 때 가장 중요한 것 중 하나는 토픽을 어떻게 쓸 것인가이다. 만약 여러 이벤트 묶음이 있다면 한 토픽에 넣을 것인가, 여러 토픽에 넣을 것인가? 극단적으로 하나의 토픽에 넣는것은 좋은 생각이 아니다. 컨슈머가 관심있는 이벤트를 선택해서 소비할 수 있는 방법이 없기 때문이다. 반대로 너무 많은 것도 좋은 건 아니다.

사실 성능관점에서 중요한것은 파티션의 갯수이다. 경험에 비추어 보면 레이턴시가 중요하다면 브로커 노드당 수백의 토픽-파티션을 가질 것이고, 아마 많을 수록 레이턴시 관점에서는 대기 시간이 줄어들 것이다.


Topic = collection of events of the same type?

일반적으로는 같은 타입의 이벤트는 같은 토픽을 사용하고, 다른 타입은 다른 토픽을 사용 하는 것이다.

Confluent 스키마 레지스트리는 이 패턴을 사용한다. topic의 모든 메시지에 대해서 동일한 Avro 스키마를 사용하도록 구너장하기 떄문이다. (옵션을 추가해서) 호환성을 유지할 수는 있지만, 궁극적으로 모든 메시지는 레코드 타입을 준수해야 한다. 일반적으로 이러면 되는데, 이벤트 소싱이나 MSA에서 데이터 교환같은 목적으로 사용 하는 사람들도 있다.

이때에는 topic이 같은 스키마를 가지는것이 덜 중요해질수는 있다 그것보다는 topic-partition 내에서 순서를 보장하는게 더 중요하다.

예를 들어 고객이 주소변경/새신용카드추가/조회/지불/계정파기... 이런것들은 순서가 중요하다. 이 순서는 '같은 파티션'을 사용함으로서 보장받을 수 있고, 고객 ID를 파티션 키로 사용한다면 동일한 topic-partiton에 들어갈 것이다.



Ordering problems

customerCreated, customerAddressChanged, and customerInvoicePaid 를 서로 다른 토픽으로 사용한다면 컨슈머는 순서의 의미없는 이벤트라 볼 것이다. 예를 들어 컨슈머는 존재하지 않는 고객에 대해서 주소 변경 이벤트를 받을 수 있을 것이다. 만약 이렇게 다른 토픽을 사용한다면 이벤트를 시간 동기화 처리를 해서 사용할 수 있겠지만 악몽이다. 


When to split topics, when to combine

몇가지 제안 할 것이다.

1. 가장 중요한것은 '고정된 순서로 머물러야 하는 모든 이벤트는' 동일한 토픽을 사용해야 한다.

2. 한 엔티티가 다른 엔티티에 의존하거나, 함께 자주 변경이 된다면 같은 토픽을 사용할수도 있다. (예를 들어 주소가 고객에게 속하는 것처럼) 한편으로 다른 팀과 관련이 없다거나, 다른 팀에서 관리하는 경우에는 토픽을 분리하라 또 하나의 엔티티 타입이 과도하게 비율이 높다면 별도로 분리하는 것을 고민하라 (다른 이벤트 타입들을 위해서이다)

3. 이벤트에 여러 엔티티가 관련된 경우 (예를 들어 구매-제품-고객) 처음에는 이벤트를 원자 단위로 기록하고, 여러 토픽으로 분할 하지 마라. 가능한 받은 그대로 이벤트를 기록하라 스트림을 이용해서 분리할 수 있지만 어려워질 것이다.

4. 컨슈머를 살펴보고 여러 토픽을 구독한다면 그 토픽들은 합쳐야 될 수도 있다. 그럴 경우 원치 않은 이벤트를 소비할 수도 있지만 그것은 큰 문제가 아니다. 이벤트를 소비하는 것은 매우 싸기 때문에 이벤트의 절반을 무시해도 큰 문제는 아니다. 대부분의 이벤트를 무시할 경우만 (99프로) 분리를 고민해라.

5. 카프카스트림이 사용하는 changelog topic은 다른 토픽에서 분리되어야 한다.


Schema management

정적 스키마가 없는 json으로 인코딩을 한다면, 쉽게 다양한 이벤트 타입을 하나의 토픽에 넣을 수 있다. 하지만 avro 를 사용할 경우 여러 이벤트 타입을 처리하는데 좀 더 많은 작업을 해야한다. 위에 언급한 것처럼 현재의 컨플런트 스키마 레지스트리는 각 토픽마다 하나의 스키마가 있다고 가정한다. 새 버전의 스키마를 등록할 수 있고, 호환되는지 체크한다. 좋은 점은 서로 다른 스키마 버전을 동시에 사용하는 프로듀서/컨슈머를 가질 수 있고, 서호 호환 가능하다는 점이다. 

컨플런트 Avro 시리얼라이저에 key.subject.name.strategy 를 추가하였으니 이걸 좀 더 참고하면 된다.


Event Sourcing Using Apache Kafka

분류없음 2018.03.22 09:37 posted by dev.bistro

https://www.confluent.io/blog/event-sourcing-using-apache-kafka/ 를 읽고 대충 필요한 것만 적음.

추가적으로 알게된 내용보다는 다시 한번 지식을 되새겨볼 수 있던 기회.


Storing events in Kafka


첫 번째 문제는 이벤트를 어떻게 저장하는가이다. 3개의 방법으로 얘기할 수 있다.

1. 모든 타입의 모든 이벤트를 하나의 토픽에 저장하는 방법 (물론 멀티 파티션)

2. Topic-per-entity-type : entity별로 관련된 이벤트들을 분리된 토픽에 저장하는 방법 (예를 들어 user관련 토픽, product관련 토픽)

3. topic-per-entity : 각각의 user, product 처럼 각 entity별로 별도의 토픽을 할당해서 저장하는 방법

low-cardinality entity를 제외하고는 3번째 전략은 실현 가능한 방법은 아니고 1,2번중 선택해야 한다.

1번은 모든 이벤트를 볼 수 있는 장점이 있고 2번은 entity type별로 분리하고 scale 할 수 있다.  아래으  내용은 대부분 1번을 기반으로 얘기를 이어나간다. 2번에 대해 더 알고 싶다면 https://www.confluent.io/blog/put-several-event-types-kafka-topic/ 를 참고하면 된다.


Basic event-sourcing storage operations

데이터 저장소에서 기대하는 가장 기본적인 명령은 '특정 엔티티의 현재 상태를 얻는 것'이다. 일반적으로 각 엔티티는 ID가 있고, ID가 주어지면 저장 시스템은 현재 상태를 반환해 줘야한다.

이벤트 로그는 primary source of truth 이고, '현재 상태'는 엔티티의 이벤트 스트림에서 파생 될 수 있다. 이를 위해서 스토리지 엔진은 Event => State => State를 반환하는 순수 함수가 필요하다.

카프카에서 'read current state'를 native하게 구현해본다면 카프카는 토픽의 모든 이벤트를 필터링 하고, 주어진 function으로 부터 상태를 구해야한다. 만약 많은 이벤트가 있다면 느리고 자원을 많이 소모할 것이다. 그리고 장애나, 캐시 클리어 등으로 인해서 다시 해야할 수도 있다.

그래서 더 좋은 방법이 필요한다. KafkaStreams와 StateStore를 이용하는 것이다. 

Kafka Streams는 고수준의 오퍼레이션을 제공하는데 그 중 하나는 스트림을 localstate로 folding하는 기능이다.

각 local store는 노드에서 사용하는 파티션의 데이터만 포함된다. 기본적으로 RocksDB(디폴트), inMemoryStore 2개의 구현체가 있다. 둘 다 fault-tolerant 이고, in-memory를 사용하더라도 데이터가 손실되지 않는다(주: local store topic을 현실화 하는 것이기 때문이다)

이렇게 이벤트 소싱에서 이벤트 스트림을 state store로 변경해서 각 엔티티의 '현재 상태'를 구할 수 있다. 


Looking up the current state

같은 노드에 있는 데이터를 쿼리하는 건 쉽다. 하지만 다른 노드에 있는 데이터를 쿼리하려면 kafka의 메타 데이터에 대해 먼저 질의한 다음 특정 ID에 대해 처리를 담당하는 노드를 찾을 수 있다. ( streamsMetadataForStoreAndKey ) 그런 다음 rest, akka-remoting 등의 방법으로 해당 노드로 요청을 하면 된다. 


Fail-over

State Store가 좋지만, 노드가 장애라면? 새로 만든다는 것은 비용이 많이 드는 작업이다.  KafkaStreams는 리밸런싱(노드의 추가/삭제) 때문에 레이턴시가 증가하거나, 요청 자체가 실패할 수 있다.  

이 이유 때문에 state-store가  changelog-topic에 로깅된다. 이 topic은 모든 이벤트가 필요없고 최신분만 필요하므로 compacted(압축) 되어 있다. 덕분에 다른 노드에서 다시 store를 구축하는게 빠르다. 하지만 여전히 리밸런싱에 대한 latency는 증가될 수 있다.  이에 대한 해결방법으로 standby 를 제공한다. primary 가 실패한다면 바로 standby 로 장애를 처리한다.


Consistency

카프카는 기본은 at-least-once 정책이다. exactly-once 정책도 지원하지만 topic을 읽고/쓰기 할 때에만 보장해준다. processing.guarantee 옵션을 켜서 exactly-once를 사용 할 수 있으며, 성능은 저하되지만 batch message의 집합이 한번의 커밋에 되므로 큰 비용은 아니다.


Listening for events

지금 까지 'current state'를 업데이트 하고 질의하는 걸 알아봤고, 이제 사이드 이펙트에 대해서 알아 볼 차례이다( 예를 들어 rest 등의 외부 호출이네 이메일 보내기)

이러한 작업들은 blocking이 일어나며 I/O를 발생하므로 state-update의 로직에 포함되는건 종은 생각이 아니다. 그리고 main 스레드에서 처리를 하는것은 성능상의 병목을 일으킨다. 그리고 state-update 자체가 여러번 실행 될 수 있다.

다행히 카프카 토픽을 처리할 많은 유연성을 가질 수 있다. state를 업데이트 전후에 메시지를 소비할 수 있고, 이 메시지 소비 전후에 원하는 방식으로 사용 할 수 있다.




Kafka Header

분류없음 2018.03.19 15:51 posted by dev.bistro

카프카의 Header 지원

- 0.11 버전이후 user(custom) header를 지원하기 시작하였고 아래 위키와 이슈를 참고

- https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers

- https://issues.apache.org/jira/browse/KAFKA-4208


실제로 저장되는 형식과 구현체는 아래를 참고

- https://kafka.apache.org/documentation/#record

- https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java#L175



Producer 코드를 따라가며 Key, Header가 어떻게 저장이 되는거 확인 

- Producer는 2개의 구현체가 있지만 MockProducer는 테스트 용이기 때문에 KafkaProducer만 확인하면 된다.

- 실제로 KafkaProducer에서 doSend를 보면 인자로 받는 ProducerRecord 안에 headers 정보가 있다. 

- 이 정보를 이용하여 byte[] serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key()); '시리얼라이징 된 객체 데이터'를 생성한다. 하지만 기본 제공 하는 Key Serializer인 ExtendedSerializer은 Header를  무시한채 Key Data만을 가지고 시리얼라이징 한다. 


방법) KafkaStreams#close 호출로서 정상적인 종료가 가능하다고 한다. (참고 : https://kafka.apache.org/10/documentation/streams/developer-guide/write-streams )

그럼 이 close를 어떻게 호출 해야하는지를 찾아보자.


1) spring-cloud-stream 를 사용할 경우

- (나같은 경우에는 order-eda-notification prj)

- general 한 방법으로 사용을 한다면 StreamsBuilderFactoryManager bean이 자동등록 되고 이 매니저 빈은 SmartLifecycle 를 상속받기 때문에 spring application이 종료 시그널을 받으면 stop을 시작하는 시작점이 된다. 

- 이 매니저빈이 실제로 Set<StreamsBuilderFactoryBean>를 차례로 종료 메소드를 호출하고, 그 안에 존재하는 KafkaStream 맴버 인스턴스도 종료 명령을 받게 된다.


2) 메뉴얼하게 KafkaStream 를 생성하는 경우

- (나같은 경우에는 order-eda-gateway prj)

- StreamsBuilderFactoryBean가 포함되어 있고 이 클래스는 명시적으로 KafkaStreams#close()를 호출하지만, 난 이 Factory자체를 쓰지 않는다.

- 하지만 gateway application 내에서 여러 KafkaStreams를 이용하여 독립적인 토폴로지를 구성하고 싶었기 때문에 AbstractOperationStream 클래스를 사용하고 있었다. 이 클래스는  몇몇 인터페이스와, StreamsBuilderFactoryBean의 running과 같은 기능만을 구현한 추상 클래스이다. 여기에 DisposableBean 를 추가하였다.

//중간부분
@Override
public Topology getTopology() {
return topology;
}

@Override
public boolean canQueryable() {
return ready;
}

@Override
public void destroy() throws Exception {
this.kafkaStreams.close();
}



추가) KafkaStreams#close는 어떤 작업을 할까?

- 상태를 PENDING_SHUTDOWN 로 바꾼다.

- TaskId의 디렉토리를 삭제하는 스케줄링 메소드 'stateDirCleaner'의 데몬 서비스를 중단한다.

- List<StreamThread>를 순차적으로 중단하라고 명령한다.

- GlobalStreamThread  스레드를 중단하라고 명령한다.

- close를 위한 shutdown 스레드를 만들어서 위의 stream-thread, global-stream-thread가 전부 종료되어 PENDING_SHUTDOWN, DEAD 상태가 될 때까지 동기적으로 기다린다.

- 전부가 정상적으로 멈춘다면 KafkaStream 상태를 NOT_RUNNING 으로 바꾼다. 

- 기존적으로 10초를 기다린다. 

- 만약 정상적인 종료가 되었다는 것을 확인 하고 싶으면  close(final long timeout, final TimeUnit timeUnit) 메소드를 이용하여 boolean type의 return을 확인해야 한다.


spring-kafka의 KafkaListener

분류없음 2018.03.11 22:28 posted by dev.bistro

KafkaListener annotation

토픽의 메시지를 소비할 메소드에 붙이는 어노테이션이다. 이 어노테이션을 붙인다면, KafkaListenerContainerFactory에 의해서 MessageListenerContainer가 기동된다. 현재로서는 ConcurrentKafkaListenerContainerFactory 클래스만 존재 하니까, 기본적으로 KafkaListener을 붙인다는 것은 ConcurrentMessageListenerContainer 리스너 컨테이너를 사용한다는 의미이다.

그리고 @ConditionalOnClass(EnableKafka.class) 에 의해서 KafkaAnnotationDrivenConfiguration 설정 클래스가 로드 되는대, 여기서 "kafkaListenerContainerFactory" 이라는 bean을 가지는ConcurrentKafkaListenerContainerFactory 가 생성이 된다. (즉 spring-kafka만 디펜던시에 추가된다면, 이 클래스가 발생된다)

 @KafkaListener 에 'containerFactory'를 지정 할 수 없지만, 기본적으로 위에서 언급한 'kafkaListenerContainerFactory' 이름을 가지는 컨테이너 팩토리를 사용한다.  이 메소드의 인자로는 @Payload, @Header, @Headers, MessageHeaders등등을 사용하여 메시지를 수신할 수 있다.  


 메소드에 정의된다면 각 메소드별로 '리스너컨테이너'가 만들어진다. - (컨테이너 팩토리가 아니다!)  만약 1개의 컨테이너 팩토리에서 2개의 @KafkaListener을 사용한다면  각각의 beanName은 org.springframework.kafka.KafkaListenerEndpointContainer#1, #2 이다.  만약 동일한 팩토리와 topic을 쓴다면 setConcurrency=2 의 효과만 가질 것이다.

클래스에 정의된다면 하나의 메시지 컨테이너가! @KafkaHandler가 있는 모든 메소드에 적용한다. 모두 실행 한다는 의미가 아닌, payload타입에 따라 적절하게 메소드를 실행해준다는 뜻이다.  @KafkaHandler가 붙은 메소드는 InvocableHandlerMethod 클래스에 감싸지고, DelegatingInvocableHandler 에 List 형으로 들어가서  DelegatingInvocableHandler.findHandlerForPayload 의해서 실행되어 진다.    payload 매칭이 2개 이상이 된다면 'Ambiguous methods...' throw를 발생시킨다.


MessageListenerContainer 인터페이스

* spring-kakfa 에서 사용되는 최상단 '메시지 리스너 컨테이너'의 인터페이스 형태이다. 구현체는 Single-Thread 형태의 KafkaMessageListenerContainer 와 KafkaMessageListenerContainer 를 여러개 붙인 형태인 ConcurrentMessageListenerContainer 이렇게 2개가 기본 제공된다.

* Message Listener 이므로 '메시지의 수신' 관점에서 볼 때 필요하다. Kafka Cluster 의 gateway를 담당하는 application 을 만들 때는 'producer' 쪽이 필요하다.

* pause(), resume()을 인터페이스 레벨에서 가지고 있다.


ㅁ AbstractMessageListenerContainer<K,V> 추상 클래스

** 아래에서 설명할 2개의 실제 구현체 클래스의 상위 추상 클래스이다.(스프링의 다른 패키지에도 많으니 kafka 패키지인지 확인 해야한다)

** doStart, doStop만 위임한채, 나머지는 공통화 시키는 목적이다.

** offset commit을 어떻게 할 것인가에 대한 ACK_MODE 가 여기에서 정의되어 있고 종류는 7개가 있다 (BATCH, COUNT, COUNT_TIME, MANUAL, MANUAL_IMMEDIATE, RECORD, TIME)


ㅁ KafkaMessageListenerContainer<K,V> 클래스

** Java Consumer를 이용하여, 자동/수동 파티션을 할당 받아, Single-Threaded 형태로 동작하는 메시지 리스너 컨테이너

** 생성자 중에서 TopicPartitionInitialOffset 를 인자로 줄 수 있는데, 이를 이용해서 초기 Offset을 지정 할 수 있다. (absolute offset은 물론, relativeToCurrent 인자를 이용해서 relative offset도 지정가능하다)

** 스프링의 Lifecycle 인터페이스를 상속받아 구현된 start()가 시작 포인트이다.

** doStart 안에서 listenerConsumer 라는 컨슈머를 내부에서 만들어 데이터를 수신하게 된다.

** ListenerConsumer 를 Runnable을 상속받는 데몬 스레드 역할을 한다. while(true) 를 하면서 consumer.poll을 하는 것이다. 


ㅁ ConcurrentMessageListenerContainer<K,V> 클래스

** concurrency 와 실제 topic-partition의 갯수에 따라서 위에서 설명한 KafkaMessageListenerContainer 를 N개 만든다. 그리고 스스로 start를 호출한다. 즉, 이 클래스는 KafkaMessageListenerContainer 를 가지는 또 하나의 컨테이너이다.

** stop/resume/start 등등 전부 순차적으로 명령어를 수행한다. (forEach)

** 그 외 추가 구현 내용없는 단순한 형태이다.


그럼 KafkaConsumer 를 그냥 쓰는 것보다 무엇이 더 편할까?

- pause/resume 의 쉬운 사용
- KafkaConsumer는 thread-safe을 보장 하지 않아, 잘 구현해서 만들어야 하지만, 위의 컨테이너를 쓰면 쉽다.
- 좀 더 풍부한 ack모드 지원
- 메뉴얼 토픽파티션 어사인
- 오프셋 점프  등등...

대화식 쿼리를 사용하려면 hostname:port 형식의 StreamsConfig.APPLICATION_SERVER_CONFIG 를 설정하면 된다. 카프카 스트림 인스턴스가 키를 기반으로 한 쿼리를 받으면 현재 인스턴스의 로컬 저장소에 포함되는지를 찾는다. 더 중요한 것은 로컬 저장소에 없을 경우 키가 포함된 Store를 찾고 싶은 것이다. 카프카 스트림은 동일한 application ID와 APPLICATION_SERVER_CONFIG 가 정의된 인스턴스에 정보를 검색할 수 있는 몇가지 메소드가 제공된다.

아래의 메소드들은 KafkaStreams instance의 method이다.

- allMetadata : 모든 StreamsMetadata 목록을 반환
- allMetadataForStore : StoreName 기반으로 StreamsMetadata 목록을 반환
- metadataForKey : 해당 Key 기반으로 StreamsMetadata 를 반환

위 메소들에서 allMetaData를 이용해서 대화형 쿼리에 적합한 모든 인스턴스 정보를 얻을 수 있다.  



각 카프카 스트림 인스턴스는 경량화된 서버가 내부적으로 떠 있게 된다.
여기서 중요한 것은 '카프카 스트림 인스턴스 중에서 하나만 쿼리를 하면 되고 어디에 해야하는지는 중요하지 않다'

RPC 메커니즘과 메타 데이터를 기반으로 인스턴스에 원하는 데이터가 없다면, 카프카 스트림 인스턴스는 위치를 찾아내서 쿼리후 반환한다. 예를 들어 A 인스턴스에는 key1가 포함되어 있지는 않지만 B 인스턴스테 포함됨을 인지하고, B를 호출하여 데이터를 검색하고 반환한다.

(중요!: RPC 메커니즘은 기본적으로 제공되지 않으므로 직접 구현해야한다. ex: https://github.com/davidandcode/hackathon_kafka/blob/master/src/main/java/io/confluent/examples/streams/interactivequeries/kafkamusic/MusicPlaysRestService.java )


ps. 결국엔 ben stopord 의 예제처럼 HostInfo를 이용해서 라우팅을 하거나, 처음부터, Key기반의 요청/응답을 특정 인스턴스로 제한 하는 수 밖에 없다. ... (원래 하던데로다.ㅋ )


처음에는 이 메소드의 효용성? 사용범위? 에 대해서 이해하지 못했다. 그냥 최초에 Streams Application이 기동될때, State 가 migration 되는 동안  Application 의 health 를 DOWN 정도으로 바꾸는 정도?  (즉 Created -> Running) 

최근에 하나 더 든생각은 isRunning 상태에서 다른 상태로 빠질 경우이다. (isRunning은 RUNNING or REBALCING 이다)  이 때에는 API로 제공되는 KeyStore를 query하면 에러가 발생 할 것이다. 그러므로

this.kafkaStreams.setStateListener((newState, oldState) -> {
if(newState.isRunning() ){
this.ready = true;
}else {
this.ready = false;// State is not RUNNING,
}
});
this.kafkaStreams.cleanUp();
this.kafkaStreams.start();

이와 같은 리스너를 등록하여,  queryable 상태를 체크하는 부분이 있어야 할 것 같았고, 그렇게 구현을 했다.

그럼  언제 isRunning 에서 다른 상태로 변경이 될까? 

가장 쉽게 찾을 수 있는 경우는 StreamThread가 shutdown이 될 때 PENDING_SHUTDOWN 으로 상태가 바뀐다. cleanup()을 호출하거나, 여러경우/다양한 곳에서 shutdown/close를 호출하는 경우이다.

그 외에는... DEAD 나 PENDING_SHUTDOWN 으로 변경하는 케이스를 찾지 못함 ㅠ ( 더 있을 줄 알았다!!)