我正在使用cwiid库,该库是用C编写的,但在python中使用。该库使我可以使用Wiimote来控制机器人上的某些电机。该代码在没有监视器,键盘或鼠标的嵌入式设备上作为守护程序运行。
当我尝试初始化对象时:
import cwiid
while True:
try:
wm = cwiid.Wiimote()
except RuntimeError:
# RuntimeError exception thrown if no Wiimote is trying to connect
# Wait a second
time.sleep(1)
# Try again
continue
99%的时间,一切正常,但是有时,库进入某种奇怪的状态,对cwiid.Wiimote()
结果的调用导致库向stderr写入“套接字连接错误(控制通道)”,并且python引发异常。发生这种情况时,每次后续调用都会cwiid.Wiimote()
导致将相同的内容写入stderr,并引发相同的异常,直到我重新启动设备为止。
我想做的就是检测到此问题,并让python自动重启设备。
如果cwiid库处于怪异状态,则抛出的异常类型也为RuntimeError
,这与连接超时异常(这是很常见的)没有什么不同,因此我似乎无法以这种方式对其进行区分。我想做的是在运行后立即阅读stderr,cwiid.Wiimote()
以查看是否出现消息“套接字连接错误(控制通道)”,如果出现,则重新启动。
到目前为止,我可以使用一些os.dup()
和os.dup2()
方法重定向stderr以防止显示该消息,但这似乎并不能帮助我阅读stderr。
如果您正在使用子进程运行某些内容,则大多数在线示例都处理读取stderr的问题,在这种情况下不适用。
我该如何阅读stderr来检测正在写入的消息?
我认为我正在寻找的是这样的:
while True:
try:
r, w = os.pipe()
os.dup2(sys.stderr.fileno(), r)
wm = cwiid.Wiimote()
except RuntimeError:
# RuntimeError exception thrown if no Wiimote is trying to connect
if ('Socket connect error (control channel)' in os.read(r, 100)):
# Reboot
# Wait a second
time.sleep(1)
# Try again
continue
不过,这似乎并没有按照我认为的方式工作。
在subprocess
幕后,除了使用dups外,还使用匿名管道来重定向子流程输出。要使进程读取其自己的stderr,您需要手动执行此操作。它涉及获取匿名管道,将标准错误重定向到管道的输入,运行有问题的stderr写入操作,从管道另一端读取输出以及清理所有备份。一切都很轻松,但是我认为我在下面的代码中做到了。
您的cwiid.Wiimote
调用的以下包装器将返回一个元组,该元组由函数调用返回的结果(None
如果为RuntimeError
)和生成的stderr输出(如果有)组成。有关tests
在各种条件下应如何工作的示例,请参见该函数。我在适应您的示例循环时遇到了麻烦,但是不太了解cwiid.Wiimote
调用成功后会发生什么。在示例代码中,您只是立即重新循环。
编辑:糟糕!修复了在example_loop()
哪里Wiimote
调用而不是作为参数传递的错误。
import time
import os
import fcntl
def capture_runtime_stderr(action):
"""Handle runtime errors and capture stderr"""
(r,w) = os.pipe()
fcntl.fcntl(w, fcntl.F_SETFL, os.O_NONBLOCK)
saved_stderr = os.dup(2)
os.dup2(w, 2)
try:
result = action()
except RuntimeError:
result = None
finally:
os.close(w)
os.dup2(saved_stderr, 2)
with os.fdopen(r) as o:
output = o.read()
return (result, output)
## some tests
def return_value():
return 5
def return_value_with_stderr():
os.system("echo >&2 some output")
return 10
def runtime_error():
os.system("echo >&2 runtime error occurred")
raise RuntimeError()
def tests():
print(capture_runtime_stderr(return_value))
print(capture_runtime_stderr(return_value_with_stderr))
print(capture_runtime_stderr(runtime_error))
os.system("echo >&2 never fear, stderr is back to normal")
## possible code for your loop
def example_loop():
while True:
(wm, output) = capture_runtime_stderr(cmiid.Wiimote)
if wm == None:
if "Socket connect error" in output:
raise RuntimeError("library borked, time to reboot")
time.sleep(1)
continue
## do something with wm??
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句