Anyway to catch a NOT_FOUND error when publishing to a non-existant exchange?

Asked by reverri

Application stops after trying to publish to an exchange that does not exist.
msg = Content(body)
msg["delivery mode"] = 2
chan.basic_publish(exchange="does_not_exist", content=msg, routing_key="")

Anyway to catch this?

Question information

Language:
English Edit question
Status:
Solved
For:
txAMQP Edit question
Assignee:
No assignee Edit question
Solved by:
Esteve Fernandez
Solved:
Last query:
Last reply:
Revision history for this message
Esteve Fernandez (esteve) said :
#1

It's part of the AMQP standard, when you try to send to an exchange, the broker sends a channel_close message. You may want to play with the "immediate" flag, so you'll receive a basic_return message, if your broker implements this behavior.

Could you attach a detailed log of your broker?

Revision history for this message
reverri (reverri) said :
#2

I am using RabbitMQ 1.5.3.

My test is based on the example txamqp_publisher.py script.

I modified the script to publish to a non-existent exchange:
@inlineCallbacks
def gotConnection(conn, authentication, body):
 yield conn.start(authentication)

 chan = yield conn.channel(1)
 yield chan.channel_open()

 yield chan.queue_declare(queue="q", durable=False, exclusive=False, auto_delete=False)
 yield chan.exchange_declare(exchange="exists", type="direct", durable=False, auto_delete=False)
 yield chan.queue_bind(queue="q", exchange="exists", routing_key="")

 msg = Content(body)
 msg["delivery mode"] = 2
 chan.basic_publish(exchange="does_not_exist", content=msg, routing_key="")

 chan.basic_publish(exchange="exists", content=msg, routing_key="")

 yield chan.channel_close()

 chan0 = yield conn.channel(0)
 yield chan0.connection_close()

 reactor.stop()

The rabbit log file has the following:
=INFO REPORT==== 1-Mar-2009::13:22:46 ===
accepted TCP connection on 0.0.0.0:5672 from 127.0.0.1:50069

=INFO REPORT==== 1-Mar-2009::13:22:46 ===
starting TCP connection <0.339.0> from 127.0.0.1:50069

=ERROR REPORT==== 1-Mar-2009::13:22:46 ===
connection <0.339.0> (running), channel 1 - error:
{amqp,not_found,"no exchange 'does_not_exist' in vhost '/'",'basic.publish'}

When running the script it just stops until I kill it with CTRL+C. After killing the process I get:
Unhandled error in Deferred:
Traceback (most recent call last):
  File "/Library/Python/2.5/site-packages/Twisted-8.2.0-py2.5-macosx-10.5-i386.egg/twisted/internet/defer.py", line 269, in errback
    self._startRunCallbacks(fail)
  File "/Library/Python/2.5/site-packages/Twisted-8.2.0-py2.5-macosx-10.5-i386.egg/twisted/internet/defer.py", line 312, in _startRunCallbacks
    self._runCallbacks()
  File "/Library/Python/2.5/site-packages/Twisted-8.2.0-py2.5-macosx-10.5-i386.egg/twisted/internet/defer.py", line 328, in _runCallbacks
    self.result = callback(self.result, *args, **kw)
  File "/Library/Python/2.5/site-packages/Twisted-8.2.0-py2.5-macosx-10.5-i386.egg/twisted/internet/defer.py", line 771, in gotResult
    _inlineCallbacks(r, g, deferred)
--- <exception caught here> ---
  File "/Library/Python/2.5/site-packages/Twisted-8.2.0-py2.5-macosx-10.5-i386.egg/twisted/internet/defer.py", line 749, in _inlineCallbacks
    result = result.throwExceptionIntoGenerator(g)
  File "/Library/Python/2.5/site-packages/Twisted-8.2.0-py2.5-macosx-10.5-i386.egg/twisted/python/failure.py", line 338, in throwExceptionIntoGenerator
    return g.throw(self.type, self.value, self.tb)
  File "txamqp_publisher.py", line 26, in gotConnection
    yield chan.channel_close()
  File "/Library/Python/2.5/site-packages/Twisted-8.2.0-py2.5-macosx-10.5-i386.egg/twisted/internet/defer.py", line 749, in _inlineCallbacks
    result = result.throwExceptionIntoGenerator(g)
  File "/Library/Python/2.5/site-packages/Twisted-8.2.0-py2.5-macosx-10.5-i386.egg/twisted/python/failure.py", line 338, in throwExceptionIntoGenerator
    return g.throw(self.type, self.value, self.tb)
  File "/Library/Python/2.5/site-packages/txamqp/protocol.py", line 85, in invoke
    raise Closed(self.reason)
txamqp.client.Closed: Method(name=close, id=40) (404, "NOT_FOUND - no exchange 'does_not_exist' in vhost '/'", 60, 40) content = None

Sorry for the long post. I will experiment with the 'immediate' flag and see what I can figure out. I was hoping to detect the channel close and automatically re-open the channel and continue with the rest of application.

Thanks,
Dan

Revision history for this message
Esteve Fernandez (esteve) said :
#3

The key is in the first line of your traceback:

"Unhandled error in Deferred"

If you want to catch these errors, you must handle both callbacks and errbacks. Depending in how your script is implemented you can either do this:

(using inlineCallbacks)

try:
    yield chan.channel_close()
except Closed:
    # Handle this exception, for example:
    print "Channel already closed!"
    reactor.stop() # This will stop your program

(using traditional callbacks):

chan.channel_close().addCallbacks(yourCallbackFunction, yourErrbackFunction)

Let me know if it works for you.

Revision history for this message
reverri (reverri) said :
#4

OK, I can catch the exception for channel_close(). I'm going to see if I can catch the channel close for other actions such as basic_publish(); if I am able to catch this for other actions would there be anyway of re-opening the channel without tearing down the connection and stopping the reactor?

Revision history for this message
Best Esteve Fernandez (esteve) said :
#5

Hi. You don't need to close the connection or stop the reactor at all. If you get an exception for a closed channel, you'll have to reopen it:

channel1 = yield client.channel(1)
yield channel1.channel_open()

keep in mind that all the consumers, that you previously created using the closed channel, will be lost. Also, if any of your queues and exchanges are declared with the auto_delete flag set and there aren't more consumers bound to them, they will be lost as well.

Revision history for this message
reverri (reverri) said :
#6

Thanks Esteve Fernandez, that solved my question.

Revision history for this message
dyneol (boo-lixum) said :
#7

Hello !

I have a similar Problem. But Esteve's solution won't work. Every channel_open() call raises the Closed exception. I cannot do anything with the channel without raising this Exception.

Any ideas ?

Revision history for this message
reverri (reverri) said :
#8

I ran into the same problem. I didn't have a chance to look into it further, I just reset the entire connection.

I think some changes to creating channels may need to be considered.

For now, you may be able to del the channel entry in channels on AMQClient. The channel() method checks to see if this key exists before creating a new channel. The closing of a channel does not remove its corresponding key from the dict so channel(1) will always return the closed channel. You can try:
del client.channels[1]
channel1 = yield client.channel(1)
yield channel1.channel_open()

I have not tested this so I don't know if it will work.

Revision history for this message
dyneol (boo-lixum) said :
#9

Nice hint, thanks. Should be solvable now.