我有一台UDP服务器,侦听线程中的传入流量。消息来自JSON格式的外部设备,例如{“ _id”:“ 0x00”,“ status”:“ on”}。此信息需要由UDP处理程序进行解析,并存储在对象的字典中(如果存在_id,则更新)。目前,我可以接收和解析JSON,但不能完全确定如何存储此数据或正确处理它。我想在UDP处理程序中使用一个队列,并在一个单独的Msg处理程序中使用该队列来处理该队列,但不确定这样做是否正确。
请注意:我重写了代码,省略了一些语法等。
# I create a node server thread in the main of my program with IP/PORT
class NodeServer(threading.Thread):
def __init__():
self.server = UDPServer((address, port), UDPHandler)
def run():
self.server.serve_forever()
class UDPServer(socketserver.ThreadingUDPServer):
allow_reuse_address = True
class UDPHandler(socketserver.BaseRequestHandler):
data_q = queue.Queue()
handler = NodeHandler(data_q)
handler.start()
def handle():
try:
# receive the message from client
data = self.request[0].decode("UTF-8")
# check if it's in json format
# HANDLE THE MESSAGE
self.data_q.put(data)
# stop the thread
self.handler.join()
# send an "ACK" msg back to the client
except: Exception as e:
#handle
class NodeHandler(threading.Thread):
table = NodeTable()
def __init(data_q):
self.data_q = data_q
def handle():
# get the string message from the queue (filled by the UDPHandler)
msg = self.data_q.get()
# check if the "_id" field exists in the current node table
# if it exists in the table, find it, update the fields from json
# otherwise, if it's a new "_id": create a new node
json_msg = json.parse(msg)
node = Node(json_msg["_id"])
# set other node parameters from the json object, status etc
# update the node table with the new information from the message
table.put(nd)
#
def join():
# join thread
class NodeTable():
# the table is a dictionary with ID and a Node object, ie {0x01: Node}
table = {}
def put(_id, node):
self.table[_id] = node
def get(_id):
return self.table[_id]
class Node():
id = 0
def __init__(id):
self.id = id
# new node
# other node functions
谢谢!
1)您可以实现UDP服务器,该服务器使用以下代码行更简单地处理无限循环中的传入消息:
import socket
def udp_server(udp_ip, udp_port, ...):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind((upd_ip, upd_port))
while True:
data, addr = sock.recvfrom(1024) # buffer size is 1024 bytes
...process data...
有关更多详细信息和讨论,请参见https://wiki.python.org/moin/UdpCommunication。
参数udp_server
是udp IP和端口,以及服务器需要与之交互的任何其他数据结构。
将其启动到自己的线程中很容易完成:
import threading
t = threading.Thread(target = udp_server, args = (...))
t.start()
2)NodeTable类只是python字典的包装,但是似乎您想让多个线程同时访问它。在这种情况下,您应该阅读以下SO答案:(link)。
根据服务器线程以外的其他线程可以对节点字典执行的操作,您可能需要锁定,也可能不需要锁定。
总而言之,我在哪里编写代码:
def main():
nodes = {} # use a simple dict for storing the nodes
lock = RLock() # if you need this
# pass nodes and lock to server thread and start it
t = threading.Thread(target = udp_server, args = (udp_ip, udp_port, nodes, lock))
t.start()
...
此时,udp服务器正在运行,主线程可以通过变量访问节点表nodes
。
将新节点添加到节点表后,是否需要通知主线程?然后,也许队列就是您想要的。您将1)在其中创建它,main()
然后2)将其传递给udp_server
:
def main()
nodes = {} # use a simple dict for storing the nodes
lock = RLock() # if you need this
q = Queue() # create a Queue and pass it to the udp server
# pass nodes and lock to server thread and start it
t = threading.Thread(target = udp_server, args = (udp_ip, udp_port, nodes, lock, q))
t.start()
# process entries from the Queue
while True:
item = q.get()
... process item...
并在udp服务器函数...process data...
中将某些内容放入队列:
while True:
data, addr = sock.recvfrom(1024) # buffer size is 1024 bytes
...json decode, etc. ...
q.put(...)
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句