Source code for edge_st_sdk.aws.aws_client

################################################################################
# COPYRIGHT(c) 2018 STMicroelectronics                                         #
#                                                                              #
# Redistribution and use in source and binary forms, with or without           #
# modification, are permitted provided that the following conditions are met:  #
#   1. Redistributions of source code must retain the above copyright notice,  #
#      this list of conditions and the following disclaimer.                   #
#   2. Redistributions in binary form must reproduce the above copyright       #
#      notice, this list of conditions and the following disclaimer in the     #
#      documentation and/or other materials provided with the distribution.    #
#   3. Neither the name of STMicroelectronics nor the names of its             #
#      contributors may be used to endorse or promote products derived from    #
#      this software without specific prior written permission.                #
#                                                                              #
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"  #
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE    #
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE   #
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE    #
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR          #
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF         #
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS     #
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN      #
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)      #
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE   #
# POSSIBILITY OF SUCH DAMAGE.                                                  #
################################################################################


"""aws_client

The aws_client module represents a client capable of connecting to the Amazon
AWS IoT cloud and performing edge operations through the Greengrass SDK.
"""


# IMPORT

import sys
from concurrent.futures import ThreadPoolExecutor

from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTShadowClient

from edge_st_sdk.utils.python_utils import lock
from edge_st_sdk.edge_client import EdgeClient
from edge_st_sdk.edge_client import EdgeClientStatus
import edge_st_sdk.aws.aws_greengrass
from edge_st_sdk.utils.edge_st_exceptions import EdgeSTInvalidOperationException


# CLASSES

