条件变量不跨两个线程等待

TT

难倒我遇到的这个线程同步。基本上,我正在写入输出缓冲区,并等待条件变量,直到读取缓冲区填充来自套接字的响应。这是一个非常简单的线程同步。

def write_wait_response(self, buffer, timeout=30):
        '''
            Write and wait for response
            Params:
                Buffer BYTE encoded data
                Timeout timeout to wait for response
            Returns: 
                response str if successful
        '''
        self.buffer = buffer

        if self.waitLock(timeout):
            # condition var was signaled, we can return a response
            readbuf = bytes(self.readbuffer)
            self.readbuffer = b''
            return readbuf
        else:
            print("AsyncClientSocket: No response recieved from {} in {} seconds, dumping buffer".format(
                    self.sa, timeout))
            self.buffer = ''
            raise TimeoutError("AsyncClientSocket Timed Out")

def handle_read(self):
        self.readbuffer, address = self.recvfrom(2048)
        print(self.readbuffer)
        print("notifying")
        self.cond.notifyAll() 

看起来够简单了吧?有 1 个线程在等待条件变量,还有 1 个线程(asyncore 异步回调循环)将填充 self.readbuffer 并通知条件变量。更奇怪的是:如果我执行 time.sleep() 而不是使用条件变量,我会在 write_wait_response() 的调用线程上得到一个完美填充的 self.readbuffer。显然,这不是我可以接受的解决方案。

这是我期望发生的事情:

  1. 调用 write_wait_response(buffer),这会写入缓冲区并等待 self.cond
  2. 异步回调循环调用 handle_write,将字节写入套接字。
  3. 服务器接收字节,写入响应。
  4. 异步回调循环看到套接字上的字节,读入 self.readbuffer,通知 cv
  5. ?????????write_wait_response 应该解除阻塞吗?

控制台输出:

waiting <- thread 1 waiting on CV
AsyncClientSocket: writing 5 bytes <- thread 2: handle_write
b'200,2' <- thread 2: that's the server response
notifying <- thread 2: that's handle_read attempting to notify the held CV

error: uncaptured python exception, closing channel <my_socket_stuff.AsyncClientSocket connected 127.0.0.1:50000 at 0x1051bf438> (<class 'RuntimeError'>:cannot notify on un-acquired lock

注意:在此日志的末尾,线程 1 仍在等待 self.cond。这是怎么回事?

全班:

class AsyncClientSocket(asyncore.dispatcher):
    def __init__(self, socketargs):
        asyncore.dispatcher.__init__(self)
        family, type, proto, canonname, sa = socketargs
        self.sa = sa
        self.create_socket(family, type)

        if type == socket.SOCK_STREAM:
            self.connect( sa )
        elif type == socket.SOCK_DGRAM:
            pass

        self.buffer = b''
        self.lock = threading.Lock()
        self.cond = threading.Condition(self.lock)
        self.readbuffer = b''

    def write_wait_response(self, buffer, timeout=30):
        '''
            Write and wait for response
            Params:
                Buffer BYTE encoded data
                Timeout timeout to wait for response
            Returns: 
                response str if successful
        '''
        self.buffer = buffer

        if self.waitLock(timeout):
            # condition var was signaled, we can return a response
            readbuf = bytes(self.readbuffer)
            self.readbuffer = b''
            return readbuf
        else:
            print("AsyncClientSocket: No response recieved from {} in {} seconds, dumping buffer".format(
                    self.sa, timeout))
            self.buffer = ''
            raise TimeoutError("AsyncClientSocket Timed Out")

    def waitLock(self, timeout):
        '''
            Wait for timeout seconds on CV
        '''
        try:
            self.cond.acquire()
            print("waiting")
            return self.cond.wait(timeout)
        finally:
            self.cond.release()


    def handle_connect(self):
        pass

    def handle_close(self):
        self.close()

    def handle_read(self):
        self.readbuffer, address = self.recvfrom(2048)
        print(self.readbuffer)
        print("notifying")
        self.cond.notifyAll() 

    def writable(self):
        return (len(self.buffer) > 0)

    def handle_write(self):
        print("AsyncClientSocket: writing {} bytes".format(len(self.buffer)))
        self.readbuffer = b''
        sent = self.sendto(self.buffer, self.sa)
        self.buffer = self.buffer[sent:]
TT

弄清楚了。这与异步无关。我只是错误地向条件变量发出信号。python3 threading api doc说notify()的调用线程必须获取底层锁,这是有道理的,不希望两个生产者通知同一个条件变量。希望一个阻塞在临界区,而另一个执行其任务。

def handle_read(self):
    try:
        self.cond.acquire()
        self.readbuffer, address = self.recvfrom(2048)
        print(self.readbuffer)
        self.cond.notify() 
    finally:
        self.cond.release()

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

如何获得两个pthread线程来响应彼此的等待和信号条件?

来自分类Dev

如何使两个线程互相等待结果?

来自分类Dev

如何使两个线程等待并互相通知

来自分类Dev

基于两个变量的条件匹配

来自分类Dev

检查两个变量signum与条件

来自分类Dev

在多线程环境中用原子保护两个变量

来自分类Dev

从C中的两个线程访问变量

来自分类Dev

从两个不同的线程访问相同的变量

来自分类Dev

通过两个不同的线程设置和访问变量

来自分类Dev

在两个线程之间共享时间变量

来自分类Dev

条件不工作的两个内部连接

来自分类Dev

两个变量在PHP上不匹配

来自分类Dev

在两个页面之间交换变量而不更改页面

来自分类Dev

使用matplotlib连接两个不连续的x变量

来自分类Dev

在两个页面之间交换变量而不更改页面

来自分类Dev

根据特定条件组合两个变量R

来自分类Dev

Python Pandas沿两个条件变量添加序列

来自分类Dev

根据特定条件组合两个变量R

来自分类Dev

如何根据两个变量的条件打印特定输出

来自分类Dev

满足两个条件的sql set变量

来自分类Dev

jQuery if / else语句,如果if条件包含或(||)两个变量

来自分类Dev

IF语句具有两个条件的相同变量

来自分类Dev

C#如何使线程等待两个手动重置事件之一?

来自分类Dev

C#如何使线程等待两个手动重置事件之一?

来自分类Dev

Java等待通过两个线程作为同一类进行通知

来自分类Dev

C ++从两个线程访问Int变量。线程安全吗?

来自分类Dev

如何触发两个承诺并有条件地等待一个?

来自分类Dev

跨两个部分的交集

来自分类Dev

异步等待是否是使程序中的两个独立任务并行运行的必要条件?

Related 相关文章

热门标签

归档