controlling queue.pending size

Asked by omgpants

I've been noticing the memory consumption of my application increasing as it runs. I've narrowed it down with the help of guppy /heapy to being a large number of txamqp.message.Message and txamqp.content.Content objects. 2.3 Million of them (or 4.1gb of memory.).
Looking closer at my queues while my application is paused (debugging with twisted manhole) I noticed my queue reference: txamqp.queue.TimeoutDeferredQueue's pending attribute's len(ref.queue.pending) = 1936252. (I have other queues running as well that are getting full). I'm calling all of the queue's get methods in generators such as:
def do_stuff(queue):
    while 1:
        d = queue.get()
        d.addCallback(handle_event).addErrback(log_error)
        yield d

As I said I have a lot of these queues running in a coiterate'd fashion. So I am assuming that I'm reading in events but not fully handling them (interrupted by other work) which is causing them to pile up in the pending state. What i'm wondering is, is there any way in txamqp to limit the size of the pending attribute? I'd rather all the messages sit on rabbitmq's side until i'm ready to fully handle them. Or do I need to do some scheduling logic with all of my iterators so that they never build up to begin with? Like set prioritization of my queues and make sure higher priority queues do more work before it piles up.
I realize this isn't much information to go off, if you need more I will supply as much as I can.
Thanks

Question information

Language:
English Edit question
Status:
Solved
For:
txAMQP Edit question
Assignee:
No assignee Edit question
Solved by:
omgpants
Solved:
Last query:
Last reply:
Revision history for this message
omgpants (omgwherearemypants) said :
#1

Never mind, looks like i'm going to have to drop fanout and use routing keys + basic_qos's prefetch ability.