Is there any way of integrating Publisher and Subscriber in one process?

Asked by aleperalta

Is there any way of integrating Publisher and Subscriber in one process? To make something like a chat program, much like in: http://www.randomhacks.net/articles/2009/05/08/chat-client-ruby-amqp-eventmachine-shoes

I've asked a similar question in the txamqp-users list.

Someone (Antonin I think it was) suggested that it is better not to integrate both in one process. That is better to keep them separate. But I've still have some doubts, specially since there is some dependencies between the messages sent and the ones which I receive. I would like to keep away from IPC (but if I must, I must. I was trying to communicate consumer with producer with PerspectiveBroker.)

This is what I've done so far:

http://dpaste.com/hold/110226/ <-- For inputing from keyboard maybe I could use twisted.internet.stdio

http://dpaste.com/hold/109292/

The result of this experiments should be "a simple chating program using the console".

Thanks,
Ale.

Question information

Language:
English Edit question
Status:
Solved
For:
txAMQP Edit question
Assignee:
No assignee Edit question
Solved by:
aleperalta
Solved:
Last query:
Last reply:
Revision history for this message
aleperalta (peralta-alejandro) said :
#1

To paraphrase the question would be what is the suggested way of putting consumer (subscriber) and publisher in a node?

 * Two different processes and communicate through some IPC?
 * Threading and sending through the same channel? http://dpaste.com/hold/109292/
 * No threads, polling + defereds, send and receive through the same channel? http://dpaste.com/hold/110226/

Any suggestion would be grate!
Thanks again,
Ale.

Revision history for this message
Esteve Fernandez (esteve) said :
#2

Sorry, I saw your e-mail in the mailing list, but unfortunately have been unable to spend more than 30 minutes online.

Anyway, I've uploaded a simple script that acts both as a consumer and as a publisher: http://dpaste.com/hold/110852/

> * Two different processes and communicate through some IPC?

You don't need to split it into two separate programs, but if you really want to use IPC, you may want to have a look at txThrift :-)

> * Threading and sending through the same channel?

I'd try to avoid threads as much as possible, especially if using the threading module. Twisted has support for threads using deferToThread, callInThread, etc. In any case, IMHO you don't need threads for your project.

> * No threads, polling + defereds, send and receive through the same channel?
> http://dpaste.com/hold/110226/

Given the event-driven nature of Twisted, polling is not really necessary. Could you have a look at the script I posted and tell me if it fits what you need? BTW, writing a chat application with txAMQP sounds really cool :-)

Thanks for using txAMQP!

Revision history for this message
aleperalta (peralta-alejandro) said :
#3

Esteve,

Thank you very much for your answer, suggestions and comments. I'll use these in my project.

A couple of comments:

One of the main difficulties I had was how to send my own input. Using twisted.internet.stdio I was able to send my own keyboard input: http://dpaste.com/hold/110936/ . (Also with some help from this guy: http://www.oluyede.org/blog/2008/08/31/twisted-interactive-console/) You must understand that I am new to twisted. The hardest part was understanding how to add the events as to read from the keyboard. Next step would be to add a decent GUI, I'll probably use gtk, or wx. But that's another story.

I was reading earlier today that you can use the same channel to publish and consume! This example also showed me how to do this with txamqp, cool, thanks again.

Problem solved!

Cheers,
Ale

Revision history for this message
bra (bra) said :
#4

Hello,

Could you please attach the code samples to this thread, or upload them to a persistent store?

