kafka consumer 예제

이 예제는 실행 가능한 세 소비자를 실행기에게 제출합니다. 각 스레드에는 데이터를 수신하는 스레드를 볼 수 있도록 별도의 ID가 제공됩니다. 프로세스를 중지하면 종료 후크가 호출되어 절전 모드 해제를 사용하여 세 스레드가 중지되고 종료될 때까지 기다립니다. 이 것을 실행하면 모든 스레드에서 많은 데이터가 표시됩니다. 다음은 한 번의 실행에서 나온 샘플입니다: 또한 소비자를 그룹화할 수 있고 소비자 그룹의 소비자는 구독한 토픽의 파티션을 공유할 수 있습니다. 토픽에 N파티션이 있고 소비자 그룹의 N 소비자가 토픽을 구독한 경우 각 소비자는 토픽의 파티션에서 데이터를 읽습니다. 이것은 소비자가 그룹에있을 수있는 머리입니다. 우리는 밖으로 다음 튜토리얼에서 소비자 그룹의 세부 사항으로 이동합니다. 파티션이 그룹의 다른 소비자에게 다시 할당되면 초기 위치는 마지막으로 커밋된 오프셋으로 설정됩니다.

위의 예제에서 소비자가 갑자기 충돌하면 파티션을 인수하는 그룹 구성원이 오프셋 1에서 소비를 시작합니다. 이 경우 6의 충돌된 소비자 위치까지 메시지를 다시 처리해야 합니다. 오프셋 관리: 소비자가 코디네이터로부터 할당을 받은 후 할당된 각 파티션의 초기 위치를 결정해야 합니다. 그룹이 처음 만들어지면 메시지를 사용하기 전에 구성 가능한 오프셋 재설정 정책(auto.offset.reset)에 따라 위치가 설정됩니다. 일반적으로 소비는 가장 빠른 오프셋 또는 최신 오프셋에서 시작됩니다. 이러한 설정을 적절하게 조정하지 못하면 일반적으로 처리된 레코드에 대한 오프셋을 커밋하기 위해 호출에서 발생한 CommitFailedException이 발생합니다. 자동 커밋 정책을 사용하는 경우 소비자가 내부적으로 커밋 오류를 자동으로 무시하기 때문에 이러한 일이 발생할 때도 알 수 없습니다(지연 메트릭에 영향을 미칠 만큼 자주 발생하지 않는 경우). 이 예외를 catch하고 무시하거나 필요한 롤백 논리를 수행할 수 있습니다. 재조정 동작에 영향을 미치는 다른 설정은 heartbeat.interval.ms. 이렇게 하면 소비자가 코디네이터에게 하트비트를 보내는 빈도가 제어됩니다.

또한 재균형이 필요할 때 소비자가 감지하는 방식이므로 하트비트 간격이 낮을수록 일반적으로 재균형이 빨라집니다. 기본 설정은 3초입니다. 큰 그룹의 경우 이 설정을 늘리는 것이 현명할 수 있습니다. Librdkafka는 카프카 소비에 대한 다중 스레드 접근 방식을 사용합니다. 사용자의 관점에서 API와의 상호 작용은 루프에서 rd_kafka_consumer_poll를 호출하는 사용자와 Java 클라이언트에서 사용하는 예제와 너무 다르지 않지만 이 API는 한 번에 하나의 메시지 또는 이벤트만 반환합니다: BOOTSTRAP_SERVERS_CONFIG : 카프카 브로커의 주소입니다. Kafka가 클러스터에서 실행되는 경우 쉼표(,) 분리된 주소를 제공할 수 있습니다. 예를 들어:localhost:9091,localhost:9092 소비자 내부를 복잡하게 하여 이 문제를 제정신으로 처리하고 처리하는 대신 API는 커밋이 성공하거나 실패할 때 호출되는 콜백을 제공합니다. 원하는 경우 이 콜백을 사용하여 커밋을 다시 시도할 수 있지만 동일한 순서 지정 문제를 처리해야 합니다. 지금까지의 예제는 동기 커밋 API에 중점을 두었지만 소비자는 비동기 API인 commitAsync도 노출합니다. 비동기 커밋을 사용하면 응용 프로그램이 커밋이 반환되기 전에 다음 메시지 일괄 처리를 시작할 수 있으므로 일반적으로 처리량이 높아집니다.

장단점은 커밋이 실패한 후에야 찾을 수 있다는 것입니다. 아래 예제에서는 기본 사용법: 예제로 시작하기 전에 먼저 Kafka에서 사용되는 일반적인 용어와 일부 명령에 대해 살펴보겠습니다. 소비자 그룹이 활성 상태인 경우 Kafka 배포의 빈 디렉토리에 있는 consumer-groups.sh 스크립트를 사용하여 명령줄에서 파티션 할당 및 소비 진행률을 검사할 수 있습니다.

About the author: mcadmin