Thursday, February 19, 2009

Twisted Sleep

Q: "How do I sleep() in the context of a twisted application without blocking the reactor?"



Here's a simple way:




def sleep(secs):
d = Deferred()
reactor.callLater(secs, d.callback, None)
return d


And here's how you might use the sleep function:




@inlineCallbacks
def doSomeStuff(stuff, throttle):
for thing in stuff:
yield someThing.send(thing)
yield sleep(throttle)


But it's better to just let the reactor schedule things for you:





def doSomeStuff(stuff, throttle):
if not stuff:
return defer.succeed(None)
d = Deferred()
work = list(stuff)
def sendNext()
if not work:
loop.stop()
d.callback(None)
next = work.pop(0)
return someThing.send(next)
loop = LoopingCall(sendNext)
loop.start(throttle)
return d

Monday, February 9, 2009

Notes on Upgrading a RabbitMQ Cluster

SURGEON GENERAL'S WARNING: This following is by no means a helpful guide to upgrading a RabbitMQ cluster ... yet. In fact, my description here might very well be the "wrong" way to do it. The purpose of this posting is to hopefully solicit comments from other RabbitMQ users and decide on the sanest way to upgrade a cluster - including methods for coping with live upgrades (zero downtime).

Also note that the following assumes a RPM-based Linux distribution; although the instructions should be simple to adapt to non-RPM-based Linux distributions.

1. If current node is disc node, remove from cluster and reset state. For example, on disc nodeA which is part of disc cluster (hostA, hostB, hostC), run the following commands as root.


$ rabbitmqctl stop_app
$ rabbitmqctl cluster rabbit@hostB rabbit@hostC
$ rabbitmqctl reset
$ rabbitmqctl start_app


2. Shutdown rabbitmq.


$ /etc/init.d/rabbitmq-server stop





3. Remove mnesia files; hopefully there's a nicer way of doing this - for example, is there a safe way to preserve message journal across versions?


