Threaded data collection with Python, including examples

by Rob Haswell
Bronco - Digital Marketing Agency

On today’s Internet 2.0 there are all sorts of data feeds available for consumption. From APIs to RSS feeds, it seems like nearly every site has a machine-readable output. There are many reasons why you’d want to collect this information, which I won’t go in to, so in this post I’m going to walk you through an application which consumes RSS feeds. I’ll be using the Python scripting language, and I’ll show you an evolution of the ways to go about the task:

Application introduction

Our application is going to work like this:

  • A database contains the list of RSS feeds. This is long – 1000+ records
  • Our application reads this list of feeds and processes them
  • The items from the feeds are stored in the database

Database manipulation and RSS feed parsing are outside the scope of this tutorial, so we’ll start off by defining some empty functions that handle all this:

"def get_feed_list():
""" Returns a list of tuples: (id, feed_url) """
pass
def get_feed_contents(feed_url):
""" Gets feed over HTTP, returns RSS XML """
pass
def parse_feed(feed_rss):
""" Parses the feed and returns a list of items """
pass
def store_feed_items(id, items):
""" Takes a feed_id and a list of items and stored them in the DB """
pass

We’re going to have all these in a module called “functions”, which can just be a file called functions.py in the same directory ( < python3.0)

Implementation 1: Single-threaded

This is the way most people would do it at first. So simple, I’ll just post the sample code:

import functions
for id, feed_url in get_feed_list():
rss = functions.get_feed_contents(feed_url)
items = functions.parse_feed(rss)
functions.store_feed_items(id, items)

Pretty simple huh? But there are fundamental problems. Feeds are usually slow, meaning that your program will spend a lot of time waiting for feeds to come in before you can parse them. You program will also be spending time parsing feeds when it could be getting them from the internet as well. Consequently this program will be as slow as molasses. It’s like eating a bowl of peas one at a time – you’d rather just shovel them in wouldn’t you? Enter: threading.

Implementation 2

So we reckon: “If we use threads, this will make things faster?” Answer: Yes. However, there are quite a few ways of doing this. We’ll start off with this:

import threading, functions, time
def thread(id, feed_url):
rss = functions.get_feed_contents(feed_url)
items = functions.parse_feed(rss)
functions.store_feed_items(id, items)
 
for id, feed_url in get_feed_list():
t = threading.Thread(target=thread, kwargs={"id":id, "feed_url":feed_url})
t.start()
while threading.activeCount() &gt; 1: time.sleep(1)

Problem: This is just going to create as many threads as there are feed items immediately and then wait for them to finish. This has the following issues:

  • If you try to request 1000+ pages at a time, many of them will time out. If a percentage of these are on the same server, you’ll DoS it.
  • With 1000 threads your app will likely either run out of memory or get so bogged down in context switching it’ll take forever
  • Try this on any consumer-grade router and it’ll probably crash

So what do we do? Well, let’s set a limit on the number of concurrent threads:

import threading, functions, time
THREAD_LIMIT = 20
def thread(id, feed_url):
rss = functions.get_feed_contents(feed_url)
items = functions.parse_feed(rss)
functions.store_feed_items(id, items)
 
for id, feed_url in get_feed_list():
while threading.activeCount() &gt; THREAD_LIMIT:
time.sleep(1)
t = threading.Thread(target=thread, kwargs={"id":id, "feed_url":feed_url})
t.start()
while threading.activeCount() &gt; 1: time.sleep(1)

Spot the difference? We have another while loop right in the for loop. This is going to make our main thread code hang there while there are other threads still running.

There’s another problem though, and that’s with the model. In this mode, we’re continually creating new threads that live for a short time, then exit. This isn’t efficient. It would be much better to create a pool of threads which we can then re-use. Let’s kick this up a notch.

Implementaton 3: Thread pool, and a Queue

So in this version we’re going to do a few new things:

  1. Use a Queue object and populate it with the list of urls.
  2. Spawns a number of threads that will read items off this Queue.
  3. The threads will process the data and store it.

I’ll start off with the sample code then walk you through it:

import threading, functions, time, Queue
THREAD_LIMIT = 50
jobs = Queue.Queue(0) # arg1 means "No item limit"
 
def thread():
while True: # forever
try:
id, feed_url = jobs.get(False) # arg1 means "Don't wait for items to appear"
except Queue.Empty:
# Nothing left to do, time to die
return
rss = functions.get_feed_contents(feed_url)
items = functions.parse_feed(rss)
functions.store_feed_items(id, items)
 
for info in get_feed_list():
Queue.put(info)
 
for n in xrange(THREAD_LIMIT):
t = threading.Thread(target=thread)
t.start()
while threading.activeCount() &gt; 1: time.sleep(1) # Wait to finish

