Is there any way of integrating Publisher and Subscriber in one process?
Is there any way of integrating Publisher and Subscriber in one process? To make something like a chat program, much like in: http://
I've asked a similar question in the txamqp-users list.
Someone (Antonin I think it was) suggested that it is better not to integrate both in one process. That is better to keep them separate. But I've still have some doubts, specially since there is some dependencies between the messages sent and the ones which I receive. I would like to keep away from IPC (but if I must, I must. I was trying to communicate consumer with producer with PerspectiveBroker.)
This is what I've done so far:
http://
http://
The result of this experiments should be "a simple chating program using the console".
Thanks,
Ale.
Question information
- Language:
- English Edit question
- Status:
- Solved
- For:
- txAMQP Edit question
- Assignee:
- No assignee Edit question
- Solved by:
- aleperalta
- Solved:
- Last query:
- Last reply:
Revision history for this message
|
#1 |
To paraphrase the question would be what is the suggested way of putting consumer (subscriber) and publisher in a node?
* Two different processes and communicate through some IPC?
* Threading and sending through the same channel? http://
* No threads, polling + defereds, send and receive through the same channel? http://
Any suggestion would be grate!
Thanks again,
Ale.
Revision history for this message
|
#2 |
Sorry, I saw your e-mail in the mailing list, but unfortunately have been unable to spend more than 30 minutes online.
Anyway, I've uploaded a simple script that acts both as a consumer and as a publisher: http://
> * Two different processes and communicate through some IPC?
You don't need to split it into two separate programs, but if you really want to use IPC, you may want to have a look at txThrift :-)
> * Threading and sending through the same channel?
I'd try to avoid threads as much as possible, especially if using the threading module. Twisted has support for threads using deferToThread, callInThread, etc. In any case, IMHO you don't need threads for your project.
> * No threads, polling + defereds, send and receive through the same channel?
> http://
Given the event-driven nature of Twisted, polling is not really necessary. Could you have a look at the script I posted and tell me if it fits what you need? BTW, writing a chat application with txAMQP sounds really cool :-)
Thanks for using txAMQP!
Revision history for this message
|
#3 |
Esteve,
Thank you very much for your answer, suggestions and comments. I'll use these in my project.
A couple of comments:
One of the main difficulties I had was how to send my own input. Using twisted.
I was reading earlier today that you can use the same channel to publish and consume! This example also showed me how to do this with txamqp, cool, thanks again.
Problem solved!
Cheers,
Ale
Hello,
Could you please attach the code samples to this thread, or upload them to a persistent store?
dpaste has already removed them, and this conversation is not quite useful for others without the code. :(
Thanks,
Revision history for this message
|
#5 |
2009/12/16 bra <email address hidden>:
> Your question #86511 on txAMQP changed:
> https:/
>
> bra posted a new comment:
> Hello,
>
> Could you please attach the code samples to this thread, or upload them
> to a persistent store?
>
> dpaste has already removed them, and this conversation is not quite
> useful for others without the code. :(
>
I don't have them right now, but I'll paste them tomorrow.
Cheers,
> Thanks,
>
> --
> You received this question notification because you are a direct
> subscriber of the question.
>
--
Ale.
Revision history for this message
|
#6 |
I think these are the files, sorry for the dealy
2009/12/16 Ale <email address hidden>:
> 2009/12/16 bra <email address hidden>:
>> Your question #86511 on txAMQP changed:
>> https:/
>>
>> bra posted a new comment:
>> Hello,
>>
>> Could you please attach the code samples to this thread, or upload them
>> to a persistent store?
>>
>> dpaste has already removed them, and this conversation is not quite
>> useful for others without the code. :(
>>
>
> I don't have them right now, but I'll paste them tomorrow.
>
> Cheers,
>
>> Thanks,
>>
>> --
>> You received this question notification because you are a direct
>> subscriber of the question.
>>
>
>
>
> --
> Ale.
>
--
Ale.
Revision history for this message
|
#7 |
2009/12/22 Ale <email address hidden>:
> I think these are the files, sorry for the dealy
Hmm... launchpad doesn't allow attachments...
------ publisher and consumer with Repl
from twisted.
from twisted.
from twisted.internet import stdio, protocol, reactor, task
from twisted.protocols import basic
from txamqp.protocol import AMQClient
from txamqp.client import TwistedDelegate
from txamqp.content import Content
import txamqp.spec
class Repl(basic.
delimiter = '\n'
prompt_string = 'send> '
def __init__(self, publisher):
def prompt(self):
def connectionMade(
def lineReceived(self, line):
def issueCommand(self, command):
def connectionLost(
class Publisher(object):
def __init__(self, chan):
self.chan = chan
def publish(self, string):
content = string
msg = Content(content)
print "Sending message: %s" % content
def publish_stop(self):
class Consumer(object):
def __init__(self, conn, chan):
self.conn = conn
self.chan = chan
@inlineCall
def get_messages(self):
# sets up the queue to consume using callbacks, the messages
will be retreived with
# queue.get()
yield self.chan.
consumer_
queue = yield self.conn.
while True:
msg = yield queue.get()
print 'Received: ' + msg.content.body + ' from channel #'
+ str(self.chan.id)
if msg.content.body == "STOP":
yield self.chan.
@inlineCallbacks
def setup_amqp_
print "Connected to broker."
yield conn.authentica
print "Authenticated. Ready to receive messages"
chan = yield conn.channel(1)
yield chan.channel_open()
yield chan.queue_
exclusive=False, auto_delete=False)
yield chan.exchange_
durable=True, auto_delete=False)
yield chan.queue_
routing_
returnValue
@inlineCallbacks
def start_consumer_
chan = yield conn.channel(1)
stdio.
consumer = Consumer(conn, chan)
yield consumer.
returnValue
@inlineCallbacks
def finish_
chan = yield conn.channel(1)
yield chan.channel_
chan0 = yield conn.channel(0)
yield chan0.connectio
reactor.stop()
if __name__ == "__main__":
import sys
if len(sys.argv) < 8:
print "%s host port vhost username password path_to_spec
content [count]" % sys.argv[0]
print "e.g. %s localhost 5672 / guest guest
../../specs/
sys.exit(1)
host = sys.argv[1]
port = int(sys.argv[2])
vhost = sys.argv[3]
username = sys.argv[4]
password = sys.argv[5]
spec = txamqp.
content = sys.argv[7]
try:
count = int(sys.argv[8])
except:
count = 1
delegate = TwistedDelegate()
d = ClientCreator(
d.addCallba
reactor.run()
------ multiconsumer
from twisted.
from twisted.internet import reactor, task
from twisted.
from txamqp.protocol import AMQClient
from txamqp.client import TwistedDelegate
from txamqp.content import Content
from txamqp.queue import Closed
import txamqp.spec
import pprint
@inlineCallbacks
def publisher_
def send_messages():
def message_iterator():
for i in range(count):
msg = Content(content)
return task.coiterate(
yield send_messages()
stopToken = "STOP"
msg = Content(stopToken)
msg["delivery mode"] = 2
chan.
print "[FANOUT] Sending message: %s" % stopToken
def whichEverFiresF
d = Deferred()
def _callback(obj, d):
def _errback(failure):
def1.
def1.
def2.
return d
class StopSignal(object):
def __init__(self):
self.d = None
def get_signal(self):
d = None
if self.d:
d = self.d
self.d = Deferred()
if not (d is None):
del d
return self.d
def fire_signal(self):
@inlineCallbacks
def consumer_
while True:
try:
msg = yield queue.get()
except Closed:
break
# msg = yield whichEverFiresF
# if isinstance(msg, str) and msg == "STOP":
# print "-> STOP SIGNAL <-"
# queue.close()
# break
print '[FANOUT] Received: ' + msg.content.body + ' from
channel #' + str(chan.id)
yield chan.basic_
@inlineCallbacks
def gotConnection(conn, username, password, body, count=1):
print "Connected to broker."
yield conn.authentica
print "Authenticated. Ready to receive messages"
chan = yield conn.channel(1)
yield chan.channel_open()
yield chan.queue_
exclusive=False, auto_delete=False)
yield chan.exchange_
type="fanout", durable=True, auto_delete=False)
yield chan.queue_
exchange=
yield chan.basic_
consumer_
queue = yield conn.queue(
sig = StopSignal()
reactor.
yield gatherResults([
publisher_
])
yield chan.channel_
chan0 = yield conn.channel(0)
yield chan0.connectio
reactor.stop()
if __name__ == "__main__":
import sys
if len(sys.argv) < 8:
print "%s host port vhost username password path_to_spec
content [count]" % sys.argv[0]
print "e.g. %s localhost 5672 / guest guest
../../specs/
sys.exit(1)
host = sys.argv[1]
port = int(sys.argv[2])
vhost = sys.argv[3]
username = sys.argv[4]
password = sys.argv[5]
spec = txamqp.
content = sys.argv[7]
try:
count = int(sys.argv[8])
except:
count = 1
delegate = TwistedDelegate()
d = ClientCreator(
d.addCallba
reactor.run()
>
> 2009/12/16 Ale <email address hidden>:
>> 2009/12/16 bra <email address hidden>:
>>> Your question #86511 on txAMQP changed:
>>> https:/
>>>
>>> bra posted a new comment:
>>> Hello,
>>>
>>> Could you please attach the code samples to this thread, or upload them
>>> to a persistent store?
>>>
>>> dpaste has already removed them, and this conversation is not quite
>>> useful for others without the code. :(
>>>
>>
>> I don't have them right now, but I'll paste them tomorrow.
>>
>> Cheers,
>>
>>> Thanks,
>>>
>>> --
>>> You received this question notification because you are a direct
>>> subscriber of the question.
>>>
>>
>>
>>
>> --
>> Ale.
>>
>
>
>
> --
> Ale.
>
--
Ale.