b

카프카 메시지간의 event time 측정하기 logstash elapsed plugin 본문

카테고리 없음

카프카 메시지간의 event time 측정하기 logstash elapsed plugin

dev.bistro 2018. 7. 30. 19:33

현재 업무에서 메지시의 상태는 하나의 필드로 정의하고 있다.  예를 들어 결제 요청이 들어오면 APPROVAL_REQUESTED:type 을 정의하여 payment topic에 적재하고 이 후 저 메시지를 컨슘하여 결제가 완료된다면 '동일한 토픽에' APPROVAL_COMPLETED:type 으로 하나의 메시지를 추가한다.

즉, APPROVAL_REQUESTED 상태는 APPROVAL_COMPLETED or APPROVAL_FAILED 상태로 변경 가능하다.

각각은 Kafka에 의해 Header에 event time이 기록되어 있지만, 별개로 actBy라는 timestamp 필드도 추가하였다.  REQUESTED 상태에서 COMPLETED 상태로 변경되기까지의 시간을 어떻게 측정할 것인가?


무난한 방법으로 하나의 Streams Application을 작성할 수도 있고, KSQL을 통해서 볼 수도 있다. 또는 COMPLTED 메시지가 기록될 때 프로그램적으로 기록 할수도 있고... 내가 선택한 방법은 기존에 모니터링을 위해 사용중인 logstash의 plugin 이다.


https://www.elastic.co/guide/en/logstash/current/plugins-filters-elapsed.html

하나의 필드를 기준으로 2개의 message 사이이 시간을 측정해서 소요시간을 측정하는 것이다.

2번째 메시지에 elapsed_time 필드를 add_field 줄수도 있고, 원본 이벤트는 그대로 나둔채 하나의 추가 이벤트를 발행해서 elasticsearch에 기록 할 수 있다. (이 경우에는 elapsed_match 라는 tag를 가진 document이다) 이 설정은 new_event_on_match 를 통해서 설정 가능하다.


개인적으로 셋팅한 내용은 아래와 같다.

   elapsed {

      start_tag => "APPROVAL_REQUESTED"

      end_tag => "APPROVAL_COMPLETED"

      unique_id_field => "id"

      timeout => 10

      new_event_on_match => false

    }


기준이 되는 필드는 'id' 이고  동일한 ID를 가진 이벤트의 APPROVAL_REQUESTED,  APPROVAL_COMPLETED 사이의 시간을  APPROVAL_COMPLETED tag를 가지는 메시지에 elapsed_time 필드로 기록 해준다. 만약 10초 이내에  동일한 id를 가지는 APPROVAL_COMPLETED 메시지를 수신하지 못한다면 elapsed_expired_error 태그를 가지는 메시지를 발행한다.


이를 통해서 손쉽게 


1카프카 주문 메시지 발행 - 2컨슘 - 3Remote API호출 - 4주문 요청 결과 메시지 발행  

1~4단계의 소요 시간을 측정 할 수 있다.



(물론 이것은 카프카 메시지 간의 소요시간이지, 그 외 측정구간은 또 다른 방법을 이용 했다)

Comments