일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
- Slick
- avo
- Spring
- spring-kafka
- schema registry
- Elk
- coursera
- kafka streams
- spring-cloud-stream
- 한빛미디어
- scala
- springboot
- enablekafkastreams
- kafkastreams
- play framework
- RabbitMQ
- confluent
- gradle
- Kafka
- 플레이 프레임워크
- Logstash
- statestore
- reactive
- Elasticsearch
- kafka interactive query
- scala 2.10
- aws
- 카프카
- kafkastream
- spring-batch
- Today
- Total
b
KafkaStreams Application을 어떻게 종료할 것인가? 본문
방법) KafkaStreams#close 호출로서 정상적인 종료가 가능하다고 한다. (참고 : https://kafka.apache.org/10/documentation/streams/developer-guide/write-streams )
그럼 이 close를 어떻게 호출 해야하는지를 찾아보자.
1) spring-cloud-stream 를 사용할 경우
- (나같은 경우에는 order-eda-notification prj)
- general 한 방법으로 사용을 한다면 StreamsBuilderFactoryManager bean이 자동등록 되고 이 매니저 빈은 SmartLifecycle 를 상속받기 때문에 spring application이 종료 시그널을 받으면 stop을 시작하는 시작점이 된다.
- 이 매니저빈이 실제로 Set<StreamsBuilderFactoryBean>를 차례로 종료 메소드를 호출하고, 그 안에 존재하는 KafkaStream 맴버 인스턴스도 종료 명령을 받게 된다.
2) 메뉴얼하게 KafkaStream 를 생성하는 경우
- (나같은 경우에는 order-eda-gateway prj)
- StreamsBuilderFactoryBean가 포함되어 있고 이 클래스는 명시적으로 KafkaStreams#close()를 호출하지만, 난 이 Factory자체를 쓰지 않는다.
- 하지만 gateway application 내에서 여러 KafkaStreams를 이용하여 독립적인 토폴로지를 구성하고 싶었기 때문에 AbstractOperationStream 클래스를 사용하고 있었다. 이 클래스는 몇몇 인터페이스와, StreamsBuilderFactoryBean의 running과 같은 기능만을 구현한 추상 클래스이다. 여기에 DisposableBean 를 추가하였다.
//중간부분
@Override
public Topology getTopology() {
return topology;
}
@Override
public boolean canQueryable() {
return ready;
}
@Override
public void destroy() throws Exception {
this.kafkaStreams.close();
}
추가) KafkaStreams#close는 어떤 작업을 할까?
- 상태를 PENDING_SHUTDOWN 로 바꾼다.
- TaskId의 디렉토리를 삭제하는 스케줄링 메소드 'stateDirCleaner'의 데몬 서비스를 중단한다.
- List<StreamThread>를 순차적으로 중단하라고 명령한다.
- GlobalStreamThread 스레드를 중단하라고 명령한다.
- close를 위한 shutdown 스레드를 만들어서 위의 stream-thread, global-stream-thread가 전부 종료되어 PENDING_SHUTDOWN, DEAD 상태가 될 때까지 동기적으로 기다린다.
- 전부가 정상적으로 멈춘다면 KafkaStream 상태를 NOT_RUNNING 으로 바꾼다.
- 기존적으로 10초를 기다린다.
- 만약 정상적인 종료가 되었다는 것을 확인 하고 싶으면 close(final long timeout, final TimeUnit timeUnit) 메소드를 이용하여 boolean type의 return을 확인해야 한다.