我被敦促将Kafka与python一起使用。此外,我需要开发一个非常简单的生产者-消费者应用程序,该应用程序可以实时从设备读取指标,然后将其发布到Kafka中的主题“指标”。然后,使用者必须订阅“度量”主题并将这些数据存储到PostgreSQL数据库中。
我试图在这里绘制架构:
+-----------+ Fetch metrics every 1 second +--------------+
|Biometric | {heartrate, oxygen level, temprature} | |
|generation ------------------------------------------------ producer.py |
|device | | |
+-----------+ +-------|------+
|
|
|
|Publish metrics in "metrics" topic, every 1 second
|{heartrate, oxygen level, tempature}
| JSON format
|
|
+-------|------+
| |
| KAFKA |
| |
+-------|------+
|
|
|
|
| Subscribe to "metrics" topic and fetch
- | the JSON every 1 second
|
+-------------+ +------|------+
| | Send data to postgreSQL | |
| postgreSQL ------------------------------------------------ consumer.py |
| | | |
+-------------+ +-------------+
现在,这就是我(零卡夫卡经验)对这个应用程序的想象。我设法将所有东西都交给了消费者。
现在,对于我来说,连接到postgreSQL数据库并将这些数据发送到它非常容易。但是我很困惑。我到处都读到,与此类数据库的连接必须通过Kafka Connector(?)进行。仅将我在消费者中收到的数据手动发送到postgres是错误的吗?为什么在这里使用“ Kafka连接器”?最后,我不知道有任何python kafka连接器,这使我变得更加复杂。
有人可以帮我清理一下吗?
如果您想以JSON格式将数据推送到kafka,我最近在这里写了一个简单的示例。
您还可以找到kafka python文档
对于Kafka-> PostgreSQL连接,您可能要使用Kafka Connect JDBC接收器。Kafka Connect是一系列预先建立的连接器,通过它您只需编写配置文件即可从Kafka推入或拉出(以kafka connect术语表示源或接收器)数据,而无需一遍又一遍地编码或重新发明轮子。Kafka connect不依赖于语言,因为您所需要做的就是将其部署在您的Kafka环境中并正确设置配置文件。
请注意,如果您打算使用Kafka connect将数据推送到PostgreSQL,则可能需要
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句