Tutorial 1: Filtered data plotter#
Corresponding files for this tutorial: TMSiHelpers/filtered_signal_plotter_helper.py and examples_SAGA/example_filter_and_plot.py
The first example on plotter customization is the filtered data plotter. Filtering data is best done on lower levels of the code, which happens in the
ConsumerThread
. This Thread is initialized and controlled by the SignalPlotterHelper
. Therefore, new versions of both the ConsumerThread
and the
PlotterHelper
have to be created.
First, a new file should be created, where the classes FilteredSignalPlotterHelper
and FilteredConsumerThread
will be defined.
The original classes should be imported and will be used for inheritance purposes. Inheritance allows new classes to reuse all methods
from the original classes and get the possibility to make the desired changes, see below. If you are not familiar with inheritance in programming, see
‘https://www.youtube.com/watch?v=RSl87lqOXDE, Python OOP Tutorial 4: Inheritance - Creating Subclasses’ for some more information.
from TMSiBackend.data_consumer.consumer_thread import ConsumerThread
from .signal_plotter_helper import SignalPlotterHelper
class FilteredSignalPlotterHelper(SignalPlotterHelper):
class FilteredConsumerThread(ConsumerThread):
The next step is to use the FilteredConsumerThread
when using the FilteredSignalPlotterHelper
. This is done by initializing the FilteredConsumerThread
in
the FilteredSignalPlotter
instead of the original ConsumerThread
. To do so we will overwrite the __init__()
method of the SignalPlotterHelper
.
The original __init__()
of the SignalPlotterHelper
calls the __init__()
of its parent class (PlotterHelper
) with the ConsumerThread
as argument.
We want to overwrite this method and call the __init__()
of the parent class of the SignalPlotterHelper
with the FilteredConsumerThread
as argument.
This is done using the super()
method, which is highlighted in the code snippet below.
class FilteredSignalPlotterHelper(SignalPlotterHelper):
def __init__(self, device, grid_type = None, hpf = 0, lpf = 0, order = 1):
# call super of SignalPlotterHelper
super(SignalPlotterHelper, self).__init__(device = device,
monitor_class = Monitor,
consumer_thread_class = FilteredConsumerThread)
self.main_plotter = RealTimeSignalPlotter()
The file can be saved and the new class can be imported in the example file. Here, the PlotterHelper
argument of the Gui
can be changed
to the newly created FilteredSignalPlotterHelper
.
from TMSiGui.gui import Gui
from TMSiPlotterHelpers.filtered_signal_plotter_helper import FilteredSignalPlotterHelper
# Remove grid_type argument or set it to 'None' to use default channel order
plotter_helper = FilteredSignalPlotterHelper(device=dev, grid_type='4-8-L', hpf=5, lpf=100, order=1)
gui = Gui(plotter_helper = plotter_helper)
After doing this, the application uses the FilteredPlotterHelper
(which uses the FilteredConsumerThread
). Next, the functional code of these two new classes can be modified and
tailored to our wishes.
In the FilteredConsumerThread
an additional buffer to store the filtered data is initialized. Furthermore, the filter itself should be initialized.
To do so, the method initialize_filter()
is created, which should be applied to the data in the process()
method, which needs to be overwritten for this
purpose. The implementation of these methods can be found in the code.
To read out the filtered buffer instead of the original buffer, the monitor function is changed to return the filtered buffer.
def monitor_function(self):
reading = {}
reading["status"] = 200
reading["buffer"] = self.consumer_thread.filtered_buffer.copy()
if self.device.get_device_type() == "APEX":
reading["live_impedances"] = self.consumer_thread.cycling_impedance
return reading
In order to control the filter, the desired filter settings need to be passed to the FilteredSignalPlotterHelper
as arguments during initialization and stored for
usage in the FilteredConsumerThread
.
class FilteredSignalPlotterHelper(SignalPlotterHelper):
def __init__(self, device, grid_type = None, hpf = 0, lpf = 0, order = 1):
# call super of SignalPlotterHelper
super(SignalPlotterHelper, self).__init__(device = device,
monitor_class = Monitor,
consumer_thread_class = FilteredConsumerThread)
self.main_plotter = RealTimeSignalPlotter()
self._current_window_size = self.main_plotter.window_size
self.grid_type = grid_type
self.hpf = hpf
self.lpf = lpf
self.order = order
The start method of the FilteredSignalPlotterHelper
should be overwritten to initialize the filter. As this should be done between the different steps of
the start function of the parent, the parent’s start method can’t be used. Therefore, the start method is overwritten completely and the initialization is done between
the different steps.
def start(self):
self.consumer = Consumer()
self.consumer_thread = self.consumer_thread_class(
consumer_reading_queue=self.consumer.reading_queue,
sample_rate=self.device.get_device_sampling_frequency(),
n_unfiltered = self.n_unfiltered_channels
)
# Initialize filter
self.consumer_thread.initialize_filter(hpf = self.hpf, lpf = self.lpf, order = self.order)
self.consumer.open(
server = self.device,
reading_queue_id = self.device.get_id(),
consumer_thread=self.consumer_thread)
# Start measurement
self.device.start_measurement(self.measurement_type)
self.monitor = self.monitor_class(monitor_function = self.monitor_function, callback=self.callback, on_error=self.on_error)
self.monitor.start()
Finally, the desired parameters (such as the filter’s cut-off frequencies) are passed when calling the FilteredSignalPlotterHelper
from the main script.