내 사용 사례는 kafka consumer-api를 사용하여 kafka-topic에서 마지막으로 성공적으로 처리 된 데이터의 오프셋에서 수동으로 읽은 다음 Kafka에 성공적으로 처리 된 데이터를 수동으로 확인할 수 있습니다. (이것은 데이터 손실을 줄이기위한 것입니다). 그러나 현재 구현에서는 'ack.acknowledge ()'를 주석 처리 하더라도 프로그램이 앞으로 이동하여 다음 오프셋에서 읽습니다 . 나는 Kafka를 처음 접했고 아래 방식으로 소비자를 구현했습니다 (우리는 스프링 부트를 사용하고 있습니다)
문제는 : ack.acknowledge ()를 주석 처리하더라도 오프셋은 여전히 업데이트되고 소비자는 예상치 못한 다음 오프셋에서 읽습니다 (지금까지 내 이해에)
소비자 구성 [ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG를 false로 설정하고 factory.getContainerProperties (). setAckMode (ContainerProperties.AckMode.MANUAL_IMMEDIATE)를 설정했습니다.] :
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Autowired
private AdapterStreamProperties appProperties;
@Value("${spring.kafka.streams.properties.receive.buffer.bytes}")
private String receiveBufferBytes;
@Bean
public ConsumerFactory<PreferredMediaMsgKey, SendEmailCmd> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, appProperties.getApplicationId());
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, appProperties.getBootstrapServers());
props.put(
ConsumerConfig.GROUP_ID_CONFIG,
"adapter");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "com.ringo.notification.adapter.serializers.PreferredMediaMsgKeyDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "com.ringo.notification.adapter.serializers.SendEmailCmdDeserializer");
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
props.put(StreamsConfig.RECEIVE_BUFFER_CONFIG, receiveBufferBytes);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<PreferredMediaMsgKey, SendEmailCmd>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<PreferredMediaMsgKey, SendEmailCmd> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
그런 다음 내 소비자는 이러한 방식으로 소비하고 있습니다 [ 'ack.acknowledge ()'주석을 달더라도 다음 번에 다음 오프셋에서 계속 읽습니다.
@KafkaListener(topics = Constants.INPUT_TOPIC, groupId = "adapter")
public void listen(ConsumerRecord<PreferredMediaMsgKey, SendEmailCmd> record,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
@Header(KafkaHeaders.OFFSET) Long offset, Acknowledgment ack) {
System.out.println("----------------------------------------------------------------------------");
System.out.println("Reading from Offset: " + offset + ", and Partition: " + partition);
System.out.println("Record for this pertition: Key : "+ record.key() + ", Value : " + record.value());
System.out.println("----------------------------------------------------------------------------");
NotificationProcessedFinal result = processor.processEmail(record.key(),record.value());
if( StringUtils.isNotEmpty(result.getErrorCmd().getErrorMsg())) {
kafkaErrorProducerTemplate.send(adapterMsgProperties.getErrorTopic(), record.key(), result.getErrorCmd());
}
else {
kafkaOutputProducerTemplate.send(adapterMsgProperties.getOutputTopic(), record.key(), result.getNotifyEmailCmd());
}
ack.acknowledge();
}
내 gradle.build의 kafka api 버전 :
//Kafka Dependencie
implementation 'org.apache.kafka:kafka-streams:2.0.1'
implementation 'org.apache.kafka:kafka-clients:2.0.1'
모든 통찰력이 도움이 될 것입니다.
미리 감사드립니다
ack.acknowledge()
커밋하고 소비자가 메시지를 성공적으로 처리했음을 의미합니다.
이렇게하면 소비자 그룹 오프셋이 업데이트됩니다. 애플리케이션이 ack없이 중지되면 현재 오프셋이 kafka에 커밋되지 않습니다 (사용자가 처리 한 마지막 메시지 중 확인되지 않은 메시지).
동일한 소비자 그룹 (및 동일한 할당 된 파티션)을 가진 새 소비자는 앱이 소비자 그룹 오프셋에서 시작점을 읽으므로 동일한 메시지를 다시 사용합니다.
확인하거나 확인하지 않으면 현재 리스너에 영향을 미치지 않으면 간단하게 새 메시지를 계속 처리합니다 (오류가 발생하지 않고 리 밸런싱이 오프셋 재로드의 원인이 될 수도 있다고 생각합니다 (확실하지 않음)).
이 기사는 인터넷에서 수집됩니다. 재 인쇄 할 때 출처를 알려주십시오.
침해가 발생한 경우 연락 주시기 바랍니다[email protected] 삭제
몇 마디 만하겠습니다