qpid_bow package

Submodules

qpid_bow.asyncio module

class qpid_bow.asyncio.AsyncioReactorHandler(loop=None, handler_base=None)

Bases: object

Qpid Proton Reactor Global Loop Handler for Python asyncio.

This implementation will setup Qpid Proton’s Selectables to use asyncio’s writable/readable event handling.

Based on Tornado implementation: https://qpid.apache.org/releases/qpid-proton-0.18.1/proton/python/examples/proton_tornado.py.html

Parameters:
  • loop – An asyncio event loop
  • handler_base – An IO Handler
on_reactor_init(event)
on_reactor_quiesced(event)
on_selectable_final(event)
on_selectable_init(event)
on_selectable_updated(event)
on_unhandled(name, event)
class qpid_bow.asyncio.Container(*handlers, **kwargs)

Bases: proton.reactor.Container

Asyncio event loop based Qpid Reactor container.

Parameters:

*handlers – One or more connectors

Keyword Arguments:
 
  • handler_base – An IO Handler.
  • impl – Reactor implementation, default is pn_reactor.
create_receiver(context: Union[proton._endpoints.Connection, proton._endpoints.Session, proton._url.Url, str], source=None, target=None, name=None, dynamic=False, handler=None, options: Union[proton.reactor.LinkOption, List[proton.reactor.LinkOption]] = None) → proton._endpoints.Receiver

Initiate a link to receive messages (subscription).

Parameters:
  • context – One of: created session, connection with or without established session, or url to create session.
  • source – Source address.
  • target – Target address.
  • name – Name of the link.
  • dynamic – Wether a dynamic AMQP queue should be generated.
  • handler – Custom handler to handle received message.
  • options – LinkOptions to further control the attachment.
Returns:

A Qpid Receiver link over which messages are received.

Return type:

Receiver

create_sender(context: Union[proton._endpoints.Connection, proton._endpoints.Session, proton._url.Url, str], target=None, source=None, name=None, handler=None, tags=None, options: Union[proton.reactor.LinkOption, List[proton.reactor.LinkOption]] = None) → proton._endpoints.Sender

Initiate a link to send messages.

Parameters:
  • context – One of: created session, connection with or without established session, or url to create session.
  • target – Target address.
  • source – Source address.
  • name – Name of the link.
  • handler – Custom handler to handle received message.
  • tags
  • options – LinkOptions to further control the attachment.
Returns:

A Qpid Sender link over which messages are sent.

Return type:

Sender

run()

Start Reactor container and begin processing.

touch()

Instruct the reactor container to do processing.

You might need to call this to startup new sessions. This is already handled for create_receiver and create_sender.

qpid_bow.config module

Configure qpid-bow.

qpid_bow.config.configure(new_config: Mapping)

Updates global config with provided mapping.

Parameters:new_config – Mapping with config data.
qpid_bow.config.get_urls(argument_urls: Optional[str] = None) → List[str]

Retrieves server argument_urls from one of the sources.

The sources priority comes in the following order: passed arguments, global config, AMQP_SERVERS environment variable.

Parameters:argument_urls – Comma-separated argument_urls.
Returns:Returns list of argument_urls to connect to.
Return type:List[str]
qpid_bow.config.process_url(url: str) → str

Processes a URL for usage with Qpid Proton.

  • ActiveMQ amqp+ssl scheme is replaced with amqps.
  • Adds username and password from config.
Parameters:url – Input URL.
Returns:Processed URL.
Return type:str

qpid_bow.exc module

Exceptions.

exception qpid_bow.exc.MessageCorrupt

Bases: Exception

Corrupt.

exception qpid_bow.exc.ObjectNotFound(class_name, object_name)

Bases: Exception

No object found.

exception qpid_bow.exc.QMF2Exception(exception_message: str, exception_data: dict)

Bases: Exception

Generic QMF2 exception.

Parameters:
  • exception_message – Message to identify the reason of exception.
  • exception_data – Additional data with error code and error text.
static from_data(exception_data: dict)

Try to initialise a specific QMF2Exception based on error code.

Parameters:exception_data – Additional data with error code and error text.
exception qpid_bow.exc.QMF2Forbidden(exception_data)

Bases: qpid_bow.exc.QMF2Exception

Forbidden QMF2 call.

Parameters:exception_data – Additional data with error code and error text.
error_code = 6
exception qpid_bow.exc.QMF2InvalidParameter(exception_data)

Bases: qpid_bow.exc.QMF2Exception

