StdTrap: a magical Pythonic mechanism for intercepting console output

As a programmer I believe less is more. Good code is small, simple and elegant and many times favorable to larger, noisier code that does the same. It's not just about aesthetics either. Making code small and beautiful makes it easier to read, and easier to understand. Which is guaranteed to make it work better. Trust me on this.

Every so often, I will come across a pattern that I suddenly realize would be so much easier to handle if only I had a magical primitive that would do what I want. By "magical" I mean a simple programming primitive that may have very simple interface that hides tricky internal machinery.

The impetus for some of my best work has come from these sudden insights. I've been interested in making things work "by magic" from my very first days as a programmer. I suspect I owe much of my technical skills to my willingness to embrace these challenges rather than shrinking away to the path of least resistance. I think most programmers miss out on these opportunities for growth because The Right Way is usually not the Easiest Way or the Quickest Way. You have to really care about the quality of your craft to go out of your way to make things beautiful on the inside.

The stdtrap.py module in turnkey-pylib demonstrate this principle well. I wrote it a few years ago to simplify large swaths of Python glue code. The interface is extremely simple:

trap = StdTrap(transparent=False)
print "printing to stdout..."
print >> sys.stderr, "printing to stderr..."
os.system("echo echo stdout")
os.system("echo echo stderr 1>&2")
trap.close()

print 'trapped stdout: """%s"""' % trap.stdout.read()
print >> sys.stderr, 'trapped stderr: """%s"""' % trap.stderr.read()

As evidenced by the example above StdTrap makes it trivial to intercept standard console output (e.g., stdout and stderr), regardless of how that output written - from local local Python code, a third-party library or even sub-programs executing outside of your Python program...

If we set the transparent variable to True we can even trap output transparently in real-time. This is really useful for implementing logic that depends on the output of a sub-routine or sub-program without actually getting in the way and blocking its output to the user.

The kicker is that as deceptively simple as this looks it's actually very tricky to implement reliably.

Difficult enough that to the best of my knowledge nobody has done it before. As much as I'd like to chalk that up to my programming genius I think there's a humbler explanation: nobody with the particular skills needed to even realize a more elegant solution was possible cared enough about elegant programming interfaces. Everyone else just followed the path of least resistance around the problem rather than sweeping it away, probably never realizing A Better Way was even possible.

From all the code I've developed StdTrap is my all-time favorite abstraction. If I didn't tell you that there was some special system voodoo under the hood involving hijacking low-level file descriptors you'd probably never ever suspect it. To support transparency StdTrap even creates a little invisible thread to funnel data in real-time to the right places as it is coming in through the hijacked file descriptor.

To give you a better idea of what I'm talking about here's the code for the splicing mechanism at the heart of StdTrap:

"""Inside the _splice method, stdout is intercepted at
the file descriptor level by redirecting it to a pipe. Now
whenever someone writes to stdout, we can read it out the
other end of the pipe.

The problem is that if we don't suck data out of this pipe
then eventually if enough data is written to it the process
writing to stdout will be blocked by the kernel, which means
we'll be limited to capturing up to 65K of output and after
that anything else will hang. So to solve that we create a
splicer subprocess to get around the OS's 65K buffering
limitation. The splicer subprocess's job is to suck the pipe
into a local buffer and spit it back out back to the parent
process through a second pipe created for this purpose"""

