Merge lp:~txamqpteam/txamqp/sync-with-thrift into lp:txamqp

Proposed by Dan Di Spaltro
Status: Merged
Approved by: Esteve Fernandez
Approved revision: 14
Merged at revision: not available
Proposed branch: lp:~txamqpteam/txamqp/sync-with-thrift
Merge into: lp:txamqp
Diff against target: None lines
To merge this branch: bzr merge lp:~txamqpteam/txamqp/sync-with-thrift
Reviewer Review Type Date Requested Status
Esteve Fernandez Approve
Review via email: mp+5753@code.launchpad.net
To post a comment you must log in.
Revision history for this message
Dan Di Spaltro (dan-dispaltro) wrote :

This should be merged in to the main branch since it follows thrift development much more closely. It seems fairly stable, we use it in over 8 production apps.

Revision history for this message
Esteve Fernandez (esteve) :
review: Approve
Revision history for this message
Esteve Fernandez (esteve) wrote :

> This should be merged in to the main branch since it follows thrift
> development much more closely. It seems fairly stable, we use it in over 8
> production apps.

Thanks for taking the time to review it, I just pushed it to trunk.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'src/examples/README'
2--- src/examples/README 2008-11-20 17:04:06 +0000
3+++ src/examples/README 2009-03-06 11:28:34 +0000
4@@ -26,8 +26,8 @@
5 - General
6
7 1. Download Thrift source code [1]
8-2. Check if it contains support for Twisted, download and apply
9-patch [2] if not.
10+2. Check if it contains support for Twisted, you'll need revision 749795 or
11+greater.
12 3. Compile Thrift with suppport for Python.
13 4. Install Thrift.
14
15@@ -56,9 +56,8 @@
16 is 'guest').
17 - path_to_spec: The path to the AMQP spec that you want to use. Keep in mind
18 that depending on the broker you use, you will need a different spec:
19- - RabbitMQ 1.4.0: $TXAMQP_PATH/src/specs/standard/amqp0-8.xml
20- - OpenAMQ 1.2d6: $TXAMQP_PATH/src/specs/standard/amqp0-9.xml
21+ - RabbitMQ 1.5.3: $TXAMQP_PATH/src/specs/standard/amqp0-8.xml
22+ - OpenAMQ 1.3c5: $TXAMQP_PATH/src/specs/standard/amqp0-9.xml
23 - Qpid M3 (Java): $TXAMQP_PATH/src/specs/qpid/amqp.0-8.xml
24
25 1 - http://incubator.apache.org/thrift/
26-2 - http://issues.apache.org/jira/browse/THRIFT-148
27
28=== modified file 'src/examples/client.py'
29--- src/examples/client.py 2008-12-08 21:48:31 +0000
30+++ src/examples/client.py 2009-02-11 22:28:45 +0000
31@@ -13,9 +13,9 @@
32 from twisted.internet.protocol import ClientCreator
33
34 import txamqp.spec
35-from txamqp.protocol import AMQClient
36 from txamqp.client import TwistedDelegate
37-from txamqp.contrib.thrift import TwistedAMQPTransport
38+from txamqp.contrib.thrift.transport import TwistedAMQPTransport
39+from txamqp.contrib.thrift.protocol import ThriftAMQClient
40
41 servicesExchange = "services"
42 responsesExchange = "responses"
43@@ -37,21 +37,9 @@
44 print "Got an error"
45 print error.value.why
46
47-def parseClientMessage(msg, channel, queue, pfactory, thriftClient):
48- deliveryTag = msg.delivery_tag
49- tr = TTransport.TMemoryBuffer(msg.content.body)
50- iprot = pfactory.getProtocol(tr)
51- (fname, mtype, rseqid) = iprot.readMessageBegin()
52-
53- m = getattr(thriftClient, 'recv_' + fname)
54- m(iprot, mtype, rseqid)
55-
56- channel.basic_ack(deliveryTag, True)
57- queue.get().addCallback(parseClientMessage, channel, queue, pfactory, thriftClient)
58-
59 @defer.inlineCallbacks
60-def prepareClient(client, authentication):
61- yield client.start(authentication)
62+def prepareClient(client, username, password):
63+ yield client.authenticate(username, password)
64
65 channel = yield client.channel(1)
66
67@@ -59,22 +47,11 @@
68 yield channel.exchange_declare(exchange=servicesExchange, type="direct")
69 yield channel.exchange_declare(exchange=responsesExchange, type="direct")
70
71- reply = yield channel.queue_declare(exclusive=True, auto_delete=True)
72-
73- responseQueue = reply.queue
74-
75- yield channel.queue_bind(queue=responseQueue, exchange=responsesExchange,
76- routing_key=responseQueue)
77-
78- amqpTransport = TwistedAMQPTransport(channel, servicesExchange, calculatorKey,
79- replyTo=responseQueue, replyToField=replyToField)
80 pfactory = TBinaryProtocol.TBinaryProtocolFactory()
81- tm = TTwisted.TwistedMemoryBuffer(amqpTransport)
82- thriftClient = tutorial.Calculator.Client(tm, pfactory)
83-
84- reply = yield channel.basic_consume(queue=responseQueue)
85- queue = yield client.queue(reply.consumer_tag)
86- queue.get().addCallback(parseClientMessage, channel, queue, pfactory, thriftClient)
87+ thriftClient = yield client.createThriftClient(responsesExchange,
88+ servicesExchange, calculatorKey, tutorial.Calculator.Client,
89+ iprot_factory=pfactory, oprot_factory=pfactory)
90+
91 defer.returnValue(thriftClient)
92
93 def gotClient(client):
94@@ -82,26 +59,31 @@
95
96 d2 = client.add(1, 2).addCallback(gotAddResults)
97
98- w = Work({'num1': 2, 'num2': 3, 'op': Operation.ADD})
99-
100- d3 = client.calculate(1, w).addCallbacks(gotCalculateResults, gotCalculateErrors)
101-
102- w = Work({'num1': 2, 'num2': 3, 'op': Operation.SUBTRACT})
103-
104- d4 = client.calculate(2, w).addCallbacks(gotCalculateResults, gotCalculateErrors)
105-
106- w = Work({'num1': 2, 'num2': 3, 'op': Operation.MULTIPLY})
107-
108- d5 = client.calculate(3, w).addCallbacks(gotCalculateResults, gotCalculateErrors)
109-
110- w = Work({'num1': 2, 'num2': 3, 'op': Operation.DIVIDE})
111-
112- d6 = client.calculate(4, w).addCallbacks(gotCalculateResults, gotCalculateErrors)
113+ w = Work(num1=2, num2=3, op=Operation.ADD)
114+
115+ d3 = client.calculate(1, w).addCallbacks(gotCalculateResults,
116+ gotCalculateErrors)
117+
118+ w = Work(num1=2, num2=3, op=Operation.SUBTRACT)
119+
120+ d4 = client.calculate(2, w).addCallbacks(gotCalculateResults,
121+ gotCalculateErrors)
122+
123+ w = Work(num1=2, num2=3, op=Operation.MULTIPLY)
124+
125+ d5 = client.calculate(3, w).addCallbacks(gotCalculateResults,
126+ gotCalculateErrors)
127+
128+ w = Work(num1=2, num2=3, op=Operation.DIVIDE)
129+
130+ d6 = client.calculate(4, w).addCallbacks(gotCalculateResults,
131+ gotCalculateErrors)
132
133 # This will fire an errback
134- w = Work({'num1': 2, 'num2': 0, 'op': Operation.DIVIDE})
135+ w = Work(num1=2, num2=0, op=Operation.DIVIDE)
136
137- d7 = client.calculate(5, w).addCallbacks(gotCalculateResults, gotCalculateErrors)
138+ d7 = client.calculate(5, w).addCallbacks(gotCalculateResults,
139+ gotCalculateErrors)
140
141 d8 = client.zip()
142
143@@ -126,15 +108,7 @@
144
145 delegate = TwistedDelegate()
146
147- d = ClientCreator(reactor, AMQClient, delegate, vhost,
148+ d = ClientCreator(reactor, ThriftAMQClient, delegate, vhost,
149 spec).connectTCP(host, port)
150-
151- if (spec.major, spec.minor) == (8, 0):
152- authentication = {"LOGIN": username, "PASSWORD": password}
153- replyToField = "reply to"
154- else:
155- authentication = "\0" + username + "\0" + password
156- replyToField = "reply-to"
157-
158- d.addCallback(prepareClient, authentication).addCallback(gotClient)
159+ d.addCallback(prepareClient, username, password).addCallback(gotClient)
160 reactor.run()
161
162=== modified file 'src/examples/server.py'
163--- src/examples/server.py 2008-10-29 18:31:04 +0000
164+++ src/examples/server.py 2009-02-11 22:28:45 +0000
165@@ -13,9 +13,11 @@
166 from twisted.internet.protocol import ClientCreator
167
168 import txamqp.spec
169-from txamqp.protocol import AMQClient
170 from txamqp.client import TwistedDelegate
171-from txamqp.contrib.thrift import TwistedAMQPTransport
172+from txamqp.contrib.thrift.transport import TwistedAMQPTransport
173+from txamqp.contrib.thrift.protocol import ThriftAMQClient
174+
175+from zope.interface import implements
176
177 servicesExchange = "services"
178 responsesExchange = "responses"
179@@ -23,6 +25,7 @@
180 calculatorKey = "calculator"
181
182 class CalculatorHandler(object):
183+ implements(tutorial.Calculator.Iface)
184
185 operations = {
186 Operation.ADD: int.__add__,
187@@ -35,7 +38,7 @@
188 # Just assume that it may take a long time
189 results = self.operations[w.op](w.num1, w.num2)
190 d = defer.Deferred()
191- reactor.callLater(3, d.callback, results)
192+ reactor.callLater(0, d.callback, results)
193 return d
194
195 def ping(self):
196@@ -50,30 +53,14 @@
197 try:
198 return self._dispatchWork(w)
199 except Exception, e:
200- return defer.fail(InvalidOperation({'logid': logid, 'why': e.message}))
201+ return defer.fail(InvalidOperation(what=logid, why=e.message))
202
203 def zip(self):
204 print "zip() called from client"
205
206-def parseServerMessage(msg, channel, queue, processor, pfactory):
207- deliveryTag = msg.delivery_tag
208- try:
209- replyTo = msg.content[replyToField]
210- except KeyError:
211- replyTo = None
212- tr = TwistedAMQPTransport(channel, responsesExchange, routingKey=replyTo)
213- tmi = TTransport.TMemoryBuffer(msg.content.body)
214- tmo = TTwisted.TwistedMemoryBuffer(tr)
215- iprot = pfactory.getProtocol(tmi)
216- oprot = pfactory.getProtocol(tmo)
217- processor.process(iprot, oprot)
218- channel.basic_ack(deliveryTag, True)
219- queue.get().addCallback(parseServerMessage, channel, queue, processor,
220- pfactory)
221-
222 @defer.inlineCallbacks
223-def prepareClient(client, authentication):
224- yield client.start(authentication)
225+def prepareClient(client, username, password):
226+ yield client.authenticate(username, password)
227
228 channel = yield client.channel(1)
229
230@@ -90,8 +77,8 @@
231
232 reply = yield channel.basic_consume(queue=calculatorQueue)
233 queue = yield client.queue(reply.consumer_tag)
234- queue.get().addCallback(parseServerMessage, channel, queue, processor,
235- pfactory)
236+ queue.get().addCallback(client.parseServerMessage, channel, responsesExchange,
237+ queue, processor, pfactory, pfactory)
238
239 if __name__ == '__main__':
240 import sys
241@@ -112,15 +99,7 @@
242
243 print 'Starting the server...'
244
245- d = ClientCreator(reactor, AMQClient, delegate, vhost,
246+ d = ClientCreator(reactor, ThriftAMQClient, delegate, vhost,
247 spec).connectTCP(host, port)
248-
249- if (spec.major, spec.minor) == (8, 0):
250- authentication = {"LOGIN": username, "PASSWORD": password}
251- replyToField = "reply to"
252- else:
253- authentication = "\0" + username + "\0" + password
254- replyToField = "reply-to"
255-
256- d.addCallback(prepareClient, authentication)
257+ d.addCallback(prepareClient, username, password)
258 reactor.run()
259
260=== modified file 'src/txamqp/client.py'
261--- src/txamqp/client.py 2008-10-29 18:31:04 +0000
262+++ src/txamqp/client.py 2009-02-11 22:28:45 +0000
263@@ -11,12 +11,15 @@
264 self.alreadyCalled = False
265
266 def set(self):
267- deferred, self.deferred = self.deferred, defer.Deferred()
268- deferred.callback(None)
269+ self.deferred.callback(True)
270
271 def wait(self):
272 return self.deferred
273
274+ def reset(self):
275+ deferred, self.deferred = self.deferred, defer.Deferred()
276+ deferred.callback(True)
277+
278 class TwistedDelegate(Delegate):
279
280 def connection_start(self, ch, msg):
281@@ -40,4 +43,4 @@
282
283 def close(self, reason):
284 self.client.closed = True
285- self.client.started.set()
286+ self.client.started.reset()
287
288=== added directory 'src/txamqp/contrib/thrift'
289=== added file 'src/txamqp/contrib/thrift/__init__.py'
290=== added file 'src/txamqp/contrib/thrift/protocol.py'
291--- src/txamqp/contrib/thrift/protocol.py 1970-01-01 00:00:00 +0000
292+++ src/txamqp/contrib/thrift/protocol.py 2009-02-11 22:28:45 +0000
293@@ -0,0 +1,108 @@
294+from zope.interface import Interface, Attribute
295+from txamqp.protocol import AMQClient
296+from txamqp.contrib.thrift.transport import TwistedAMQPTransport
297+from txamqp.content import Content
298+
299+from twisted.internet import defer
300+from twisted.python import log
301+
302+from thrift.protocol import TBinaryProtocol
303+from thrift.transport import TTwisted, TTransport
304+
305+class ThriftAMQClient(AMQClient):
306+
307+ def __init__(self, *args, **kwargs):
308+ AMQClient.__init__(self, *args, **kwargs)
309+
310+ if self.check_0_8():
311+ self.replyToField = "reply to"
312+ else:
313+ self.replyToField = "reply-to"
314+
315+ @defer.inlineCallbacks
316+ def createThriftClient(self, responsesExchange, serviceExchange,
317+ routingKey, clientClass, responseQueue=None, iprot_factory=None,
318+ oprot_factory=None):
319+
320+ channel = yield self.channel(1)
321+
322+ if responseQueue is None:
323+ reply = yield channel.queue_declare(exclusive=True,
324+ auto_delete=True)
325+
326+ responseQueue = reply.queue
327+
328+ yield channel.queue_bind(queue=responseQueue,
329+ exchange=responsesExchange, routing_key=responseQueue)
330+
331+ reply = yield channel.basic_consume(queue=responseQueue)
332+
333+ log.msg("Consuming messages on queue: %s" % responseQueue)
334+
335+ amqpTransport = TwistedAMQPTransport(channel, serviceExchange,
336+ routingKey, replyTo=responseQueue, replyToField=self.replyToField)
337+
338+ if iprot_factory is None:
339+ iprot_factory = self.factory.iprot_factory
340+
341+ if oprot_factory is None:
342+ oprot_factory = self.factory.oprot_factory
343+
344+ thriftClient = clientClass(amqpTransport, oprot_factory)
345+
346+ queue = yield self.queue(reply.consumer_tag)
347+ queue.get().addCallback(self.parseClientMessage, channel, queue,
348+ thriftClient, iprot_factory=iprot_factory)
349+
350+ defer.returnValue(thriftClient)
351+
352+ def parseClientMessage(self, msg, channel, queue, thriftClient,
353+ iprot_factory=None):
354+ deliveryTag = msg.delivery_tag
355+ tr = TTransport.TMemoryBuffer(msg.content.body)
356+ if iprot_factory is None:
357+ iprot = self.factory.iprot_factory.getProtocol(tr)
358+ else:
359+ iprot = iprot_factory.getProtocol(tr)
360+ (fname, mtype, rseqid) = iprot.readMessageBegin()
361+
362+ method = getattr(thriftClient, 'recv_' + fname)
363+ method(iprot, mtype, rseqid)
364+
365+ channel.basic_ack(deliveryTag, True)
366+ queue.get().addCallback(self.parseClientMessage, channel, queue,
367+ thriftClient, iprot_factory=iprot_factory)
368+
369+ def parseServerMessage(self, msg, channel, exchange, queue, processor,
370+ iprot_factory=None, oprot_factory=None):
371+ deliveryTag = msg.delivery_tag
372+ try:
373+ replyTo = msg.content[self.replyToField]
374+ except KeyError:
375+ replyTo = None
376+
377+ tmi = TTransport.TMemoryBuffer(msg.content.body)
378+ tr = TwistedAMQPTransport(channel, exchange, replyTo)
379+
380+ if iprot_factory is None:
381+ iprot = self.factory.iprot_factory.getProtocol(tmi)
382+ else:
383+ iprot = iprot_factory.getProtocol(tmi)
384+
385+ if oprot_factory is None:
386+ oprot = self.factory.oprot_factory.getProtocol(tr)
387+ else:
388+ oprot = oprot_factory.getProtocol(tr)
389+
390+ d = processor.process(iprot, oprot)
391+ channel.basic_ack(deliveryTag, True)
392+
393+ return queue.get().addCallback(self.parseServerMessage, channel,
394+ exchange, queue, processor, iprot_factory, oprot_factory)
395+
396+
397+class IThriftAMQClientFactory(Interface):
398+
399+ iprot_factory = Attribute("Input protocol factory")
400+ oprot_factory = Attribute("Input protocol factory")
401+ processor = Attribute("Thrift processor")
402
403=== added file 'src/txamqp/contrib/thrift/service.py'
404--- src/txamqp/contrib/thrift/service.py 1970-01-01 00:00:00 +0000
405+++ src/txamqp/contrib/thrift/service.py 2009-02-11 22:28:45 +0000
406@@ -0,0 +1,7 @@
407+from zope.interface import Interface, Attribute
408+
409+class IThriftAMQClientFactory(Interface):
410+
411+ iprot_factory = Attribute("Input protocol factory")
412+ oprot_factory = Attribute("Input protocol factory")
413+ processor = Attribute("Thrift processor")
414
415=== renamed file 'src/txamqp/contrib/thrift.py' => 'src/txamqp/contrib/thrift/transport.py'
416--- src/txamqp/contrib/thrift.py 2008-10-29 18:31:04 +0000
417+++ src/txamqp/contrib/thrift/transport.py 2009-02-11 22:28:45 +0000
418@@ -1,16 +1,17 @@
419 from txamqp.content import Content
420+from thrift.transport import TTwisted
421
422-class TwistedAMQPTransport(object):
423- def __init__(self, channel, exchange, routingKey, replyTo=None,
424- replyToField="reply to"):
425+class TwistedAMQPTransport(TTwisted.TMessageSenderTransport):
426+ def __init__(self, channel, exchange, routingKey, replyTo=None, replyToField=None):
427+ TTwisted.TMessageSenderTransport.__init__(self)
428 self.channel = channel
429 self.exchange = exchange
430 self.routingKey = routingKey
431 self.replyTo = replyTo
432 self.replyToField = replyToField
433
434- def write(self, data):
435- content = Content(body=data)
436+ def sendMessage(self, message):
437+ content = Content(body=message)
438 if self.replyTo:
439 content[self.replyToField] = self.replyTo
440
441
442=== modified file 'src/txamqp/protocol.py'
443--- src/txamqp/protocol.py 2009-02-05 19:22:56 +0000
444+++ src/txamqp/protocol.py 2009-02-11 22:28:45 +0000
445@@ -226,6 +226,9 @@
446 self.outgoing.get().addCallback(self.writer)
447 self.work.get().addCallback(self.worker)
448
449+ def check_0_8(self):
450+ return (self.spec.minor, self.spec.major) == (0, 8)
451+
452 @defer.inlineCallbacks
453 def channel(self, id):
454 yield self.channelLock.acquire()
455@@ -297,6 +300,15 @@
456 ch.dispatch(frame, self.work)
457
458 @defer.inlineCallbacks
459+ def authenticate(self, username, password, mechanism='AMQPLAIN', locale='en_US'):
460+ if self.check_0_8():
461+ response = {"LOGIN": username, "PASSWORD": password}
462+ else:
463+ response = "\0" + username + "\0" + password
464+
465+ yield self.start(response, mechanism, locale)
466+
467+ @defer.inlineCallbacks
468 def start(self, response, mechanism='AMQPLAIN', locale='en_US'):
469 self.response = response
470 self.mechanism = mechanism
471
472=== modified file 'src/txamqp/testlib.py'
473--- src/txamqp/testlib.py 2008-10-29 18:31:04 +0000
474+++ src/txamqp/testlib.py 2009-02-11 22:28:45 +0000
475@@ -39,6 +39,9 @@
476 self.user = 'guest'
477 self.password = 'guest'
478 self.vhost = 'localhost'
479+ self.queues = []
480+ self.exchanges = []
481+ self.connectors = []
482
483 @inlineCallbacks
484 def connect(self, host=None, port=None, spec=None, user=None, password=None, vhost=None):
485@@ -56,16 +59,11 @@
486 self.connectors.append(c)
487 client = yield onConn
488
489- yield client.start({"LOGIN": user, "PASSWORD": password})
490+ yield client.authenticate(user, password)
491 returnValue(client)
492
493 @inlineCallbacks
494 def setUp(self):
495- self.queues = []
496- self.exchanges = []
497-
498- self.connectors = []
499-
500 self.client = yield self.connect()
501
502 self.channel = yield self.client.channel(1)

Subscribers

People subscribed via source and target branches

to status/vote changes: