Callbacks¶
Henson operates on messages through a series of asyncio.coroutine()
callback functions. Each callback type serves a unique purpose.
callback
¶
This is the only one of the callback settings that is required. Its purpose is to process the incoming message. If desired, it should return the result(s) of processing the message as an iterable.
async def callback(application, message):
return ['spam']
Application('name', callback=callback)
Note
There can only be one function registered as callback
.
error
¶
These callbacks are called when an exception is raised while processing a message.
app = Application('name')
@app.error
async def log_error(application, message, exception):
logger.error('spam')
Note
Exceptions raised while postprocessing a result will not be processed through these callbacks.
message_acknowledgement
¶
These callbacks are intended to acknowledge that a message has been received and should not be made available to other consumers. They run after a message and its result(s) have been fully processed.
app = Application('name')
@app.message_acknowledgement
async def acknowledge_message(application, original_message):
await original_message.acknowledge()
message_preprocessor
¶
These callbacks are called as each message is first received. Any modifications
they make to the message will be reflected in what is passed to callback
for processing.
app = Application('name')
@app.message_preprocessor
async def add_process_id(application, message):
message['pid'] = os.getpid()
return message
result_postprocessor
¶
These callbacks will operate on the result(s) of callback
. Each callback is
applied to each result.
app = Application('name')
@app.result_postprocessor
async def store_result(application, result):
with open('/tmp/result', 'w') as f:
f.write(result)
startup
¶
These callbacks will run as an application is starting.
app = Application('name')
@app.startup
async def connect_to_database(application):
await db.connect(application.settings['DB_HOST'])
teardown
¶
These callbacks will run as an application is shutting down.
app = Application('name')
@app.teardown
async def disconnect_from_database(application):
await db.close()