dpaste has already removed them, and this conversation is not quite useful for others without the code. :(

Thanks,

Revision history for this message
aleperalta (peralta-alejandro) said :
#5

2009/12/16 bra <email address hidden>:
> Your question #86511 on txAMQP changed:
> https://answers.launchpad.net/txamqp/+question/86511
>
> bra posted a new comment:
> Hello,
>
> Could you please attach the code samples to this thread, or upload them
> to a persistent store?
>
> dpaste has already removed them, and this conversation is not quite
> useful for others without the code. :(
>

I don't have them right now, but I'll paste them tomorrow.

Cheers,

> Thanks,
>
> --
> You received this question notification because you are a direct
> subscriber of the question.
>

--
Ale.

Revision history for this message
aleperalta (peralta-alejandro) said :
#6

I think these are the files, sorry for the dealy

2009/12/16 Ale <email address hidden>:
> 2009/12/16 bra <email address hidden>:
>> Your question #86511 on txAMQP changed:
>> https://answers.launchpad.net/txamqp/+question/86511
>>
>> bra posted a new comment:
>> Hello,
>>
>> Could you please attach the code samples to this thread, or upload them
>> to a persistent store?
>>
>> dpaste has already removed them, and this conversation is not quite
>> useful for others without the code. :(
>>
>
> I don't have them right now, but I'll paste them tomorrow.
>
> Cheers,
>
>> Thanks,
>>
>> --
>> You received this question notification because you are a direct
>> subscriber of the question.
>>
>
>
>
> --
> Ale.
>

--
Ale.

Revision history for this message
aleperalta (peralta-alejandro) said :
#7

2009/12/22 Ale <email address hidden>:
> I think these are the files, sorry for the dealy

Hmm... launchpad doesn't allow attachments...

------ publisher and consumer with Repl
from twisted.internet.defer import inlineCallbacks, gatherResults, returnValue
from twisted.internet.protocol import ClientCreator
from twisted.internet import stdio, protocol, reactor, task
from twisted.protocols import basic
from txamqp.protocol import AMQClient
from txamqp.client import TwistedDelegate
from txamqp.content import Content
import txamqp.spec

class Repl(basic.LineReceiver):
    delimiter = '\n'
    prompt_string = 'send> '

    def __init__(self, publisher):
        self.publisher = publisher

    def prompt(self):
        self.transport.write(self.prompt_string)

    def connectionMade(self):
        self.sendLine('Welcome to Console')
        self.prompt()

    def lineReceived(self, line):
        self.prompt()
        self.issueCommand(line)

    def issueCommand(self, command):
        self.publisher.publish(command)

    def connectionLost(self, reason):
        self.publisher.publish_stop()
        reactor.callLater(5, reactor.stop)

class Publisher(object):
    def __init__(self, chan):
        self.chan = chan
        self.stopToken = "STOP"

    def publish(self, string):
        content = string
        msg = Content(content)
        msg["delivery mode"] = 2
        self.chan.basic_publish(exchange="chatservice",
                           content=msg,
                           routing_key="txamqp_chatroom")
        print "Sending message: %s" % content

    def publish_stop(self):
        self.publish(self.stopToken)

class Consumer(object):
    def __init__(self, conn, chan):
        self.conn = conn
        self.chan = chan

    @inlineCallbacks
    def get_messages(self):
        # sets up the queue to consume using callbacks, the messages
will be retreived with
        # queue.get()
        yield self.chan.basic_consume(queue='chatrooms', no_ack=True,
consumer_tag="testtag")

        queue = yield self.conn.queue("testtag")

        while True:
            msg = yield queue.get()
            print 'Received: ' + msg.content.body + ' from channel #'
+ str(self.chan.id)
            if msg.content.body == "STOP":
                break

        yield self.chan.basic_cancel("testtag")

@inlineCallbacks
def setup_amqp_conn(conn, username, password):
    print "Connected to broker."
    yield conn.authenticate(username, password)

    print "Authenticated. Ready to receive messages"
    chan = yield conn.channel(1)
    yield chan.channel_open()

    yield chan.queue_declare(queue="chatrooms", durable=True,
exclusive=False, auto_delete=False)
    yield chan.exchange_declare(exchange="chatservice", type="direct",
durable=True, auto_delete=False)

    yield chan.queue_bind(queue="chatrooms", exchange="chatservice",
routing_key="txamqp_chatroom")
    returnValue(conn)

@inlineCallbacks
def start_consumer_and_repl(conn):
    chan = yield conn.channel(1)
    stdio.StandardIO(Repl(Publisher(chan)))
    consumer = Consumer(conn, chan)
    yield consumer.get_messages()
    returnValue(conn)

@inlineCallbacks
def finish_and_clean_up(conn):
    chan = yield conn.channel(1)
    yield chan.channel_close()
    chan0 = yield conn.channel(0)
    yield chan0.connection_close()
    reactor.stop()

if __name__ == "__main__":
    import sys
    if len(sys.argv) < 8:
        print "%s host port vhost username password path_to_spec
content [count]" % sys.argv[0]
        print "e.g. %s localhost 5672 / guest guest
../../specs/standard/amqp0-8.xml hello 1000" % sys.argv[0]
        sys.exit(1)

    host = sys.argv[1]
    port = int(sys.argv[2])
    vhost = sys.argv[3]
    username = sys.argv[4]
    password = sys.argv[5]

    spec = txamqp.spec.load(sys.argv[6])

    content = sys.argv[7]
    try:
        count = int(sys.argv[8])
    except:
        count = 1

    delegate = TwistedDelegate()

    d = ClientCreator(reactor, AMQClient, delegate=delegate, vhost=vhost,
        spec=spec).connectTCP(host, port)

    d.addCallback(setup_amqp_conn, username, password).\
        addCallback(start_consumer_and_repl).\
        addCallback(finish_and_clean_up)
    reactor.run()

------ multiconsumer

from twisted.internet.defer import inlineCallbacks, gatherResults, Deferred
from twisted.internet import reactor, task
from twisted.internet.protocol import ClientCreator
from txamqp.protocol import AMQClient
from txamqp.client import TwistedDelegate
from txamqp.content import Content
from txamqp.queue import Closed
import txamqp.spec
import pprint

@inlineCallbacks
def publisher_fanout(chan, body, count):
    def send_messages():
        def message_iterator():
            for i in range(count):
                content = body + "-%d" % i
                msg = Content(content)
                msg["delivery mode"] = 2
                chan.basic_publish(exchange="chatservice_fanout", content=msg)
                print "[FANOUT] Sending message: %s" % content
                yield None
        return task.coiterate(message_iterator())

    yield send_messages()

    stopToken = "STOP"
    msg = Content(stopToken)
    msg["delivery mode"] = 2
    chan.basic_publish(exchange="chatservice_fanout", content=msg)
    print "[FANOUT] Sending message: %s" % stopToken

def whichEverFiresFirst(def1, def2):
    d = Deferred()

    def _callback(obj, d):
        d.callback(obj)

    def _errback(failure):
        failure.trap(Closed)

    def1.addCallback(_callback, d)
    def1.addErrback(_errback)
    def2.addCallback(_callback, d)
    return d

class StopSignal(object):

    def __init__(self):
        self.d = None

    def get_signal(self):
        d = None

        if self.d:
            d = self.d

        self.d = Deferred()

        if not (d is None):
            del d

        return self.d

    def fire_signal(self):
        self.d.callback("STOP")

@inlineCallbacks
def consumer_fanout(conn, chan, queue, sig):

    while True:
        try:
            msg = yield queue.get()
        except Closed:
            break

        # msg = yield whichEverFiresFirst(queue.get(), sig.get_signal())

        # if isinstance(msg, str) and msg == "STOP":
        # print "-> STOP SIGNAL <-"
        # queue.close()
        # break

        print '[FANOUT] Received: ' + msg.content.body + ' from
channel #' + str(chan.id)

    yield chan.basic_cancel("testtag_fanout")

@inlineCallbacks
def gotConnection(conn, username, password, body, count=1):
    print "Connected to broker."
    yield conn.authenticate(username, password)

    print "Authenticated. Ready to receive messages"
    chan = yield conn.channel(1)
    yield chan.channel_open()

    yield chan.queue_declare(queue="chatrooms_fanout", durable=True,
exclusive=False, auto_delete=False)
    yield chan.exchange_declare(exchange="chatservice_fanout",
type="fanout", durable=True, auto_delete=False)
    yield chan.queue_bind(queue="chatrooms_fanout",
exchange="chatservice_fanout")

    yield chan.basic_consume(queue='chatrooms_fanout', no_ack=True,
consumer_tag="testtag_fanout")
    queue = yield conn.queue("testtag_fanout")

    sig = StopSignal()

    reactor.callLater(2, queue.close)

    yield gatherResults([
        consumer_fanout(conn, chan, queue, sig),
 publisher_fanout(chan, body, count),
    ])

    yield chan.channel_close()

    chan0 = yield conn.channel(0)

    yield chan0.connection_close()

    reactor.stop()

if __name__ == "__main__":
    import sys
    if len(sys.argv) < 8:
        print "%s host port vhost username password path_to_spec
content [count]" % sys.argv[0]
        print "e.g. %s localhost 5672 / guest guest
../../specs/standard/amqp0-8.xml hello 1000" % sys.argv[0]
        sys.exit(1)

    host = sys.argv[1]
    port = int(sys.argv[2])
    vhost = sys.argv[3]
    username = sys.argv[4]
    password = sys.argv[5]

    spec = txamqp.spec.load(sys.argv[6])

    content = sys.argv[7]
    try:
        count = int(sys.argv[8])
    except:
        count = 1

    delegate = TwistedDelegate()

    d = ClientCreator(reactor, AMQClient, delegate=delegate, vhost=vhost,
        spec=spec).connectTCP(host, port)

    d.addCallback(gotConnection, username, password, content, count)

    reactor.run()

>
> 2009/12/16 Ale <email address hidden>:
>> 2009/12/16 bra <email address hidden>:
>>> Your question #86511 on txAMQP changed:
>>> https://answers.launchpad.net/txamqp/+question/86511
>>>
>>> bra posted a new comment:
>>> Hello,
>>>
>>> Could you please attach the code samples to this thread, or upload them
>>> to a persistent store?
>>>
>>> dpaste has already removed them, and this conversation is not quite
>>> useful for others without the code. :(
>>>
>>
>> I don't have them right now, but I'll paste them tomorrow.
>>
>> Cheers,
>>
>>> Thanks,
>>>
>>> --
>>> You received this question notification because you are a direct
>>> subscriber of the question.
>>>
>>
>>
>>
>> --
>> Ale.
>>
>
>
>
> --
> Ale.
>

--
Ale.