Welcome to aio-pika’s documentation!¶
aio_pika it’s a wrapper for the PIKA for asyncio and humans.
Features¶
- Completely asynchronous API.
- Object oriented API.
- Auto-reconnects with complete state recovery with connect_robust (e.g. declared queues or exchanges, consuming state and bindings).
- Python 3.4+ compatible (include 3.6).
Installation¶
Installation with pip:
pip install aio-pika
Installation from git:
# via pip
pip install https://github.com/mosquito/aio-pika/archive/master.zip
# manually
git clone https://github.com/mosquito/aio-pika.git
cd aio-pika
python setup.py install
Usage example¶
Simple consumer:
import asyncio
import aio_pika
async def main(loop):
connection = await aio_pika.connect_robust(
"amqp://guest:guest@127.0.0.1/", loop=loop
)
queue_name = "test_queue"
async with connection:
# Creating channel
channel = await connection.channel()
# Declaring queue
queue = await channel.declare_queue(queue_name, auto_delete=True)
async for message in queue:
with message.process():
print(message.body)
if queue.name in message.body.decode():
break
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
loop.close()
Simple publisher:
import asyncio
import aio_pika
async def main(loop):
connection = await aio_pika.connect_robust("amqp://guest:guest@127.0.0.1/", loop=loop)
async with connection:
routing_key = "test_queue"
channel = await connection.channel() # type: aio_pika.Channel
await channel.default_exchange.publish(
aio_pika.Message(
body='Hello {}'.format(routing_key).encode()
),
routing_key=routing_key
)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
loop.close()
Get single message example:
import asyncio
from aio_pika import connect_robust, Message
async def main(loop):
connection = await connect_robust("amqp://guest:guest@127.0.0.1/", loop=loop)
queue_name = "test_queue"
routing_key = "test_queue"
# Creating channel
channel = await connection.channel()
# Declaring exchange
exchange = await channel.declare_exchange('direct', auto_delete=True)
# Declaring queue
queue = await channel.declare_queue(queue_name, auto_delete=True)
# Binding queue
await queue.bind(exchange, routing_key)
await exchange.publish(
Message(
bytes('Hello', 'utf-8'),
content_type='text/plain',
headers={'foo': 'bar'}
),
routing_key
)
# Receiving message
incoming_message = await queue.get(timeout=5)
# Confirm message
incoming_message.ack()
await queue.unbind(exchange, routing_key)
await queue.delete()
await connection.close()
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
Development¶
Clone the project:
git clone https://github.com/mosquito/aio-pika.git
cd aio-pika
Create a new virtualenv for aio_pika:
virtualenv -p python3.5 env
Install all requirements for aio_pika:
env/bin/pip install -e '.[develop]'
Tutorial¶
Thanks for contributing¶
- @mosquito (author)
- @hellysmile (bug fixes and ideas)
- @alternativehood (bugfixes)
- @akhoronko
- @zyp
- @decaz
- @kajetanj
- @iselind
Versioning¶
This software follows Semantic Versioning