Source code for blue_st_sdk.debug_console

################################################################################
# COPYRIGHT(c) 2018 STMicroelectronics                                         #
#                                                                              #
# Redistribution and use in source and binary forms, with or without           #
# modification, are permitted provided that the following conditions are met:  #
#   1. Redistributions of source code must retain the above copyright notice,  #
#      this list of conditions and the following disclaimer.                   #
#   2. Redistributions in binary form must reproduce the above copyright       #
#      notice, this list of conditions and the following disclaimer in the     #
#      documentation and/or other materials provided with the distribution.    #
#   3. Neither the name of STMicroelectronics nor the names of its             #
#      contributors may be used to endorse or promote products derived from    #
#      this software without specific prior written permission.                #
#                                                                              #
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"  #
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE    #
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE   #
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE    #
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR          #
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF         #
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS     #
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN      #
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)      #
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE   #
# POSSIBILITY OF SUCH DAMAGE.                                                  #
################################################################################


"""debug

The debug module is responsible for managing the debugging capabilities offered
by the BlueSTSDK.
"""


# IMPORT

from abc import ABCMeta
from abc import abstractmethod
from concurrent.futures import ThreadPoolExecutor

from bluepy.btle import BTLEException
from blue_st_sdk.utils.ble_node_definitions import Debug
from blue_st_sdk.utils.python_utils import lock


# CLASSES

