我正在使用Kafka 0.8.0并尝试实现以下提到的方案。
JCA API(充当生产者并向其发送数据)--->消费者------> HBase
使用JCA客户端获取数据后,我便立即将每条消息发送给使用者。例如,一旦生产者发送消息号1,我就想从消费者那里获取相同的消息并“放入” HBase。但是我的使用者在经过一些随机的n条消息后开始获取消息。我想使生产者和消费者同步,以便他们两者开始一起工作。
我用过了:
1个经纪人
1个主题
1个单一生产者和高级消费者
谁能建议我需要做些什么来达到同样的目的?
编辑:
添加一些相关的代码片段。
消费者.java
public class Consumer extends Thread {
private final ConsumerConnector consumer;
private final String topic;
PrintWriter pw = null;
int t = 0;
StringDecoder kd = new StringDecoder(null);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
Map<String, List<KafkaStream<String, Signal>>> consumerMap;
KafkaStream<String, Signal> stream;
ConsumerIterator<String, Signal> it;
public Consumer(String topic) {
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
this.topic = topic;
topicCountMap.put(topic, new Integer(1));
consumerMap = consumer.createMessageStreams(topicCountMap, kd, new Serializer(
new VerifiableProperties()));
stream = consumerMap.get(topic).get(0);
it = stream.iterator();
}
private static ConsumerConfig createConsumerConfig() {
Properties props = new Properties();
props.put("zookeeper.connect", KafkaProperties.zkConnect);
props.put("group.id", KafkaProperties.groupId);
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
props.put("fetch.size", "1024");
return new ConsumerConfig(props);
}
synchronized public void run() {
while (it.hasNext()) {
t = (it.next().message()).getChannelid();
System.out.println("In Consumer received msg" + t);
}
}
}
生产者
public class Producer {
public final kafka.javaapi.producer.Producer<String, Signal> producer;
private final String topic;
private final Properties props = new Properties();
public Producer(String topic)
{
props.put("serializer.class", "org.bigdata.kafka.Serializer");
props.put("key.serializer.class", "kafka.serializer.StringEncoder");
props.put("metadata.broker.list", "localhost:9092");
// Use random partitioner. Don't need the key type. Just set it to Integer.
// The message is of type userdefined Object .
producer = new kafka.javaapi.producer.Producer<String,Signal(newProducerConfig(props));
this.topic = topic;
}
}
KafkaProperties.java
public interface KafkaProperties {
final static String zkConnect = "127.0.0.1:2181";
final static String groupId = "group1";
final static String topic = "test00";
final static String kafkaServerURL = "localhost";
final static int kafkaServerPort = 9092;
final static int kafkaProducerBufferSize = 64 * 1024;
final static int connectionTimeOut = 100000;
final static int reconnectInterval = 10000;
final static String clientId = "SimpleConsumerDemoClient";
}
消费者对前10条消息的行为方式是这样的,它不会使消费者收到的消息失效,但是从第11条消息开始,它将开始正常运行。
producer sending msg1
producer sending msg2
producer sending msg3
producer sending msg4
producer sending msg5
producer sending msg6
producer sending msg7
producer sending msg8
producer sending msg9
producer sending msg10
producer sending msg11
producer sending msg12
In Consumer received msg12
producer sending msg13
In Consumer received msg13
producer sending msg14
In Consumer received msg14
producer sending msg15
In Consumer received msg15
producer sending msg16
In Consumer received msg16
producer sending msg17
In Consumer received msg17
producer sending msg18
In Consumer received msg18
producer sending msg19
In Consumer received msg19
producer sending msg20
In Consumer received msg20
producer sending msg21
In Consumer received msg21
编辑:在生产者向消费者发送消息的地方添加侦听器功能。我正在使用默认的生产者配置没有覆盖它
public synchronized void onValueChanged(final MonitorEvent event_) {
// Get the value from the DBR
try {
final DBR dbr = event_.getDBR();
final String[] val = (String[]) dbr.getValue();
producer1.producer.send(new KeyedMessage<String, Signal>
(KafkaProperties.topic,new Signal(messageNo)));
System.out.println("producer sending msg"+messageNo);
messageNo++;
} catch (Exception ex) {
ex.printStackTrace();
}
}
尝试添加props.put("request.required.acks", "1")
到生产者配置。默认情况下,生产者不等待确认,也不保证消息传递。因此,如果在测试之前就启动代理,则生产者可能会在代理完全初始化之前开始发送消息,并且前几条消息可能会丢失。
尝试添加props.put("auto.offset.reset", "smallest")
到使用者配置。它等于--from-beginning
kafka-console-consumer.sh的选项。如果您的使用者启动时间比生产者晚,并且Zookeeper中没有保存偏移量数据,则默认情况下它将仅使用新消息(请参阅文档中的使用者配置)。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句