ActorBase

This class should not be initialized directly.

class gridworks.actor_base.ActorBase(settings, api_type_maker_by_name={'base.g.node.gt': <class 'gridworks.types.base_g_node_gt.BaseGNodeGt_Maker'>, 'g.node.gt': <class 'gridworks.types.g_node_gt.GNodeGt_Maker'>, 'g.node.instance.gt': <class 'gridworks.types.g_node_instance_gt.GNodeInstanceGt_Maker'>, 'gw.cert.id': <class 'gridworks.types.gw_cert_id.GwCertId_Maker'>, 'heartbeat.a': <class 'gridworks.types.heartbeat_a.HeartbeatA_Maker'>, 'ready': <class 'gridworks.types.ready.Ready_Maker'>, 'sim.timestep': <class 'gridworks.types.sim_timestep.SimTimestep_Maker'>, 'super.starter': <class 'gridworks.types.super_starter.SuperStarter_Maker'>, 'supervisor.container.gt': <class 'gridworks.types.supervisor_container_gt.SupervisorContainerGt_Maker'>})

This is the base class for GNodes, used to communicate via RabbitMQ

Parameters:
acknowledge_message(delivery_tag)

Acknowledge the message delivery from RabbitMQ by sending a Basic.Ack RPC method for the delivery tag. :param int delivery_tag: The delivery tag from the Basic.Deliver frame

Return type:

None

add_on_cancel_consumer_callback()

Add a callback that will be invoked if RabbitMQ cancels the consumer for some reason. If RabbitMQ does cancel the consumer, on_consumer_cancelled will be invoked by pika.

Return type:

None

add_on_consume_channel_close_callback()

This method tells pika to call the on_consumer_channel_closed method if RabbitMQ unexpectedly closes the channel.

Return type:

None

add_on_publish_channel_close_callback()

This method tells pika to call the on_channel_closed method if RabbitMQ unexpectedly closes the channel.

Return type:

None

close_consumer_channel()

Call to close the channel with RabbitMQ cleanly by issuing the Channel.Close RPC command.

Return type:

None

close_publish_channel()

Invoke this command to close the channel with RabbitMQ by sending the Channel.Close RPC command.

Return type:

None

close_publish_connection()

This method closes the production connection to RabbitMQ.

Return type:

None

connect_consumer()

This method connects to RabbitMQ, returning the connection handle. When the connection is established, the on_consumer_connection_open method will be invoked by pika. :rtype: pika.SelectConnection

Return type:

SelectConnection

connect_publisher()

This method connects to RabbitMQ, returning the connection handle. When the connection is established, the on_connection_open method will be invoked by pika. :rtype: pika.SelectConnection

Return type:

SelectConnection

from_alias_from_routing_key(routing_key)

Returns the GNodeAlias in left-right-dot format. Raises a SchemaError if there is trouble getting this. :param routing_key: This is the basic_deliver.routing_key string :type routing_key: str :param in a rabbit message:

Parameters:

routing_key (str) –

Return type:

str

from_role_from_routing_key(routing_key)

Returns the GNodeRole that the message came from. Raises a SchemaError if there is trouble getting this. :param routing_key: basic_deliver.routing_key string :type routing_key: str :param in a rabbit message:

Raises:

SchemaError – Error in message construction

Returns:

GNodeRole of the actor that sent the message

Return type:

GNodeRole

Parameters:

routing_key (str) –

get_payload_type_name(basic_deliver)
The TypeName is a string that provides the strongly typed specification

(API/ABI) for the incoming message. This is similar to knowing the protobuf name/method or the ABI name/method.

The TypeName will articulate, in particular, how to decode the payload.

Parameters:
  • basic_deliver – the rabbit basic_deliver object

  • body – the rabbit body object (i.e. the payload as incoming type)

Returns:

raises SchemaError if the TypeName is not accessible. Otherwise returns the TypeName

Return type:

str

heartbeat_from_super(from_alias, ping)

Subordinate GNode responds to its supervisor’s heartbeat with a “pong” message.

