'Broken Pipe' error when reusing the same pipe inside a loop

ViFI

I am new to interprocess communication and I am trying to understand the usage of os.pipe and os.fork with each other in Python.

In the code below, If I uncomment the lines "Broken Pipe" error comes otherwise it is working fine.

Idea is to have a SIGCHLD handler when child process exits and increment respective counters when child only function (run_child) and parent only function (sigchld_handler) execute. Since forked process will have its own version of memory and changes will not reflect in parent process, attempt is to let child process send message to parent process via pipe and let parent process update counter.

import os
import signal
import time

class A(object):
    def __init__(self):
        self.parent  = 0
        self.child = 0
        self._child_pid = None

        self.rd , self.wr = os.pipe()
        print self.rd , self.wr
        signal.signal(signal.SIGCHLD, self.sigchld_handler)

    def sigchld_handler(self, a, b):
        self.parent += 1
        print "Main run count : (parent) ", self.parent
        #rf = os.fdopen(self.rd, 'r')
        #self.child = int(rf.read())
        #rf.close()
        self._child_pid = None

    def run_child(self):
        self.child += 1
        print "Main run count (child) : ", self.child
        print "Running in child : " , os.getpid()
        wr = os.fdopen(self.wr,'w')
        text = "%s" % (self.child)
        print "C==>", text
        wr.write(text)
        wr.close()
        os._exit(os.EX_OK)

    def run(self):
        if self._child_pid:
            print "Child Process", self._child_pid, " already running."
        else:
            self._child_pid = os.fork()

            if not self._child_pid:
                self.run_child()

a = A()
i = 0
while True:
    a.run()
    time.sleep(4)
    i += 1
    if i > 5:
        break

Interestingly error comes after first few iterations. Can somebody please explain why the error is coming and what should I do to solve this.

EDIT 1: There are a couple of similar examples: ex1 , ex2 , ex3 . I have actually used them only to learn but in my case, I am extending the examples to run in a loop to act more like a producer/consumer queue. I understand it might not be good approach as multiprocess/Queue modules are available in Python but I want to understand the mistake I am making here.

EDIT 2 (solution):

Based on @S.kozlov's answer, modifying code to create a new pipe for every communication. Here is the modified code.

import os
import pdb
import signal
import time

class A(object):
    def __init__(self):
        self.parent  = 0
        self.child = 0
        self._child_pid = None
        signal.signal(signal.SIGCHLD, self.sigchld_handler)

    def sigchld_handler(self, a, b):
        self.parent += 1
        os.close(self.wr)
        print "Main run count : (parent) ", self.parent
        rd = os.fdopen(self.rd, 'r')
        self.child = int(rd.read())
        self._child_pid = None

    def run_child(self):
        self.child += 1
        print "Main run count (child) : ", self.child
        print "Running in child : " , os.getpid()
        os.close(self.rd)
        wr = os.fdopen(self.wr, 'w')
        text = "%s" % (self.child)
        print "C==>", text
        wr.write(text)
        wr.close()
        os._exit(os.EX_OK)

    def run(self):
        if self._child_pid:
            print "Child Process", self._child_pid, " already running."
        else:
            self.rd , self.wr = os.pipe()
            self._child_pid = os.fork()

            if not self._child_pid:
                self.run_child()

a = A()
i = 0
while True:
    a.run()
    time.sleep(4)
    i += 1
    if i > 5:
        break

With this, output should come (something) like this.

Main run count (child) :  1
Running in child :  15752
C==> 1
Main run count : (parent)  1
Main run count (child) :  2
Running in child :  15753
C==> 2
Main run count : (parent)  2
Main run count (child) :  3
Running in child :  15754
C==> 3
Main run count : (parent)  3
Main run count (child) :  4
Running in child :  15755
C==> 4
Main run count : (parent)  4
Main run count (child) :  5
Running in child :  15756
C==> 5
Main run count : (parent)  5
Main run count (child) :  6
Running in child :  15757
C==> 6
Main run count : (parent)  6 
S.Kozlov

The problem with your code is that you are trying to reuse one pipe several times, and it's not the valid case for pipe in general. The exception you are getting just saying you: "Hey, you have closed this pipe on the previous run. Once a pipe is closed, it's closed.".

So you can change your code to create a pipe for each child, store one end (read) in the "parent" and give another to the child. Then it should work.

Edit 1. I've updated your code with that thing about "one pipe for every child", it's not how the good code supposed to be, but in educational sense hope it will help.

import os
import signal
import time


class A(object):
    def __init__(self):
        self.parent = 0
        self.child = 0
        self._child_pid = None
        signal.signal(signal.SIGCHLD, self.sigchld_handler)

    def sigchld_handler(self, a, b):
        self.parent += 1
        print "Main run count : (parent) ", self.parent
        os.close(self.wr)
        rf = os.fdopen(self.rd, 'r')
        message = rf.read()
        rf.close()
        print "Code from child [", self._child_pid, "]: ", message
        self.rd = None
        self._child_pid = None

    def run_child(self):
        self.child += 1
        print "Main run count (child) : ", self.child
        print "Running in child : " , os.getpid()
        os.close(self.rd)
        wr = os.fdopen(self.wr, 'w')
        text = "Hello from %s" % (self.child)
        print "C==>", text
        wr.write(text)
        wr.close()
        os._exit(os.EX_OK)

    def run(self):
        if self._child_pid:
            print "Child Process", self._child_pid, " already running."
        else:
            rd, wr = os.pipe()
            self.rd = rd
            self.wr = wr
            self._child_pid = os.fork()

            if not self._child_pid:
                self.run_child()
a = A()
i = 0
while True:
    a.run()
    time.sleep(4)
    i += 1
    if i > 5:
        break

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related

From Dev

Error when booting, "Broken pipe"

From Dev

Broken pipe error when running Gradle test

From Dev

Broken pipe error when reading stops

From Dev

What is a broken pipe error?

From Dev

Broken pipe error on scp

From Dev

What is a broken pipe error?

From Dev

Broken Pipe Error Redis

From Dev

error: [Errno 32] Broken pipe

From Dev

Error -32 EPIPE broken pipe

From Dev

Broken pipe error when executing Android method more than once?

From Dev

Dpkg broken pipe error when using os.popen in Python

From Dev

Redis broken pipe error when running in istio mesh

From Dev

Unity Error ERROR_BROKEN_PIPE

From Dev

Named pipe: ReadFile after ConnectNamedPipe return ERROR_BROKEN_PIPE

From Dev

Broken-pipe Error Python subprocess

From Dev

Fixing broken pipe error in uWSGI with Python

From Dev

git throwing broken pipe error on push

From Dev

What is causing this WildFly / Undertow broken pipe error?

From Dev

Spring Cloud: Zuul Broken Pipe Error

From Dev

Error Broken pipe while install chrubuntu

From Dev

Hibernation error - Could not write byte: broken pipe

From Dev

stop python program when ssh pipe is broken

From Dev

uwsgi broken pipe when running it as systemd service

From Dev

Getting error "cat: write error: Broken pipe" only when running bash script non-interactively

From Dev

OCaml - Fatal error: exception Sys_error("Broken pipe") when using `| head` on output containing many lines

From Dev

Reusing pipe data for different commands

From Dev

Reusing pipe data for different commands

From Dev

python3 - broken pipe error when using socket.send()

From Dev

Broken pipe error when trying to send data from server to client in Python sockets

Related Related

HotTag

Archive