Tutorial 1: Filtered data plotter

Tutorial 1: Filtered data plotter#

Corresponding files for this tutorial: TMSiHelpers/filtered_signal_plotter_helper.py and examples_SAGA/example_filter_and_plot.py

The first example on plotter customization is the filtered data plotter. Filtering data is best done on lower levels of the code, which happens in the ConsumerThread. This Thread is initialized and controlled by the SignalPlotterHelper. Therefore, new versions of both the ConsumerThread and the PlotterHelper have to be created.

First, a new file should be created, where the classes FilteredSignalPlotterHelper and FilteredConsumerThread will be defined. The original classes should be imported and will be used for inheritance purposes. Inheritance allows new classes to reuse all methods from the original classes and get the possibility to make the desired changes, see below. If you are not familiar with inheritance in programming, see ‘https://www.youtube.com/watch?v=RSl87lqOXDE, Python OOP Tutorial 4: Inheritance - Creating Subclasses’ for some more information.

from TMSiBackend.data_consumer.consumer_thread import ConsumerThread
from .signal_plotter_helper import SignalPlotterHelper
class FilteredSignalPlotterHelper(SignalPlotterHelper):
class FilteredConsumerThread(ConsumerThread):

The next step is to use the FilteredConsumerThread when using the FilteredSignalPlotterHelper. This is done by initializing the FilteredConsumerThread in the FilteredSignalPlotter instead of the original ConsumerThread. To do so we will overwrite the __init__() method of the SignalPlotterHelper. The original __init__() of the SignalPlotterHelper calls the __init__() of its parent class (PlotterHelper) with the ConsumerThread as argument. We want to overwrite this method and call the __init__() of the parent class of the SignalPlotterHelper with the FilteredConsumerThread as argument. This is done using the super() method, which is highlighted in the code snippet below.

class FilteredSignalPlotterHelper(SignalPlotterHelper):
    def __init__(self, device, grid_type = None, hpf = 0, lpf = 0, order = 1):
        # call super of SignalPlotterHelper
            super(SignalPlotterHelper, self).__init__(device = device,
                                                  monitor_class = Monitor, 
                                                  consumer_thread_class = FilteredConsumerThread)
        self.main_plotter = RealTimeSignalPlotter()

The file can be saved and the new class can be imported in the example file. Here, the PlotterHelper argument of the Gui can be changed to the newly created FilteredSignalPlotterHelper.

from TMSiGui.gui import Gui
from TMSiPlotterHelpers.filtered_signal_plotter_helper import FilteredSignalPlotterHelper
        # Remove grid_type argument or set it to 'None' to use default channel order
        plotter_helper = FilteredSignalPlotterHelper(device=dev, grid_type='4-8-L', hpf=5, lpf=100, order=1)
        gui = Gui(plotter_helper = plotter_helper)

After doing this, the application uses the FilteredPlotterHelper (which uses the FilteredConsumerThread). Next, the functional code of these two new classes can be modified and tailored to our wishes.

In the FilteredConsumerThread an additional buffer to store the filtered data is initialized. Furthermore, the filter itself should be initialized. To do so, the method initialize_filter() is created, which should be applied to the data in the process() method, which needs to be overwritten for this purpose. The implementation of these methods can be found in the code. To read out the filtered buffer instead of the original buffer, the monitor function is changed to return the filtered buffer.

    def monitor_function(self):
        reading = {}
        reading["status"] = 200
        reading["buffer"] = self.consumer_thread.filtered_buffer.copy()
        if self.device.get_device_type() == "APEX":
            reading["live_impedances"] = self.consumer_thread.cycling_impedance
        return reading

In order to control the filter, the desired filter settings need to be passed to the FilteredSignalPlotterHelper as arguments during initialization and stored for usage in the FilteredConsumerThread.

class FilteredSignalPlotterHelper(SignalPlotterHelper):
    def __init__(self, device, grid_type = None, hpf = 0, lpf = 0, order = 1):
        # call super of SignalPlotterHelper
            super(SignalPlotterHelper, self).__init__(device = device,
                                                  monitor_class = Monitor, 
                                                  consumer_thread_class = FilteredConsumerThread)
        self.main_plotter = RealTimeSignalPlotter()
        self._current_window_size = self.main_plotter.window_size   
        
        self.grid_type = grid_type
        self.hpf = hpf
        self.lpf = lpf
        self.order = order

The start method of the FilteredSignalPlotterHelper should be overwritten to initialize the filter. As this should be done between the different steps of the start function of the parent, the parent’s start method can’t be used. Therefore, the start method is overwritten completely and the initialization is done between the different steps.

    def start(self):
        self.consumer = Consumer()
        self.consumer_thread = self.consumer_thread_class(
            consumer_reading_queue=self.consumer.reading_queue,
            sample_rate=self.device.get_device_sampling_frequency(), 
            n_unfiltered = self.n_unfiltered_channels
        )
        # Initialize filter
        self.consumer_thread.initialize_filter(hpf = self.hpf, lpf = self.lpf, order = self.order)
        self.consumer.open(
            server = self.device,
            reading_queue_id = self.device.get_id(),
            consumer_thread=self.consumer_thread)
        # Start measurement
        self.device.start_measurement(self.measurement_type)
        self.monitor = self.monitor_class(monitor_function = self.monitor_function, callback=self.callback, on_error=self.on_error)
        self.monitor.start()

Finally, the desired parameters (such as the filter’s cut-off frequencies) are passed when calling the FilteredSignalPlotterHelper from the main script.