在pika使用者中运行多处理/多线程并将数据定向到特定数据框

帕特里克·帕特里克

我是Python多线程/处理和RabbitMQ的新手。基本上,我有一个RabbitMQ消费者,可以为我提供实时的医院数据。每个消息包含每个患者的生命体征。为了运行我的逻辑并用于触发警报,我需要为每个患者至少存储5条此类消息。另外,由于患者数量未知,我想使用多线程或多处理程序来使我的警报几乎实时并按比例放大。我的方法是为每个患者创建一个全局数据框,然后将与该患者有关的消息附加到该数据框中。但是现在,我在创建多线程/进程并将数据发送到各个患者数据框时遇到了问题。这是我的代码


bed_list=[]
thread_list=[]
bed_df={}
alarms=0

def spo2(body,bed):
    body_data= body.decode()
    print(body_data)
    packet= json.loads(body_data)
    bed_id= packet['beds'][0]['bedId']
    if bed_id=bed:
        primary_attributes= json_normalize(packet)
         '''some logic'''
        global bed_df
        bed_df[bed_id]= bed_df[bed_id].append(packet) # creating the global dataframe to store five messages
        print(bed_df[bed_id])

        ''' some other calcuation'''

            phy_channel.basic_publish(body=json.dumps(truejson),exchange='nicu')# throwing out the alarm with another queue
            bed_df[bed_id]= bed_df[bed_id].tail(4)  # resets the size of the dataframe 


def callback(ch, method, properties, body):
    body_data= body.decode()
    packet= json.loads(body_data)
    bed_id= packet['beds'][0]['bedId']
    print(bed_id)
    global bed_list
    if bed_id not in bed_list:
      bed_list.append(bed_id)


#pseudo code
 for bed in bed_list:
     proc = Process(target=spo2, args=(bed,))
     procs.append(proc)
     proc.start()

我无法找到一种方法,可以在该方法中为每个患者(bed_id)创建线程/进程,以便每当我收到该患者(bed_id)的消息时,都可以将其定向到该线程。我已经检查了Queues,但是文档对于实现这种情况不是很清楚。

罗兰·史密斯

在沿着这条路线走之前,您应该评估这是否是必要的。一个重要的限制是Rabbitmq带宽

构建一个单线程应用程序,并开始向其提供合成Rabbitmq消息。增加msg / s速率,直到无法维持为止。

如果该比率比实际发生的比率高得多,那么您就完成了。:-)

如果不是,则开始应用程序进行性能分析,以查找花费最多时间的部分。这些是您的瓶颈。仅当您知道瓶颈所在时,您才能查看相关代码并考虑如何改善它们。

请注意,multiprocessingthreading做不同的事情并且有不同的应用程序。如果您的应用程序受到计算量的限制,那么multiprocessing可以通过将计算量分布在多个CPU内核上来提供帮助。请注意,这仅在计算彼此独立时才有效如果您的应用程序花费大量时间等待I / O,则threading可以帮助您在一个线程中进行计算而另一个线程在等待I / O。

但是就复杂性而言,它们都不是免费的。例如,threading您必须使用锁来保护数据帧的读取和写入,以使一次只能有一个线程读取或修改该数据帧。随着multiprocessing你必须发送从工作进程将数据备份到父进程。

在这种情况下,我认为这multiprocessing将是最有用的。您可以设置许多过程,每个过程负责部分床位/患者。如果rabbitmq可以有多个侦听器,则可以让每个工作进程仅处理来自其负责的患者的消息。否则,您必须将消息分发到适当的过程。现在,每个工作进程都为许多患者处理消息(并保留数据框)。当基于对数据的计算触发警报时,工作人员只需向父进程发送一条详细说明患者标识符和警报性质的消息。

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

多线程队列使用者和任务处理

来自分类Dev

多线程队列使用者和任务处理

来自分类Dev

ConcurrentQueue .Net:多线程使用者

来自分类Dev

使用固定数量的线程处理大量多线程数据并允许例外

来自分类Dev

使用多线程将数据同时插入到列表中

来自分类Dev

BASH中的多处理/多线程

来自分类Dev

C 中的多线程与多处理

来自分类Dev

处理HDFS数据的Kafka使用者应在哪里运行?

来自分类Dev

如何使用多线程/多处理并行高效地运行python脚本?

来自分类Dev

使用多线程模块将API数据检索到数据帧中

来自分类Dev

使用多线程Java进行数据处理

来自分类Dev

如何“选择特定数据,仅复制某些列并将其插入到不同的表中?”

来自分类Dev

使用特定数据运行nunit测试

来自分类Dev

python中的多处理/多线程下载文件

来自分类Dev

python中的多处理/多线程下载文件

来自分类Dev

使用多处理或多线程来提高Python的抓取速度

来自分类Dev

使用此脚本实现多线程(或多处理?)吗?

来自分类Dev

多处理写入熊猫数据框

来自分类Dev

以参数作为数据框的多处理

来自分类Dev

Linq to Sql:如何使用存储过程获取特定行并将列数据显示到文本框中

来自分类Dev

如何从Excel工作表中获取特定数据到文本框中

来自分类Dev

统计数据框中的特定数据并显示

来自分类Dev

从熊猫数据框中获取特定数据

来自分类Dev

在管道中运行使用者

来自分类Dev

线程特定数据与线程本地存储

来自分类Dev

特定数据框的函数

来自分类Dev

使用者如何绑定到指定线程

来自分类Dev

生产者使用者多线程为什么需要Thread.sleep?

来自分类Dev

Python多处理与多线程结合

Related 相关文章

热门标签

归档