본문 바로가기
데이터 엔지니어링

Kafka exactly-once delivery 지원

by 내기록 2022. 9. 3.
반응형

메시지 시스템들의 메시지 전송 방식에는 적어도 한 번 전송(at-least-once), 최대 한 번 전송(at-most-once), 정확히 한 번 전송(exactly-once)이 있습니다.

 

1) 적어도 한 번 전송(at-least-once)

 

https://mirakl.tech/sending-kafka-message-in-a-transactional-way-34d6d19bb7b2

 

Kafka(브로커)는 첫 번째 메시지를 기록하고 잘 받았다는 ACK를 프로듀서에게 전송하려고 합니다. 하지만 네트워크 오류 또는 브로커 장애가 발생하여 결국 프로듀서는 메시지에 대한 ACK를 받지 못합니다.

메시지를 전송한 후 브로커로부터 ACK을 받지 못한 프로듀서는 브로커가 메시지를 받지 못했다고 판단해 메시지를 재전송합니다.

 

프로듀서는 메시지를 보내고 그에 대한 ACK를 받지 못했으므로 브로커가 메시지를 받지 못했다고 판단하지만 브로커는 메시지를 기록했으므로 장애에서 복구된 브로커는 메시지를 갖고 있습니다. 따라서 프로듀서 입장에서는 브로커가 메시지를 저장하고 ACK만 전송하지 못한 것인지, 메시지를 저장하지 못해서 ACK를 전송하지 않은 것인지는 정확히 알 수 없습니다.

 

하지만 메시지에 대한 ACK를 받지 못한 프로듀서는 적어도 한 번 전송 방식에 따라 메시지를 다시 한번 전송합니다. 따라서 동일한 메시지가 브로커에 중복 저장 될 수 있습니다.

 

위와 같이 네트워크의 회선 장애나 기타 장애 상황에 따라 일부 메시지 중복이 발생할 수는 있지만, 최소한 하나의 메시지는 반드시 보장한다는 것이 적어도 한 번 전송 방식이며, 카프카의 기본 동작입니다.

 

 

2) 최대 한 번 전송(at-most-once)

https://mirakl.tech/sending-kafka-message-in-a-transactional-way-34d6d19bb7b2

최대 한 번 전송은 ACK를 받지 못해도 재전송을 하지 않습니다. 즉, 일부 메시지의 손실을 감안하더라도 중복 전송은 하지 않는 경우입니다. 일부 메시지가 손실되더라도 높은 처리량을 필요로 하는 대량의 로그 수집이나 IoT같은 환경에서 사용할 수 있습니다.

 

 

3) 정확히 한 번 전송(exactly-once)

https://mirakl.tech/sending-kafka-message-in-a-transactional-way-34d6d19bb7b2

 

 

https://hevodata.com/blog/kafka-exactly-once-semantics/

동작 과정

(1) 프로듀서가 브로커의 토픽으로 메시지를 전송합니다. 이때 PID(Producer ID)와 seq(메시지 번호)를 헤더에 포함하여 전송합니다.

(2) 브로커는 메시지를 저장하고 PID와 seq를 메모리에 기록합니다. 그리고 메시지를 잘 받았다는 ACK를 프로듀서에게 응답합니다.

(3) 프로듀서는 다음 메시지를 전송합니다. PID는 동일하지만 seq는 1 증가합니다.

(4) 브로커는 다음 메시지를 저장하고 PID와 seq를 메모리에 기록합니다. 그리고 잘 받았다는 ACK를 프로듀서에게 전송하려 하지만 네트워크 오류 또는 브로커 장애가 발생하여 프로듀서는 ACK를 받지 못합니다. (아래 그림에서 3번)

(5) 브로커로부터 ACK를 받지 못한 프로듀서는 브로커가 다음 메시지를 받지 못했다고 판단해 메시지를 재전송합니다. (아래 그림에서 4번)

(6) 프로듀서는 재전송한 메시지의 헤더에서 PID와 seq를 비교해서 메시지가 이미 브로커에 저장되어 있는 것을 확인하면 메시지를 중복 저장하지 않고 ACK만 전송합니다. (아래 그림에서 5번)

 

프로듀서가 중복 없는 전송을 시작하면, 프로듀서는 고유한 PID를 할당받게 되고, PID와 seq를 메시지 헤더에 포함해 메시지를 전송합니다. 브로커는 각 메시지마다 PID, seq값을 메모리에 유지하기 때문에 브로커에 기록된 메시지들의 중복 여부를 알 수 있습니다.

PID는 사용자가 별도로 생성하는 것이 아닌 프로듀서에 의해 자동 생성됩니다.

프로듀서에서 시퀀스 번호를 메시지마다 순차적으로 증가시키는 방법과 동일하게, 브로커에서도 기록되는 메시지들에 대해 시퀀스 번호를 증가시킵니다. 따라서 프로듀서가 보낸 메시지의 시퀀스 번호가 브로커가 갖고 있는 시퀀스 번호보다 정확하게 하나 큰 경우가 아니라면 브로커는 프로듀서의 메시지를 저장하지 않습니다.

 

PID와 seq은 브로커의 메모리에 유지되고, 리플리케이션 로그에도 저장됩니다. 따라서 예기치 못한 브로커의 장애 등으로 리더가 변경되는 일이 발생하더라도 새로운 리더가 PID와 시퀀스 번호(seq)을 정확히 알 수 있으므로 중복 없는 메시지 전송이 가능합니다.

 

 

 

정확히 한 번 전송(exactly-once)?

 

파이프라인에서 exactly-once 처리는 매우 중요합니다. 카프카 브로커가 0.11.0.0 버전 이후라면 사용이 가능합니다.

 

