본문 바로가기
데이터 엔지니어링

[번역] Kafka Throughput&Latency Best Practices 공식문서

by 내기록 2022. 7. 29.
반응형

출처는 아래에 있는 confluent 공식 문서입니다.

 

1. Optimizing for Throughput

처리량을 최적화하기 위해 producer와 consumer는 주어진 시간 내에 가능한 한 많은 데이터를 이동시켜야 한다.

높은 처리량을 얻으려면 데이터가 이동하는 속도를 최대화 해서 가장 빠른 속도로 만들어야 한다.

 

Number of Partitions

topic partition은 카프카의 병렬화 단위이다.

생산자는 병렬로 다른 파티션에 메시지를 전송하고, 병렬처리된 서로 다른 브로커를 지나, 병렬 처리된 서로 다른 컨슈머들에게 소비된다.

일반적으로 토픽 파티션이 많은수록 처리량이 증가한다.

처리량을 최대화하려면 병렬 처리된 브로커들에게 전해질 충분한 수의 파티션이 필요하다.

 

무작정 파티션의 수를 늘리기만 하는 것은 좋지 않다. 

자신이 가진 환경에서 producer throughput과 consumer throughput을 기반으로 파티션 수를 선택해야 한다.

또한 토픽 파티션들 사이에 데이터가 고르게 분산되도록 데이터 패턴과 key 값에 대한 설계가 필요하다.

(토픽 파티션들은 key값을 사용해서 나눠지기 때문에 고르게 분산되기 위한 설계가 필요함)

 

이렇게 하면 특정 항목 파티션의 오버로드를 방지할 수 있다.

 

Batching Message

kafka producer의 배치를 사용하면 동일한 파티션으로 보내지는 메시지를 일괄 처리할 수 있다.

즉, 여러 개의 메시지를 하나의 요청에 담아 보내는 것이다.

처리량을 최적화 하기 위해 producer 배치를 튜닝한다. 배치 사이즈를 늘리고 메시지가 모일 때까지 대기하는 시간을 늘린다.

배치 크기가 클수록 요청이 줄어들어 각 요청을 처리하기 위한 producer와 broker CPU 오버헤드가 줄어든다.

 

Java 클라이언트를 사용하여 batch.size 매개변수로 각 메시지 배치의 최대 크기(바이트)를 늘릴 수 있다.

배치의 최대 크기만큼 메시지가 모일 수 있게 linger.ms 값을 수정할 수 있다.

지연을 통해 producer는 메시지가 batch.size 만큼 모일 때까지 대기한다. 

메시지는 보낼 준비가 되는 즉시 전송하지 않으므로 대기 시간이 더 길어질 수 있다.

 

Compression

처리량을 최적화하기 위해 producer에서 압축을 사용할 수 있다. 즉, 많은 비트를 더 적은 비트로 전송할 수 있는데 compression.type 매개변수를 사용해서 압축을 실행한다.

lz4(성능을 위해 권장된다), snappy, zstd, gzip, none(기본값, 압축 없음)

lz4를 사용해서 압축함으로써 성능을 향상시킬 수 있다.

 

Producer acks

producer가 메시지를 전송하면, 메시지는 대상 파티션의 리더 브로커에게 전송된다.

그 다음 producer는 다음 메시지를 보내기 전에 메시지가 커밋되었음을 알기 위해 리더 브로커로부터 응답을 기다린다.

acks가 0일 경우 프로듀서는 브로커의 응답을 기다리지 않는다. 

acks가 1일 경우 리더 리플리카가 메시지를 받는 순간 브로커로부터 응답을 받는다.

acks가 all일 경우 동기화된 모든 리플리카가 메시지를 받으면 브로커로부터 응답을 받는다.

이는 consumer가 아직 커밋되지 않은 메시지를 읽을 수 없도록 하기 위한 장치이다.

 

acks값은 producer의 처리량에 영향을 미친다. producer가 빨리 응답을 받을수록 다음 메시지를 더 빨리 보낼 수 있으며, 이는 일반적으로 처리량을 더 높인다. 

acks=1이면 리더 브로커는 레코드를 로컬 로그에 기록한 다음 모든 팔루워의 확인을 기다리지 않고 요청을 승인한다.

단점은 acks=all에 의해 내구성이 낮다는 것이다. 모든 리플리카에 복제 되었다는 것이 보장되지 않기 때문이다.

 

Memory Allocation

kafka producer들은 자바 클라이언트가 보내지 않은 메시지를 저장하기 위해 자동으로 메모리를 할당한다.

메모리 제한에 도달하면 producer는 메모리가 해제될 때 까지 또는 max.block.ms 시간이 경과할 때까지 추가 전송을 차단한다.

매개변수 buffer.memory를 사용하여 할당된 메모리 양을 조정할 수 있다. 파티션이 많지 않다면 따로 수정할 필요가 없을 수 있지만,

