Timeout On Publisher Call

Asked by Dan Di Spaltro

I have a thrift service which makes a call to another service. If that service stops listening for some reason, but the AMQP software still works, I was experimenting with inlineCallbacks and yield, on basically making the timeout work the way I want.

In summary, I want a publisher service that never gets a result from a call to generate an error or something so I don't just wait on yield (which would happen indefinitely) However, I looked at the timeout code you have, and I went into the txamqp contrib thrift source and added a timeout it would fail to keep executing because it will generate an "Empty" error and will most likely kill this "queue.get(timeout=10).addCallback(parseclient.... " code path. I am not sure how I should go about addressing this.

What I have right now is basically I just get rid of the inlineCallbacks and just issue a addCallback and addErrback, to keep things from being unable to execute. I also experimented with a callLater(15, issueErrbackOnDeferred), but then if the timeout I am not sure if I am just missing something or what. Thanks again!

Question information

Language:
English Edit question
Status:
Solved
For:
txAMQP Edit question
Assignee:
No assignee Edit question
Solved by:
Dan Di Spaltro
Solved:
Last query:
Last reply:
Revision history for this message
Esteve Fernandez (esteve) said :
#1

Hi Dan. I'm not sure if I get what you want (it's a bit late here and I'm a bit sleepy), but if you use:

yield queue.get(timeout=10)

a Timeout exception should be raised. If you prefer the traditional idiom, you can use:

queue.get(timeout=10).addCallbacks(myCallback, myErrback)

Could you check if it works the way you want?

Revision history for this message
Dan Di Spaltro (dan-dispaltro) said :
#2

Okay if I look at the createThriftClient and parseServerMessage of the contrib.thrift package, the problem is that the queue.get(timeout=10).addCallback(parseServer....) times out and fires some errback which gets lost because nothing catches it I guess.

Moreover, that deferred is different than the deferred I am waiting for. That is the "pluck off the queue" deferred and I am waiting for the "matched response for service request" deferred. So I was just curious if you have anything on your radar to address the latter case. I could try and submit a patch but there is still a lot about txamqp I don't fully understand.

Hopefully I am being decently clear, my previous post was a little scatterbrain.

Revision history for this message
Esteve Fernandez (esteve) said :
#3

And would something like this work for you?

@defer.inlineCallbacks
def myFunc(client):
    d = client.doSomethingRemote()
    d.setTimeout(10) # 10 seconds
    try:
        yield d
    except TimeoutError:
        print "Expected a response after 10 seconds"

More info about setTimeout: http://twistedmatrix.com/documents/current/api/twisted.internet.defer.Deferred.html#setTimeout

Revision history for this message
Dan Di Spaltro (dan-dispaltro) said :
#4

Yeah that works, is there any way to perform a cleanup on the "waiting" deferred list, or does that happen in the background? I am not sure if that is an issue or not, just trying to track down some memory leaks with our production app. Thanks again!

Revision history for this message
Esteve Fernandez (esteve) said :
#5

That's an interesting question. If a request expires, its deferred should be removed from the outstanding requests dictionary. However, if a response is received even after the timeout exception is raised, the request deferred will raise an AlreadyCalledError exception. I'll try to fix this behavior.

Revision history for this message
Dan Di Spaltro (dan-dispaltro) said :
#6

Thanks for the response, that would be excellent. I was looking through the code and it looks you started to address that in client.py with the alreadyCalled flag.

Oh btw, I have gotten that error randomly on disconnect from the AMQP server. I could post this as a seperate bug if you want.

2009-02-18 11:30:33-0800 [AMQPublisher,client] Unhandled error in Deferred:
2009-02-18 11:30:33-0800 [AMQPublisher,client] Unhandled Error
        Traceback (most recent call last):
          File "/usr/lib/python2.5/site-packages/twisted/internet/defer.py", line 269, in errback
            self._startRunCallbacks(fail)
          File "/usr/lib/python2.5/site-packages/twisted/internet/defer.py", line 312, in _startRunCallbacks
            self._runCallbacks()
          File "/usr/lib/python2.5/site-packages/twisted/internet/defer.py", line 328, in _runCallbacks
            self.result = callback(self.result, *args, **kw)
          File "/usr/lib/python2.5/site-packages/twisted/internet/defer.py", line 757, in gotResult
            _inlineCallbacks(r, g, deferred)
        --- <exception caught here> ---
          File "/usr/lib/python2.5/site-packages/twisted/internet/defer.py", line 735, in _inlineCallbacks
            result = result.throwExceptionIntoGenerator(g)
          File "/usr/lib/python2.5/site-packages/twisted/python/failure.py", line 331, in throwExceptionIntoGenerator
            return g.throw(self.type, self.value, self.tb)
          File "/usr/lib64/python2.5/site-packages/txamqp/protocol.py", line 274, in worker
            self.close(e)
          File "/usr/lib64/python2.5/site-packages/txamqp/protocol.py", line 261, in close
            self.delegate.close(reason)
          File "/usr/lib64/python2.5/site-packages/txamqp/client.py", line 46, in close
            self.client.started.reset()
          File "/usr/lib64/python2.5/site-packages/txamqp/client.py", line 21, in reset
            deferred.callback(True)
          File "/usr/lib/python2.5/site-packages/twisted/internet/defer.py", line 243, in callback
            self._startRunCallbacks(result)
          File "/usr/lib/python2.5/site-packages/twisted/internet/defer.py", line 298, in _startRunCallbacks
            raise AlreadyCalledError
        twisted.internet.defer.AlreadyCalledError: