我对流编程还很陌生。我们有使用 Avro 的 Kafka 流。
我想将 Kafka Stream 连接到 Spark Stream。我使用了波纹管代码。
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
lines = kvs.map(lambda x: x[1])
我有波纹管错误。
return s.decode('utf-8') File "/usr/lib64/python2.7/encodings/utf_8.py", line 16, in decode return codecs.utf_8_decode(input, errors, True) UnicodeDecodeError: 'utf8'编解码器无法解码位置 57-58 中的字节:无效的连续字节
我是否需要指定 Kafka 使用 Avro,是否存在上述错误?如果是我怎么指定呢?
是的,问题在于流的反序列化。您可以使用confluent-kafka-python库并在以下位置指定valueDecoder:
from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient`
from confluent_kafka.avro.serializer.message_serializer import MessageSerializer
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers}, valueDecoder=MessageSerializer.decode_message)`
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句