[docs]class AWSClient(EdgeClient): """Class responsible for handling an Amazon AWS client used for plain MQTT communication with AWS IoT.""" _TIMEOUT_s = 10 """Timeout for discovering information.""" _NUMBER_OF_THREADS = 5 """Number of threads to be used to notify the listeners."""
[docs] def __init__(self, client_name, device_certificate_path, \ device_private_key_path, group_ca_path, core_info): """Constructor. AWSClient has to be instantiated through a call to the :meth:`edge_st_sdk.aws.aws_greengrass.AWSGreengrass.get_client` method. :param client_name: Name of the client, as it is on the cloud. :type client_name: str :param device_certificate_path: Relative path of the device's certificate stored on the core device. :type device_certificate_path: str :param device_private_key_path: Relative path of the device's private key stored on the core device. :type device_private_key_path: str :param group_ca_path: Relative path of the certification authority's certificate stored on the core device. :type group_ca_path: str :param core_info: Information related to the core of the group to which the client belongs. :type core_info: list :raises EdgeSTInvalidOperationException: is raised if the discovery of the core has not been completed yet, i.e. if the AWSClient has not been instantiated through a call to the :meth:`edge_st_sdk.aws.aws_greengrass.AWSGreengrass.get_client` method. """ self._status = EdgeClientStatus.INIT """Status.""" self._thread_pool = ThreadPoolExecutor(AWSClient._NUMBER_OF_THREADS) """Pool of thread used to notify the listeners.""" self._listeners = [] """List of listeners to the feature changes. It is a thread safe list, so a listener can subscribe itself through a callback.""" # Check the client is created with the right pattern (Builder). if not edge_st_sdk.aws.aws_greengrass.AWSGreengrass.discovery_completed(): raise EdgeSTInvalidOperationException('Amazon AWS clients must be ' \ 'obtained through a call to the \'get_client()\' method of an ' \ '\'AWSGreengrass\' object.') # Saving informations. self._connected = False self._client_name = client_name self._core_info = core_info # Creating a shadow client. self._shadow_client = AWSIoTMQTTShadowClient(client_name) self._shadow_client.configureCredentials( group_ca_path, device_private_key_path, device_certificate_path) # Getting the underneath client and configurint it. self._client = self._shadow_client.getMQTTConnection() self._client.configureOfflinePublishQueueing(-1) # Infinite queueing. self._client.configureDrainingFrequency(2) # Draining: 2 Hz. # Creating a shadow handler with persistent subscription. self._shadow_handler = self._shadow_client.createShadowHandlerWithName( self._client_name, True) # Updating client. self._update_status(EdgeClientStatus.IDLE)
[docs] def get_name(self): """Get the client name. :returns: The client name, i.e. the name of the client. :rtype: str """ return self._client_name
[docs] def connect(self): """Connect to the core. :returns: True if the connection was successful, False otherwise. :rtype: bool """ # Updating client. self._update_status(EdgeClientStatus.CONNECTING) # Connecting. if not self._connected: # Iterate through the connection options for the core and use the # first successful one. for connectivity_info in self._core_info.connectivityInfoList: self._current_host = connectivity_info.host self._current_port = connectivity_info.port self._shadow_client.configureEndpoint( self._current_host, self._current_port) self._shadow_client.configureAutoReconnectBackoffTime(1, 32, 20) self._shadow_client.configureConnectDisconnectTimeout( self._TIMEOUT_s) self._shadow_client.configureMQTTOperationTimeout( self._TIMEOUT_s / 2.0) try: self._shadow_client.connect() self._connected = True break except BaseException as e: self._connected = False if self._connected: self._update_status(EdgeClientStatus.CONNECTED) else: self._update_status(EdgeClientStatus.UNREACHABLE) return self._connected
[docs] def disconnect(self): """Disconnect from the core.""" # Updating client. self._update_status(EdgeClientStatus.DISCONNECTING) # Disconnecting. if self._connected: self._shadow_client.disconnect() self._connected = False # Updating client. self._update_status(EdgeClientStatus.DISCONNECTED)
[docs] def publish(self, topic, payload, qos): """Publish a new message to the desired topic with the given quality of service. :param topic: Topic name to publish to. :type topic: str :param payload: Payload to publish (JSON formatted string). :type payload: str :param qos: Quality of Service. Could be "0" or "1". :type qos: int """ if self._connected: self._client.publish(topic, payload, qos)
[docs] def subscribe(self, topic, qos, callback): """Subscribe to the desired topic with the given quality of service and register a callback to handle the published messages. :param topic: Topic name to publish to. :type topic: str :param qos: Quality of Service. Could be "0" or "1". :type qos: int :param callback: Function to be called when a new message for the subscribed topic comes in. """ if self._connected: self._client.subscribe(topic, qos, callback)
[docs] def unsubscribe(self, topic): """Unsubscribe to the desired topic. :param topic: Topic name to unsubscribe to. :type topic: str """ if self._connected: self._client.unsubscribe(topic)
[docs] def get_shadow_state(self, callback, timeout_s): """Get the state of the shadow client. Retrieve the device shadow JSON document from the cloud by publishing an empty JSON document to the corresponding shadow topics. :param callback: Function to be called when the response for a shadow request comes back. :param timeout_s: Timeout in seconds to perform the request. :type timeout_s: int """ if self._connected: self._shadow_handler.shadowGet(callback, timeout_s)
[docs] def update_shadow_state(self, payload, callback, timeout_s): """Update the state of the shadow client. Update the device shadow JSON document string on the cloud by publishing the provided JSON document to the corresponding shadow topics. :param payload: JSON document string used to update the shadow JSON document on the cloud. :type payload: json :param callback: Function to be called when the response for a shadow request comes back. :param timeout_s: Timeout in seconds to perform the request. :type timeout_s: int """ if self._connected: self._shadow_handler.shadowUpdate(payload, callback, timeout_s)
[docs] def delete_shadow_state(self, callback, timeout_s): """Delete the state of the shadow client. Delete the device shadow from the cloud by publishing an empty JSON document to the corresponding shadow topics. :param callback: Function to be called when the response for a shadow request comes back. :param timeout_s: Timeout in seconds to perform the request. :type timeout_s: int """ if self._connected: self._shadow_handler.shadowDelete(callback, timeout_s)
[docs] def add_listener(self, listener): """Add a listener. :param listener: Listener to be added. :type listener: :class:`edge_st_sdk.edge_client.EdgeClientListener` """ if listener is not None: with lock(self): if not listener in self._listeners: self._listeners.append(listener)
[docs] def remove_listener(self, listener): """Remove a listener. :param listener: Listener to be removed. :type listener: :class:`edge_st_sdk.edge_client.EdgeClientListener` """ if listener is not None: with lock(self): if listener in self._listeners: self._listeners.remove(listener)
def _update_status(self, new_status): """Update the status of the client. :param new_status: New status. :type new_status: :class:`edge_st_sdk.edge_client.EdgeClientStatus` """ old_status = self._status self._status = new_status for listener in self._listeners: # Calling user-defined callback. self._thread_pool.submit( listener.on_status_change( self, new_status.value, old_status.value))