我的目标是在Rabbitmq交换队列和kafka主题之间设置连接器。
我遵循此指南来设置连接器:https : //camel.apache.org/camel-kafka-connector/latest/try-it-out-locally.html。我从以下来源下载并安装了连接器:https : //github.com/apache/camel-kafka-connector,对其进行了构建并解压缩了文件camel-rabbitmq-kafka-connector
。我还指出plugin.path
了我camel-rabbitmq-kafka-connector
在connect-standalone.properties中解压缩jar的文件夹。
我用于的参数CamelRabbitSourceConnector
如下:
name=CamelRabbitmqSourceConnector
connector.class=org.apache.camel.kafkaconnector.rabbitmq.CamelRabbitmqSourceConnector
tasks.max=1
# use the kafka converters that better suit your needs, these are just defaults:
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
# comma separated topics to send messages into
topics=mytopic
# mandatory properties (for a complete properties list see the connector documentation):
# The exchange name determines the exchange to which the produced messages will be sent to. In the case of consumers, the exchange name determines the exchange the queue will be bound to.
camel.source.path.exchangeName=myexchange
camel.source.endpoint.hostname=myhostname
camel.source.endpoint.addresses=localhost:5672
camel.source.endpoint.queue=myqueue
我对Rabbitmq的docker run命令如下所示:docker run --rm -it --hostname myhostname -p 15672:15672 -p 5672:5672 --name rabbitmq rabbitmq:3-management
。对于kafka,我使用了标准的“入门”指南。
使用python Pika库发送消息:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='myqueue',durable=True,auto_delete=True)
channel.basic_publish(exchange='', routing_key='myqueue', body='some body...')
如您所见,我在发送消息时未exchange
在channel.basic_publish
函数中指定参数。如果我将其设置为camel.source.path.exchangeName
,则我的消息会介于两者之间,因此可能在这里丢失了一些内容。
通过将客户端更改为Java,我能够解决此问题:https : //www.rabbitmq.com/tutorials/tutorial-one-java.html而不是python。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句