일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
- Slick
- kafka interactive query
- schema registry
- kafkastream
- kafkastreams
- statestore
- kafka streams
- enablekafkastreams
- scala 2.10
- aws
- spring-kafka
- 플레이 프레임워크
- RabbitMQ
- 한빛미디어
- scala
- reactive
- coursera
- Elk
- Elasticsearch
- play framework
- spring-cloud-stream
- Spring
- avo
- 카프카
- Kafka
- springboot
- spring-batch
- confluent
- gradle
- Logstash
- Today
- Total
목록Kafka (15)
b
Kafka 는 기본적으로 1번 이상의 메시지 전송을 보장 하는 At Least Once Delivery 정책이었다. 메시지가 전송되었다는 ack를 확실히 받지 못할 경우 retry send를 하는 방법으로 최소 1회의 전송을 보장 하였던 것이다. 하지만 0.11 버전부터 exactly once delivery 를 위해서 idempotent producer가 지원되기 시작하였다 (함께 transactional producer도 포함된다) 메시지가 at least once delivery (즉 duplicate send)는 링크의 글과 이미지를 참고하면 쉽게 이해할 수 있다. https://dzone.com/articles/exactly-once-semantics-with-apache-kafka-1 Exac..
https://deview.kr/2019/schedule/305 에서 소개한 11번가의 주문/결제 시스템에 관련된 이야기이다. 실제로 '결제'단계에서 장애가 났을때 11번가의 주문/결제는 어떻게 진행이 되었을까? 아래의 예는 지난 11월 11일 오후 1시경의 11번가 스토리이다. 단순화해서 "주문 -> 결제 처리 -> 11번가 데이터베이스에 저장" 흐름대로 주문/결제가 처리된다고 생각하자. 13:00 정각 기프티콘을 절반 가격에 한정 판매 하면서 주문은 미친듯이 들어온다. 12시 59분에 비해 약 4배의 속도로 주문/결제가 들어왔다. 당연히 주문 처리량 보다 높아졌다. 13:01분 부터 주문유입량 >> 주문처리량이 되었다. 원래대로라면 다른 주문들은 다 Rejection 하거나, 고객에게 1분 이상의 대..
#topic 생성 kafka-topics --create --topic freblogg --partitions 3 --replication-factor 1 --zookeeper localhost:2181 - 파일명은 base offset에 영향을 받는다 (해당 세그먼트의 첫 번째 offset) 그러므로 새로 생성된 토픽-파티션의 첫번째 세그먼트는 항상 0000000000000.log 파일명이다. 간단하게 2개의 메시지를 넣고 다시 한번 0번 파티션을 확인해보면 0000000000000.log 의 파일 크기가 변경되고 그 안을 보면 해당 데이터가 들어간것을 확인 할 수 있다 이를 좀 더 자세히 보려면 아래의 명령어처럼 세그먼트 내부를 확인 할 수 있다. ( DumpLogSegments ) ~/conflue..
카프카 스트림은으로 StateStore를 구성하여 서비스에 투입하는 것은 튜토리얼과 상용은 하늘과 땅차이다. 특히 데이터가 어느 정도 적재되는 상황에서는 그 문제는 더 커진다. 일반적으로 Application 이 standby가 되면 트래픽을 받을 수 있는 것과는 달리, Kafka Streams로 Query를 받아 내기 위해서는 StateStore가 초기화 되어야 한다. 그래서 아래와 같은 코드를 준비하여야 한다. (시대가 어느시대인데 무한 루프냐 ㅠ) public class StoreHelper { public static T waitUntilStoreIsQueryable(final String storeName, final QueryableStoreType queryableStoreType, fina..
2017.07에 테스트 한것을 가져옴 kafka version 1.1 1. min.insync.replicas=2, replication-factor=1 생성) kafka-topics --create --zookeeper localhost:2181 --config min.insync.replicas=2 --topic top2 --partitions 1 --replication-factor 1 > 생성은 잘 된다. > 의견 : 생성 자체가 안되었으면 좋았을것 같다 (alter에서 수정하면 정상적으로 쓸 수 있기는 하다) - 생각해보면 min.insync.replicas가 relicas보다 많더라도, kafka-client에서 acks를 all로 하지 않는 다면 의미 없는 설정이다. 그렇기 때문에 '만들어 ..
logstash 에는 이미 kafka input 모듈이 있기 때문에 쉽게 카프카 메시지를 엘라스틱에 저장할 수 있다. ( https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html )몇가지 주의할 점 1) 주문번호등을 Partition Key로 이용할 경우에, 데이터가 어느 파티션에 들어가 있는지, Offset은 얼마인지도 중요한 디버깅 요소가 된다. 이러한 데이터도 함께 elasticsearch 에 넣으려면 add_field를 이용한다. 우선 input 쪽에 아래의 설정을 추가하여 메타 정보를 사용할 수 있게 한다input {kafka {decoreate_events => true}} 그리고 filter 영역에 아래처럼 추가하여..
필요한 수준까지 이해한듯 해서, 메모로 남김 원래의 카프카는 데이터 주입에 대한 순서가 중요하지 않았다. (정확히는 어느 정도의 버퍼를 카프카 브로커가 가지면서 Event Time에 대한 조정 작업을 알아서 해준다)하지만 producer의 명등성 옵션과 순서 보장에 대한 이슈가 중요해졌고 max.in.flight.request.per.connection = 1 로 셋팅하면서 순서 꼬임을 좀 방어하고자 했는데...1로 설정하고 idempotence 옵션이 켜져있고 retry가 가능한 상태일때 전송하다가 OutOfOrderSequence 예외가 발생한다면 클라이언트 영역에서 이 Sequence Number를 잘 처리해야한다. (재전송 하거나, 다음 idempotence 전송에서 써야한다) 이게 어렵다... ..
subject란?- Schema Registry는 Schema를 Subject 단위로 관리되어지는 듯 하다. 디폴트로는 -key, -value (ex: data.order-value) 형식으로 사용되지만 confluent 4.1 부터 이 부분을 커스텀마이징 할 수 있는 SubjectNameStrategy가 추가되었다.기본적으로 사용하는 토픽이 동일하다면 동일한 subject를 보지만, 커스텀마이징 한다면 Record 에 따라 subject로 변경 가능하다.Get the subject name for the given topic and value type. 1. 최초에 Schema Registry에 등록하고 Schema ID 를 가져오는 동작- SR은 Cache 기반으로 동작한다 어플리케이션이 시작하면 A..