Both the received heartbeat (ping) and the response (pong) have the type HeartbeatA (see: https://gridworks.readthedocs.io/en/latest/apis/types.html#heartbeata).

The subordinate GNode generates its own unique identifier (hex) and includes it in the pong message along with the heartbeat it received from the supervisor.

Note that the subordinate GNode does not have the responsibility of verifying the authenticity of the last heartbeat received from the supervisor - although typically, the supervisor does send the last heartbeat from this GNode (except during the initial heartbeat exchange).

Parameters:
  • from_alias (str) – the alias of the GNode that sent the ping.

  • ping (HeartbeatA) – the heartbeat sent.

Return type:

None

Raises: ValueError: If from_alias is not this GNode’s Supervisor alias.

local_rabbit_startup()

This should be overwritten in derived class for any additional rabbit bindings. DO NOT start queues here. It is called at the end of self.start_consuming()

Return type:

None

local_start()

This should be overwritten in derived class for additional threads. It cannot assume the rabbit channels are established and that messages can be received or sent.

Return type:

None

local_stop()

Join any threads in the derived class.

Return type:

None

message_category_from_routing_key(routing_key)

Returns the MessageCategory of the message given the routing key.

Raises a SchemaError exception if there is a problem decoding the MessageCategory

Parameters:
  • routing_key (str) – This is the basic_deliver.routing_key string

  • message (in a rabbit) –

Returns:

the MessageCategory, as enum

Return type:

MessageCategory

on_basic_qos_ok(_unused_frame)

Invoked by pika when the Basic.QoS method has completed. At this point we will start consuming messages by calling start_consuming which will invoke the needed RPC commands to start the process. :param pika.frame.Method _unused_frame: The Basic.QosOk response frame

Return type:

None

on_cancelconsumer_ok(_unused_frame, userdata)

This method is invoked by pika when RabbitMQ acknowledges the cancellation of a consumer. At this point we will close the channel. This will invoke the on_consumer_channel_closed method once the channel has been closed, which will in-turn close the connection. :param pika.frame.Method _unused_frame: The Basic.CancelOk frame :param str|unicode userdata: Extra user data (consumer tag)

Return type:

None

on_consumer_cancelled(method_frame)

Invoked by pika when RabbitMQ sends a Basic.Cancel for a consumer receiving messages. :param pika.frame.Method method_frame: The Basic.Cancel frame

Return type:

None

on_consumer_channel_closed(channel, reason)

Invoked by pika when the RabbitMQ channel is unexpectedly closed.

This callback is triggered when a channel is closed, usually due to violating the protocol by attempting to re-declare an exchange or queue with different parameters. In this case, the connection is closed to gracefully shutdown the object.

Parameters:
  • channel (PikaChannel) – The closed channel object.

  • reason (Exception) – why the channel was closed

Return type:

None

on_consumer_channel_open(channel)

Invoked by pika when the channel has been successfully opened.

This callback is triggered when the channel is opened, and it provides the channel object that can be used for further operations. In this case, we’ll proceed to declare the exchange to be used.

Parameters:

channel (PikaChannel) – The opened channel object.

Return type:

None

on_consumer_connection_closed(_unused_connection, reason)

This method is invoked by pika when the connection to RabbitMQ is closed unexpectedly. Since it is unexpected, we will reconnect to RabbitMQ if it disconnects. :param pika.connection.Connection connection: The closed connection obj :param Exception reason: exception representing reason for loss of connection.

Parameters:
  • _unused_connection (SelectConnection) –

  • reason (Exception) –

Return type:

None

on_consumer_connection_open(_unused_connection)

This method is called by pika once the connection to RabbitMQ has been established. It passes the handle to the connection object in case we need it, but in this case, we’ll just mark it unused. :param pika.SelectConnection _unused_connection: The connection

Parameters:

_unused_connection (SelectConnection) –

Return type:

None

on_consumer_connection_open_error(_unused_connection, err)

This method is called by pika if the connection to RabbitMQ can’t be established. :param pika.SelectConnection _unused_connection: The connection :param Exception err: The error

Parameters:
  • _unused_connection (SelectConnection) –

  • err (Exception) –

Return type:

None

on_direct_message_bindok(_unused_frame, binding)

Invoked by pika when the Queue.Bind method has completed for direct messages. At this point we will set the prefetch count for the channel. :param pika.frame.Method _unused_frame: The Queue.BindOk response frame :param str|unicode userdata: Extra user data (queue name)

Return type:

None

on_exchange_declareok(_unused_frame, userdata)

Invoked by pika when RabbitMQ has finished the Exchange.Declare RPC command. :param pika.Frame.Method unused_frame: Exchange.DeclareOk response frame :param str|unicode userdata: Extra user data (exchange name)

Return type:

None

on_message(_unused_channel, basic_deliver, properties, body)

Invoked by pika when a message is delivered from RabbitMQ. If a message does not get here that you expect should get here, check the routing key of the outbound message and the rabbitmq bindings.

Parses the TypeName of the message payload and the GNodeAlias of the sender. If it recognizes the GNode and the TypeName, then it sends the message on to the check_routing function, which will be defined in a child class (e.g., the GNodeFactoryActorBase if the actor is a GNodeFactory).

From RabbitMQ: The channel is passed for your convenience. The basic_deliver object that is passed in carries the exchange, delivery tag, and a redelivered flag for the message. The properties passed in is an instance of BasicProperties with the message properties including the routing key. The body is the message that was sent.

Parameters:
  • _unused_channel (pika.channel.Channel) – The channel object

  • basic_deliver (pika.spec.Basic.Deliver) – The basic.deliver method

  • properties (pika.spec.BasicProperties) – The message properties including the routing key

  • body (bytes) – The message body

Return type:

None

on_publish_channel_closed(channel, reason)

Invoked by pika when the RabbitMQ channel is unexpectedly closed.

This callback is triggered when a channel is closed, usually due to violating the protocol by attempting to re-declare an exchange or queue with different parameters. In this case, the connection is closed to gracefully shutdown the object.

Parameters:
  • channel (PikaChannel) – The closed channel object.

  • reason (Exception) – The reason why the channel was closed.

Return type:

None

on_publish_channel_open(channel)

Invoked by pika when the channel has been successfully opened.

This callback is triggered when the channel is opened, and it provides the channel object that can be used for further operations. In this case, we’ll proceed to declare the exchange to be used.

Parameters:

channel (PikaChannel) – The opened channel object.

Return type:

None

on_publish_connection_closed(_unused_connection, reason)

This method is invoked by pika when the connection to RabbitMQ is closed unexpectedly. Since it is unexpected, we will reconnect to RabbitMQ if it disconnects. :param pika.connection.Connection connection: The closed connection obj :param Exception reason: exception representing reason for loss of connection.

Return type:

None

on_publish_connection_open(_unused_connection)

This method is called by pika once the publisher connection to RabbitMQ has been established. It passes the handle to the connection object in case we need it, but in this case, we’ll just mark it unused. :param pika.SelectConnection _unused_connection: The connection

Return type:

None

on_publish_connection_open_error(_unused_connection, err)

This method is called by pika if the connection to RabbitMQ can’t be established. :param pika.SelectConnection _unused_connection: The connection :param Exception err: The error

Return type:

None

on_queue_declareok(_unused_frame)

Method invoked by pika when the Queue.Declare RPC call made in setup_queue has completed. In this method we will bind the queue and exchange together with the routing key by issuing the Queue.Bind RPC command. When this command is complete, the on_bindok method will be invoked by pika. :param pika.frame.Method _unused_frame: The Queue.DeclareOk frame :param str|unicode userdata: Extra user data (queue name)

Return type:

None

open_consume_channel()

Open a new channel with RabbitMQ by issuing the Channel.Open RPC command. When RabbitMQ responds that the channel is open, the on_channel_open callback will be invoked by pika.

Return type:

None

open_publish_channel()

This method will open a new channel with RabbitMQ by issuing the Channel.Open RPC command. When RabbitMQ confirms the channel is open by sending the Channel.OpenOK RPC reply, the on_channel_open method will be invoked.

Return type:

None

abstract prepare_for_death()

Use actor_main_stopped to exit out of any threads in the derived class. Then use local_stop to join those threads.

If there are no threads in the derived class, copy this method into the derived class, get rid of the abstractmethod decorator, and delete the exception

Return type:

None

reconnect_consumer()

Will be invoked if the connection can’t be opened or is closed. Indicates that a reconnect is necessary then stops the ioloop.

Return type:

None

route_message(from_alias, from_role, payload)

Base class for message routing in GNode Actors, handling interactions with the Supervisor and TimeCoordinator.

Derived classes are expected to implement their own route_message method. It is recommended to call super().route_message(from_alias, from_role, payload) at the end of the method if the message has not been routed yet.

Parameters:
Return type:

None

route_mqtt_message(from_alias, payload)

Base class for message routing from SCADA actors, which use MessageCategory.MqttJsonBroadcast

This is only intended to be used for AtomicTNodes and Ears.

Parameters:
Return type:

None

run_consumer()

Run the example consumer by connecting to RabbitMQ and then starting the IOLoop to block and allow the SelectConnection to operate.

Return type:

None

run_publisher()

Run the example code by connecting and then starting the IOLoop.

Return type:

None

send_message(payload, message_category=MessageCategory.RabbitJsonDirect, to_role=None, to_g_node_alias=None, radio_channel=None)

Publish a direct message to another GNode in the registry world. The only type of direct messages in the registry use json (i.e. no more streamlined serial encoding), unlike in non-registry worlds.

Parameters:
  • payload (HeartbeatA) – Any GridWorks types with a json content-type

  • key (that includes TypeName as a json) –

  • as_type() (and has) –

  • method. (as an encoding) –

  • routing_key_type – for creating routing key

  • to_role (Optional[GNodeRole]) – used if a direct message

  • to_g_node_alias (str) – used if a direct message

  • message_category (MessageCategory) –

  • radio_channel (str | None) –

Returns:

MESSAGE_SENT with success, otherwise some description of why the message was not sent.

Return type:

OnSendMessageDiagnostic

set_qos()

This method sets up the consumer prefetch to only be delivered one message at a time. The consumer must acknowledge this message before RabbitMQ will deliver another one. You should experiment with different prefetch values to achieve desired performance.

Return type:

None

setup_exchange()

Setup the exchange on RabbitMQ by invoking the Exchange.Declare RPC command. When it is complete, the on_exchange_declareok method will be invoked by pika. :param str|unicode exchange_name: The name of the exchange to declare

Return type:

None

setup_queue()

Setup the queue on RabbitMQ by invoking the Queue.Declare RPC command. When it is complete, the on_queue_declareok method will be invoked by pika. :param str|unicode queue_name: The name of the queue to declare.

Return type:

None

start_consuming()

This method sets up the consumer by first calling add_on_cancel_consumer_callback so that the object is notified if RabbitMQ cancels the consumer. It then issues the Basic.Consume RPC command which returns the consumer tag that is used to uniquely identify the consumer with RabbitMQ. We keep the value to use it when we want to cancel consuming. The on_message method is passed in as a callback pika will invoke when a message is fully received.

Return type:

None

stop_consumer()

Cleanly shutdown the connection to RabbitMQ by stopping the consumer with RabbitMQ. When RabbitMQ confirms the cancellation, on_cancelconsumer_ok will be invoked by pika, which will then closing the channel and connection. If you want to use this with CTRL-C, figure out how to add back the commented out ioloop.start() below without error.

Return type:

None

stop_consuming()

Tell RabbitMQ that you would like to stop consuming by sending the Basic.Cancel RPC command.

Return type:

None

stop_publisher()

Stop the example by closing the channel and connection. We set a flag here so that we stop scheduling new messages to be published. The IOLoop is started because this method is invoked by the Try/Catch below when KeyboardInterrupt is caught. Starting the IOLoop again will allow the publisher to cleanly disconnect from RabbitMQ.

Return type:

None

type_name_from_routing_key(routing_key)

Returns the TypeName of the message given the routing key. Raises a SchemaError exception if there is a problem decoding the TypeName, or if it does not have the appropriate left-right-dot format.

Parameters:
  • routing_key (str) – This is the basic_deliver.routing_key string

  • message (in a rabbit) –

Returns:

the TypeName of the payload, in Lrd format

Return type:

str