Tags

, , ,

I needed an endpoint that streamed the output of an external program to the remote client. In this article I describe how I did it and discuss a few issues I encountered. Note that if you just want to stream events back to the browser, I’ll also cover that. An external command is just what I needed, and is the more difficult case.

A simple stream

The program below is a simple Flask server. To run it you need to pip install flask shelljob. Save it to a file server.py and then run python server.py.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
import flask
from shelljob import proc

app = flask.Flask(__name__)

@app.route( '/stream' )
def stream():
    g = proc.Group()
    p = g.run( [ "bash", "-c", "for ((i=0;i<100;i=i+1)); do echo $i; sleep 1; done" ] )

    def read_process():
        while g.is_pending():
            lines = g.readlines()
            for proc, line in lines:
                yield line

    return flask.Response( read_process(), mimetype= 'text/plain' )

if __name__ == "__main__":
    app.run(debug=True)

Once running you can issue a curl request to see that it’s streaming: curl http://127.0.0.1:5000/stream. This assumes it started on port 5000, just check the server output.

The streamed data in this example is a simple bash loop that generates a sequence of number and pauses in between. You’ll probably want to put some useful command in its place.

The Group class is part of the shelljob package I wrote before. It takes care of the ugly streaming of data from a subprocess in Python. (I’ve not yet tested/ported it to Python 3. Let me know if you need that.) Here I’m just reading the data from the process and yielding it to the stream. If you’re already familiar with yield that bit should be clear. If not, then I suggest you read up on yield as its too much to cover here.

Timeout?

Since you’re a responsible server programmer you probably have a question about timeouts. At the moment that above loop is hopeful that the subprocess eventually exits. To change that we need to have a timeout. I have taken care that readlines itself has a default timeout of two seconds and will simply return an empty list after that time. This gives you a chance to put in your own premature termination conditions.

Here is a modified read_process function that prints an interval message every 5 seconds (very roughly).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
    def read_process():
        trigger_time = time.time() + 5
        while g.is_pending():
            lines = g.readlines()
            for proc, line in lines:
                yield line

            now = time.time()
            if now > trigger_time:
                yield "*** Interval"
                trigger_time = now + 5

Does that block my server?

Running the server directly will cause the loop to block the server. If you do another curl request in a second console you won’t get any response. Kill the first request and then suddenly you’ll start getting data. This isn’t very satisfying.

Chances are you already have a solution for your server, like gunicorn or uwsgi. In case you don’t I’ll go over how to do it with gunicorn. First pip install gunicorn eventlet. Now don’t run the server directly, instead use the command gunicorn -k eventlet stream_server:app. Note this launches the server on a different port (usually 8000).

Unfortunately, if you do a curl request now, it won’t work. It’s something I don’t yet understand. It only applies to external subprocess calls; if you stream internally generated data everything is fine. The fix is to add a call to monkey_patch at the top of the server code, after the other imports.

1
2
3
4
5
6
7
import flask
from shelljob import proc

import eventlet
eventlet.monkey_patch()

app = flask.Flask(__name__)

Good. Now you can issue a bunch of curl requests in multiple consoles and all of them will stream the results.

I would be appreciative if somebody could explain the reason why I need monkey_patch here. It feels like a defect somewhere, possibility compatiblity between gunicorn and shelljob. I’d like to find the proper fix.

Now my Python process doesn’t work

It seems to work until you call a Python subprocess. Now it appears to block for a long-time, possibly until the end of the process, and then give all the data at once — or timeout.

This happens because Python is trying to be clever. If the output of the command is not a console it goes into a buffered output mode. So instead of flushing on every line of output it accumulates a lot more text before anything is actually written.

To get around this the python process can be launched with a -u parameter. This turns off the buffering.

I made a simple external script to test this, called slow.py.

1
2
3
4
5
import time

for i in range(0,100):
    print(i)
    time.sleep(1)

In the server I changed the process I run to p = g.run( [ "python", "-u", "slow.py" ] ). A curl request now reports the lines as we hoped. To see the broken behaviour just remove the -u part.

Stream events to a browser

The above can be combined with an EventSource in a browser (at least those that support it). First create the following page.html file.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
<!DOCTYPE html>
<html>
<head>
    <script>
    var source = new EventSource("/stream");
    source.onmessage = function(event) {
        document.getElementById("output").innerHTML += event.data + "<br/>"
    }
    </script>
</head>
<body>
    <h1>Output</h1>
    <div id="output"></div>
</body>
</html>

A few lines in the server must be modified. The yield line needs to format a message response. This is merely a data: header and two line-feeds. The response must be marked as text/event-stream to tell the browser it’s a stream. Finally we need an endpoint to serve the HTML page — don’t load the file directly in the browser as cross-domain restrictions will break it. The below is the modified server.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import flask
from shelljob import proc

import eventlet
eventlet.monkey_patch()

app = flask.Flask(__name__)

@app.route( '/stream' )
def stream():
    g = proc.Group()
    p = g.run( [ "bash", "-c", "for ((i=0;i<100;i=i+1)); do echo $i; sleep 1; done" ] )

    def read_process():
        while g.is_pending():   
            lines = g.readlines()
            for proc, line in lines:
                yield "data:" + line + "\n\n"

    return flask.Response( read_process(), mimetype= 'text/event-stream' )

@app.route('/page')
def get_page():
    return flask.send_file('page.html')

if __name__ == "__main__":
    app.run(debug=True)

Navigate to http://127.0.0.1:8000/page in your browser and the stream should start appearing. If you have the gunicorn stuff setup correctly you should be able to open multiple tabs. Each load of this page will have its own stream.

Also interesting, EventSource restarts the connection when it is done. Once you reach the final number it will just start counting again. Try reducing the bash loop to see this effect. I don’t know much else about event sources, so you’re on your own from here.