我有一个简单的JSON对象,如下所示
d = { 'tag ': 'blah',
'name' : 'sam',
'score':
{'row1': 100,
'row2': 200
}
}
以下是我的python代码,它向Kafka发送消息
from kafka import SimpleProducer, KafkaClient
import json
# To send messages synchronously
kafka = KafkaClient('10.20.30.12:9092')
producer = SimpleProducer(kafka)
jd = json.dumps(d)
producer.send_messages(b'message1',jd)
我在风暴日志中看到已接收到该消息,但对于元组{此处的json结构},它抛出的Transformation为null不确定是否需要解决此问题?
以下是我给kafka的生产者的代码。我做的唯一不同的事情是yaml.safe_load
用来加载json内容。它以字符串而不是unicode的形式返回内容。以下是代码段
with open('smaller_test_prod.txt') as f:
for line in f:
d = yaml.safe_load(line)
jd = json.dumps(d)
producer.send_messages(b'zeus_metrics',jd)
在这里,每一行都是存储在文件中的json数据。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句