더 작은 오프셋 커밋을 방지하기위한 Kafka 소비자 오프셋 커밋 확인

nael.fridhi

우리는 오프셋 10 커밋 요청을 보내는 소비자가 있다고 가정합니다. 통신 문제가 있고 브로커가 요청을받지 못했고 물론 응답하지 않은 경우. 그 후 다른 소비자가 다른 배치를 처리하고 오프셋 20을 성공적으로 커밋했습니다.

Q : 우리의 경우 오프셋 20을 커밋하기 전에 로그의 이전 오프셋이 커밋되었는지 여부를 확인할 수 있도록 처리 할 방법이나 속성이 있는지 알고 싶습니다.

마이크

설명하는 시나리오는 비동기 커밋을 사용할 때만 발생할 수 있습니다.

하나의 특정 TopicPartition은 동일한 ConsumerGroup 내의 단일 소비자 만 사용할 수 있습니다. 동일한 TopicPartition을 읽는 두 명의 소비자가있는 경우에만 가능합니다.

  1. 소비자 그룹이 다른 경우 또는
  2. 동일한 ConsumerGroup이 있고 재조정이 발생하는 경우. 그러나 여전히 한 소비자 만 해당 TopicPartition을 한 번에 읽을 수 있으며 둘 다 동시에 읽지 않습니다.

사례 # 1은 매우 명확합니다. 소비자 그룹이 서로 다른 경우 파티션을 병렬로 독립적으로 사용합니다. 또한 커밋 된 오프셋은 별도로 관리됩니다.

사례 # 2 : 소비자가 실패 / 사망하고 소비자를 복구하지 않아 첫 번째 소비자가 오프셋 10을 커밋하지 못하면 재조정이 발생하고 다른 활성 소비자가 해당 파티션을 선택합니다. 오프셋 10이 커밋되지 않았으므로 새 소비자는 다음 배치로 점프하기 전에 오프셋 10을 다시 읽기 시작하고 오프셋 20을 커밋 할 수 있습니다. 이로 인해 "최소 한 번"의미론이 발생하고 중복이 발생할 수 있습니다.

이제 더 높은 오프셋을 커밋 한 후 더 작은 오프셋을 커밋 할 수있는 유일한 시나리오입니다. 처음에 언급했듯이 오프셋 비동기 적으로 커밋하면 (을 사용하여 commitAsync) 실제로 발생할 수 있습니다 . 시간순으로 다음 시나리오를 상상해보십시오.

  • 소비자가 오프셋 0을 읽습니다 (백그라운드 스레드가 오프셋 0을 커밋하려고 함).
  • 오프셋 0 커밋 성공
  • 소비자가 오프셋 1을 읽음 (백그라운드 스레드가 오프셋 1을 커밋하려고 함)
  • 오프셋 1 커밋에 실패했습니다. 나중에 다시 시도하세요.
  • 소비자가 오프셋 2를 읽습니다 (백그라운드 스레드가 오프셋 2를 커밋하려고 함).
  • 오프셋 2 커밋 성공
  • 이제 무엇을해야 할까요 (오프셋 1 커밋을 다시 시도 합니까?)

재시도 메커니즘이 오프셋 1을 다시 커밋하도록두면 소비자가 오프셋 1까지만 커밋 한 것 같습니다. 이는 최신 오프셋 par TopicPartition의 각 소비자 그룹에 대한 정보가 내부 압축 Kafka 토픽 __consumer_offsets에 저장되기 때문입니다. 이는 소비자 그룹의 최신 값 (이 경우 : 오프셋 1) 만 저장하기위한 것입니다.

책 "Kafka-The Definitive Guide"에는이 문제를 완화하는 방법에 대한 힌트가 있습니다.

비동기 커밋 재시도 : 비동기 재 시도에 적합한 커밋 순서를 얻는 간단한 패턴은 단조롭게 증가하는 시퀀스 번호를 사용하는 것입니다. 커밋 할 때마다 시퀀스 번호를 늘리고 커밋 할 때 시퀀스 번호를 commitAsync 콜백에 추가합니다. 재 시도를 보낼 준비가되면 콜백이받은 커밋 시퀀스 번호가 인스턴스 변수와 같은지 확인하십시오. 그렇다면 새로운 커밋이 없었으며 재 시도해도 안전합니다. 인스턴스 시퀀스 번호가 더 높으면 새로운 커밋이 이미 전송되었으므로 다시 시도하지 마십시오.

예를 들어, 아래 Scala에서이 아이디어의 구현을 볼 수 있습니다.

