분류없음2018.03.18 12:08

방법) KafkaStreams#close 호출로서 정상적인 종료가 가능하다고 한다. (참고 : https://kafka.apache.org/10/documentation/streams/developer-guide/write-streams )

그럼 이 close를 어떻게 호출 해야하는지를 찾아보자.


1) spring-cloud-stream 를 사용할 경우

- (나같은 경우에는 order-eda-notification prj)

- general 한 방법으로 사용을 한다면 StreamsBuilderFactoryManager bean이 자동등록 되고 이 매니저 빈은 SmartLifecycle 를 상속받기 때문에 spring application이 종료 시그널을 받으면 stop을 시작하는 시작점이 된다. 

- 이 매니저빈이 실제로 Set<StreamsBuilderFactoryBean>를 차례로 종료 메소드를 호출하고, 그 안에 존재하는 KafkaStream 맴버 인스턴스도 종료 명령을 받게 된다.


2) 메뉴얼하게 KafkaStream 를 생성하는 경우

- (나같은 경우에는 order-eda-gateway prj)

- StreamsBuilderFactoryBean가 포함되어 있고 이 클래스는 명시적으로 KafkaStreams#close()를 호출하지만, 난 이 Factory자체를 쓰지 않는다.

- 하지만 gateway application 내에서 여러 KafkaStreams를 이용하여 독립적인 토폴로지를 구성하고 싶었기 때문에 AbstractOperationStream 클래스를 사용하고 있었다. 이 클래스는  몇몇 인터페이스와, StreamsBuilderFactoryBean의 running과 같은 기능만을 구현한 추상 클래스이다. 여기에 DisposableBean 를 추가하였다.

//중간부분
@Override
public Topology getTopology() {
return topology;
}

@Override
public boolean canQueryable() {
return ready;
}

@Override
public void destroy() throws Exception {
this.kafkaStreams.close();
}



추가) KafkaStreams#close는 어떤 작업을 할까?

- 상태를 PENDING_SHUTDOWN 로 바꾼다.

- TaskId의 디렉토리를 삭제하는 스케줄링 메소드 'stateDirCleaner'의 데몬 서비스를 중단한다.

- List<StreamThread>를 순차적으로 중단하라고 명령한다.

- GlobalStreamThread  스레드를 중단하라고 명령한다.

- close를 위한 shutdown 스레드를 만들어서 위의 stream-thread, global-stream-thread가 전부 종료되어 PENDING_SHUTDOWN, DEAD 상태가 될 때까지 동기적으로 기다린다.

- 전부가 정상적으로 멈춘다면 KafkaStream 상태를 NOT_RUNNING 으로 바꾼다. 

- 기존적으로 10초를 기다린다. 

- 만약 정상적인 종료가 되었다는 것을 확인 하고 싶으면  close(final long timeout, final TimeUnit timeUnit) 메소드를 이용하여 boolean type의 return을 확인해야 한다.


Posted by dev.bistro