Programming

HTTP streaming of command output in Python Flask

I needed an endpoint that streamed the output of an external program to the remote client. In this article I describe how I did it and discuss a few issues I encountered. Note that if you just want to stream events back to the browser, I’ll also cover that. An external command is just what I needed, and is the more difficult case.

A simple stream

The program below is a simple Flask server. To run it you need to pip install flask shelljob. Save it to a file server.py and then run python server.py.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
import flask
from shelljob import proc

app = flask.Flask(__name__)

@app.route( '/stream' )
def stream():
    g = proc.Group()
    p = g.run( [ "bash", "-c", "for ((i=0;i<100;i=i+1)); do echo $i; sleep 1; done" ] )

    def read_process():
        while g.is_pending():
            lines = g.readlines()
            for proc, line in lines:
                yield line

    return flask.Response( read_process(), mimetype= 'text/plain' )

if __name__ == "__main__":
    app.run(debug=True)

Once running you can issue a curl request to see that it’s streaming: curl http://127.0.0.1:5000/stream. This assumes it started on port 5000, just check the server output.

The streamed data in this example is a simple bash loop that generates a sequence of number and pauses in between. You’ll probably want to put some useful command in its place.

The Group class is part of the shelljob package I wrote before. It takes care of the ugly streaming of data from a subprocess in Python. (I’ve not yet tested/ported it to Python 3. Let me know if you need that.) Here I’m just reading the data from the process and yielding it to the stream. If you’re already familiar with yield that bit should be clear. If not, then I suggest you read up on yield as its too much to cover here.

Timeout?

Since you’re a responsible server programmer you probably have a question about timeouts. At the moment that above loop is hopeful that the subprocess eventually exits. To change that we need to have a timeout. I have taken care that readlines itself has a default timeout of two seconds and will simply return an empty list after that time. This gives you a chance to put in your own premature termination conditions.

Here is a modified read_process function that prints an interval message every 5 seconds (very roughly).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
    def read_process():
        trigger_time = time.time() + 5
        while g.is_pending():
            lines = g.readlines()
            for proc, line in lines:
                yield line

            now = time.time()
            if now > trigger_time:
                yield "*** Interval"
                trigger_time = now + 5

Does that block my server?

Running the server directly will cause the loop to block the server. If you do another curl request in a second console you won’t get any response. Kill the first request and then suddenly you’ll start getting data. This isn’t very satisfying.

Chances are you already have a solution for your server, like gunicorn or uwsgi. In case you don’t I’ll go over how to do it with gunicorn. First pip install gunicorn eventlet. Now don’t run the server directly, instead use the command gunicorn -k eventlet stream_server:app. Note this launches the server on a different port (usually 8000).

Unfortunately, if you do a curl request now, it won’t work. It’s something I don’t yet understand. It only applies to external subprocess calls; if you stream internally generated data everything is fine. The fix is to add a call to monkey_patch at the top of the server code, after the other imports.

1
2
3
4
5
6
7
import flask
from shelljob import proc

import eventlet
eventlet.monkey_patch()

app = flask.Flask(__name__)

Good. Now you can issue a bunch of curl requests in multiple consoles and all of them will stream the results.

I would be appreciative if somebody could explain the reason why I need monkey_patch here. It feels like a defect somewhere, possibility compatiblity between gunicorn and shelljob. I’d like to find the proper fix.

Now my Python process doesn’t work

It seems to work until you call a Python subprocess. Now it appears to block for a long-time, possibly until the end of the process, and then give all the data at once — or timeout.

This happens because Python is trying to be clever. If the output of the command is not a console it goes into a buffered output mode. So instead of flushing on every line of output it accumulates a lot more text before anything is actually written.

To get around this the python process can be launched with a -u parameter. This turns off the buffering.

I made a simple external script to test this, called slow.py.

1
2
3
4
5
import time

for i in range(0,100):
    print(i)
    time.sleep(1)

In the server I changed the process I run to p = g.run( [ "python", "-u", "slow.py" ] ). A curl request now reports the lines as we hoped. To see the broken behaviour just remove the -u part.

Stream events to a browser

The above can be combined with an EventSource in a browser (at least those that support it). First create the following page.html file.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
<!DOCTYPE html>
<html>
<head>
    <script>
    var source = new EventSource("/stream");
    source.onmessage = function(event) {
        document.getElementById("output").innerHTML += event.data + "<br/>"
    }
    </script>