import java.util._
import java.time.Duration
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer, OffsetAndMetadata, OffsetCommitCallback}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import collection.JavaConverters._

object AsyncCommitWithCallback extends App {

  // define topic
  val topic = "myOutputTopic"

  // set properties
  val props = new Properties()
  props.put(ConsumerConfig.GROUP_ID_CONFIG, "AsyncCommitter5")
  props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
  // [set more properties...]
  

  // create KafkaConsumer and subscribe
  val consumer = new KafkaConsumer[String, String](props)
  consumer.subscribe(List(topic).asJavaCollection)

  // initialize global counter
  val atomicLong = new AtomicLong(0)

  // consume message
  try {
    while(true) {
      val records = consumer.poll(Duration.ofMillis(1)).asScala

      if(records.nonEmpty) {
        for (data <- records) {
          // do something with the records
        }
        consumer.commitAsync(new KeepOrderAsyncCommit)
      }

    }
  } catch {
    case ex: KafkaException => ex.printStackTrace()
  } finally {
    consumer.commitSync()
    consumer.close()
  }


  class KeepOrderAsyncCommit extends OffsetCommitCallback {
    // keeping position of this callback instance
    val position = atomicLong.incrementAndGet()

    override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = {
      // retrying only if no other commit incremented the global counter
      if(exception != null){
        if(position == atomicLong.get) {
          consumer.commitAsync(this)
        }
      }
    }
  }

}

이 기사는 인터넷에서 수집됩니다. 재 인쇄 할 때 출처를 알려주십시오.

침해가 발생한 경우 연락 주시기 바랍니다[email protected] 삭제

에서 수정
0

몇 마디 만하겠습니다

0리뷰
로그인참여 후 검토

관련 기사

분류에서Dev

오프셋을 로컬로 저장하기 위해 Spring Kafka에서 오프셋 커밋을 비활성화하는 방법은 무엇입니까?

분류에서Dev

