Skip to content Skip to sidebar Skip to footer

Read Subprocess Stdout And Stderr Concurrently

I'm trying to run a lengthy command within Python that outputs to both stdout and stderr. I'd like to poll the subprocess and write the output to separate files. I tried the follow

Solution 1:

I did that once.. here is some old code I wrote



class Process_Communicator():

    def join(self):
        self.te.join()
        self.to.join()
        self.running = False
        self.aggregator.join()
        self.ti.join()

    def enqueue_in(self):
        while self.running and self.p.stdin is not None:
            while not self.stdin_queue.empty():
                s = self.stdin_queue.get()
                self.p.stdin.write(str(s) + '\n\r')
            pass

    def enqueue_output(self):
        if not self.p.stdout or self.p.stdout.closed:
            return
        out = self.p.stdout
        for line in iter(out.readline, b''):
            self.qo.put(line)
        #    out.flush()

    def enqueue_err(self):
        if not self.p.stderr or self.p.stderr.closed:
            return
        err = self.p.stderr
        for line in iter(err.readline, b''):
            self.qe.put(line)

    def aggregate(self):
        while (self.running):
            self.update()
        self.update()

    def update(self):
        line = ""
        try:
            while self.qe.not_empty:
                line = self.qe.get_nowait()  # or q.get(timeout=.1)
                self.unbblocked_err += line
        except Queue.Empty:
            pass

        line = ""
        try:
            while self.qo.not_empty:
                line = self.qo.get_nowait()  # or q.get(timeout=.1)
                self.unbblocked_out += line
        except Queue.Empty:
            pass

        while not self.stdin_queue.empty():
                s = self.stdin_queue.get()
                self.p.stdin.write(str(s))

    def get_stdout(self, clear=True):
        ret = self.unbblocked_out
        if clear:
            self.unbblocked_out = ""
        return ret

    def has_stdout(self):
        ret = self.get_stdout(False)
        if ret == '':
            return None
        else:
            return ret

    def get_stderr(self, clear=True):
        ret = self.unbblocked_out
        if clear:
            self.unbblocked_out = ""
        return ret

    def has_stderr(self):
        ret = self.get_stdout(False)
        if ret == '':
            return None
        else:
            return ret

    def __init__(self, subp):
        '''This is a simple class that collects and aggregates the
        output from a subprocess so that you can more reliably use
        the class without having to block for subprocess.communicate.'''
        self.p = subp
        self.unbblocked_out = ""
        self.unbblocked_err = ""
        self.running = True
        self.qo = Queue.Queue()
        self.to = threading.Thread(name="out_read",
                                    target=self.enqueue_output,
                                    args=())
        self.to.daemon = True  # thread dies with the program
        self.to.start()

        self.qe = Queue.Queue()
        self.te = threading.Thread(name="err_read",
                                   target=self.enqueue_err,
                                   args=())
        self.te.daemon = True  # thread dies with the program
        self.te.start()

        self.stdin_queue = Queue.Queue()
        self.aggregator = threading.Thread(name="aggregate",
                                           target=self.aggregate,
                                           args=())
        self.aggregator.daemon = True  # thread dies with the program
        self.aggregator.start()
        pass

You may not need the whole example, but feel free to cut copy and paste what you need. It's also important to show how I did the threading.


Solution 2:

The code looks more complicated than the task requires. I don't see why do you need to call process.poll() or queue.get_nowait() here. To deliver subprocess' stdout/stderr to several sinks; you could start with teed_call() that accepts arbitrary file-like objects: you could pass logfiles and special file-like objects that accumulates errors in theirs .write() methods.

To fix your code with minimal changes; you should call .join() on the reader threads (even if process.poll() is not None i.e., the subprocess exited; there could be some pending output. Joining reader's threads ensures that all output is read).


Post a Comment for "Read Subprocess Stdout And Stderr Concurrently"