Kafka : 소비자 API : kafka-consumer-api를 사용하여 오프셋에서 수동으로 읽고 확인하지 못함

패티

내 사용 사례는 kafka consumer-api를 사용하여 kafka-topic에서 마지막으로 성공적으로 처리 된 데이터의 오프셋에서 수동으로 읽은 다음 Kafka에 성공적으로 처리 된 데이터를 수동으로 확인할 수 있습니다. (이것은 데이터 손실을 줄이기위한 것입니다). 그러나 현재 구현에서는 'ack.acknowledge ()'를 주석 처리 하더라도 프로그램이 앞으로 이동하여 다음 오프셋에서 읽습니다 . 나는 Kafka를 처음 접했고 아래 방식으로 소비자를 구현했습니다 (우리는 스프링 부트를 사용하고 있습니다)

문제는 : ack.acknowledge ()를 주석 처리하더라도 오프셋은 여전히 ​​업데이트되고 소비자는 예상치 못한 다음 오프셋에서 읽습니다 (지금까지 내 이해에)

소비자 구성 [ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG를 false로 설정하고 factory.getContainerProperties (). setAckMode (ContainerProperties.AckMode.MANUAL_IMMEDIATE)를 설정했습니다.] :

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Autowired
    private AdapterStreamProperties appProperties;

    @Value("${spring.kafka.streams.properties.receive.buffer.bytes}")
    private String receiveBufferBytes;

    @Bean
    public ConsumerFactory<PreferredMediaMsgKey, SendEmailCmd> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, appProperties.getApplicationId());
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, appProperties.getBootstrapServers());
        props.put(
                ConsumerConfig.GROUP_ID_CONFIG,
                "adapter");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "com.ringo.notification.adapter.serializers.PreferredMediaMsgKeyDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "com.ringo.notification.adapter.serializers.SendEmailCmdDeserializer");
        props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
        props.put(StreamsConfig.RECEIVE_BUFFER_CONFIG, receiveBufferBytes);

        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<PreferredMediaMsgKey, SendEmailCmd>
    kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<PreferredMediaMsgKey, SendEmailCmd> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }

