방법) KafkaStreams#close 호출로서 정상적인 종료가 가능하다고 한다. (참고 : https://kafka.apache.org/10/documentation/streams/developer-guide/write-streams )

그럼 이 close를 어떻게 호출 해야하는지를 찾아보자.


1) spring-cloud-stream 를 사용할 경우

- (나같은 경우에는 order-eda-notification prj)

- general 한 방법으로 사용을 한다면 StreamsBuilderFactoryManager bean이 자동등록 되고 이 매니저 빈은 SmartLifecycle 를 상속받기 때문에 spring application이 종료 시그널을 받으면 stop을 시작하는 시작점이 된다. 

- 이 매니저빈이 실제로 Set<StreamsBuilderFactoryBean>를 차례로 종료 메소드를 호출하고, 그 안에 존재하는 KafkaStream 맴버 인스턴스도 종료 명령을 받게 된다.


2) 메뉴얼하게 KafkaStream 를 생성하는 경우

- (나같은 경우에는 order-eda-gateway prj)

- StreamsBuilderFactoryBean가 포함되어 있고 이 클래스는 명시적으로 KafkaStreams#close()를 호출하지만, 난 이 Factory자체를 쓰지 않는다.

- 하지만 gateway application 내에서 여러 KafkaStreams를 이용하여 독립적인 토폴로지를 구성하고 싶었기 때문에 AbstractOperationStream 클래스를 사용하고 있었다. 이 클래스는  몇몇 인터페이스와, StreamsBuilderFactoryBean의 running과 같은 기능만을 구현한 추상 클래스이다. 여기에 DisposableBean 를 추가하였다.

//중간부분
@Override
public Topology getTopology() {
return topology;
}

@Override
public boolean canQueryable() {
return ready;
}

@Override
public void destroy() throws Exception {
this.kafkaStreams.close();
}



추가) KafkaStreams#close는 어떤 작업을 할까?

- 상태를 PENDING_SHUTDOWN 로 바꾼다.

- TaskId의 디렉토리를 삭제하는 스케줄링 메소드 'stateDirCleaner'의 데몬 서비스를 중단한다.

- List<StreamThread>를 순차적으로 중단하라고 명령한다.

- GlobalStreamThread  스레드를 중단하라고 명령한다.

- close를 위한 shutdown 스레드를 만들어서 위의 stream-thread, global-stream-thread가 전부 종료되어 PENDING_SHUTDOWN, DEAD 상태가 될 때까지 동기적으로 기다린다.

- 전부가 정상적으로 멈춘다면 KafkaStream 상태를 NOT_RUNNING 으로 바꾼다. 

- 기존적으로 10초를 기다린다. 

- 만약 정상적인 종료가 되었다는 것을 확인 하고 싶으면  close(final long timeout, final TimeUnit timeUnit) 메소드를 이용하여 boolean type의 return을 확인해야 한다.


대화식 쿼리를 사용하려면 hostname:port 형식의 StreamsConfig.APPLICATION_SERVER_CONFIG 를 설정하면 된다. 카프카 스트림 인스턴스가 키를 기반으로 한 쿼리를 받으면 현재 인스턴스의 로컬 저장소에 포함되는지를 찾는다. 더 중요한 것은 로컬 저장소에 없을 경우 키가 포함된 Store를 찾고 싶은 것이다. 카프카 스트림은 동일한 application ID와 APPLICATION_SERVER_CONFIG 가 정의된 인스턴스에 정보를 검색할 수 있는 몇가지 메소드가 제공된다.

아래의 메소드들은 KafkaStreams instance의 method이다.

- allMetadata : 모든 StreamsMetadata 목록을 반환
- allMetadataForStore : StoreName 기반으로 StreamsMetadata 목록을 반환
- metadataForKey : 해당 Key 기반으로 StreamsMetadata 를 반환

위 메소들에서 allMetaData를 이용해서 대화형 쿼리에 적합한 모든 인스턴스 정보를 얻을 수 있다.  



각 카프카 스트림 인스턴스는 경량화된 서버가 내부적으로 떠 있게 된다.
여기서 중요한 것은 '카프카 스트림 인스턴스 중에서 하나만 쿼리를 하면 되고 어디에 해야하는지는 중요하지 않다'

RPC 메커니즘과 메타 데이터를 기반으로 인스턴스에 원하는 데이터가 없다면, 카프카 스트림 인스턴스는 위치를 찾아내서 쿼리후 반환한다. 예를 들어 A 인스턴스에는 key1가 포함되어 있지는 않지만 B 인스턴스테 포함됨을 인지하고, B를 호출하여 데이터를 검색하고 반환한다.

(중요!: RPC 메커니즘은 기본적으로 제공되지 않으므로 직접 구현해야한다. ex: https://github.com/davidandcode/hackathon_kafka/blob/master/src/main/java/io/confluent/examples/streams/interactivequeries/kafkamusic/MusicPlaysRestService.java )


ps. 결국엔 ben stopord 의 예제처럼 HostInfo를 이용해서 라우팅을 하거나, 처음부터, Key기반의 요청/응답을 특정 인스턴스로 제한 하는 수 밖에 없다. ... (원래 하던데로다.ㅋ )



티스토리 툴바