파티션이 많은 경우에는 메시지 크기, 지연 시간 및 파티션 수를 고려하면서 해당 값을 조정하면 더 많은 파티션에서 파이프라인을 유지할 수 있다.

 

Consumer Fetching

처리량을 최적화하는 또 다른 방법은 consumer가 leader broker로부터 가져오는 데이터의 양을 조정하는 것이다.

fetch.min.bytes 값을 늘리면 fetch 요청에 대해 소비자가 leader broker로부터 얻는 데이터 양을 늘릴 수 있다.

이 매개변수는 consumer의 fetch 응답에 대해 예상되는 최소 바이트 수를 설정한다.

이 값을 늘리면 fetch 요청의 수가 줄어들어 fetch 요청을 처리하기 위한 broker CPU 오버헤드가 감소하므로 처리량이 향상된다.

 

위에서 producer batch를 증가시킨 결과와 유사하게 consumer에 대한 fetch 사이즈를 늘렸을 때 높은 latency로 trade-off가 발생할 수 있다.

이는 fetch요청에 fetch.min.bytes를 충족하는 충분한 메시지가 있거나 대기시간(fetch.max.wait.ms)가 만료될 때까지 브로커가 소비자에게 새 메시지를 보내지 않기 때문이다.

 

소비자를 여러명 포함하는 consumer group을 사용하여 병렬화할 수 있다. 소비를 병렬화하면-parallelizing consumption- 여러 consumer가 동시에 여러 파티션을 처리하면서 로드 밸런싱을 수행할 수 있기 때문에 처리량이 향상될 수 있다.

이 병렬화의 상한선은 위에서 정의된 토픽 파티션 수이다.

 

Summary of Configurations for Optimizing Throughput

Producer

  • batch.size: increase to 100000–200000 (default 16384)
  • linger.ms: increase to 10–100 (default 0)
  • compression.type=lz4 (default none, for example, no compression)
  • acks=1 (default: all - default prior to Kafka 3.0: 1)
  • buffer.memory: increase if there are a lot of partitions (default 33554432)

Consumer

  • fetch.min.bytes: increase to ~100000 (default 1)
  • fetch.max.wait.ms=500 (default 500)

 

 

2. Optimizing for Latency

kafka는 대용량 데이터 전송에 매우 낮은 end-to-end latency를 제공한다.

이는 kafka에 적재된 레코드를 소비자가 가져오는 데 걸리는 시간이 짧다는 것을 의미한다.

 

전용 클러스터를 사용하는 경우 CKU를 추가하면 대기 시간을 줄일 수 있다. <<?

end-to-end latency에 영향을 미치는 다른 관련 변수에는 클라이언트 어플리케이션 구현, 파티션 분할 빛 키 지정 전략, produce and consumer pattern, 네트워크 지연시간 및 QoS등이 있다.

 

Number of Partitions

아래 링크에는 파티션 수를 선택하는 방법에 대한 내용이 있다.

 

How to Choose the Number of Topics/Partitions in a Kafka Cluster? | Confluent

Confluent is building the foundational platform for data in motion so any organization can innovate and win in a digital-first world.

www.confluent.io

 

파티션은 카프카에서 병렬화의 단위이기 때문에 파티션의 수가 증가하면 throughput이 증가할 수 있다.

 

다만, 파티션의 수가 증가하면 대기 시간이 증가한다. 각각의 broker간에 공유되는 여러 파티션을 복제하는 데 시간이 오래 걸릴 수 있으며, 결과적으로 메시지가 커밋된 것으로 간주하는데 시간이 더 오래 걸릴 수 있다. (위의 acks참고)

-> 이 내용은 kafka partition이 구성되는 방법과 replica에 대한 지식이 있어야 한다.

커밋될 때까지 메시지를 사용할 수 없으므로, 궁극적으로 end-to-end latency는 증가할 수 있다.

 

Batching Messages

producer들은 자동으로 메시지를 일괄 처리한다. 배치들이 채워질 때까지 기다리는 시간이 짧으면 데이터를 생성하는 대기 시간이 더 짧아진다. (batch size가 작으면 메시지를 전송하는 대기 시간이 짧아진다)

기본적으로 producer는 low latency로 튜닝되고 매개변수 linger.ms는 default 0으로 설정되며, producer는 전송할 데이터가 있는 즉시 데이터를 전송한다. 

이 경우 메시지는 즉시 전송되므로 batch 처리가 비활성화되어 있는 것은 아니다. -메시지는 상항 배치로 보낸다- 배치에 하나의 메시지만 담아서 보내는 경우이다. (메시지가 producer가 broker에 전송하는 것보다 더 빠르게 producer로 들어오지 않는 한!)

 

Compression

압축을 사용하는지에 대한 여부를 고려해야 한다. 압축을 사용하면 CPU cycle이 증가하지만 네트워크 대역폭 사용량이 줄어든다.