Invalid parameter is specified.

Parameters:exception_data – Additional data with error code and error text.
error_code = 4
exception qpid_bow.exc.QMF2NotFound(exception_data)

Bases: qpid_bow.exc.QMF2Exception

QMF2 object is not found.

Parameters:exception_data – Additional data with error code and error text.
error_code = 7
exception qpid_bow.exc.QMF2ObjectExists(exception_data)

Bases: qpid_bow.exc.QMF2Exception

QMF2 object already exists.

Parameters:exception_data – Additional data with error code and error text.
error_code = 7
exception qpid_bow.exc.RetriableMessage

Bases: Exception

Release message back to the queue.

exception qpid_bow.exc.TimeoutReached

Bases: Exception

Timeout is reached.

exception qpid_bow.exc.UnroutableMessage

Bases: Exception

Origin message has no reply-to address.

qpid_bow.message module

Message utility methods.

qpid_bow.message.create_message(body: Union[str, bytes, dict, list], properties: Optional[dict] = None, priority: qpid_bow.Priority = <Priority.normal: 2>) → proton._message.Message

Utility method to create message with common attributes.

Parameters:
  • body – Message body.
  • properties – Message properties.
  • priority – Message priority.
Returns:

Created message.

Return type:

Message

qpid_bow.message.create_reply(origin_message: proton._message.Message, result_data: Union[str, bytes, dict, list]) → proton._message.Message

Create reply to origin message with result data.

Reply messages share the same correlation ID, properties and priority with the exception of being marked as reply.

The address is set to the reply_to address from the origin message for usage in a addressless Sender.

Parameters:
  • origin_message – Origin message we are replying to.
  • result_data – Message body of the reply.
Returns:

Created reply message.

Return type:

Message

qpid_bow.message.decode_message(data: bytes) → proton._message.Message

Utility method to decode message from bytes.

Parameters:data – Raw AMQP data in bytes.
Returns:Decoded message.
Return type:Message

qpid_bow.receiver module

Receive messages from AMQP broker.

class qpid_bow.receiver.Receiver(callback: Union[Callable[proton._message.Message, bool], Callable[[proton._message.Message, proton._delivery.Delivery], bool], Callable[proton._message.Message, Awaitable[bool]], Callable[[proton._message.Message, proton._delivery.Delivery], Awaitable[bool]]], address: Optional[str] = None, server_url: Optional[str] = None, limit: Optional[int] = None, container_class: Type[Any] = <class 'proton.reactor.Container'>, reconnect_strategy: qpid_bow.ReconnectStrategy = <ReconnectStrategy.backoff: <proton.reactor.Backoff object>>)

Bases: qpid_bow.Connector

Callback based AMQP message receiver.

Parameters:
  • callback – Function to call when new message is received.
  • address – Name of queue or exchange from where to receive the messages.
  • server_url – Comma-separated list of urls to connect to. Multiple can be specified for connection fallback, the first should be the primary server.
  • limit – Limit the amount of messages to receive.
  • container_class – Qpid Proton reactor container-class to use.
  • reconnect_strategy – Strategy to use on connection drop.
add_address(address: str)

Start receiving messages from the given additional address.

Parameters:address – Queue or exchange address to receive from.
coroutine handle_async_message(event)
handle_message(event)
on_connection_opened(event: proton._events.EventBase)
on_message(event)

