transient queues leak memory

Asked by Adam Goodman

We've written code in which a long-running daemon frequently creates and deletes temporary queues, but there does not seem to be any way in txamqp to properly clean up a queue once it is no longer needed. (Specifically, I'm referring to the TimeoutDeferredQueue instances in the AMQClient.queues dictionary). I've included some sample code below to demonstrate this...

I think that I could just work around this problem by doing "del client.queues[key]" after calling queue_delete and closing the queue, but this seems a bit crude. Is there a mechanism to do this cleanly? (Should there be one?)

Also, an aside: is there really any need for the deferredLock in this code in AMQClient in protocol.py?

    @defer.inlineCallbacks
    def queue(self, key):
        yield self.queueLock.acquire()
        try:
            try:
                q = self.queues[key]
            except KeyError:
                q = TimeoutDeferredQueue()
                self.queues[key] = q
        finally:
            self.queueLock.release()
        defer.returnValue(q)

There are no yield statements between the acquire() and release() calls, so it doesn't seem like anything else could run concurrent with this function...

Thanks,
- Adam

-----

from twisted.internet import defer, reactor
from twisted.internet.protocol import ClientCreator
from txamqp.protocol import AMQClient
from txamqp.client import TwistedDelegate
import txamqp.spec

class LeakTest(object):
    def __init__(self, host, port, user, password, vhost, exchange, specfile):
        self.host = host
        self.port = port
        self.user = user
        self.password = password
        self.vhost = vhost
        self.exchange = exchange
        self.specfile = specfile

    @defer.inlineCallbacks
    def temp_queue(self, client, channel, key):
        yield channel.queue_declare(queue=key, durable=True,
                                    exclusive=False, auto_delete=True)
        yield channel.queue_bind(queue=key, exchange=self.exchange,
                                 routing_key=key)

        sub = yield channel.basic_consume(queue=key, no_ack=True)
        queue = yield client.queue(sub.consumer_tag)

        # XXX normally would do 'yield queue.get()' here or something

        queue.close()
        yield channel.queue_delete(queue=key)

    @defer.inlineCallbacks
    def run(self):
        spec = txamqp.spec.load(self.specfile)
        delegate = TwistedDelegate()
        cc = ClientCreator(reactor, AMQClient, delegate=delegate,
                           vhost=self.vhost, spec=spec)

        # connect and auth
        client = yield cc.connectTCP(self.host, self.port)
        yield client.authenticate(self.user, self.password)

        # create a channel
        channel = yield client.channel(1)
        yield channel.channel_open()

        # create exchange
        yield channel.exchange_declare(exchange=self.exchange, type='topic',
                                       durable=True, auto_delete=False)

        # create and delete 100 queues
        for i in xrange(100):
            yield self.temp_queue(client, channel, 'leaktest.queue.%d' % i)

        # XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
        # here lies the problem:
        # there are now 100 queues in this dictionary, even though
        # we no longer need any of them!
        # XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
        print client.queues

        reactor.stop()

if __name__ == '__main__':
    l = LeakTest(...)

    reactor.callWhenRunning(l.run)
    reactor.run()

Question information

Language:
English Edit question
Status:
Open
For:
txAMQP Edit question
Assignee:
No assignee Edit question
Last query:
Last reply:
Revision history for this message
Esteve Fernandez (esteve) said :
#1

Thanks for testing this. Indeed, it seems txAMQP leaks memory and your solution is perfectly valid, we just need to integrate it so it's transparent.

Regarding the use of DeferredLock, you're right. I keep meaning to remove it and simplify how queues and channels are allocated, but I can't seem to find time for it. I'll try to do it in the following days.

Can you help with this problem?

Provide an answer of your own, or ask Adam Goodman for more information if necessary.

To post a message you must log in.