Lines to note:

  • 1: We create the job list. We use a Queue class from the standard library, as it’s threadsafe. In fact, this is exactly what it was designed for.
  • 7-11: Read from the queue. The False parameter means that once the queue is empty, we’re not interested any more. This raises the Queue.Empty exception, and we terminate the thread.
  • 16: Put lots of info into the queue

Our threads run in a loop, performing work in lines 12-14, until there is no more work, then they exit. This model will work just fine for the majority of people, however, there are (still) problems. They are:

  1. Opening 50 threads that write to a database will usually mean 50 database connections, or lots of locking. Either way, this is bad. Your data-getting threads don’t want to be sat around waiting on a DB lock when they could be getting more information. Worse, you don’t want to exceed your connection limit to the databse.
  2. In this model, you have one master thread doing nothing and 50 threads doing I/O and work. This isn’t a great idea in this application. Ideally you should only use threading (in Python) when you need to either:
    1. Wait for I/O
    2. Truly perform more than one concurrent task

Problem #2 isn’t so serious, but it would be better to have more control over the heavy lifting. However problem #1 definitely needs addressing. The solution is to shift all processing inline to the master thread, which takes care of all the processing.

Implementation 4: 1 worker, many runners

Here’s the code:

import threading, functions, time, Queue
THREAD_LIMIT = 50
jobs = Queue.Queue(0) # arg1 means "No item limit"
rss_to_process = Queue.Queue(THREAD_LIMIT) # We set a limit on this, I'll
# explain later
def thread():
while True: # forever
try:
id, feed_url = jobs.get(False) # arg1 means "Don't wait for items
# to appear"
except Queue.Empty:
# Nothing left to do, time to die
return
rss = functions.get_feed_contents(feed_url)
rss_to_process.put((id, rss), True) # This will block if our processing
# queue is too large
 
for info in get_feed_list(): # Load them up
jobs.put(info)
 
for n in xrange(THREAD_LIMIT): # Unleash the hounds
t = threading.Thread(target=thread)
t.start()
while threading.activeCount() &gt; 1 or not rss_to_process.empty():
# That condition means we want to do this loop if there are threads
# running OR there's stuff to process
try:
id, rss = rss_to_process.get(False, 1) # Wait for up to a second for a
# result
except Queue.Empty:
continue
items = functions.parse_feed(rss)
functions.store_feed_items(id, items)

Notes:

  • 4: Here we have a new Queue to return the RSS XML to the master thread. We set a limit on the size of the queue in case the threads return data faster than we can process it, meaning the queue will fill up with XML, burning memory. This sets a rudimentary rate limit on the threads, as they will block when the queue is full.
  • 15: This is the line in the thread that blocks when the processing queue is full.
  • 25-34: This is where all the heavy lifting takes place, all inline as well.

…and there you have it, a fully fledged multithreaded data collector. Not bad for a few hours’ work. It’s not finished though, as there are plenty of things you’d want to add to it. For example:

  • More exception handling, especially KeyboardInterrupt and other signals. I would advise importing the signal module, then wrapping our final loop in a big try/except block, catching KeyboardInterrupt. This would need to empty the job and processing queues, which will cause your threads to exit gracefully, and then your program will exit too.
  • Better support for multi-core machines. The Python global interpreter lock prevents more than one thread from running at a time, however you can still spread your threads over multiple cores (I believe). What you might want to consider is throwing a few os.fork() calls in, forcing your program to multiprocess, which could then take full advantage of multiple cores. You can do this by either dividing up your work queue at the start, or by moving the heavy lifting out of the main thread and into seperate processes. Your main thread could then communicate with these “worker” processes via shared memory or sockets (my preference) then pass back the results. Make sure your functions.parse_feed can produce picklable objects.
  • You could use GUI programming or ncurses to provide a progress bar, by inspecting the value of jobs.qsize()

Comments and suggestions welcome, and you can feel free to use the contact form.

Disclaimer

The code contained in this tutorial is not guaranteed to work, or even compile. It has never been executed and is not tested for syntax errors or other bugs. However it is semantically accurate and is provided “as-is”. I am not responsible for any loss of business, crashes, errors, marital crisis or nuclear wars caused as a result of using this code. Copy and paste at your own risk.

Bronco - Digital Marketing Agency
Making your inbox more interesting
Looking to keep up to date, or find out those things we can’t mention on the blog? Then sign up to our semi-regular newsletter. Don’t worry, we won’t spam you.

27 Comments

Get in Touch

Things are better when they’re made simpler. That’s why the David Naylor blog is now just that; a blog. No sales pages, no contact form - just interesting* info about SEO.

If you’d like to find out more about the Digital Marketing services we do provide then head over to Bronco (our main company website) to get in touch.

Get in Touch Today * Interestingness not guaranteed
Part of the Bronco family