How to create a empty topic in rabbitmq queue list for nova components

Asked by Sateesh Bodla

Hi,

I want to create an empty topic queue as well as exchange in the Rabbitmq queue using kombu. I have seen the code in nova impl_kombu.py of rpc module, but it is implementing the consumers. I need to create a empty queue so that if any service (nova scheduler, compute, network) is not yet started and the queue doesn't exist in the rabbitmq list_queue then the message will be dropped. So in order to not to drop the messages, I will create a empty queue for the service before there is any rpc.call / cast call for the component.

Regards,
Sateesh B.

Question information

Revision history for this message
John Garbutt (johngarbutt) said :
#1

Currently the queues are created (if they are not already there) when each service starts. The code for that, used to be in the manager base class.

However, the queue is created before the service is registered in the database (as far as I can tell).

What problem are you seeing / trying to solve?

Revision history for this message
Russell Bryant (russellb) said :
#2

Take a look at the code for the NotifyPublisher in impl_kombu.py. This code does exactly what you're talking about. It is making sure a queue exists before it sends a message.

https://github.com/openstack/nova/blob/master/nova/rpc/impl_kombu.py#L308

Revision history for this message
Sateesh Bodla (sateesh-bodla) said :
#3

Hi,

I am trying to resolve the bug 919835. To resolve that we need to create a empty queue for the component in rabbitmq to hold the message. As there will be no queue created before any service is started. Please let me know how to call that reconnect method from the call / cast method in impl_kombu.py file.

I have written following code for that. Created a new function named create_empty_queue in Class Connection of impl_kombu.py.

    def create_empty_queue(self, topic):
        options = {'durable': FLAGS.rabbit_durable_queues,
                'auto_delete': False,
                'exclusive': False}
        topic_exchange = kombu.entity.Exchange(
                name=FLAGS.control_exchange,
                type='topic',
                durable=options['durable'],
                auto_delete=options['auto_delete'])
        LOG.info(_("from create_empty_queue"))
        self.channel = self.connection.channel()

        queue = kombu.entity.Queue(channel=self.channel,
                exchange=topic_exchange,
                name=topic,
                routing_key=topic)
        queue.declare()

Calling this empty queue function in function "def call" and "def cast" as follows:

    conn = rpc.create_connection(new=True)
    conn.create_empty_queue(topic)
    node_topic = '%s.%s' % (self.topic, self.host)
    conn.create_empty_queue(node_topic)

But I am getting some exceptions:

2012-04-02 16:44:24 ERROR nova.rpc.common [-] Exception during message handling
(nova.rpc.common): TRACE: Traceback (most recent call last):
(nova.rpc.common): TRACE: File "/usr/lib/python2.7/dist-packages/nova/rpc/amqp.py", line 250, in _process_data
(nova.rpc.common): TRACE: rval = node_func(context=ctxt, **node_args)
(nova.rpc.common): TRACE: File "/usr/lib/python2.7/dist-packages/nova/exception.py", line 112, in wrapped
(nova.rpc.common): TRACE: return f(*args, **kw)
(nova.rpc.common): TRACE: File "/usr/lib/python2.7/dist-packages/nova/compute/manager.py", line 156, in decorated_function
(nova.rpc.common): TRACE: function(self, context, instance_uuid, *args, **kwargs)
(nova.rpc.common): TRACE: File "/usr/lib/python2.7/dist-packages/nova/compute/manager.py", line 180, in decorated_function
(nova.rpc.common): TRACE: sys.exc_info())
(nova.rpc.common): TRACE: File "/usr/lib/python2.7/contextlib.py", line 24, in __exit__
(nova.rpc.common): TRACE: self.gen.next()
(nova.rpc.common): TRACE: File "/usr/lib/python2.7/dist-packages/nova/compute/manager.py", line 174, in decorated_function
(nova.rpc.common): TRACE: return function(self, context, instance_uuid, *args, **kwargs)
(nova.rpc.common): TRACE: File "/usr/lib/python2.7/dist-packages/nova/compute/manager.py", line 736, in terminate_instance
(nova.rpc.common): TRACE: self._delete_instance(context, instance)
(nova.rpc.common): TRACE: File "/usr/lib/python2.7/dist-packages/nova/compute/manager.py", line 716, in _delete_instance
(nova.rpc.common): TRACE: self._shutdown_instance(context, instance, 'Terminating')
(nova.rpc.common): TRACE: File "/usr/lib/python2.7/dist-packages/nova/compute/manager.py", line 671, in _shutdown_instance
(nova.rpc.common): TRACE: network_info = self._get_instance_nw_info(context, instance)
(nova.rpc.common): TRACE: File "/usr/lib/python2.7/dist-packages/nova/compute/manager.py", line 312, in _get_instance_nw_info
(nova.rpc.common): TRACE: instance)
(nova.rpc.common): TRACE: File "/usr/lib/python2.7/dist-packages/nova/network/api.py", line 216, in get_instance_nw_info
(nova.rpc.common): TRACE: 'args': args})
(nova.rpc.common): TRACE: File "/usr/lib/python2.7/dist-packages/nova/rpc/__init__.py", line 69, in call
(nova.rpc.common): TRACE: return _get_impl().call(context, topic, msg, timeout)
(nova.rpc.common): TRACE: File "/usr/lib/python2.7/dist-packages/nova/rpc/impl_kombu.py", line 642, in call
(nova.rpc.common): TRACE: conn.create_empty_queue(topic)
(nova.rpc.common): TRACE: File "/usr/lib/python2.7/dist-packages/nova/rpc/impl_kombu.py", line 624, in create_empty_queue
(nova.rpc.common): TRACE: queue.declare()
(nova.rpc.common): TRACE: File "/usr/lib/python2.7/dist-packages/kombu/entity.py", line 360, in declare
(nova.rpc.common): TRACE: self.name and self.queue_declare(nowait, passive=False),
(nova.rpc.common): TRACE: File "/usr/lib/python2.7/dist-packages/kombu/entity.py", line 378, in queue_declare
(nova.rpc.common): TRACE: nowait=nowait)
(nova.rpc.common): TRACE: File "/usr/lib/python2.7/dist-packages/kombu/syn.py", line 14, in blocking
(nova.rpc.common): TRACE: return __sync_current(fun, *args, **kwargs)
(nova.rpc.common): TRACE: File "/usr/lib/python2.7/dist-packages/kombu/syn.py", line 40, in __eblocking__
(nova.rpc.common): TRACE: return spawn(fun, *args, **kwargs).wait()
(nova.rpc.common): TRACE: File "/usr/lib/python2.7/dist-packages/eventlet/greenthread.py", line 166, in wait
(nova.rpc.common): TRACE: return self._exit_event.wait()
(nova.rpc.common): TRACE: File "/usr/lib/python2.7/dist-packages/eventlet/event.py", line 116, in wait
(nova.rpc.common): TRACE: return hubs.get_hub().switch()
(nova.rpc.common): TRACE: File "/usr/lib/python2.7/dist-packages/eventlet/hubs/hub.py", line 177, in switch
(nova.rpc.common): TRACE: return self.greenlet.switch()
(nova.rpc.common): TRACE: File "/usr/lib/python2.7/dist-packages/eventlet/greenthread.py", line 192, in main
(nova.rpc.common): TRACE: result = function(*args, **kwargs)
(nova.rpc.common): TRACE: File "/usr/lib/python2.7/dist-packages/amqplib/client_0_8/channel.py", line 1381, in queue_declare
(nova.rpc.common): TRACE: (50, 11), # Channel.queue_declare_ok
(nova.rpc.common): TRACE: File "/usr/lib/python2.7/dist-packages/amqplib/client_0_8/abstract_channel.py", line 97, in wait
(nova.rpc.common): TRACE: return self.dispatch_method(method_sig, args, content)
(nova.rpc.common): TRACE: File "/usr/lib/python2.7/dist-packages/amqplib/client_0_8/abstract_channel.py", line 115, in dispatch_method
(nova.rpc.common): TRACE: return amqp_method(self, args)
(nova.rpc.common): TRACE: File "/usr/lib/python2.7/dist-packages/amqplib/client_0_8/channel.py", line 273, in _close
(nova.rpc.common): TRACE: (class_id, method_id))
(nova.rpc.common): TRACE: AMQPChannelException: (406, u"PRECONDITION_FAILED - parameters for queue 'network' in vhost '/' not equivalent", (50, 10), 'Channel.queue_declare')
(nova.rpc.common): TRACE:

