ActorBase
This class should not be initialized directly.
- class gridworks.actor_base.ActorBase(settings, api_type_maker_by_name={'base.g.node.gt': <class 'gridworks.types.base_g_node_gt.BaseGNodeGt_Maker'>, 'g.node.gt': <class 'gridworks.types.g_node_gt.GNodeGt_Maker'>, 'g.node.instance.gt': <class 'gridworks.types.g_node_instance_gt.GNodeInstanceGt_Maker'>, 'gw.cert.id': <class 'gridworks.types.gw_cert_id.GwCertId_Maker'>, 'heartbeat.a': <class 'gridworks.types.heartbeat_a.HeartbeatA_Maker'>, 'ready': <class 'gridworks.types.ready.Ready_Maker'>, 'sim.timestep': <class 'gridworks.types.sim_timestep.SimTimestep_Maker'>, 'super.starter': <class 'gridworks.types.super_starter.SuperStarter_Maker'>, 'supervisor.container.gt': <class 'gridworks.types.supervisor_container_gt.SupervisorContainerGt_Maker'>})
This is the base class for GNodes, used to communicate via RabbitMQ
- Parameters:
settings (GNodeSettings) –
api_type_maker_by_name (Dict[str, HeartbeatA_Maker]) –
- acknowledge_message(delivery_tag)
Acknowledge the message delivery from RabbitMQ by sending a Basic.Ack RPC method for the delivery tag. :param int delivery_tag: The delivery tag from the Basic.Deliver frame
- Return type:
None
- add_on_cancel_consumer_callback()
Add a callback that will be invoked if RabbitMQ cancels the consumer for some reason. If RabbitMQ does cancel the consumer, on_consumer_cancelled will be invoked by pika.
- Return type:
None
- add_on_consume_channel_close_callback()
This method tells pika to call the on_consumer_channel_closed method if RabbitMQ unexpectedly closes the channel.
- Return type:
None
- add_on_publish_channel_close_callback()
This method tells pika to call the on_channel_closed method if RabbitMQ unexpectedly closes the channel.
- Return type:
None
- close_consumer_channel()
Call to close the channel with RabbitMQ cleanly by issuing the Channel.Close RPC command.
- Return type:
None
- close_publish_channel()
Invoke this command to close the channel with RabbitMQ by sending the Channel.Close RPC command.
- Return type:
None
- close_publish_connection()
This method closes the production connection to RabbitMQ.
- Return type:
None
- connect_consumer()
This method connects to RabbitMQ, returning the connection handle. When the connection is established, the on_consumer_connection_open method will be invoked by pika. :rtype: pika.SelectConnection
- Return type:
SelectConnection
- connect_publisher()
This method connects to RabbitMQ, returning the connection handle. When the connection is established, the on_connection_open method will be invoked by pika. :rtype: pika.SelectConnection
- Return type:
SelectConnection
- from_alias_from_routing_key(routing_key)
Returns the GNodeAlias in left-right-dot format. Raises a SchemaError if there is trouble getting this. :param routing_key: This is the basic_deliver.routing_key string :type routing_key: str :param in a rabbit message:
- Parameters:
routing_key (str) –
- Return type:
str
- from_role_from_routing_key(routing_key)
Returns the GNodeRole that the message came from. Raises a SchemaError if there is trouble getting this. :param routing_key: basic_deliver.routing_key string :type routing_key: str :param in a rabbit message:
- Raises:
SchemaError – Error in message construction
- Returns:
GNodeRole of the actor that sent the message
- Return type:
- Parameters:
routing_key (str) –
- get_payload_type_name(basic_deliver)
- The TypeName is a string that provides the strongly typed specification
(API/ABI) for the incoming message. This is similar to knowing the protobuf name/method or the ABI name/method.
The TypeName will articulate, in particular, how to decode the payload.
- Parameters:
basic_deliver – the rabbit basic_deliver object
body – the rabbit body object (i.e. the payload as incoming type)
- Returns:
raises SchemaError if the TypeName is not accessible. Otherwise returns the TypeName
- Return type:
str
- local_rabbit_startup()
This should be overwritten in derived class for any additional rabbit bindings. DO NOT start queues here. It is called at the end of self.start_consuming()
- Return type:
None
- local_start()
This should be overwritten in derived class for additional threads. It cannot assume the rabbit channels are established and that messages can be received or sent.
- Return type:
None
- local_stop()
Join any threads in the derived class.
- Return type:
None
- message_category_from_routing_key(routing_key)
Returns the MessageCategory of the message given the routing key.
Raises a SchemaError exception if there is a problem decoding the MessageCategory
- Parameters:
routing_key (str) – This is the basic_deliver.routing_key string
message (in a rabbit) –
- Returns:
the MessageCategory, as enum
- Return type:
- on_basic_qos_ok(_unused_frame)
Invoked by pika when the Basic.QoS method has completed. At this point we will start consuming messages by calling start_consuming which will invoke the needed RPC commands to start the process. :param pika.frame.Method _unused_frame: The Basic.QosOk response frame
- Return type:
None
- on_cancelconsumer_ok(_unused_frame, userdata)
This method is invoked by pika when RabbitMQ acknowledges the cancellation of a consumer. At this point we will close the channel. This will invoke the on_consumer_channel_closed method once the channel has been closed, which will in-turn close the connection. :param pika.frame.Method _unused_frame: The Basic.CancelOk frame :param str|unicode userdata: Extra user data (consumer tag)
- Return type:
None
- on_consumer_cancelled(method_frame)
Invoked by pika when RabbitMQ sends a Basic.Cancel for a consumer receiving messages. :param pika.frame.Method method_frame: The Basic.Cancel frame
- Return type:
None
- on_consumer_channel_closed(channel, reason)
Invoked by pika when RabbitMQ unexpectedly closes the channel. Channels are usually closed if you attempt to do something that violates the protocol, such as re-declare an exchange or queue with different parameters. In this case, we’ll close the connection to shutdown the object. :param pika.channel.Channel: The closed channel :param Exception reason: why the channel was closed
- Return type:
None
- on_consumer_channel_open(channel)
This method is invoked by pika when the channel has been opened. The channel object is passed in so we can make use of it. Since the channel is now open, we’ll declare the exchange to use. :param pika.channel.Channel channel: The channel object
- Return type:
None
- on_consumer_connection_closed(_unused_connection, reason)
This method is invoked by pika when the connection to RabbitMQ is closed unexpectedly. Since it is unexpected, we will reconnect to RabbitMQ if it disconnects. :param pika.connection.Connection connection: The closed connection obj :param Exception reason: exception representing reason for loss of connection.
- Parameters:
_unused_connection (SelectConnection) –
reason (Exception) –
- Return type:
None
- on_consumer_connection_open(_unused_connection)
This method is called by pika once the connection to RabbitMQ has been established. It passes the handle to the connection object in case we need it, but in this case, we’ll just mark it unused. :param pika.SelectConnection _unused_connection: The connection
- Parameters:
_unused_connection (SelectConnection) –
- Return type:
None
- on_consumer_connection_open_error(_unused_connection, err)
This method is called by pika if the connection to RabbitMQ can’t be established. :param pika.SelectConnection _unused_connection: The connection :param Exception err: The error
- Parameters:
_unused_connection (SelectConnection) –
err (Exception) –
- Return type:
None
- on_direct_message_bindok(_unused_frame, binding)
Invoked by pika when the Queue.Bind method has completed for direct messages. At this point we will set the prefetch count for the channel. :param pika.frame.Method _unused_frame: The Queue.BindOk response frame :param str|unicode userdata: Extra user data (queue name)
- Return type:
None
- on_exchange_declareok(_unused_frame, userdata)
Invoked by pika when RabbitMQ has finished the Exchange.Declare RPC command. :param pika.Frame.Method unused_frame: Exchange.DeclareOk response frame :param str|unicode userdata: Extra user data (exchange name)
- Return type:
None
- on_message(_unused_channel, basic_deliver, properties, body)
Invoked by pika when a message is delivered from RabbitMQ. If a message does not get here that you expect should get here, check the routing key of the outbound message and the rabbitmq bindings.
Parses the TypeName of the message payload and the GNodeAlias of the sender. If it recognizes the GNode and the TypeName then it sends the message on to the check_routing function, which will be defined in a child class (e.g., the GNodeFactoryActorBase if the actor is a GNodeFactory).
From RabbitMQ: The channel is passed for your convenience. The basic_deliver object that is passed in carries the exchange, routing key, delivery tag and a redelivered flag for the message. The properties passed in is an instance of BasicProperties with the message properties and the body is the message that was sent. :param pika.channel.Channel _unused_channel: The channel object :param pika.Spec.Basic.Deliver: basic_deliver method :param pika.Spec.BasicProperties: properties :param bytes body: The message body
- Return type:
None
- on_publish_channel_closed(channel, reason)
Invoked by pika when RabbitMQ unexpectedly closes the channel. Channels are usually closed if you attempt to do something that violates the protocol, such as re-declare an exchange or queue with different parameters. In this case, we’ll close the connection to shutdown the object. :param pika.channel.Channel channel: The closed channel :param Exception reason: why the channel was closed
- Return type:
None
- on_publish_channel_open(channel)
This method is invoked by pika when the channel has been opened. The channel object is passed in so we can make use of it. Since the channel is now open, we’ll declare the exchange to use. :param pika.channel.Channel channel: The channel object
- Return type:
None
- on_publish_connection_closed(_unused_connection, reason)
This method is invoked by pika when the connection to RabbitMQ is closed unexpectedly. Since it is unexpected, we will reconnect to RabbitMQ if it disconnects. :param pika.connection.Connection connection: The closed connection obj :param Exception reason: exception representing reason for loss of connection.
- Return type:
None
- on_publish_connection_open(_unused_connection)
This method is called by pika once the publisher connection to RabbitMQ has been established. It passes the handle to the connection object in case we need it, but in this case, we’ll just mark it unused. :param pika.SelectConnection _unused_connection: The connection
- Return type:
None
- on_publish_connection_open_error(_unused_connection, err)
This method is called by pika if the connection to RabbitMQ can’t be established. :param pika.SelectConnection _unused_connection: The connection :param Exception err: The error
- Return type:
None
- on_queue_declareok(_unused_frame)
Method invoked by pika when the Queue.Declare RPC call made in setup_queue has completed. In this method we will bind the queue and exchange together with the routing key by issuing the Queue.Bind RPC command. When this command is complete, the on_bindok method will be invoked by pika. :param pika.frame.Method _unused_frame: The Queue.DeclareOk frame :param str|unicode userdata: Extra user data (queue name)
- Return type:
None
- open_consume_channel()
Open a new channel with RabbitMQ by issuing the Channel.Open RPC command. When RabbitMQ responds that the channel is open, the on_channel_open callback will be invoked by pika.
- Return type:
None
- open_publish_channel()
This method will open a new channel with RabbitMQ by issuing the Channel.Open RPC command. When RabbitMQ confirms the channel is open by sending the Channel.OpenOK RPC reply, the on_channel_open method will be invoked.
- Return type:
None
- abstract prepare_for_death()
Use actor_main_stopped to exit out of any threads in the derived class. Then use local_stop to join those threads.
If there are no threads in the derived class, copy this method into the derived class, get rid of the abstractmethod decorator, and delete the exception
- Return type:
None
- reconnect_consumer()
Will be invoked if the connection can’t be opened or is closed. Indicates that a reconnect is necessary then stops the ioloop.
- Return type:
None
- route_message(from_alias, from_role, payload)
This router should be overwritten by derived class based on the messages received by that GNodeRole
- Parameters:
from_alias (str) –
from_role (GNodeRole) –
payload (SimTimestep) –
- Return type:
None
- route_mqtt_message(from_alias, payload)
This router should be overwritten by derived class
- Parameters:
from_alias (str) –
payload (HeartbeatA) –
- Return type:
None
- run_consumer()
Run the example consumer by connecting to RabbitMQ and then starting the IOLoop to block and allow the SelectConnection to operate.
- Return type:
None
- run_publisher()
Run the example code by connecting and then starting the IOLoop.
- Return type:
None
- send_message(payload, message_category=MessageCategory.RabbitJsonDirect, to_role=None, to_g_node_alias=None, radio_channel=None)
Publish a direct message to another GNode in the registry world. The only type of direct messages in the registry use json (i.e. no more streamlined serial encoding), unlike in non-registry worlds.
- Parameters:
payload (HeartbeatA) – Any GridWorks types with a json content-type
key (that includes TypeName as a json) –
as_type() (and has) –
method. (as an encoding) –
routing_key_type – for creating routing key
to_role (Optional[GNodeRole]) – used if a direct message
to_g_node_alias (str) – used if a direct message
message_category (MessageCategory) –
radio_channel (str | None) –
- Returns:
MESSAGE_SENT with success, otherwise some description of why the message was not sent.
- Return type:
OnSendMessageDiagnostic
- set_qos()
This method sets up the consumer prefetch to only be delivered one message at a time. The consumer must acknowledge this message before RabbitMQ will deliver another one. You should experiment with different prefetch values to achieve desired performance.
- Return type:
None
- setup_exchange()
Setup the exchange on RabbitMQ by invoking the Exchange.Declare RPC command. When it is complete, the on_exchange_declareok method will be invoked by pika. :param str|unicode exchange_name: The name of the exchange to declare
- Return type:
None
- setup_queue()
Setup the queue on RabbitMQ by invoking the Queue.Declare RPC command. When it is complete, the on_queue_declareok method will be invoked by pika. :param str|unicode queue_name: The name of the queue to declare.
- Return type:
None
- start_consuming()
This method sets up the consumer by first calling add_on_cancel_consumer_callback so that the object is notified if RabbitMQ cancels the consumer. It then issues the Basic.Consume RPC command which returns the consumer tag that is used to uniquely identify the consumer with RabbitMQ. We keep the value to use it when we want to cancel consuming. The on_message method is passed in as a callback pika will invoke when a message is fully received.
- Return type:
None
- stop_consumer()
Cleanly shutdown the connection to RabbitMQ by stopping the consumer with RabbitMQ. When RabbitMQ confirms the cancellation, on_cancelconsumer_ok will be invoked by pika, which will then closing the channel and connection. If you want to use this with CTRL-C, figure out how to add back the commented out ioloop.start() below without error.
- Return type:
None
- stop_consuming()
Tell RabbitMQ that you would like to stop consuming by sending the Basic.Cancel RPC command.
- Return type:
None
- stop_publisher()
Stop the example by closing the channel and connection. We set a flag here so that we stop scheduling new messages to be published. The IOLoop is started because this method is invoked by the Try/Catch below when KeyboardInterrupt is caught. Starting the IOLoop again will allow the publisher to cleanly disconnect from RabbitMQ.
- Return type:
None
- type_name_from_routing_key(routing_key)
Returns the TypeName of the message given the routing key. Raises a SchemaError exception if there is a problem decoding the TypeName, or if it does not have the appropriate left-right-dot format.
- Parameters:
routing_key (str) – This is the basic_deliver.routing_key string
message (in a rabbit) –
- Returns:
the TypeName of the payload, in Lrd format
- Return type:
str