Source code for wire_st_sdk.iolink.iolink_sensor

################################################################################
# COPYRIGHT(c) 2018 STMicroelectronics                                         #
#                                                                              #
# Redistribution and use in source and binary forms, with or without           #
# modification, are permitted provided that the following conditions are met:  #
#   1. Redistributions of source code must retain the above copyright notice,  #
#      this list of conditions and the following disclaimer.                   #
#   2. Redistributions in binary form must reproduce the above copyright       #
#      notice, this list of conditions and the following disclaimer in the     #
#      documentation and/or other materials provided with the distribution.    #
#   3. Neither the name of STMicroelectronics nor the names of its             #
#      contributors may be used to endorse or promote products derived from    #
#      this software without specific prior written permission.                #
#                                                                              #
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"  #
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE    #
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE   #
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE    #
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR          #
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF         #
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS     #
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN      #
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)      #
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE   #
# POSSIBILITY OF SUCH DAMAGE.                                                  #
################################################################################


"""iolink_sensor

The iolink_sensor module represents a device capable of connecting to an IOLink
masterboard and sending/receiving data to it.
"""


# IMPORT

import sys
import struct

from serial import SerialException
from serial import SerialTimeoutException

from wire_st_sdk.iolink.iolink_protocol import IOLinkProtocol
from wire_st_sdk.iolink.iolink_device import IOLinkDevice
from wire_st_sdk.utils.wire_st_exceptions import WireSTInvalidOperationException
from wire_st_sdk.utils.python_utils import lock_for_object


# CLASSES

