Source code for edge_st_sdk.aws.aws_greengrass

################################################################################
# COPYRIGHT(c) 2018 STMicroelectronics                                         #
#                                                                              #
# Redistribution and use in source and binary forms, with or without           #
# modification, are permitted provided that the following conditions are met:  #
#   1. Redistributions of source code must retain the above copyright notice,  #
#      this list of conditions and the following disclaimer.                   #
#   2. Redistributions in binary form must reproduce the above copyright       #
#      notice, this list of conditions and the following disclaimer in the     #
#      documentation and/or other materials provided with the distribution.    #
#   3. Neither the name of STMicroelectronics nor the names of its             #
#      contributors may be used to endorse or promote products derived from    #
#      this software without specific prior written permission.                #
#                                                                              #
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"  #
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE    #
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE   #
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE    #
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR          #
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF         #
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS     #
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN      #
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)      #
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE   #
# POSSIBILITY OF SUCH DAMAGE.                                                  #
################################################################################


"""aws_greengrass

The aws_greengrass module is responsible for managing the discovery process of
AWS devices and allocating the needed resources.

"""


# IMPORT

import os
import sys
import uuid
import logging
from abc import ABCMeta
from abc import abstractmethod
from enum import Enum
from concurrent.futures import ThreadPoolExecutor

from AWSIoTPythonSDK.core.greengrass.discovery.providers import DiscoveryInfoProvider
from AWSIoTPythonSDK.core.protocol.connection.cores import ProgressiveBackOffCore
from AWSIoTPythonSDK.exception.AWSIoTExceptions import DiscoveryInvalidRequestException

from edge_st_sdk.utils.python_utils import lock
import edge_st_sdk.aws.aws_client
from edge_st_sdk.utils.edge_st_exceptions import EdgeSTInvalidOperationException
from edge_st_sdk.utils.edge_st_exceptions import EdgeSTInvalidDataException


# CLASSES

