우리는 오프셋 10 커밋 요청을 보내는 소비자가 있다고 가정합니다. 통신 문제가 있고 브로커가 요청을받지 못했고 물론 응답하지 않은 경우. 그 후 다른 소비자가 다른 배치를 처리하고 오프셋 20을 성공적으로 커밋했습니다.
Q : 우리의 경우 오프셋 20을 커밋하기 전에 로그의 이전 오프셋이 커밋되었는지 여부를 확인할 수 있도록 처리 할 방법이나 속성이 있는지 알고 싶습니다.
설명하는 시나리오는 비동기 커밋을 사용할 때만 발생할 수 있습니다.
하나의 특정 TopicPartition은 동일한 ConsumerGroup 내의 단일 소비자 만 사용할 수 있습니다. 동일한 TopicPartition을 읽는 두 명의 소비자가있는 경우에만 가능합니다.
사례 # 1은 매우 명확합니다. 소비자 그룹이 서로 다른 경우 파티션을 병렬로 독립적으로 사용합니다. 또한 커밋 된 오프셋은 별도로 관리됩니다.
사례 # 2 : 소비자가 실패 / 사망하고 소비자를 복구하지 않아 첫 번째 소비자가 오프셋 10을 커밋하지 못하면 재조정이 발생하고 다른 활성 소비자가 해당 파티션을 선택합니다. 오프셋 10이 커밋되지 않았으므로 새 소비자는 다음 배치로 점프하기 전에 오프셋 10을 다시 읽기 시작하고 오프셋 20을 커밋 할 수 있습니다. 이로 인해 "최소 한 번"의미론이 발생하고 중복이 발생할 수 있습니다.
이제 더 높은 오프셋을 커밋 한 후 더 작은 오프셋을 커밋 할 수있는 유일한 시나리오입니다. 처음에 언급했듯이 오프셋 을 비동기 적으로 커밋하면 (을 사용하여 commitAsync
) 실제로 발생할 수 있습니다 . 시간순으로 다음 시나리오를 상상해보십시오.
재시도 메커니즘이 오프셋 1을 다시 커밋하도록두면 소비자가 오프셋 1까지만 커밋 한 것 같습니다. 이는 최신 오프셋 par TopicPartition의 각 소비자 그룹에 대한 정보가 내부 압축 Kafka 토픽 __consumer_offsets에 저장되기 때문입니다. 이는 소비자 그룹의 최신 값 (이 경우 : 오프셋 1) 만 저장하기위한 것입니다.
책 "Kafka-The Definitive Guide"에는이 문제를 완화하는 방법에 대한 힌트가 있습니다.
비동기 커밋 재시도 : 비동기 재 시도에 적합한 커밋 순서를 얻는 간단한 패턴은 단조롭게 증가하는 시퀀스 번호를 사용하는 것입니다. 커밋 할 때마다 시퀀스 번호를 늘리고 커밋 할 때 시퀀스 번호를 commitAsync 콜백에 추가합니다. 재 시도를 보낼 준비가되면 콜백이받은 커밋 시퀀스 번호가 인스턴스 변수와 같은지 확인하십시오. 그렇다면 새로운 커밋이 없었으며 재 시도해도 안전합니다. 인스턴스 시퀀스 번호가 더 높으면 새로운 커밋이 이미 전송되었으므로 다시 시도하지 마십시오.
예를 들어, 아래 Scala에서이 아이디어의 구현을 볼 수 있습니다.
import java.util._
import java.time.Duration
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer, OffsetAndMetadata, OffsetCommitCallback}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import collection.JavaConverters._
object AsyncCommitWithCallback extends App {
// define topic
val topic = "myOutputTopic"
// set properties
val props = new Properties()
props.put(ConsumerConfig.GROUP_ID_CONFIG, "AsyncCommitter5")
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
// [set more properties...]
// create KafkaConsumer and subscribe
val consumer = new KafkaConsumer[String, String](props)
consumer.subscribe(List(topic).asJavaCollection)
// initialize global counter
val atomicLong = new AtomicLong(0)
// consume message
try {
while(true) {
val records = consumer.poll(Duration.ofMillis(1)).asScala
if(records.nonEmpty) {
for (data <- records) {
// do something with the records
}
consumer.commitAsync(new KeepOrderAsyncCommit)
}
}
} catch {
case ex: KafkaException => ex.printStackTrace()
} finally {
consumer.commitSync()
consumer.close()
}
class KeepOrderAsyncCommit extends OffsetCommitCallback {
// keeping position of this callback instance
val position = atomicLong.incrementAndGet()
override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = {
// retrying only if no other commit incremented the global counter
if(exception != null){
if(position == atomicLong.get) {
consumer.commitAsync(this)
}
}
}
}
}
이 기사는 인터넷에서 수집됩니다. 재 인쇄 할 때 출처를 알려주십시오.
침해가 발생한 경우 연락 주시기 바랍니다[email protected] 삭제
몇 마디 만하겠습니다