def _splice(spliced_fd, usepty, transparent):
    """splice into spliced_fd -> (splicer_pid, splicer_reader, orig_fd_dup)"""

    # duplicate the fd we want to trap for safe keeping
    orig_fd_dup = os.dup(spliced_fd)

    # create a bi-directional pipe/pty
    # data written to w can be read from r
    if usepty:
        r, w = os.openpty()
    else:
        r, w = os.pipe()

    # splice into spliced_fd by overwriting it
    # with the newly created `w` which we can read from with `r`
    os.dup2(w, spliced_fd)
    os.close(w)

    spliced_fd_reader = os.fdopen(r, "r", 0)
    splicer_pipe = Pipe()

    # the child process uses this to signal the parent to continue
    # the parent uses this to signal the child to close
    signal_event = SignalEvent()

    splicer_pid = os.fork()
    if splicer_pid:
        signal_continue = signal_event

        splicer_pipe.w.close()
        spliced_fd_reader.close()

        while not signal_continue.isSet():
            pass

        return splicer_pid, splicer_pipe.r, orig_fd_dup
    else:
        signal_closed = signal_event

        # child splicer
        splicer_pipe.r.close()

        # we don't need this copy of spliced_fd
        # keeping it open will prevent it from closing
        os.close(spliced_fd)

        set_blocking(spliced_fd_reader.fileno(), False)
        set_blocking(splicer_pipe.w.fileno(), False)

        def os_write_all(fd, data):
            while data:
                len = os.write(fd, data)
                if len < 0:
                    raise Error("os.write error")
                data = data[len:]


        poll = select.poll()
        poll.register(spliced_fd_reader, select.POLLIN | select.POLLHUP)

        buf = ""

        closed = False
        SignalEvent.send(os.getppid())

        while True:
            if not closed:
                closed = signal_closed.isSet()

            if closed and not buf:
                break

            try:
                events = poll.poll()
            except select.error:
                events = ()

            for fd, mask in events:
                if fd == spliced_fd_reader.fileno():
                    if mask & select.POLLIN:

                        data = spliced_fd_reader.read()

                        buf += data
                        poll.register(splicer_pipe.w)

                        if transparent:
                            # if our dupfd file descriptor has been closed
                            # redirect output to the originally trapped fd
                            try:
                                os_write_all(orig_fd_dup, data)
                            except OSError, e:
                                if e[0] == errno.EBADF:
                                    os_write_all(spliced_fd, data)
                                else:
                                    raise

                    if mask & select.POLLHUP:
                        closed = True
                        poll.unregister(fd)

                elif fd == splicer_pipe.w.fileno():
                    if mask & select.POLLOUT:
                        written = os.write(splicer_pipe.w.fileno(), buf)
                        buf = buf[written:]
                        if not buf:
                            poll.unregister(splicer_pipe.w)

        os._exit(0)        def _splice(spliced_fd, usepty, transparent):
    """splice into spliced_fd -> (splicer_pid, splicer_reader, orig_fd_dup)"""

    # duplicate the fd we want to trap for safe keeping
    orig_fd_dup = os.dup(spliced_fd)

    # create a bi-directional pipe/pty
    # data written to w can be read from r
    if usepty:
        r, w = os.openpty()
    else:
        r, w = os.pipe()

    # splice into spliced_fd by overwriting it
    # with the newly created `w` which we can read from with `r`
    os.dup2(w, spliced_fd)
    os.close(w)

    spliced_fd_reader = os.fdopen(r, "r", 0)
    splicer_pipe = Pipe()

    # the child process uses this to signal the parent to continue
    # the parent uses this to signal the child to close
    signal_event = SignalEvent()

    splicer_pid = os.fork()
    if splicer_pid:
        signal_continue = signal_event

        splicer_pipe.w.close()
        spliced_fd_reader.close()

        while not signal_continue.isSet():
            pass

        return splicer_pid, splicer_pipe.r, orig_fd_dup
    else:
        signal_closed = signal_event

        # child splicer
        splicer_pipe.r.close()

        # we don't need this copy of spliced_fd
        # keeping it open will prevent it from closing
        os.close(spliced_fd)

        set_blocking(spliced_fd_reader.fileno(), False)
        set_blocking(splicer_pipe.w.fileno(), False)

        def os_write_all(fd, data):
            while data:
                len = os.write(fd, data)
                if len < 0:
                    raise Error("os.write error")
                data = data[len:]


        poll = select.poll()
        poll.register(spliced_fd_reader, select.POLLIN | select.POLLHUP)

        buf = ""

        closed = False
        SignalEvent.send(os.getppid())

        while True:
            if not closed:
                closed = signal_closed.isSet()

            if closed and not buf:
                break

            try:
                events = poll.poll()
            except select.error:
                events = ()

            for fd, mask in events:
                if fd == spliced_fd_reader.fileno():
                    if mask & select.POLLIN:

                        data = spliced_fd_reader.read()

                        buf += data
                        poll.register(splicer_pipe.w)

                        if transparent:
                            # if our dupfd file descriptor has been closed
                            # redirect output to the originally trapped fd
                            try:
                                os_write_all(orig_fd_dup, data)
                            except OSError, e:
                                if e[0] == errno.EBADF:
                                    os_write_all(spliced_fd, data)
                                else:
                                    raise

                    if mask & select.POLLHUP:
                        closed = True
                        poll.unregister(fd)

                elif fd == splicer_pipe.w.fileno():
                    if mask & select.POLLOUT:
                        written = os.write(splicer_pipe.w.fileno(), buf)
                        buf = buf[written:]
                        if not buf:
                            poll.unregister(splicer_pipe.w)

        os._exit(0)

You can find the full GPL3 licensed source code for stdtrap on GitHub @ turnkey-pylib/pylib/stdtrap.py. Hope you find it useful!

Add new comment