Kindly let me know how to use the reconnect method for the component in NotifyPublisher calss from call / cast methods, so that it will create an empty queue for a respective component if doesn't exists.

Thanks in advance,
Sateesh B.

Revision history for this message
Sateesh Bodla (sateesh-bodla) said :
#4

Hi John,

An empty queue will be created when any service is started, but what if the service is not yet started and there is no empty queue created in rabbitmq. When a rpc call/cast is called for that service there will be no empty queue for that service to send the message. Thus the message will be dropped.

So, I need to create an empty queue in rpc call/cast for a service, when ever there is no queue created for that service in rabbitmq list_queues.

Please let me know any pointers on how to create empty queues in the call/cast methods of impl_kombu.py file.

Thanks in advance,
Sateesh B.

Revision history for this message
Russell Bryant (russellb) said :
#5

This doesn't really sound like a bug in the code to me. It sounds like a bug in the deployment, not the code. It seems reasonable to expect that the services will get started before another service tries to use them.

Is this something you actually had a problem with? If so, shouldn't this just be resolved by properly updating whatever you are using to start all the services to start them in the proper order?

Revision history for this message
John Garbutt (johngarbutt) said :
#6

OK, I see you are trying to work on this bug:
https://bugs.launchpad.net/nova/+bug/919835

Bit of a edge case, but an annoying one. As Russell points out, you shouldn't normally see this issue if you start the services in the correct order.

It might a better fix be doing something like adding a timeout to rpc.call so you don't get the "wait for ever" problem. Don't know if there is another bug for that already, and there may be a reason there is no timeout.

I assume you have seen the developer docs on this stuff:
http://nova.openstack.org/devref/rabbit.html

I don't know the code that well, but you should find the common entry points here:
nova/rpc/__init__.py

And the default implementation here:
nova/rpc/impl_kombu.py (take a look at multicall)

Hope that helps.

Revision history for this message
Russell Bryant (russellb) said :
#7

I added a timeout to rpc.call a while back for bug 843200.

Revision history for this message
Sateesh Bodla (sateesh-bodla) said :
#8

Hi Russel,

I have gone through the bug 843200 and it is related to the restarting of the service while processing the request message. But the bug 919835 is that the message has not been queued for the service, since there is no queue for that service in rabbitmq list_queues.

In order to not to drop that message we need to create an empty queue for that service call/cast request.

Please correct me if I am wrong and let me know your comments.

Thanks in advance,
Sateesh B.

Revision history for this message
Russell Bryant (russellb) said :
#9

I think both John and I understand what you're trying to do, just not why you're trying to do it. I get that you're trying to get the code to create the queue ... but what we're saying is that this should never be a problem, unless the services are starting in the wrong order. In that case, starting the services in the wrong order is the problem, not this code.

Can you help with this problem?

Provide an answer of your own, or ask Sateesh Bodla for more information if necessary.

To post a message you must log in.