[docs]class AWSGreengrass(object): MAX_DISCOVERY_ATTEMPTS = 3 """Maximum number of attempts when trying to discover the core.""" _GROUP_CA_PATH = './aws_group_ca/' """Group Certification Authority path.""" _TIMEOUT_s = 10 """Timeout for discovering information.""" _NUMBER_OF_THREADS = 5 """Number of threads to be used to notify the listeners.""" _discovery_completed = False """Discovery completed flag."""
[docs] def __init__(self, endpoint, root_ca_path): """Constructor. Initializing AWS Discovery. :param endpoint: AWS endpoint. :type endpoint: str :param root_ca_path: Path to the root Certification Authority file. :type root_ca_path: str """ self._status = AWSGreengrassStatus.INIT """Status.""" self._thread_pool = ThreadPoolExecutor(AWSGreengrass._NUMBER_OF_THREADS) """Pool of thread used to notify the listeners.""" self._listeners = [] """List of listeners to the feature changes. It is a thread safe list, so a listener can subscribe itself through a callback.""" self._endpoint = endpoint """AWS endpoint.""" self._root_ca_path = root_ca_path """Path to the root Certification Authority file.""" self._group_ca_path = None """Path to the group Certification Authority file.""" self._core_info = None """Core information.""" # Updating service. self._update_status(AWSGreengrassStatus.IDLE)
def _discover_core(self, client_id, device_certificate_path, device_private_key_path): """Performing the discovery of the core belonging to the same group of the given client name. :param client_id: Name of a client, as it is on the cloud, belonging to the same group of the core. :type client_id: str :param device_certificate_path: Relative path of a device's certificate stored on the core device, belonging to the same group of the core. :type device_certificate_path: str :param device_private_key_path: Relative path of a device's private key stored on the core device, belonging to the same group of the core. :type device_private_key_path: str :returns: The name of the core. :rtype: str :raises EdgeSTInvalidOperationException: is raised if the discovery of the core fails. :raises EdgeSTInvalidDataException: is raised a wrong configuration data is provided. """ # Checking configuration parameters. if not os.access(self._root_ca_path, os.R_OK): msg = '\nRoot Certification Authority certificate path "%s" is not ' \ 'accessible.\r\n' \ 'Please run the application with \"sudo\".' \ % (self._root_ca_path) raise EdgeSTInvalidDataException(msg) if not os.path.exists(device_certificate_path): msg = '\nInvalid device certificate path: "%s"' \ % (device_certificate_path) raise EdgeSTInvalidDataException(msg) if not os.path.exists(device_private_key_path): msg = '\nInvalid device private key path: "%s"' \ % (device_private_key_path) raise EdgeSTInvalidDataException(msg) # Updating service. self._update_status(AWSGreengrassStatus.DISCOVERING_CORE) # Progressive back off core. backOffCore = ProgressiveBackOffCore() # Discover GGCs. discoveryInfoProvider = DiscoveryInfoProvider() discoveryInfoProvider.configureEndpoint(self._endpoint) discoveryInfoProvider.configureCredentials( self._root_ca_path, device_certificate_path, device_private_key_path) discoveryInfoProvider.configureTimeout(self._TIMEOUT_s) attempts = AWSGreengrass.MAX_DISCOVERY_ATTEMPTS while attempts != 0: try: # Discovering information. discoveryInfo = discoveryInfoProvider.discover(client_id) caList = discoveryInfo.getAllCas() coreList = discoveryInfo.getAllCores() # Picking only the first ca and core info. group_id, ca = caList[0] self._core_info = coreList[0] # Persisting connectivity/identity information. self._group_ca_path = self._GROUP_CA_PATH + group_id + \ '_CA_' + str(uuid.uuid4()) + '.crt' if not os.path.exists(self._GROUP_CA_PATH): os.makedirs(self._GROUP_CA_PATH) group_ca_path_file = open(self._group_ca_path, 'w') group_ca_path_file.write(ca) group_ca_path_file.close() break except DiscoveryInvalidRequestException as e: raise EdgeSTInvalidOperationException( 'Invalid discovery request detected: %s' % (e.message)) except BaseException as e: attempts -= 1 backOffCore.backOff() if attempts == 0: raise EdgeSTInvalidOperationException( 'Discovery of the core related to the client "%s", with ' \ 'certificate "%s" and key "%s", failed after %d retries.' % \ (client_id, device_certificate_path, device_private_key_path, AWSGreengrass.MAX_DISCOVERY_ATTEMPTS)) self._configure_logging() AWSGreengrass._discovery_completed = True # Updating service. self._update_status(AWSGreengrassStatus.CORE_DISCOVERED) return self._core_info.coreThingArn def _configure_logging(self): """Configuring logging, required for using shadow devices.""" self._logger = logging.getLogger('AWSIoTPythonSDK.core') self._logger.setLevel(logging.ERROR) self._streamHandler = logging.StreamHandler() self._formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s') self._streamHandler.setFormatter(self._formatter) self._logger.addHandler(self._streamHandler)
[docs] @classmethod def discovery_completed(self): """Checking whether the discovery has completed. :returns: True if the discovery process has completed, False otherwise. :rtype: bool """ return AWSGreengrass._discovery_completed
[docs] def get_client(self, client_id, device_certificate_path, device_private_key_path): """Getting an Amazon AWS client. :param client_id: Name of the client, as it is on the cloud. :type client_id: str :param device_certificate_path: Relative path of a device's certificate stored on the core device. :type device_certificate_path: str :param device_private_key_path: Relative path of a device's private key stored on the core device. :type device_private_key_path: str :returns: Amazon AWS client. :rtype: :class:`edge_st_sdk.aws.aws_client.AWSClient` :raises EdgeSTInvalidOperationException: is raised if the discovery of the core fails. :raises EdgeSTInvalidDataException: is raised if a wrong configuration data is provided. """ # Performing the discovery of the core belonging to the same group of # the client. try: if not self.discovery_completed(): self._discover_core( client_id, device_certificate_path, device_private_key_path) # Creating the client. return edge_st_sdk.aws.aws_client.AWSClient( client_id, device_certificate_path, device_private_key_path, self._group_ca_path, self._core_info) except (EdgeSTInvalidDataException, EdgeSTInvalidOperationException) \ as e: raise e
[docs] def get_endpoint(self): """Getting the AWS endpoint.""" return self._endpoint
[docs] def add_listener(self, listener): """Add a listener. :param listener: Listener to be added. :type listener: :class:`edge_st_sdk.aws.aws_greengrass.AWSGreengrassListener` """ if listener is not None: with lock(self): if not listener in self._listeners: self._listeners.append(listener)
[docs] def remove_listener(self, listener): """Remove a listener. :param listener: Listener to be removed. :type listener: :class:`edge_st_sdk.aws.aws_greengrass.AWSGreengrassListener` """ if listener is not None: with lock(self): if listener in self._listeners: self._listeners.remove(listener)
def _update_status(self, new_status): """Update the status of the client. :param new_status: New status. :type new_status: :class:`edge_st_sdk.aws.aws_greengrass.AWSGreengrassStatus` """ old_status = self._status self._status = new_status for listener in self._listeners: # Calling user-defined callback. self._thread_pool.submit( listener.on_status_change( self, new_status.value, old_status.value))
[docs]class AWSGreengrassStatus(Enum): """Status of the AWS Greengrass service.""" INIT = 'INIT' """Dummy initial status.""" IDLE = 'IDLE' """Waiting for a connection and sending advertising data.""" DISCOVERING_CORE = 'DISCOVERING_CORE' """Discovering the Core.""" CORE_DISCOVERED = 'CORE_DISCOVERED' """Core discovered."""
# INTERFACES
[docs]class AWSGreengrassListener(object): """Interface used by the :class:`edge_st_sdk.aws.aws_greengrass.AWSGreengrass` class to notify changes of an AWS Greengrass service's status. """ __metaclass__ = ABCMeta
[docs] @abstractmethod def on_status_change(self, aws_greengrass, new_status, old_status): """To be called whenever the AWS Greengrass service changes its status. :param aws_greengrass: AWS Greengrass service that has changed its status. :type aws_greengrass: :class:`edge_st_sdk.aws.aws_greengrass.AWSGreengrass` :param new_status: New status. :type new_status: :class:`edge_st_sdk.aws.aws_greengrass.AWSGreengrassStatus` :param old_status: Old status. :type old_status: :class:`edge_st_sdk.aws.aws_greengrass.AWSGreengrassStatus` :raises NotImplementedError: if the method has not been implemented. """ raise NotImplementedError('You must implement "on_status_change()" to ' 'use the "AWSGreengrassListener" class.')