Quick start¶
Some useful examples.
Simple consumer¶
import asyncio
import aio_pika
async def main(loop):
connection = await aio_pika.connect_robust(
"amqp://guest:guest@127.0.0.1/", loop=loop
)
queue_name = "test_queue"
async with connection:
# Creating channel
channel = await connection.channel()
# Declaring queue
queue = await channel.declare_queue(
queue_name, auto_delete=True
)
async for message in queue:
with message.process():
print(message.body)
if queue.name in message.body.decode():
break
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
loop.close()
Simple publisher¶
import asyncio
import aio_pika
async def main(loop):
connection = await aio_pika.connect_robust(
"amqp://guest:guest@127.0.0.1/", loop=loop)
async with connection:
routing_key = "test_queue"
channel = await connection.channel()
await channel.default_exchange.publish(
aio_pika.Message(
body='Hello {}'.format(routing_key).encode()
),
routing_key=routing_key
)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
loop.close()
Get single message example¶
import asyncio
from aio_pika import connect_robust, Message
async def main(loop):
connection = await connect_robust(
"amqp://guest:guest@127.0.0.1/", loop=loop
)
queue_name = "test_queue"
routing_key = "test_queue"
# Creating channel
channel = await connection.channel()
# Declaring exchange
exchange = await channel.declare_exchange(
'direct', auto_delete=True
)
# Declaring queue
queue = await channel.declare_queue(
queue_name, auto_delete=True
)
# Binding queue
await queue.bind(exchange, routing_key)
await exchange.publish(
Message(
bytes('Hello', 'utf-8'),
content_type='text/plain',
headers={'foo': 'bar'}
),
routing_key
)
# Receiving message
incoming_message = await queue.get(timeout=5)
# Confirm message
incoming_message.ack()
await queue.unbind(exchange, routing_key)
await queue.delete()
await connection.close()
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
Tornado example¶
import asyncio
import tornado.ioloop
import tornado.web
from aio_pika import connect_robust, Message
tornado.ioloop.IOLoop.configure('tornado.platform.asyncio.AsyncIOLoop')
io_loop = tornado.ioloop.IOLoop.current()
asyncio.set_event_loop(io_loop.asyncio_loop)
QUEUE = asyncio.Queue()
class SubscriberHandler(tornado.web.RequestHandler):
async def get(self):
message = await QUEUE.get()
self.finish(message.body)
class PublisherHandler(tornado.web.RequestHandler):
async def post(self):
connection = self.application.settings['amqp_connection']
channel = await connection.channel()
try:
await channel.default_exchange.publish(
Message(body=self.request.body),
routing_key="test",
)
finally:
await channel.close()
self.finish("OK")
async def make_app():
amqp_connection = await connect_robust()
channel = await amqp_connection.channel()
queue = await channel.declare_queue('test', auto_delete=True)
await queue.consume(QUEUE.put, no_ack=True)
return tornado.web.Application(
[
(r"/publish", PublisherHandler),
(r"/subscribe", SubscriberHandler),
],
amqp_connection=amqp_connection
)
if __name__ == "__main__":
app = io_loop.asyncio_loop.run_until_complete(make_app())
app.listen(8888)
tornado.ioloop.IOLoop.current().start()