$ rm -rf /var/lib/rabbitmq/mnesia/rabbit/*


4. Upgrade with new rpm.


$ rpm -U --force rabbitmq-server-.rpm


5. Reconfigure rabbitmq node per requirements for your system; configure users, etc. I believe this only should be done on first node in cluster. The other nodes should use this to seed their configuration upon joining the cluster.


6. If current node is disc node, add to cluster (after all nodes in disc cluster have been upgraded). For example if current node is hostA and part of cluster (hostA, hostB, hostC):


$ rabbitmqctl stop_app
$ rabbitmqctl force_reset
$ rabbitmqctl cluster rabbit@hostA rabbit@hostB rabbit@hostC
$ rabbitmqctl start_app


This last step is where I the least confidence. For starters, without forcefully resetting the node, I got this error on trying to cluster the nodes: "Bad cookie in table definition amqqueue." Secondly, I had not realized this before, but upon joining the cluster the local node will pick up all the settings from the synchronized disc nodes - including configured users and passwords.

Wednesday, November 5, 2008

finally

I always thought I understand `finally' in the context of exception handling in either Java or Python. Now, I'm not sure I really understand it. Riddle: What does the following print?




def f():
try:
return 1
finally:
return 2
print f()


Ah ... never mind ... I understand it again now. It means: finally.

Tuesday, October 28, 2008

Class Variables

My dreaded multiple inheritance article was published in Python Magazine yesterday. It was tricky enough giving a presentation on the subject, so I look at the article as somewhat of an achievement. Someone contacted me shortly after concerning my use of class variable in one of the examples:




class Publishable(object):
published = False
def __init__(self, start_date, end_date):
self.start_date = start_date
self.end_date = end_date
def publish(self):
self.published = True


The inquirer's question:


I've always been under the impression having class & instance
variables with the same name is just confusing and not a good way to
write Python. Why did you do this?



Anyhow, on the subject of "class variables," here are my thoughts:



[Begin email response]




I guess it depends on who you ask. The pattern of using class level
variable as a default values for instances is one I like and don't
find confusing. But these kind of things are always subjective. It's
not my own pattern either, but one I've noticed in other projects.



http://twistedmatrix.com/trac/browser/tags/releases/twisted-8.1.0/twisted/internet/defer.py#L137
http://trac.pythonpaste.org/pythonpaste/browser/Paste/WebOb/trunk/webob/__init__.py#L474


I make a distinction, though, between a "class variable" and a
variable defined at the class level. In terms of the VM, they are of
course the same thing, but are differentiated in human terms by uses
cases. The former "class variable" is useful if you are tracking
state on a class - for example keeping a count of objects created
through a class method:




class Foo:
_created = 0
@classmethod
def create(cls):
foo = Foo()
cls._created += 1
return foo



The latter "class level variable" can be useful for defining default
values for attributes (especially ones which cannot be specified in
the constructor):




class Task:
completed = False
def __init__(self):
self.start_time = time.time()



(This is same pattern I'm adopting in my article.) Of course, when
the "completed" attribute is looked up on an instance of Task, it has
to take an extra step - first looking in dict of instance and then in
the class. However, this also means a decent space optimization if
your ratio of incomplete tasks to complete tasks is very high - once a
task is marked as completed it is likely to get discarded and hence
garbage collected.




I think it's only confusing (again, this is very subjective) if you
try to mix the two uses cases in one class:




class Confusing:
completed = 0
def __init__(self):
self.completed = False
@classmethod
def complete(cls, confusingInstance):
cls.completed += 1
confusingInstance.completed = True


Thanks for the feedback.



[End email response]

Thursday, October 9, 2008

Lexical Scope

So, briefly on the topic of lexical scope, what does the following print?


def f():
funcs = []
for i in range(5):
def g():
print i
funcs.append(g)
return funcs

funcs = f()
for func in funcs:
func()

Wednesday, October 8, 2008

Deferred Task Queue

Problem: You have a queue of tasks to manage but only n should run at a time. An example of this is some CPU-intensive task where running more than a fixed number of tasks decreases your overall throughput due to context switching. Sometimes the precise number is simplified to the number of CPUs you have to work with. ( Insert actual numbers here ;)



A simple queuing mechanism is the perhaps the most obvious solution to the above problem. I wrote one Task Queue implementation which was pretty terrible and required you to "pump" initial events - but only a certain number to "start" the queue. I won't post the code for that here. However, I revisited the problem yesterday and came up with some simple code which seems to do the job nicely. (Note, this is for an application using Twisted, hence "Deferred Task Queue". In a producer/consumer thread-based model where consumers are threads in fixed-size thread-pool, the problem is already solved by just using Python's Queue and letting the consumers pull jobs off the queue - i.e. the size of thread pool dictates how many jobs can run concurrently.)




class TaskQueue:

def __init__(self, concurrentMax=cpuCount()):
self.concurrentMax = concurrentMax
self._running = 0
self._queued = []

def push(self, f, *args, **kwargs):
if self._running < self.concurrentMax:
self._running += 1
return f(*args, **kwargs).addBoth(self._try_queued)
d = defer.Deferred()
self._queued.append((f, args, kwargs, d))
return d

def _try_queued(self, r):
self._running -= 1
if self._running < self.concurrentMax and self._queued:
f, args, kwargs, d = self._queued.pop(0)
self._running += 1
actuald = f(*args, **kwargs).addBoth(self._try_queued)
actuald.chainDeferred(d)
if isinstance(r, failure.Failure):
r.trap()
return r



Note that the above implementation is missing a notion of "capacity" - which might be important for a more general solution. My application actually handles capacity external to queue, but there might be some benefit in internalizing the concept and raising exceptions on push() when capacity is exceeded. I'm still undecided on this.




The interface is pretty straightforward. You have a function f (and its arguments) that returns a Deferred and that you want to call (eventually). For example, doSomeStuff() below simply returns a Deferred object that will fire after 2 seconds have elapsed:


def doSomeStuff(a, b=None):
print 'doSomeStuff(%s, %s) called: %f' % (a, b, time.time())
def finishUp():
print 'doSomeStuff(%s, %s) finished: %f' % (a, b, time.time())
d.callback('done %d %d' % (a, b))
d = defer.Deferred()
reactor.callLater(2.0, finishUp)
return d


Let's queue up some calls to doSomeStuff()


taskq = TaskQueue(3)
taskq.push(doSomeStuff, 1, b=2)
taskq.push(doSomeStuff, 2, b=3)
taskq.push(doSomeStuff, 3, b=4)
taskq.push(doSomeStuff, 4, b=5)
taskq.push(doSomeStuff, 5, b=6)


The output of the above would be something like this:


doSomeStuff(1, 2) called: 1223472790.943929
doSomeStuff(2, 3) called: 1223472790.944112
doSomeStuff(1, 2) finished: 1223472792.947769
doSomeStuff(3, 4) called: 1223472792.947887
doSomeStuff(2, 3) finished: 1223472792.948004
doSomeStuff(4, 5) called: 1223472792.948162
doSomeStuff(3, 4) finished: 1223472794.951818
doSomeStuff(5, 6) called: 1223472794.951937
doSomeStuff(4, 5) finished: 1223472794.952080
doSomeStuff(5, 6) finished: 1223472796.955836



As you can see, as soon as one of the called functions completes its job, the next one queued is called. There is no need to tell the queue to start doing its work - just push jobs onto the queue.




The technique that makes this work is simply exploiting the elegance of Deferreds by "sneaking" in a check for pushed jobs that have been queued via Deferred's addBoth() method. For those of you unfamiliar with how a Deferred works in Twisted, you should the deferred section of Twisted's asynchronous programming guide and then this document on Deferreds ... and maybe this one too.

What makes this useful is that I can treat a call to push() as if it were simply a call to the function being queued - no other messy API to tell the queue which callbacks or errbacks need to be invoked when the function is finally called. The internal function _try_queued() acts as a transparent pass-through gateway so the caller to push() doesn't need to worry about adding a funky callback to translate some wrapped value or otherwise - again I'm eschewing unnecessary API details. So for example:




# this
taskq.push(doSomeStuff, 1, b=2).addCallback(cb).addErrback(eb)

# is the same as
doSomeStuff(1, b=2).addCallback(cb).addErrback(eb)

# ... just with queuing behavior under the hood


In closing, in the context of asynchronous programming or otherwise, I've begun to strongly believe "the best API is no API".

Friday, August 29, 2008

Python's vars()

The builtin function vars() in Python will give you the argument's __dict__ or the namespace of the object. That is the namespace of the object not the object and the names belonging to its class - an important distinction. In short, it is simple a syntactic nicety over referencing the property __dict__ directly. For example, contrast the following:


>>> namespace = vars(o)


With:


>>> names = o.__dict__


But for introspecting objects to produce some custom serialized form (json comes to mind), this is likely not what you want to do. This post will attempt to explain why not.

Say we want to explicitly convert an object to some more primitive form (a dictionary or list) and we use vars() to introspect the attributes of the object. Maybe our serialization function will also account for things we don't want to serialize - like things we don't want to expose over the network for security or bandwidth reasons. Here's a simple example:


class Thing(object):
def __init__(self, a, b, secret=None)
self.a = a
self.b = b
self.secret = secret
self._fd = open('file.txt')

def dictifyThing(thing):
blacklisted = ['secret', '_fd']
data = dict(vars(thing))
for bl in blacklisted:
del data[bl]
return data


That will work fine for this example. And so we're happy:


>>> thing = Thing( 1, 2, "don't tell anyone" )
>>> dictifyThing(thing)
{'a': 1, 'b': 2}


A great (though not unique) feature of Python are properties which allows us to make derived read-only attributes on an object or attributes that carry side-effects upon setting. For example, we might have an extension of Thing where a is read-only and derived from the value of b:


AnotherThing(Thing):

def __init__(self, b, secret=None):
self.b = b
self._fd = open('data.txt')

@property
def a(self):
return self.b - 1


However, dictifyThing() will no longer work the way we want it to:


>>> thing = AnotherThing(2, "open sesame")
>>> dictifyThing(thing)
{ 'b': 2 }


One should expect this behavior, because a, being a derived property, isn't actually part of the namespace of an instance of AnotherThing. This conversely illustrates another advantage of properties: calculating the value of a and setting as a normal attribute is undesirable both for space efficiency reasons and the fact that we wouldn't have a definite way of constraining the numerical relationship between a and b.

Does this mean we can't write a decent version dictifyThing()? No, probably the more commonly known dir() will give us all the referenceable names of an object - which then must include class attributes. We can use this to write something nicer:


def dictifyThing(thing):
blacklisted = ['secret']
data = {}
for name in ( n for n in dir(thing) if n[0] != '_' and
n not in blacklisted ):
data[name] = getattr(thing, name)
return name


Now our encoding scheme works nicely for both Thing and AnotherThing. A small improvement would be to use a higher-level function make defining new encoders simpler:


def _encode(include=(), exclude=()):
def f(o):
data = {}
if include:
for name in include:
data[name] = getattr(o, name)
return data
for name in ( n for n in dir(thing) if n[0] != '_' and
n not in exclude ):
data[name] = getattr(thing, name)
return data
return f

encodeThing = _encode(exclude=('secret'))
encodeFoo = _encode(include=('a','b','c'))