Called when a message is received. The message itself can be obtained as a property on the event. For the purpose of referring to this message in further actions (e.g. if explicitly accepting it, the delivery should be used, also obtainable via a property on the event.

on_start(event)

Handle start event.

Parameters:event – Reactor init event object with container to connect to.
on_timer_task(event: proton._events.EventBase)

Handles the event when a timer is finished.

Parameters:event – Reactor timer task event object.
receive(timeout: Optional[datetime.timedelta] = None)

Start receive loop for up to timeout duration or limit messages.

Parameters:timeout – Timeout duration to wait for message.
remove_address(address: str)

Stop receiving messages from the given address.

Parameters:address – Queue or exchange address to stop receiving from.
stop()

Stop connection to the AMQP server.

qpid_bow.remote_procedure module

Remote procedure call handling.

class qpid_bow.remote_procedure.RemoteProcedure(callback: Union[Callable[proton._message.Message, bool], Callable[[proton._message.Message, proton._delivery.Delivery], bool], Callable[proton._message.Message, Awaitable[bool]], Callable[[proton._message.Message, proton._delivery.Delivery], Awaitable[bool]]], address: str, server_url: Optional[str] = None, reconnect_strategy: qpid_bow.ReconnectStrategy = <ReconnectStrategy.failover: <qpid_bow.NonBackoff object>>)

Bases: qpid_bow.receiver.Receiver

This class can be used to handle a simple RPC pattern,
sending a call message and waiting for a reply on a temporary queue and response handling through callbacks.
Parameters:
  • callback – Function to call when new message is received.
  • address – Address of queue or exchange to send the messages to.
  • server_url – Comma-separated list of urls to connect to. Multiple can be specified for connection fallback, the first should be the primary server.
  • reconnect_strategy – Strategy to use on connection drop.
call(message: proton._message.Message, timeout: Optional[datetime.timedelta] = None)

Send RPC message and wait for call reply. :param message: Message to send to RPC-address. :param timeout: Optional maximum timeout to wait for a reply.

on_message(event)

Called when a message is received. The message itself can be obtained as a property on the event. For the purpose of referring to this message in further actions (e.g. if explicitly accepting it, the delivery should be used, also obtainable via a property on the event.

on_sendable(event)

Called when the sender link has credit and messages can therefore be transferred.

on_start(event)

Handle start event.

Parameters:event – Reactor init event object with container to connect to.
reply_to

Reply to address of our temporary queue.

qpid_bow.sender module

Send messages to AMPQ broker.

class qpid_bow.sender.Sender(address: Optional[str] = None, server_url: Optional[str] = None, reconnect_strategy: qpid_bow.ReconnectStrategy = <ReconnectStrategy.failover: <qpid_bow.NonBackoff object>>)

Bases: qpid_bow.Connector

Class to send messages in a batch to an AMQP address.

Parameters:
  • address – Address of queue or exchange to send the messages to.
  • server_url – Comma-separated list of urls to connect to. Multiple can be specified for connection fallback, the first should be the primary server.
  • reconnect_strategy – Strategy to use on connection drop.
on_sendable(event)

Handles sendable event, sends all the messages in the send_queue.

on_start(event)

Handle start event.

Parameters:event – Reactor init event object with container to connect to.
queue(messages: Iterable[proton._message.Message])

Enqueue messages that will be send on calling send.

send()

Send queued messages.

Module contents

Qpid-bow client framework.

class qpid_bow.Connector(server_url: Optional[str] = None, container_class: Type[proton.reactor.Container] = <class 'proton.reactor.Container'>, reconnect_strategy: qpid_bow.ReconnectStrategy = <ReconnectStrategy.backoff: <proton.reactor.Backoff object>>)

Bases: proton.handlers.MessagingHandler

Initiate and keep connection to AMQP message broker.

Parameters:
  • server_url – Comma-separated list of urls to connect to. Multiple can be specified for connection fallback, the first should be the primary server.
  • container_class – Qpid Proton reactor container-class to use.
  • reconnect_strategy – Strategy to use on connection drop.
on_connection_closed(event: proton._events.EventBase)

Handle close connection event.

Parameters:event – Connection close event.
on_connection_opened(event: proton._events.EventBase)
on_start(event: proton._events.EventBase)

Handle start event.

Parameters:event – Reactor init event object with container to connect to.
on_transport_error(event: proton._events.EventBase)

Called when some error is encountered with the transport over which the AMQP connection is to be established. This includes authentication errors as well as socket errors.

run()

Start this Connector and setup connection to the AMQP server.

stop()

Stop connection to the AMQP server.

touch()

Instruct the reactor container to do processing.

When running with an alternative container, like the AsyncioContainer, you might need to call this to startup new sessions.

coroutine wait_closed()
class qpid_bow.NonBackoff

Bases: proton.reactor.Backoff

next()
class qpid_bow.Priority

Bases: enum.Enum

Convenience enum for message priorities.

Qpid supports a configurable amount of priorities for a queue, be sure to have at least 5.

When used on a message and enabled on a queue Qpid will re-order which get send out to a receiver first based on the priority.

high = 3
internal_low = 0
low = 1
normal = 2
realtime = 4
class qpid_bow.ReconnectStrategy

Bases: enum.Enum

Define possible reconnect strategies.

backoff = <proton.reactor.Backoff object>
disabled = False
failover = <qpid_bow.NonBackoff object>
class qpid_bow.RunState

Bases: enum.Enum

Indicate current state of Connector.

connected = 5
failed = 6
reconnecting = 4
started = 3
stopped = 1
stopping = 2