그런 다음 내 소비자는 이러한 방식으로 소비하고 있습니다 [ 'ack.acknowledge ()'주석을 달더라도 다음 번에 다음 오프셋에서 계속 읽습니다.

  @KafkaListener(topics = Constants.INPUT_TOPIC, groupId = "adapter")
  public void listen(ConsumerRecord<PreferredMediaMsgKey, SendEmailCmd> record,
                     @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
                     @Header(KafkaHeaders.OFFSET) Long offset, Acknowledgment ack) {

    System.out.println("----------------------------------------------------------------------------");
    System.out.println("Reading from Offset: " +  offset + ", and Partition: " + partition);
    System.out.println("Record for this pertition: Key : "+ record.key() + ", Value : " +   record.value());
    System.out.println("----------------------------------------------------------------------------");
    NotificationProcessedFinal result = processor.processEmail(record.key(),record.value());

    if( StringUtils.isNotEmpty(result.getErrorCmd().getErrorMsg())) {
      kafkaErrorProducerTemplate.send(adapterMsgProperties.getErrorTopic(), record.key(), result.getErrorCmd());
    }
    else {
      kafkaOutputProducerTemplate.send(adapterMsgProperties.getOutputTopic(), record.key(), result.getNotifyEmailCmd());
    }
    ack.acknowledge();
  }

내 gradle.build의 kafka api 버전 :

//Kafka Dependencie
implementation      'org.apache.kafka:kafka-streams:2.0.1'
implementation      'org.apache.kafka:kafka-clients:2.0.1'

모든 통찰력이 도움이 될 것입니다.

미리 감사드립니다

스냅
ack.acknowledge()

커밋하고 소비자가 메시지를 성공적으로 처리했음을 의미합니다.

이렇게하면 소비자 그룹 오프셋이 업데이트됩니다. 애플리케이션이 ack없이 중지되면 현재 오프셋이 kafka에 커밋되지 않습니다 (사용자가 처리 한 마지막 메시지 중 확인되지 않은 메시지).

동일한 소비자 그룹 (및 동일한 할당 된 파티션)을 가진 새 소비자는 앱이 소비자 그룹 오프셋에서 시작점을 읽으므로 동일한 메시지를 다시 사용합니다.

확인하거나 확인하지 않으면 현재 리스너에 영향을 미치지 않으면 간단하게 새 메시지를 계속 처리합니다 (오류가 발생하지 않고 리 밸런싱이 오프셋 재로드의 원인이 될 수도 있다고 생각합니다 (확실하지 않음)).

이 기사는 인터넷에서 수집됩니다. 재 인쇄 할 때 출처를 알려주십시오.

침해가 발생한 경우 연락 주시기 바랍니다[email protected] 삭제

에서 수정
0

몇 마디 만하겠습니다

0리뷰
로그인참여 후 검토

관련 기사

분류에서Dev

Kafka Connect SinkTask를 확장하고 주어진 오프셋에서 소비 시작

분류에서Dev

Kafka에서 두 오프셋 사이에서 kafka 소비자를 재생하는 방법은 무엇입니까?

분류에서Dev

폴링 전에 네이티브 Apache Kafka 소비자 API를 사용하여 메시지 필터링

분류에서Dev

zookeeper CLI를 사용하여 kafka 소비자 그룹에 대한 오프셋 정보를 가져올 수 없습니다.

분류에서Dev

kafka-node-인수를 비동기 함수 consumer.on ();

분류에서Dev

Kafka Consumer Group에서 오프셋을 임의의 값으로 재설정하는 방법은 무엇입니까?

분류에서Dev

flume을 사용하여 kafka 채널에 데이터를 읽고 쓰기위한 Kafka 소스 대 Avro 소스

분류에서Dev

Kafka 소비자가 오프셋 커밋 후 모든 메시지를 읽을 수 없음 (오류 = OFFSET_OUT_OF_RANGE

분류에서Dev

파티션을 수동으로 할당하지 않고 Exactly-Once Kafka Consumer를 구현하는 방법

분류에서Dev

더 작은 오프셋 커밋을 방지하기위한 Kafka 소비자 오프셋 커밋 확인

분류에서Dev

루프에서 메시지를 사용하는 동안 Kafka 소비자 누락 메시지

분류에서Dev

Spring Boot Kafka 소비자가 지연되고 잘못 읽음

분류에서Dev

kafka kafka-consumer-groups.sh --describe는 소비자 그룹에 대한 출력을 반환하지 않습니다.

분류에서Dev

kafka 0.10.1에 사용할 소비자 API는 무엇입니까?

분류에서Dev

Kafka 생산자의 외부 API에서 데이터를 읽고 Scala의 Kafka 소비자에게 보내는 방법

분류에서Dev

누가 Apache Kafka에서 소비자의 마지막 읽은 메시지 오프셋을 추적합니까?

분류에서Dev

Spring kafka-kafka 소비자를 구성하지 못했습니다.

분류에서Dev

--max-messages를 사용할 때 Kafka 콘솔 소비자가 잘못된 오프셋을 커밋합니다.

분류에서Dev

checkpointLocation 오프셋을 사용하여 Kafka 주제에서 스트림을 읽는 올바른 방법

분류에서Dev

Kafka 소비자 : 소비자와 poducer 사이의 오프셋 지연

분류에서Dev

Kafka Connect : 독립 실행 형 모드에서 "Connect.offsets"를 읽고 마지막으로 처리 된 오프셋을 가져옵니다.

분류에서Dev

fs2-kafka를 사용하여 Embedded-kafka에서 읽는 방법

분류에서Dev

모든 소비자가 Kafka에서 모든 데이터를 읽을 수 있도록하는 방법

분류에서Dev

kafka-python을 사용하여 동적으로 Kafka에 존재하지 않는 주제를 만드는 방법

분류에서Dev

Kafka : 소비자 그룹이 읽은 오프셋 번호 메시지를 누가 관리합니까?

분류에서Dev

kafka-console-consumer가 Docker에서 메시지를 수신하지 않습니다.

분류에서Dev

kafka-avro-console-consumer를 사용하여 avro 메시지를 읽을 수 없습니다. SerializationException : 알 수없는 매직 바이트

분류에서Dev

특정 오프셋으로 kafka 주제에서 메시지를 얻는 방법

분류에서Dev

AWS Lambda를 사용하여 Kafka (MSK) 이벤트 소스 읽기

Related 관련 기사

  1. 1

    Kafka Connect SinkTask를 확장하고 주어진 오프셋에서 소비 시작

  2. 2

    Kafka에서 두 오프셋 사이에서 kafka 소비자를 재생하는 방법은 무엇입니까?

  3. 3

    폴링 전에 네이티브 Apache Kafka 소비자 API를 사용하여 메시지 필터링

  4. 4

    zookeeper CLI를 사용하여 kafka 소비자 그룹에 대한 오프셋 정보를 가져올 수 없습니다.

  5. 5

    kafka-node-인수를 비동기 함수 consumer.on ();

  6. 6

    Kafka Consumer Group에서 오프셋을 임의의 값으로 재설정하는 방법은 무엇입니까?

  7. 7

    flume을 사용하여 kafka 채널에 데이터를 읽고 쓰기위한 Kafka 소스 대 Avro 소스

  8. 8

    Kafka 소비자가 오프셋 커밋 후 모든 메시지를 읽을 수 없음 (오류 = OFFSET_OUT_OF_RANGE

  9. 9

    파티션을 수동으로 할당하지 않고 Exactly-Once Kafka Consumer를 구현하는 방법

  10. 10

    더 작은 오프셋 커밋을 방지하기위한 Kafka 소비자 오프셋 커밋 확인

  11. 11

    루프에서 메시지를 사용하는 동안 Kafka 소비자 누락 메시지

  12. 12

    Spring Boot Kafka 소비자가 지연되고 잘못 읽음

  13. 13

    kafka kafka-consumer-groups.sh --describe는 소비자 그룹에 대한 출력을 반환하지 않습니다.

  14. 14

    kafka 0.10.1에 사용할 소비자 API는 무엇입니까?

  15. 15

    Kafka 생산자의 외부 API에서 데이터를 읽고 Scala의 Kafka 소비자에게 보내는 방법

  16. 16

    누가 Apache Kafka에서 소비자의 마지막 읽은 메시지 오프셋을 추적합니까?

  17. 17

    Spring kafka-kafka 소비자를 구성하지 못했습니다.

  18. 18

    --max-messages를 사용할 때 Kafka 콘솔 소비자가 잘못된 오프셋을 커밋합니다.

  19. 19

    checkpointLocation 오프셋을 사용하여 Kafka 주제에서 스트림을 읽는 올바른 방법

  20. 20

    Kafka 소비자 : 소비자와 poducer 사이의 오프셋 지연

  21. 21

    Kafka Connect : 독립 실행 형 모드에서 "Connect.offsets"를 읽고 마지막으로 처리 된 오프셋을 가져옵니다.

  22. 22

    fs2-kafka를 사용하여 Embedded-kafka에서 읽는 방법

  23. 23

    모든 소비자가 Kafka에서 모든 데이터를 읽을 수 있도록하는 방법

  24. 24

    kafka-python을 사용하여 동적으로 Kafka에 존재하지 않는 주제를 만드는 방법

  25. 25

    Kafka : 소비자 그룹이 읽은 오프셋 번호 메시지를 누가 관리합니까?

  26. 26

    kafka-console-consumer가 Docker에서 메시지를 수신하지 않습니다.

  27. 27

    kafka-avro-console-consumer를 사용하여 avro 메시지를 읽을 수 없습니다. SerializationException : 알 수없는 매직 바이트

  28. 28

    특정 오프셋으로 kafka 주제에서 메시지를 얻는 방법

  29. 29

    AWS Lambda를 사용하여 Kafka (MSK) 이벤트 소스 읽기

뜨겁다태그

보관