반면 압축을 사용하지 않으면 CPU cycle은 절약되지만 네트워크 대역폭 사용량은 증가한다.

압축 성능에 따라 compression.type=disabled를 사용하여 압축을 사용하지 않는 것이 CPU주기를 절약하기에 좋다.

단, 좋은 압축 코덱을 사용하면 latency도 줄일 수 있다.

 

Producer acks

producer가 요청이 완료된 것으로 간주하기 전에 통합 클러스터의 리더 브로커가 수신해야 하는 승인 수를 조정할 수 있다.

leader broker의 응답이 빠를수록 producer는 다음 메시지 배치를 계속해서 보낼 수 있어 producer latency가 줄어든다.

producer acks 매개변수를 사용하여 승인 수를 설정할 수 있는데, 기본은 acks=1으로 leader broker는 모든 복제본이 메시지를 수신하기 전에 생산자에게 응답한다. 

응용 프로그램 요구사항에 따라 producer가 broker의 응답을 기다리지 않도록 acks=0으로 설정이 가능하지만, 메시지가 producer 모르게 손실 될 위험이 있다.

 

Consumer Fetching

producer의 batch 개념과 유사하게 consumer는 broker로부터 가져오는 각 데이터 양을 조정하여 대기 시간을 줄일 수 있다.

consumer 매개변수인 fetch.min.bytes의 기본 값은 1이다. 즉, 단일 바이트의 데이터를 사용할 수 있는 즉시 fetch 요청이 응답되거나 데이터가 도착할 때까지 기다리는 동안 fetch 요청의 시간(fetch.max.wait.ms)이 초과된다.

 

두 매개변수로 fetch 요청의 크기 또는 요청 대기 시간을 파악할 수 있다.

 

Using Local Tables

kafka streams application을 사용하거나 ksqlDB를 사용하는 경우 응용 프로그램 내에서 몇 가지 성능 향상을 수행할 수 있다. 대규모 테이블을 low processing latency로 검색을 수행해야 하는 경우 로컬 스트림 처리를 사용할 수 있다.

일반적인 방법은 kafka connect를 사용하여 원격 데이터베이스를 kafka에 로컬로 사용할 수 있도록 하는 것이다.

(kafka source connector를 사용해서 원격 데이터베이스(예:oracle) -> kafka로 데이터를 전송한다.)

그런 다음 kafka streams API 또는 ksqlDB를 활용하여 테이블과 스크림의 빠르고 효율적인 query join을 할 수 있다.

(예를 들어 ksql을 사용하면 table/stream을 생성해서 kafka message들을 테이블 형태로 조회하거나, 실시간으로 들어오는 데이터를 stream으로 생성해서 쿼리할 수 있다.)

 

local state store에서 각 테이블의 최신 정보를 조회할 수 있으므로 작업을 수행할 때 latency와 원격 데이터베이스의 load가 줄어든다.

(local state store : kafka는 메시지가 디스크에 저장되니까 이렇게 표현한 것 같다.)

 

 

Kafka Streams Overview | Confluent Documentation

 

docs.confluent.io

 

 

Topology Optimization (Streams관점)

(이 부분은 잘 이해가 안돼서 나중에 다시 수정할 예정)

kafka streams application은 병렬 처리를 위해 분할된 데이터에서 작동할 수 있는 stream processor nodes의 그래프인 processor topologies 기반이다.(?)

application에 따라 repartitioned topics 기반의 불필요한 데이터 셔플이 발생할 수 있으며, 이로 인해 정확성 문제는 발생하지 않지만 성능 불이익이 발생할 수 있다.

 

성능 저하를 방지하려면 매개변수 topology.optimization을 설정하여 event streaming application에 대한 토폴로지 최적화를 할 수 있다. 최적화를 활성화하면 repartition topics를 통해 저장되고 내보내지는 재구성된 스트림의 양을 줄일 수 있다.

 

* 위 내용 이해를 위해서는 카프카 스트림즈의 구조를 알아야 한다.

topology는 Node와 각 노드를 연결하는 선으로 이루어져 있으며, 노드는 processor를 의미하고 선은 stream을 의미한다.

https://daniel.arneam.com/blog/distributedarchitecture/2020-10-25-Kafka-Streams-Concepts/

 

Summary of Configurations for Optimizing Latency

Producer

  • linger.ms=0 (default 0)
  • compression.type=none (default none, meaning no compression)
  • acks=1 (default: all - prior to Kafka 3.0: 1)

Consumer

  • fetch.min.bytes=1 (default 1)

Streams

  • StreamsConfig.TOPOLOGY_OPTIMIZATION: StreamsConfig.OPTIMIZE(default StreamsConfig.NO_OPTIMIZATION)
  • Streams applications have embedded producers and consumers, so also check those configuration recommendations

 

 

3. Throughput vs Latency

 

 

References

 

 

 

반응형

댓글