Problem: You have a queue of tasks to manage but only n should run at a time. An example of this is some CPU-intensive task where running more than a fixed number of tasks decreases your overall throughput due to context switching. Sometimes the precise number is simplified to the number of CPUs you have to work with. ( Insert actual numbers here ;)
A simple queuing mechanism is the perhaps the most obvious solution to the above problem. I wrote one Task Queue implementation which was pretty terrible and required you to "pump" initial events - but only a certain number to "start" the queue. I won't post the code for that here. However, I revisited the problem yesterday and came up with some simple code which seems to do the job nicely. (Note, this is for an application using Twisted, hence "Deferred Task Queue". In a producer/consumer thread-based model where consumers are threads in fixed-size thread-pool, the problem is already solved by just using Python's Queue and letting the consumers pull jobs off the queue - i.e. the size of thread pool dictates how many jobs can run concurrently.)
class TaskQueue:
def __init__(self, concurrentMax=cpuCount()):
self.concurrentMax = concurrentMax
self._running = 0
self._queued = []
def push(self, f, *args, **kwargs):
if self._running < self.concurrentMax:
self._running += 1
return f(*args, **kwargs).addBoth(self._try_queued)
d = defer.Deferred()
self._queued.append((f, args, kwargs, d))
return d
def _try_queued(self, r):
self._running -= 1
if self._running < self.concurrentMax and self._queued:
f, args, kwargs, d = self._queued.pop(0)
self._running += 1
actuald = f(*args, **kwargs).addBoth(self._try_queued)
actuald.chainDeferred(d)
if isinstance(r, failure.Failure):
r.trap()
return r
Note that the above implementation is missing a notion of "capacity" - which might be important for a more general solution. My application actually handles capacity external to queue, but there might be some benefit in internalizing the concept and raising exceptions on push() when capacity is exceeded. I'm still undecided on this.
The interface is pretty straightforward. You have a function f (and its arguments) that returns a Deferred and that you want to call (eventually). For example, doSomeStuff() below simply returns a Deferred object that will fire after 2 seconds have elapsed:
def doSomeStuff(a, b=None):
print 'doSomeStuff(%s, %s) called: %f' % (a, b, time.time())
def finishUp():
print 'doSomeStuff(%s, %s) finished: %f' % (a, b, time.time())
d.callback('done %d %d' % (a, b))
d = defer.Deferred()
reactor.callLater(2.0, finishUp)
return d
Let's queue up some calls to doSomeStuff()
taskq = TaskQueue(3)
taskq.push(doSomeStuff, 1, b=2)
taskq.push(doSomeStuff, 2, b=3)
taskq.push(doSomeStuff, 3, b=4)
taskq.push(doSomeStuff, 4, b=5)
taskq.push(doSomeStuff, 5, b=6)
The output of the above would be something like this:
doSomeStuff(1, 2) called: 1223472790.943929
doSomeStuff(2, 3) called: 1223472790.944112
doSomeStuff(1, 2) finished: 1223472792.947769
doSomeStuff(3, 4) called: 1223472792.947887
doSomeStuff(2, 3) finished: 1223472792.948004
doSomeStuff(4, 5) called: 1223472792.948162
doSomeStuff(3, 4) finished: 1223472794.951818
doSomeStuff(5, 6) called: 1223472794.951937
doSomeStuff(4, 5) finished: 1223472794.952080
doSomeStuff(5, 6) finished: 1223472796.955836
As you can see, as soon as one of the called functions completes its job, the next one queued is called. There is no need to tell the queue to start doing its work - just push jobs onto the queue.
The technique that makes this work is simply exploiting the elegance of Deferreds by "sneaking" in a check for pushed jobs that have been queued via Deferred's addBoth() method. For those of you unfamiliar with how a Deferred works in Twisted, you should the deferred section of Twisted's asynchronous programming guide and then this document on Deferreds ... and maybe this one too.
What makes this useful is that I can treat a call to push() as if it were simply a call to the function being queued - no other messy API to tell the queue which callbacks or errbacks need to be invoked when the function is finally called. The internal function _try_queued() acts as a transparent pass-through gateway so the caller to push() doesn't need to worry about adding a funky callback to translate some wrapped value or otherwise - again I'm eschewing unnecessary API details. So for example:
# this
taskq.push(doSomeStuff, 1, b=2).addCallback(cb).addErrback(eb)
# is the same as
doSomeStuff(1, b=2).addCallback(cb).addErrback(eb)
# ... just with queuing behavior under the hood
In closing, in the context of asynchronous programming or otherwise, I've begun to strongly believe "the best API is no API".


0 comments:
Post a Comment