Kafka 소비자가 오프셋 커밋 후 모든 메시지를 읽을 수 없음 (오류 = OFFSET_OUT_OF_RANGE

분류에서Dev

enable.auto.commit = false 인 Kafka 소비자가 여전히 오프셋을 커밋합니다.

분류에서Dev

kafka는 소비 된 오프셋과 커밋 된 오프셋을 구별합니까?

분류에서Dev

Kafka 스트림은 오프셋 의미를 커밋합니다.

분류에서Dev

Kafka Streams : 처리가 완료된 후 오프셋이 커밋되었는지 확인하는 방법

분류에서Dev

Kakfa 소비자 커밋 내 자신의 오프셋 ID가 작동하지 않음-commitSync (final Map <TopicPartition, OffsetAndMetadata> 오프셋)

분류에서Dev

--max-messages를 사용할 때 Kafka 콘솔 소비자가 잘못된 오프셋을 커밋합니다.

분류에서Dev

카프카의 소비자 예외 오프셋 커밋

분류에서Dev

Rdkafka는 메시지 처리 후 오프셋을 자동 커밋합니까?

분류에서Dev

Kafka Connect-오프셋 및 플러시를 커밋하지 못했습니다.

분류에서Dev

kafka + 시작 오프셋이 이전 마지막 오프셋보다 훨씬 더 높을 수있는 방법

분류에서Dev

Kafka Connect SinkTask를 확장하고 주어진 오프셋에서 소비 시작

분류에서Dev

git log를 사용하여 프로젝트에 더 많은 커밋을 수행 한 작성자와 커미터를 찾는 방법은 무엇입니까?

분류에서Dev

작성자를 변경하지 않고 Git 사인 오프 커밋

분류에서Dev

KStreams-org.apache.kafka.common.errors.TimeoutException : 오프셋을 성공적으로 커밋하기 전에 60000ms의 시간 초과가 만료되었습니다.

분류에서Dev

누가 Apache Kafka에서 소비자의 마지막 읽은 메시지 오프셋을 추적합니까?

분류에서Dev

Spring Integration Kafka 소비자에 대한 되감기 오프셋

분류에서Dev

어떻게 카프카 소비자 자동 작업을 커밋합니까?

분류에서Dev

오프셋 관리 기능이있는 Python Kafka 소비자

분류에서Dev

Kafka에서 두 오프셋 사이에서 kafka 소비자를 재생하는 방법은 무엇입니까?

분류에서Dev

더 큰 오프셋에 포함 된 오프셋을 제거하는 비단 법적인 방법이 있습니까?

분류에서Dev

팬더 필터 날짜 시간 : TypeError : 오프셋 순진한 날짜 시간과 오프셋 인식 날짜 시간을 비교할 수 없습니다.

분류에서Dev

Kafka 소비자 : 소비자와 poducer 사이의 오프셋 지연

분류에서Dev

Javascript로 문장에서 선택한 문자열 오프셋 시작 및 오프셋 끝을 얻는 방법

분류에서Dev

Kafka : 소비자 그룹이 읽은 오프셋 번호 메시지를 누가 관리합니까?

분류에서Dev

명확한 오프셋은 kafka에서 구조화 된 스트리밍을 촉발합니다.

분류에서Dev

Kafka : 소비자 API : kafka-consumer-api를 사용하여 오프셋에서 수동으로 읽고 확인하지 못함

분류에서Dev

GitLab CI : 마스터가 아닌 다른 분기에 대해 커밋 / 자동 파이프 라인을 비활성화하는 방법은 무엇입니까?

Related 관련 기사

  1. 1

    오프셋을 로컬로 저장하기 위해 Spring Kafka에서 오프셋 커밋을 비활성화하는 방법은 무엇입니까?

  2. 2

    Kafka 소비자가 오프셋 커밋 후 모든 메시지를 읽을 수 없음 (오류 = OFFSET_OUT_OF_RANGE

  3. 3

    enable.auto.commit = false 인 Kafka 소비자가 여전히 오프셋을 커밋합니다.

  4. 4

    kafka는 소비 된 오프셋과 커밋 된 오프셋을 구별합니까?

  5. 5

    Kafka 스트림은 오프셋 의미를 커밋합니다.

  6. 6

    Kafka Streams : 처리가 완료된 후 오프셋이 커밋되었는지 확인하는 방법

  7. 7

    Kakfa 소비자 커밋 내 자신의 오프셋 ID가 작동하지 않음-commitSync (final Map <TopicPartition, OffsetAndMetadata> 오프셋)

  8. 8

    --max-messages를 사용할 때 Kafka 콘솔 소비자가 잘못된 오프셋을 커밋합니다.

  9. 9

    카프카의 소비자 예외 오프셋 커밋

  10. 10

    Rdkafka는 메시지 처리 후 오프셋을 자동 커밋합니까?

  11. 11

    Kafka Connect-오프셋 및 플러시를 커밋하지 못했습니다.

  12. 12

    kafka + 시작 오프셋이 이전 마지막 오프셋보다 훨씬 더 높을 수있는 방법

  13. 13

    Kafka Connect SinkTask를 확장하고 주어진 오프셋에서 소비 시작

  14. 14

    git log를 사용하여 프로젝트에 더 많은 커밋을 수행 한 작성자와 커미터를 찾는 방법은 무엇입니까?

  15. 15

    작성자를 변경하지 않고 Git 사인 오프 커밋

  16. 16

    KStreams-org.apache.kafka.common.errors.TimeoutException : 오프셋을 성공적으로 커밋하기 전에 60000ms의 시간 초과가 만료되었습니다.

  17. 17

    누가 Apache Kafka에서 소비자의 마지막 읽은 메시지 오프셋을 추적합니까?

  18. 18

    Spring Integration Kafka 소비자에 대한 되감기 오프셋

  19. 19

    어떻게 카프카 소비자 자동 작업을 커밋합니까?

  20. 20

    오프셋 관리 기능이있는 Python Kafka 소비자

  21. 21

    Kafka에서 두 오프셋 사이에서 kafka 소비자를 재생하는 방법은 무엇입니까?

  22. 22

    더 큰 오프셋에 포함 된 오프셋을 제거하는 비단 법적인 방법이 있습니까?

  23. 23

    팬더 필터 날짜 시간 : TypeError : 오프셋 순진한 날짜 시간과 오프셋 인식 날짜 시간을 비교할 수 없습니다.

  24. 24

    Kafka 소비자 : 소비자와 poducer 사이의 오프셋 지연

  25. 25

    Javascript로 문장에서 선택한 문자열 오프셋 시작 및 오프셋 끝을 얻는 방법

  26. 26

    Kafka : 소비자 그룹이 읽은 오프셋 번호 메시지를 누가 관리합니까?

  27. 27

    명확한 오프셋은 kafka에서 구조화 된 스트리밍을 촉발합니다.

  28. 28

    Kafka : 소비자 API : kafka-consumer-api를 사용하여 오프셋에서 수동으로 읽고 확인하지 못함

  29. 29

    GitLab CI : 마스터가 아닌 다른 분기에 대해 커밋 / 자동 파이프 라인을 비활성화하는 방법은 무엇입니까?

뜨겁다태그

보관