exactly-once delivery는 프로듀서부터 컨슈머까지 연결되는 파이프라인의 처리를 뜻합니다. 이것이 보장되지 않는 파이프라인의 경우 트랜잭션 처리를 하지 않으면 카프카 클러스터에 두 번 이상 데이터가 저장될 수 있습니다. 데이터가 클러스터에 저장되었으나 ack가 유실되어 프로듀서가 재처리하는 경우가 대표적인 예시입니다. 결과적으로 카프카 브로커에서 트랜잭션 처리를 하더라도 컨슈머가 중복해서 데이터 처리를 하는 것에 대해 보장하지 않으므로, 컨슈머의 중복처리는 따로 로직을 작성해야 합니다.

 

kafka는 기본적으로 at-least-once를 보장하며 옵션을 통해 exactly-once를 적용할 수 있습니다.

 

 

카프카 트랜잭션 적용

프로듀서 

configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); # 트랜잭션 처리 설정

 

프로듀서가 해당 설정을 가지고 데이터를 보내면 아래와 같이 데이터가 브로커에 적재되는 것을 확인할 수 있습니다.

__transaction_state는 프로듀서가 보내는 데이터의 트랜잭션을 기록하는데 사용하며 이 데이터는 트랜잭션이 활성화된 컨슈머가 데이터를 가져갈 때 사용됩니다.

$ ./kafka-console-consumer.sh --consumer.config consumer.config --formatter "kafka.coordinator.transaction.TransactionLog\$TransactionLogMessageFormatter" --bootstrap-server localhost:9092 --topic __transaction_state --from-beginning
..b941a::TransactionMetadata(transactionalId=..b941a, producerId=1000, producerEpoch=28, txnTimeoutMs=60000, state=Ongoing, pendingState=None, topicPartitions=Set(test-1), txnStartTimestamp=1594967312702, txnLastUpdateTimestamp=1594967312702)
..b941a::TransactionMetadata(transactionalId=..b941a, producerId=1000, producerEpoch=28, txnTimeoutMs=60000, state=PrepareCommit, pendingState=None, topicPartitions=Set(test-1), txnStartTimestamp=1594967312702, txnLastUpdateTimestamp=1594967312712)
..b941a::TransactionMetadata(transactionalId=..4b941a, producerId=1000, producerEpoch=28, txnTimeoutMs=60000, state=CompleteCommit, pendingState=None, topicPartitions=Set(), txnStartTimestamp=1594967312702, txnLastUpdateTimestamp=1594967312713)

위 데이터를 보면 Ongoing->PrepareCommin->CompleteCommit 순서대로 트랜잭션이 진행되는 것을 확인할 수 있으며 이렇게 트랜잭션이 완료된 데이터만 컨슈머가 가져갈 수 있습니다.

 

 

트랜잭션을 활성화하면 토픽에 트랜잭션 관련된 데이터가 추가적으로 적재됩니다.

카프카 브로커에 적재된 세그먼트 파일을 확인하면 메시지를 하나 전송했을 때 두 개의 오프셋이 작성된 것을 알 수 있습니다.

16번 오프셋 - isTransactional: true, isControl: false
17번 오프셋 - isTransactional: true, isControl: true,  endTxnMarker: COMMIT

프로듀서는 트랜잭션을 끝내면 해당 토픽에 commit 메시지를 명시적으로 전달합니다. 이로 인해 추가 오프셋이 소모되는 것입니다.

위 데이터에서 16번 오프셋은 실질적인 데이터이고 17번 오프셋은 commit을 명시하는 데이터이기 때문에 컨슈머는 실제 데이터가 있는 16번 오프셋의 데이터만 가져가며 17번 오프셋은 무시하게 됩니다.

 

컨슈머

configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); # 명시적 오프셋 커밋
configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, TRANSACTION_CONFIG); # 트랜잭션 설정

위 설정을 통해 프로듀서가 브로커로 보낸 데이터 중 트랜잭션이 완벽하게 완료된 데이터만 읽을 수 있습니다.

 

[main] INFO com.example.ConsumerWithSyncCommit - ConsumerRecord(topic = test, partition = 1, leaderEpoch = 0, offset = 16, CreateTime = 1594970489649, serialized key size = -1, serialized value size = 54, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value =data)

컨슈머를 실행하면 16번 오프셋의 데이터만 읽는 것을 확인할 수 있습니다.

 

 

Conclusion

카프카 트랜잭션은 프로듀서에서 컨슈머까지 데이터가 전달될 때 exactly-once를 지원하기 위한 옵션입니다. 하지만 이 옵션을 사용하더라도 최종적으로 컨슈머에서 target 저장소로 데이터가 들어갈 때 중복이 발생할 수 있습니다. 트랜잭션 옵션은 프로듀서에서 컨슈머까지 가는 파이프라인에 대해서 exactly-once 전달을 보장합니다. 내부적으로 트랜잭션 관련한 브로커 처리를 수행하기 때문에 옵션을 사용하지 않는 것보다 성능이 다소 떨어질 수 있습니다.

컨슈머가 적재하는 저장소가 유니크키를 지원하는 저장소라면 굳이 트랜잭션 옵션을 적용할 필요는 없을 것 같습니다.

하지만 데이터 처리에서 프로듀서가 중복해서 데이터를 보내는 것을 방지해야 하는 파이프라인을 구성할 때는 이 옵션을 사용하는 것이 좋습니다.

 

 

 

 

 

 

References

https://blog.voidmainvoid.net/354

https://www.confluent.io/blog/transactions-apache-kafka/

 

 

 

반응형

댓글