[docs]class DebugConsole(): """Class used to read/write debug messages.""" _MAXIMUM_MESSAGE_SIZE_BYTES = 20 """Maximum size of the messages to send.""" _NUMBER_OF_THREADS = 5 """Number of threads to be used to notify the listeners."""
[docs] def __init__(self, node, stdinout_characteristic, stderr_characteristic): """Constructor. Args: node (:class:`blue_st_sdk.node.Node`): Node that will send the data. stdinout_characteristic (Characteristic): The BLE characteristic used to read/write data from/to stdin/stdout. Refer to `Characteristic <https://ianharvey.github.io/bluepy-doc/characteristic.html>`_ for more information. stderr_characteristic (Characteristic): The BLE characteristic used to read data from stderr. Refer to `Characteristic <https://ianharvey.github.io/bluepy-doc/characteristic.html>`_ for more information. """ self._node = node """Node that sends data to this class.""" self._stdinout_characteristic = stdinout_characteristic """Characteristic used to read/write data from/to stdin/stdout.""" self._stderr_characteristic = stderr_characteristic """Characteristic used to read data from stderr.""" self._thread_pool = ThreadPoolExecutor(DebugConsole._NUMBER_OF_THREADS) """Pool of thread used to notify the listeners.""" self._listeners = [] """List of listeners to the events of new data received. It is a thread safe list, so a listener can subscribe itself through a
callback.""" def _decode_data(self, data): """Convert data to standard ascii characters. Args: data (bytearray): Data to be encoded. Returns: str: A string representing the given data. """ return data.decode('ISO-8859-1')
[docs] def write(self, data): """Write an array of bytes to the stdin. The message might be sent through more iterations on the Bluetooth channel. Args: data (bytearray): Data to be sent. Returns: int: The number of bytes sent to the stdin/stdout standard characteristic. """ try: char_handle = self._stdinout_characteristic.getHandle() bytes_sent = 0 while bytes_sent < len(data): # Computing data to send. bytes_to_send = min( self._MAXIMUM_MESSAGE_SIZE_BYTES, len(data) - bytes_sent ) data_to_send = data[bytes_sent:bytes_sent + bytes_to_send] # Writing data. self._node.writeCharacteristic( char_handle, data_to_send, True) bytes_sent += bytes_to_send # Calling on-write callback for a debug characteristic. self.on_write_characteristic( self._stdinout_characteristic, data_to_send, True) return bytes_sent except BTLEException as e:
self._node._unexpected_disconnect()
[docs] def add_listener(self, listener): """Adding a listener. Args: listener (:class:`blue_st_sdk.debug.DebugListener`): Listener to be added. """ if listener is not None: with lock(self): if not listener in self._listeners: self._listeners.append(listener) if self._listeners: self._node.set_notification_status( self._stdinout_characteristic, True) self._node.set_notification_status(
self._stderr_characteristic, True)
[docs] def remove_listener(self, listener): """Remove a listener. Args: listener (:class:`blue_st_sdk.debug.DebugListener`): Listener to be removed. """ if listener is not None: with lock(self): if listener in self._listeners: self._listeners.remove(listener) if not self._listeners: self._node.set_notification_status( self._stdinout_characteristic, False) self._node.set_notification_status(
self._stderr_characteristic, False)
[docs] def on_update_characteristic(self, characteristic, data): """The characteristic has been updated. If it is a debug characteristic, data are sent to the registered listeners. Args: characteristic (Characteristic): The BLE characteristic that has been updated. Refer to `Characteristic <https://ianharvey.github.io/bluepy-doc/characteristic.html>`_ for more information. data (str): The data notified from the given characteristic. """ try: if len(self._listeners) == 0: return data_str = self._decode_data(data) if characteristic.uuid == \ Debug.DEBUG_STDINOUT_BLUESTSDK_SERVICE_UUID: for listener in self._listeners: # Calling user-defined callback. self._thread_pool.submit(listener.on_stdout_receive( self, data_str)) elif characteristic.uuid == \ Debug.DEBUG_STDERR_BLUESTSDK_SERVICE_UUID: for listener in self._listeners: # Calling user-defined callback. self._thread_pool.submit(listener.on_stderr_receive( self, data_str)) except BTLEException as e:
self._node._unexpected_disconnect()
[docs] def on_write_characteristic(self, characteristic, data, status): """The characteristic has been written. Args: characteristic (Characteristic): The BLE characteristic that has been written. data (bytearray): Received data. status (bool): True if the writing operation was successfully, False otherwise. """ try: if len(self._listeners) == 0: return data_str = self._decode_data(data) if characteristic.uuid == \ Debug.DEBUG_STDINOUT_BLUESTSDK_SERVICE_UUID: for listener in self._listeners: # Calling user-defined callback. self._thread_pool.submit(listener.on_stdin_send( self, data_str[0:self._MAXIMUM_MESSAGE_SIZE_BYTES], status)) except BTLEException as e:
self._node._unexpected_disconnect()
[docs] def get_node(self): """Getting the node that listen to / write to this debug console. Returns: node (:class:`blue_st_sdk.node.Node`): the node that listen/write to this debug console. """
return self._node
[docs]class DebugConsoleListener(object): """Interface used by the :class:`blue_st_sdk.debug.DebugConsole` class to notify changes on a debug console. Data received/sent from/to a node are encoded with ISO-8859-1 charset. """ __metaclass__ = ABCMeta
[docs] @abstractmethod def on_stdout_receive(self, debug_console, message): """Called whenever a new message is received on the standard output. Args: debug_console (object): Console that sends the message. message (str): The message received on the stdout console. Raises: :exc:`NotImplementedError` if the method has not been implemented. """ raise NotImplementedError( 'You must implement "on_stdut_received()" to use the "DebugListener"'
'class.')
[docs] @abstractmethod def on_stderr_receive(self, debug_console, message): """Called whenever a new message is received on the standard error. Args: debug_console (object): Console that sends the message. message (str): The message received on the stderr console. Raises: :exc:`NotImplementedError` if the method has not been implemented. """ raise NotImplementedError( 'You must implement "on_stderr_receive()" to use the "DebugListener"'
'class.')
[docs] @abstractmethod def on_stdin_send(self, debug_console, message, status): """Called whenever a new message is sent to the standard input. Args: debug_console (object): Console that receives the message. message (str): The message sent to the stdin console. status (bool): True if the message is sent correctly, False otherwise. Raises: :exc:`NotImplementedError` if the method has not been implemented. """ raise NotImplementedError( 'You must implement "on_stdin_send()" to use the "DebugListener"'
'class.')