Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | |||||
3 | 4 | 5 | 6 | 7 | 8 | 9 |
10 | 11 | 12 | 13 | 14 | 15 | 16 |
17 | 18 | 19 | 20 | 21 | 22 | 23 |
24 | 25 | 26 | 27 | 28 | 29 | 30 |
Tags
- kafkastreams
- coursera
- 한빛미디어
- spring-batch
- Elasticsearch
- play framework
- reactive
- statestore
- Kafka
- schema registry
- scala
- spring-kafka
- kafkastream
- Spring
- scala 2.10
- Slick
- aws
- 플레이 프레임워크
- RabbitMQ
- enablekafkastreams
- springboot
- kafka interactive query
- Logstash
- 카프카
- kafka streams
- avo
- confluent
- gradle
- spring-cloud-stream
- Elk
Archives
- Today
- Total
b
rabbitmq delivery-tag ? 본문
데이터의 준비
- 분간하기 쉬운 데이터 10개를 큐에 준비한다 ex) data1 ~ data10
@Bean public ApplicationRunner runner(AmqpTemplate template) { return args -> IntStream.rangeClosed(1, 10).forEach(c -> template.convertAndSend("myqueue", "data => " + c)); }
delivery-tag 의 출력
- 어떤식으로 delivery-tag 값이 할당되는지 확인한다.
@RabbitListener(queues = "myqueue") public void listen(Message in, Channel channel) throws InterruptedException, IOException { long tag = in.getMessageProperties().getDeliveryTag(); System.out.printf ("tag: %d - %s %n", tag, new String(in.getBody()) ); } 출력 tag: 1 - data => 1 tag: 2 - data => 2 tag: 3 - data => 3 tag: 4 - data => 4 tag: 5 - data => 5 tag: 6 - data => 6 tag: 7 - data => 7 tag: 8 - data => 8 tag: 9 - data => 9 tag: 10 - data => 10
basicAck 의 2번째 multiple을 false로 셋팅해본다
if(tag == 2) {
channel.basicAck(tag, false);
}
출력
tag: 1 - data => 1
tag: 2 - data => 3
tag: 3 - data => 4
tag: 4 - data => 5
tag: 5 - data => 6
tag: 6 - data => 7
tag: 7 - data => 8
tag: 8 - data => 9
tag: 9 - data => 10
결과
10개의 메시지 중에서 2번째 메시지 1개만 ack 처리됨, 그래서 9개의 데이터가 남아 있음
basicAck 의 2번째 multiple 을 true로 셋팅해본다
if(tag == 3) {
channel.basicAck(tag, true);
}
예상
tag 3 즉, 처음부터 3개 데이터인 data1, data3, data4 가 ack 처리되어
출력은 data5 ~ data10 총 6개의 데이터만 출력될 것이다
출력
tag: 1 - data => 5
tag: 2 - data => 6
tag: 3 - data => 7
tag: 4 - data => 8
tag: 5 - data => 9
tag: 6 - data => 10
#### rabbitmq 와 amqp 0.9.1 문서에서의 delivery -tag
https://www.rabbitmq.com/confirms.html
delivery-tag는 클라이언트 라이브러리에 의해 증가하는 양수이다. 클라이언트는 이 tag 를 인자로 이용해서 acknowledge 메소드를 사용한다. 이 값은 '채널'마다 고유하고, 반드시 해당 채널에서 사용되어야 한다.
만약 다른 채널의 값을 사용하려고 하면 `unknown delivery tag` 라는 에러가 발생하고 채널이 닫힌다
https://www.rabbitmq.com/amqp-0-9-1-reference.html#domain.delivery-tag
하나 또는 여러개의 메시지를 Acknowledge 하는 ack method의 첫번째 파라미터로 사용된다.
The server-assigned and channel-specific delivery tag
The delivery tag is valid only within the channel from which the message was received. I.e. a client MUST NOT receive a message on one channel and then acknowledge it on another.
The server MUST NOT use a zero value for delivery tags. Zero is reserved for client use, meaning "all messages so far received".
무중단 상태라면 delivery-tag 가 overflow 되지는 않을까?
- delivery-tag의 kafka offset 과 동일하게 type은 'long' 이다. 하루에 몇억건씩 하더라도 overflow 일어나기에는 너무 거대한 숫자므로 쓸데 없는 고민이다.
Comments