</head>
<body>
    <h1>Output</h1>
    <div id="output"></div>
</body>
</html>

A few lines in the server must be modified. The yield line needs to format a message response. This is merely a data: header and two line-feeds. The response must be marked as text/event-stream to tell the browser it’s a stream. Finally we need an endpoint to serve the HTML page — don’t load the file directly in the browser as cross-domain restrictions will break it. The below is the modified server.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import flask
from shelljob import proc

import eventlet
eventlet.monkey_patch()

app = flask.Flask(__name__)

@app.route( '/stream' )
def stream():
    g = proc.Group()
    p = g.run( [ "bash", "-c", "for ((i=0;i<100;i=i+1)); do echo $i; sleep 1; done" ] )

    def read_process():
        while g.is_pending():   
            lines = g.readlines()
            for proc, line in lines:
                yield "data:" + line + "\n\n"

    return flask.Response( read_process(), mimetype= 'text/event-stream' )

@app.route('/page')
def get_page():
    return flask.send_file('page.html')

if __name__ == "__main__":
    app.run(debug=True)

Navigate to http://127.0.0.1:8000/page in your browser and the stream should start appearing. If you have the gunicorn stuff setup correctly you should be able to open multiple tabs. Each load of this page will have its own stream.

Also interesting, EventSource restarts the connection when it is done. Once you reach the final number it will just start counting again. Try reducing the bash loop to see this effect. I don’t know much else about event sources, so you’re on your own from here.

Categories: Programming, Use Case

Tagged as: , , ,

11 replies »

  1. Per your question:

    “I would be appreciative if somebody could explain the reason why I need monkey_patch here. It feels like a defect somewhere, possibility compatiblity between gunicorn and shelljob. I’d like to find the proper fix.”

    monkey_patch does things like change paths to library calls, for example if you were using gevents, time.sleep(), it would be changed to gevent.sleep().
    I think underneath monkey_patch deals with blocking versus nonblocking library calls.
    time.sleep() inside is a simple loop, gevent.sleep() syncs with the master thread.

    • I thought the combination of Gunicron with Flask would automatically do the monkey_patch however. I assume this since the HTTP layer is patched automatically.

  2. In my limited experience, use of monkey_patch is arbitrary. Not sure why.

    I tried substituting -k gevent for -k eventlet but curl only responded in one terminal. When I then monkey_patched gevent, curl did not respond in any terminal.

    • I’m not entirely happy with the Gunicorn stack. From experiece I know a lot of combinations of workers and configuration just don’t work correctly.

  3. Dear mororay,

    Firstly, thanks for your script

    My issue is: I want to input variable to your stream() function, when I tried to do that, I always received error “stream() takes exactly 1 argument (0 given)”

    In my opinion, I think I will edit your page.html or input via url, but I don’t know how to do that.

    Would you please point me some key? Thanks for your support.

    suker200

  4. About this:

    > It feels like a defect somewhere, possibility compatiblity between gunicorn and shelljob. I’d like to find the proper fix.

    I just accomplished the exact same thing as you did here, using flask (default config mainly), intermediate files, and subprocess. Exact same problem seems to happen – first curl gets streaming output, second curl hangs until first finishes

  5. Hi!
    I would like to have stream output inline, preferably inside some html tag and with blazing fast speed of the output update. Do you think something like this can be made with flask ?

    • You’ll want to loook at the section on “Stream events to the browser”. This is an effective way to get the browser to update as data comes back.

      You could also create your own HTTP reader in JS and continually add it to the text/html of an element. I don’t know for sure how the buffering affects this though.

    • Thanks. I implemented this yesterday evening using socketIO. It all works smoothly so now I have to figure out how you can stop and resume stream where you left off but that is another story.

  6. Thanks for your great work!I followed your instructions and implements it.But I have a problem here,when i nav to localhost:5000/page,the webpage keeps updating all the time,that is,it keeps print 0~100 as if the for loop is running all the time.However,when nav to localhost:5000/stream,it seems the loop only exec once.where am wrong ?

    • The browser sees this as a permanent stream and expects a long-term connection. If it loses the connection it will simply connect again. In our simple case our server will just print the numbers again.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s