# -*- coding: utf-8 -*-
"""
Operational Modal Analysis Module for Single and Multi-Setup Configurations.
Part of the pyOMA2 package.
Authors:
Dag Pasca
Diego Margoni
"""
from __future__ import annotations
import logging
import typing
import numpy as np
from scipy.signal import decimate, detrend
from pyoma2.functions.gen import (
filter_data,
)
if typing.TYPE_CHECKING:
from pyoma2.algorithms import BaseAlgorithm
logger = logging.getLogger(__name__)
[docs]class BaseSetup:
"""
Base class for operational modal analysis (OMA) setups.
This class provides foundational methods and attributes used by both
SingleSetup and MultiSetup classes. It serves as a superclass for specific
setup types, offering common functionalities for handling data, running algorithms, and
extracting modal properties.
Attributes
----------
algorithms : dict[str, BaseAlgorithm]
Dictionary storing algorithms added to the setup, keyed by their names.
data : np.ndarray, optional
Time series data array, typically representing the system's output.
fs : float, optional
Sampling frequency of the data.
Warning
-------
The BaseSetup class is not intended for direct instantiation by users.
It acts as a common interface for handling different types of setup configurations.
Specific functionalities are provided through its subclasses.
"""
algorithms: typing.Dict[str, BaseAlgorithm]
data: np.ndarray # TODO use generic typing
fs: float # sampling frequency
[docs] def rollback(self) -> None:
"""
Rollback the data to the initial state.
This method must be implemented by subclasses to provide a rollback mechanism for the data.
Raises a `NotImplementedError` in the base class.
"""
raise NotImplementedError("Rollback method must be implemented by subclasses.")
# add algorithm (method) to the set.
[docs] def add_algorithms(self, *algorithms: BaseAlgorithm) -> None:
"""
Adds algorithms to the setup and configures them with data and sampling frequency.
Parameters
----------
algorithms : variable number of BaseAlgorithm
One or more algorithm instances to be added to the setup.
Notes
-----
The algorithms must be instantiated before adding them to the setup,
and their names must be unique.
"""
self.algorithms = {
**getattr(self, "algorithms", {}),
**{alg.name: alg._set_data(data=self.data, fs=self.fs) for alg in algorithms},
}
# run the whole set of algorithms (methods). METODO 1 di tutti
[docs] def run_all(self) -> None:
"""
Runs all the algorithms added to the setup.
Iterates through each algorithm stored in the setup and executes it. The results are saved within
each algorithm instance.
Notes
-----
This method assumes that all algorithms are properly initialized and can be executed without
additional parameters.
"""
for alg_name in self.algorithms:
self.run_by_name(name=alg_name)
logger.info("all done")
# run algorithm (method) by name. QUESTO รจ IL METODO 1 di un singolo
[docs] def run_by_name(self, name: str) -> None:
"""
Runs a specific algorithm by its name.
Parameters
----------
name : str
The name of the algorithm to be executed.
Raises
------
KeyError
If the specified algorithm name does not exist in the setup.
Notes
-----
The result of the algorithm execution is saved within the algorithm instance.
"""
logger.info("Running %s...", name)
logger.debug("...with parameters: %s", self[name].run_params)
self[name]._pre_run()
result = self[name].run()
logger.debug("...saving %s result", name)
self[name]._set_result(result)
# get the modal properties (all results).
[docs] def mpe(self, name: str, *args, **kwargs) -> None:
"""
Extracts modal parameters from selected poles/peaks of a specified algorithm.
Parameters
----------
name : str
Name of the algorithm from which to extract modal parameters.
args : tuple
Positional arguments to be passed to the algorithm's mpe method.
kwargs : dict
Keyword arguments to be passed to the algorithm's mpe method.
Raises
------
KeyError
If the specified algorithm name does not exist in the setup.
"""
logger.info("Getting mpe modal parameters from %s", name)
self[name].mpe(*args, **kwargs)
# get the modal properties (all results) from the plots.
[docs] def mpe_from_plot(self, name: str, *args, **kwargs) -> None:
"""
Extracts modal parameters directly from plot selections of a specified algorithm.
Parameters
----------
name : str
Name of the algorithm from which to extract modal parameters.
args : tuple
Positional arguments to be passed to the algorithm's mpe method.
kwargs : dict
Keyword arguments to be passed to the algorithm's mpe method.
Raises
------
KeyError
If the specified algorithm name does not exist in the setup.
"""
logger.info("Getting mpe modal parameters from plot... %s", name)
self[name].mpe_from_plot(*args, **kwargs)
def __getitem__(self, name: str) -> BaseAlgorithm:
"""
Retrieves an algorithm from the setup by its name.
Parameters
----------
name : str
The name of the algorithm to retrieve.
Returns
-------
BaseAlgorithm
The algorithm instance with the specified name.
Raises
------
KeyError
If no algorithm with the given name exists in the setup.
"""
if name in self.algorithms:
return self.algorithms[name]
else:
raise KeyError(f"No algorithm named '{name}' exists.")
[docs] def get(
self, name: str, default: typing.Optional[BaseAlgorithm] = None
) -> typing.Optional[BaseAlgorithm]:
"""
Retrieves an algorithm from the setup by its name, returning a default value if not found.
Parameters
----------
name : str
The name of the algorithm to retrieve.
default : BaseAlgorithm, optional
The default value to return if the specified algorithm is not found.
Returns
-------
BaseAlgorithm or None
The algorithm instance with the specified name or the default value if not found.
"""
return self.algorithms.get(name, default)
# method to decimate data
@staticmethod
def _decimate_data(data: np.ndarray, fs: float, q: int, **kwargs) -> tuple:
"""
Applies decimation to the data using the scipy.signal.decimate function.
This method reduces the sampling rate of the data by a factor of 'q'.
The decimation process includes low-pass filtering to reduce aliasing.
The method updates the instance's data and sampling frequency attributes.
Parameters
----------
data : np.ndarray
The input data to be decimated.
q : int
The decimation factor. Must be greater than 1.
axis : int, optional
The axis along which to decimate the data. Default is 0.
**kwargs : dict, optional, will be passed to scipy.signal.decimate
Additional keyword arguments for the scipy.signal.decimate function:
n : int, optional
The order of the filter (if 'ftype' is 'fir') or the number of times
to apply the filter (if 'ftype' is 'iir'). If None, a default value is used.
ftype : {'iir', 'fir'}, optional
The type of filter to use for decimation: 'iir' for an IIR filter
or 'fir' for an FIR filter. Default is 'iir'.
zero_phase : bool, optional
If True, applies a zero-phase filter, which has no phase distortion.
If False, uses a causal filter with some phase distortion. Default is True.
Raises
------
ValueError
If the decimation factor 'q' is not greater than 1.
Returns
-------
tuple
A tuple containing the decimated data, updated sampling frequency, sampling interval,
Notes
-----
For further information, see `scipy.signal.decimate
<https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.decimate.html>`_.
"""
newdata = decimate(data, q, **kwargs)
fs = fs / q
dt = 1 / fs
Ndat = newdata.shape[0]
T = 1 / fs / q * Ndat
return newdata, fs, dt, Ndat, T
# method to detrend data
@staticmethod
def _detrend_data(data: np.ndarray, **kwargs) -> np.ndarray:
"""
Applies detrending to the data using the scipy.signal.detrend function.
This method removes a linear or constant trend from the data, commonly used to remove drifts
or offsets in time series data. It's a preprocessing step, often necessary for methods that
assume stationary data. The method updates the instance's data attribute.
Parameters
----------
data : np.ndarray
The input data to be detrended.
axis : int, optional
The axis along which to detrend the data. Default is 0.
**kwargs : dict, optional, will be passed to scipy.signal.detrend
Additional keyword arguments for the scipy.signal.detrend function:
type : {'linear', 'constant'}, optional
The type of detrending: 'linear' for linear detrend, or 'constant' for just
subtracting the mean. Default is 'linear'.
bp : int or numpy.ndarray of int, optional
Breakpoints where the data is split for piecewise detrending. Default is 0.
Raises
------
ValueError
If invalid parameters are provided.
Returns
-------
np.ndarray
The detrended data.
Notes
-----
For further information, see `scipy.signal.detrend
<https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.detrend.html>`_.
"""
axis = kwargs.pop("axis", 0)
return detrend(data, axis=axis, **kwargs)
# method to detrend data
@staticmethod
def _filter_data(
data: np.ndarray,
fs: float,
Wn: typing.Union[float, typing.Tuple[float, float]],
order: int = 8,
btype: str = "lowpass",
) -> np.ndarray:
"""
Apply a Butterworth filter to the input data and return the filtered signal.
This function designs and applies a Butterworth filter with the specified parameters to the input
data. It can be used to apply lowpass, highpass, bandpass, or bandstop filters.
Parameters
----------
data : ndarray
The input signal data to be filtered. The filter is applied along the first axis
(i.e., each column is filtered independently).
fs : float
The sampling frequency of the input data.
Wn : float or tuple of float
The critical frequency or frequencies. For lowpass and highpass filters, Wn is a scalar;
for bandpass and bandstop filters, Wn is a length-2 sequence.
order : int, optional
The order of the filter. A higher order leads to a sharper frequency cutoff but can also
lead to instability and significant phase delay. Default is 8.
btype : str, optional
The type of filter to apply. Options are "lowpass", "highpass", "bandpass", or "bandstop".
Default is "lowpass".
Returns
-------
filt_data : ndarray
The filtered signal, with the same shape as the input data.
Notes
-----
For more information, see the scipy documentation for `signal.butter`
(https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.butter.html)
and `signal.sosfiltfilt`
(https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.sosfiltfilt.html).
"""
return filter_data(
data=data,
fs=fs,
Wn=Wn,
order=order,
btype=btype,
)