[docs]class IOLinkSensor(IOLinkDevice): """IO-Link Sensor class. This class manages the commands of a standard IO-Link sensor. """ _SIZE_OF_ENV = 3 """Number of elements in an environmental domain data.""" _SIZE_OF_TDM = 6 """Number of elements in a time domain data.""" _SIZE_OF_FDM = 4 """Number of elements per frequency in a frequency domain data.""" _SIZE_OF_FDM_LINES = 1024 """Number of elements per frequency in a frequency domain data.""" _SIZE_OF_FLOAT_bytes = 4 """Size of floating point numbers in bytes on ARM-32 platforms.""" _FLOAT_PRECISION = 3 """Number of digits after the decimal point for sensors' data.""" def _get_measure(self, measure): """Execute a 'get' measure command. :param measure: The measure command to execute. :type measure: str :returns: A list with the result of the executed measure command. :rtype: list :raises SerialException, SerialTimeoutException: are raised if something with the serial communication does not work. :raises WireSTInvalidOperationException: is raised if the command has not been executed successfully. """ try: with lock_for_object(self._master): while True: self._master._execute(IOLinkProtocol.COMMAND_ICD, IOLinkProtocol.REQUEST_SLAVE) self._master._execute(str(self._position - 1) \ + IOLinkProtocol.TERMINATOR_SEQ, IOLinkProtocol.REQUEST_SENSOR_COMMAND) self._master._execute(IOLinkProtocol.COMMAND_MEAS1, IOLinkProtocol.REQUEST_MEASURE_TYPE) if not self._master._execute(measure, IOLinkProtocol.MESSAGE_TRANSMISSION_COMPLETED): self._master._execute(IOLinkProtocol.COMMAND_END, IOLinkProtocol.REQUEST_MOD) else: #print('TRANSMISSION COMPLETED') break if not IOLinkProtocol.BYTES_TRANSMISSION: info = self._master._get_answer()[ len(IOLinkProtocol.TERMINATOR_SEQ): \ - 2 * len(IOLinkProtocol.TERMINATOR_SEQ) - len(IOLinkProtocol.MESSAGE_TRANSMISSION_COMPLETED)] else: info = self._master._get_answer()[: \ - len(IOLinkProtocol.TERMINATOR_SEQ) - len(IOLinkProtocol.MESSAGE_TRANSMISSION_COMPLETED)] #print('SENDING COMMAND END') self._master._execute(IOLinkProtocol.COMMAND_END, IOLinkProtocol.REQUEST_MOD) #print('COMMAND END SENT') return info except (SerialException, SerialTimeoutException, WireSTInvalidOperationException) as e: raise e def _bytes_to_floats(self, data, precision=0): """Converts an array of bytes to floating point numbers in Little Endian order (four bytes per number). :param data: Input array of bytes that contains the values to convert. :type data: str :param precision: Number of digits after the decimal point. :type precision: int :returns: A list of floating point numbers. :rtype: list """ # Python 2 # return [round( # struct.unpack('<f', struct.pack('cccc', *data[i * 4:i * 4 + 4]))[0], # precision) \ # for i in range(0, int(len(data) / 4))] # Python 3 return [round( struct.unpack('<f', data[i * 4:i * 4 + 4])[0], precision) \ for i in range(0, int(len(data) / 4))]
[docs] def get_env(self): """Get environmental data. :returns: A list with Pressure [mbar], Humidity [%], and Temperature [C] values. :rtype: list :raises SerialException, SerialTimeoutException: are raised if something with the serial communication does not work. :raises WireSTInvalidOperationException: is raised if the command has not been executed successfully. """ try: with lock_for_object(self._master): while True: info = self._get_measure(IOLinkProtocol.COMMAND_MEAS1_3) if not IOLinkProtocol.BYTES_TRANSMISSION: info = info.split( IOLinkProtocol.TERMINATOR_SEQ.encode('utf-8'))[2:] info = [i for i in info if i != ''] if len(info) == IOLinkSensor._SIZE_OF_ENV: break else: if len(info) == IOLinkSensor._SIZE_OF_ENV * \ IOLinkSensor._SIZE_OF_FLOAT_bytes: break #print('Missing data. Sending again....') if not IOLinkProtocol.BYTES_TRANSMISSION: info = list(map(lambda s: float(s), info)) else: info = self._bytes_to_floats(info, self._FLOAT_PRECISION) return info except (SerialException, SerialTimeoutException, WireSTInvalidOperationException) as e: raise e
[docs] def get_tdm(self): """Get time domain data. :returns: A two-elements list, with a list of RMS Speed values on X,Y,Z axes [mm/s] as the first element, and a list of Peak Acceleration values on X,Y,Z axes [m/s2] as the second element. :rtype: list :raises SerialException, SerialTimeoutException: are raised if something with the serial communication does not work. :raises WireSTInvalidOperationException: is raised if the command has not been executed successfully. """ try: with lock_for_object(self._master): while True: info = self._get_measure(IOLinkProtocol.COMMAND_MEAS1_2) if not IOLinkProtocol.BYTES_TRANSMISSION: info = info.split( IOLinkProtocol.TERMINATOR_SEQ.encode('utf-8'))[2:] if len(info) == IOLinkSensor._SIZE_OF_TDM: break else: if len(info) == IOLinkSensor._SIZE_OF_TDM * \ IOLinkSensor._SIZE_OF_FLOAT_bytes: break #print('Missing data. Sending again....') if not IOLinkProtocol.BYTES_TRANSMISSION: info = [list(map(lambda s: float(s), info[0:3])), \ list(map(lambda s: float(s), info[3:6]))] else: info = self._bytes_to_floats(info, self._FLOAT_PRECISION) info = [info[0:3], info[3:6]] return info except (SerialException, SerialTimeoutException, WireSTInvalidOperationException) as e: raise e
[docs] def get_fft(self): """Get Fast Fourier Transform of vibration data. :returns: A n-elements list, with each element being a list of four values: the first is a frequency [Hz] and the other three are the corresponding vibration values on the three axis [m/s2]. :rtype: list :raises SerialException, SerialTimeoutException: are raised if something with the serial communication does not work. :raises WireSTInvalidOperationException: is raised if the command has not been executed successfully. """ try: with lock_for_object(self._master): l = IOLinkSensor._SIZE_OF_FDM_LINES n = IOLinkSensor._SIZE_OF_FDM * \ IOLinkSensor._SIZE_OF_FLOAT_bytes while True: info = self._get_measure(IOLinkProtocol.COMMAND_MEAS1_4) if not IOLinkProtocol.BYTES_TRANSMISSION: info = info.split( IOLinkProtocol.TERMINATOR_SEQ.encode('utf-8'))[2:] if len(info) == l: break else: if len(info) == l * n: break #print('Missing data. Sending again...') if not IOLinkProtocol.BYTES_TRANSMISSION: for i in range(0, len(info)): info[i] = list( map(lambda s: float(s), info[i].split('\t')[0:-1])) else: info = [info[i * n:i * n + n] for i in range(0, l)] info = list(map(lambda s: self._bytes_to_floats(s, \ self._FLOAT_PRECISION), info)) return info except (SerialException, SerialTimeoutException, WireSTInvalidOperationException) as e: raise e
def _set_parameter(self, parameter, value): """Execute a 'set' parameter command. :param parameter: The parameter command to execute. :type parameter: str :param value: The parameter value to set. :type value: str :returns: True if the parameter has been set correctly, False otherwise. :rtype: bool :raises SerialException, SerialTimeoutException: are raised if something with the serial communication does not work. :raises WireSTInvalidOperationException: is raised if the command has not been executed successfully. """ try: with lock_for_object(self._master): self._master._execute(IOLinkProtocol.COMMAND_ICD, IOLinkProtocol.REQUEST_SLAVE) self._master._execute(str(self._position - 1) \ + IOLinkProtocol.TERMINATOR_SEQ, IOLinkProtocol.REQUEST_SENSOR_COMMAND) self._master._execute(IOLinkProtocol.COMMAND_SET, IOLinkProtocol.REQUEST_PARAMETER_NAME) self._master._execute(parameter, IOLinkProtocol.REQUEST_PARAMETER_VALUE) self._master._execute(value \ + IOLinkProtocol.TERMINATOR_SEQ, IOLinkProtocol.TERMINATOR_SEQ) self._master._execute(None, IOLinkProtocol.TERMINATOR_SEQ) info = self._master._get_answer() self._master._execute(IOLinkProtocol.COMMAND_END, IOLinkProtocol.REQUEST_MOD) return True if IOLinkProtocol.MESSAGE_PARAMETER_UPDATED \ in info else False except (SerialException, SerialTimeoutException, WireSTInvalidOperationException) as e: raise e
[docs] def set_odr(self, odr): """Set accelerometer's output data rate. :param odr: Accelerometer's output data rate. :type odr: :class:`wire_st_sdk.iolink.iolink_protocol.ODR` :returns: True if the parameter has been set correctly, False otherwise. :rtype: bool :raises SerialException, SerialTimeoutException: are raised if something with the serial communication does not work. :raises WireSTInvalidOperationException: is raised if the command has not been executed successfully. """ try: with lock_for_object(self._master): return self._set_parameter( IOLinkProtocol.COMMAND_ODR, '{:04d}'.format(odr.value)) except (SerialException, SerialTimeoutException, WireSTInvalidOperationException) as e: raise e
[docs] def set_fls(self, fls): """Set accelerometer's full scale. :param fls: Accelerometer's full scale. :type fls: :class:`wire_st_sdk.iolink.iolink_protocol.FLS` :returns: True if the parameter has been set correctly, False otherwise. :rtype: bool :raises SerialException, SerialTimeoutException: are raised if something with the serial communication does not work. :raises WireSTInvalidOperationException: is raised if the command has not been executed successfully. """ try: with lock_for_object(self._master): return self._set_parameter( IOLinkProtocol.COMMAND_FLS, '{:02d}'.format(fls.value)) except (SerialException, SerialTimeoutException, WireSTInvalidOperationException) as e: raise e
[docs] def set_sze(self, sze): """Set accelerometer's input array size for FFT. :param sze: Accelerometer's input array size for FFT. :type sze: :class:`wire_st_sdk.iolink.iolink_protocol.SZE` :returns: True if the parameter has been set correctly, False otherwise. :rtype: bool :raises SerialException, SerialTimeoutException: are raised if something with the serial communication does not work. :raises WireSTInvalidOperationException: is raised if the command has not been executed successfully. """ try: with lock_for_object(self._master): return self._set_parameter( IOLinkProtocol.COMMAND_SZE, '{:04d}'.format(sze.value)) except (SerialException, SerialTimeoutException, WireSTInvalidOperationException) as e: raise e
[docs] def set_sub(self, sub): """Set accelerometer's number of subranges. :param sub: Number of subranges. :type sub: :class:`wire_st_sdk.iolink.iolink_protocol.SUB` :returns: True if the parameter has been set correctly, False otherwise. :rtype: bool :raises SerialException, SerialTimeoutException: are raised if something with the serial communication does not work. :raises WireSTInvalidOperationException: is raised if the command has not been executed successfully. """ try: with lock_for_object(self._master): return self._set_parameter( IOLinkProtocol.COMMAND_SUB, '{:02d}'.format(sub.value)) except (SerialException, SerialTimeoutException, WireSTInvalidOperationException) as e: raise e
[docs] def set_acq(self, acq): """Set accelerometer's total acquisition time, which is valid for all types of analysis. :param acq: Accelerometer's total acquisition time (must be in the range [ACQ_MIN..ACQ_MAX]). :type acq: int :returns: True if the parameter has been set correctly, False otherwise. :rtype: bool :raises SerialException, SerialTimeoutException: are raised if something with the serial communication does not work. :raises WireSTInvalidOperationException: is raised if the command has not been executed successfully. """ try: with lock_for_object(self._master): self._acq = acq return self._set_parameter( IOLinkProtocol.COMMAND_ACQ, '{:05d}'.format(acq)) except (SerialException, SerialTimeoutException, WireSTInvalidOperationException) as e: raise e
[docs] def set_ovl(self, ovl): """Set accelerometer's overlapping percentage between two consecutive FFT analysis. :param ovl: Accelerometer's overlapping percentage between two consecutive FFT analysis (must be in the range [OVL_MIN..OVL_MAX]). :type ovl: int :returns: True if the parameter has been set correctly, False otherwise. :rtype: bool :raises SerialException, SerialTimeoutException: are raised if something with the serial communication does not work. :raises WireSTInvalidOperationException: is raised if the command has not been executed successfully. """ try: with lock_for_object(self._master): return self._set_parameter( IOLinkProtocol.COMMAND_OVL, '{:02d}'.format(ovl)) except (SerialException, SerialTimeoutException, WireSTInvalidOperationException) as e: raise e