Source code for pyoma2.algorithms.data.run_params

"""
This module provides classes for storing run parameters for various modal
analysis algorithms included in the pyOMA2 package. Each class defines
parameters for a specific algorithm or step in a clustering workflow.
"""

from __future__ import annotations

from typing import Literal

import numpy as np
import numpy.typing as npt
from pydantic import BaseModel, ConfigDict, model_validator
from typing_extensions import TypedDict


[docs]class HCDictType(TypedDict): """ Hard validation criteria (HC) dictionary. Attributes ---------- xi_max : float, optional Maximum allowable damping ratio. Defaults to None (no limit). mpc_lim : float, optional Modal Phase Collinearity (MPC) limit. Defaults to None. mpd_lim : float, optional Mean Phase Deviation (MPD) limit. Defaults to None. CoV_max : float, optional Maximum coefficient of variation (CoV) of the frequencies. Only used when `calc_unc=True`(i.e., when uncertainty bounds of modal parameters are calculated). Defaults to None. """ xi_max: float | None mpc_lim: float | None mpd_lim: float | None CoV_max: float | None
[docs]class SCDictType(TypedDict): """ Soft validation criteria (SC) dictionary. Attributes ---------- err_fn : float Maximum threshold for relative natural frequency difference. Default: 0.05. err_xi : float Maximum threshold for relative damping difference. Default: 0.1. err_phi : float Maximum threshold for Modal Assurance Criterion (MAC) difference. Default: 0.05. """ err_fn: float err_xi: float err_phi: float
[docs]class BaseRunParams(BaseModel): """ Base class for storing run parameters for modal analysis algorithms. This class configures Pydantic to: - Allow attributes to be set from class defaults or keyword arguments. - Forbid any extra fields not defined in subclasses. """ model_config = ConfigDict( from_attributes=True, arbitrary_types_allowed=True, extra="forbid", )
[docs]class FDDRunParams(BaseRunParams): """ Run parameters for the Frequency Domain Decomposition (FDD) method. Attributes ---------- nxseg : int Number of points per segment used in spectral density estimation. Default: 1024. method_SD : {'per', 'cor'} Method for spectral density estimation: - 'per': Periodogram - 'cor': Correlation-based Default: 'per'. pov : float Percentage of overlap between segments when `method_SD='per'`. Must be between 0.0 and 1.0. Default: 0.5. """ nxseg: int = 1024 method_SD: Literal["per", "cor"] = "per" pov: float = 0.5
[docs]class EFDDRunParams(BaseRunParams): """ Run parameters for the Enhanced Frequency Domain Decomposition (EFDD) method. Attributes ---------- nxseg : int Number of points per segment used in spectral density estimation. Default: 1024. method_SD : {'per', 'cor'} Method for spectral density estimation: - 'per': Periodogram - 'cor': Correlation-based Default: 'per'. pov : float Percentage of overlap between segments when `method_SD='per'`. Must be between 0.0 and 1.0. Default: 0.5. """ nxseg: int = 1024 method_SD: Literal["per", "cor"] = "per" pov: float = 0.5
[docs]class pLSCFRunParams(BaseRunParams): """ Run parameters for the poly-reference Least Square Complex Frequency (pLSCF) method. Attributes ---------- ordmax : int Maximum model order for the analysis. (Required) ordmin : int Minimum model order for the analysis. Default: 0. nxseg : int Number of points per segment for Power Spectral Density (PSD) estimation. Default: 1024. method_SD : {'per', 'cor'} Method for PSD estimation: - 'per': Periodogram - 'cor': Correlation-based Default: 'per'. pov : float Percentage of overlap between segments when `method_SD='per'`. Must be between 0.0 and 1.0. Default: 0.5. sc : SCDictType Soft validation criteria dictionary. Default: `{'err_fn': 0.05, 'err_xi': 0.05, 'err_phi': 0.05}`. hc : HCDictType Hard validation criteria dictionary. Default: `{'xi_max': 0.1, 'mpc_lim': 0.7, 'mpd_lim': 0.3}`. """ ordmax: int ordmin: int = 0 nxseg: int = 1024 method_SD: Literal["per", "cor"] = "per" pov: float = 0.5 sc: SCDictType = {"err_fn": 0.05, "err_xi": 0.05, "err_phi": 0.05} hc: HCDictType = {"xi_max": 0.1, "mpc_lim": 0.7, "mpd_lim": 0.3}
[docs]class SSIRunParams(BaseRunParams): """ Run parameters for the Stochastic Subspace Identification (SSI) method. Attributes ---------- br : int Number of block rows in the Hankel matrix. Default: 20. method : {'cov', 'cov_R', 'dat', 'IOcov'}, optional Variant of SSI algorithm to use: - 'cov': Covariance-driven - 'cov_R': Covariance-driven with autocorrelation - 'dat': Data-driven - 'IOcov': Input-Output covariance-driven Default: 'cov'. ref_ind : list[int], optional List of reference sensor indices for subspace identification. Default: None. ordmin : int Minimum model order to evaluate. Default: 0. ordmax : int, optional Maximum model order to evaluate. Default: None (no upper limit). step : int Step size for iterating through model orders. Default: 1. sc : SCDictType Soft validation criteria dictionary. Default: `{'err_fn': 0.05, 'err_xi': 0.1, 'err_phi': 0.05}`. hc : HCDictType Hard validation criteria dictionary. Default: `{'xi_max': 0.2, 'mpc_lim': 0.5, 'mpd_lim': 0.5, 'CoV_max': 0.2}`. calc_unc : bool Whether to calculate uncertainty bounds for modal parameters. Default: False. nb : int Number of bootstrap samples for uncertainty calculation. Default: 50. U : npt.NDArray[np.float64], optional Array of input time series (if using input-output SSI). Default: None. spetrum : bool Whether to compute the frequency spectrum. Default: False. fdd_run_params : FDDRunParams, optional Instance of FDDRunParams to pass FDD parameters to SSI. Default: None. """ br: int = 20 method: Literal["cov", "cov_R", "dat", "IOcov"] | None = "cov" ref_ind: list[int] | None = None ordmin: int = 0 ordmax: int | None = None step: int = 1 sc: SCDictType = {"err_fn": 0.05, "err_xi": 0.1, "err_phi": 0.05} hc: HCDictType = {"xi_max": 0.2, "mpc_lim": 0.5, "mpd_lim": 0.5, "CoV_max": 0.2} calc_unc: bool = False nb: int = 50 U: npt.NDArray[np.float64] | None = None spetrum: bool = False fdd_run_params: FDDRunParams | None = None
# ============================================================================= # CLUSTERING # =============================================================================
[docs]class Step1(BaseRunParams): """ Parameters for the first step of clustering analysis. Attributes ---------- hc : bool or {'after'} Whether to apply hard validation criteria (HC) to the poles. If 'after', HC is applied after pre-clustering. Default: True. hc_dict : HCDictType Hard validation criteria dictionary. Used only if `hc=True`. Default: `{'xi_max': 0.15, 'mpc_lim': 0.8, 'mpd_lim': 0.3, 'CoV_max': 0.15}`. sc : bool or {'after'} Whether to apply soft validation criteria (SC) to the poles. If 'after', SC is applied after pre-clustering. Default: False. sc_dict : SCDictType Soft validation criteria dictionary. Used only if `sc=True`. Default: `{'err_fn': 0.03, 'err_xi': 0.05, 'err_phi': 0.05}`. pre_cluster : bool Whether to perform a pre-clustering step before validation. Default: False. pre_clus_typ : {'GMM', 'kmeans', 'FCMeans'} Type of pre-clustering algorithm to use. Default: 'GMM'. pre_clus_dist : list of { 'dfn', 'dxi', 'dlambda', 'dMAC', 'dMPC', 'dMPD', 'MPC', 'MPD' } Distance metrics to use for pre-clustering. Default: ['dlambda', 'dMAC']. transform : {'box-cox'}, optional Data transformation method to apply before clustering. Default: None. """ hc: bool | Literal["after"] = True hc_dict: HCDictType = { "xi_max": 0.15, "mpc_lim": 0.8, "mpd_lim": 0.3, "CoV_max": 0.15, } sc: bool | Literal["after"] = False sc_dict: SCDictType = {"err_fn": 0.03, "err_xi": 0.05, "err_phi": 0.05} pre_cluster: bool = False pre_clus_typ: Literal["GMM", "kmeans", "FCMeans"] = "GMM" pre_clus_dist: list[ Literal["dfn", "dxi", "dlambda", "dMAC", "dMPC", "dMPD", "MPC", "MPD"] ] = ["dlambda", "dMAC"] transform: Literal["box-cox"] | None = None
[docs]class Step2(BaseRunParams): """ Parameters for the second step of clustering analysis. Attributes ---------- distance : list of { 'dfn', 'dxi', 'dlambda', 'dMAC', 'dMPC', 'dMPD' } Distance metrics for clustering. Default: ['dlambda', 'dMAC']. weights : {'tot_one'} or list[float], optional Weighting scheme for distance metrics. If 'tot_one', all distances sum to one. If a list, its length must equal `len(distance)`. Default: None. sqrtsqr : bool Whether to apply square-root to the sum of squares of distances. Default: False. algo : {'hdbscan', 'hierarc', 'optics', 'spectral', 'affinity'} Clustering algorithm to use. Default: 'hierarc'. dc : float or {'auto', 'mu+2sig', '95weib'}, optional Distance threshold for hierarchical clustering (only if `algo='hierarc'`). - 'auto': Automatic threshold - 'mu+2sig': Mean plus two standard deviations - '95weib': 95th percentile of Weibull fit If None, `n_clusters` must be specified. Default: 'auto'. linkage : {'average', 'complete', 'single'} Linkage criterion for hierarchical clustering (only if `algo='hierarc'`). Default: 'average'. min_size : int or {'auto'}, optional Minimum cluster size for 'hdbscan' or 'optics'. Default: 'auto'. n_clusters : int or {'auto'}, optional Number of clusters for 'spectral' or 'hierarc' (if `dc=None`). Default: None. """ distance: list[Literal["dfn", "dxi", "dlambda", "dMAC", "dMPC", "dMPD"]] = [ "dlambda", "dMAC", ] weights: Literal["tot_one"] | list[float] | None = None sqrtsqr: bool = False algo: Literal["hdbscan", "hierarc", "optics", "spectral", "affinity"] = "hierarc" dc: float | Literal["auto", "mu+2sig", "95weib"] | None = "auto" linkage: Literal["average", "complete", "single"] = "average" min_size: int | Literal["auto"] | None = "auto" n_clusters: int | Literal["auto"] | None = None
[docs]class Step3(BaseRunParams): """ Parameters for the third step of clustering analysis (post-processing). Attributes ---------- post_proc : list of post-processing steps to apply to clustering results: - 'merge_similar': Merge similar clusters. - 'damp_IQR': Damp clusters based on Interquartile Range (IQR) of damping. - 'fn_IQR': Filter clusters based on IQR of natural frequencies. - 'fn_med': Filter clusters based on median natural frequencies. - '1xorder': Filter clusters allowing 1 pole per order. - 'min_size': Filter clusters based on minimum size (from Step2). - 'min_size_pctg': Filter clusters based on minimum size as a percentage of the largest cluster. - 'min_size_kmeans': Filter clusters based on minimum size using k-means. - 'min_size_gmm': Filter clusters based on minimum size using Gaussian Mixture Models. - 'MTT': Filter clusters based on Modified Thompson Tau technique. - 'ABP': Filter clusters based on Adjusted boxplot technique. merge_dist : float or {'auto', 'deder'} Threshold for merging similar clusters. Default: 'auto'. min_pctg : float Minimum cluster size expressed as a fraction of the largest cluster. Default: 0.3. select : {'avg', 'fn_mean_close', 'xi_med_close', 'medoid'} Method for selecting the final representative cluster: - 'avg': Average of each cluster. - 'fn_mean_close': Cluster with mean natural frequency closest to overall mean. - 'xi_med_close': Cluster with median damping ratio closest to overall median. - 'medoid': Actual medoid of the cluster. Default: 'medoid'. freqlim : tuple[float, float], optional Frequency range limits (min, max) to filter clusters. Default: None. Warnings -------- The `post_proc` list is applied sequentially, and the order of operations affects the results. Steps listed earlier in the sequence are applied before later ones. Additionally, the same step can appear multiple times in the list, and it will be applied each time it is encountered. Ensure the order and repetition are appropriate for your intended analysis. """ post_proc: list[ Literal[ "merge_similar", "damp_IQR", "fn_IQR", "fn_med", "1xorder", "min_size", "min_size_pctg", "min_size_kmeans", "min_size_gmm", "MTT", "ABP", ] ] = ["merge_similar", "damp_IQR", "fn_IQR", "1xorder", "min_size_pctg", "MTT"] merge_dist: Literal["auto", "deder"] | float = "auto" min_pctg: float = 0.3 select: Literal["avg", "fn_mean_close", "xi_med_close", "medoid"] = "medoid" freqlim: tuple[float, float] | None = None
[docs]class Clustering(BaseModel): """ Main class for clustering analysis orchestration. Attributes ---------- name : str Name of the clustering instance. steps : tuple[Step1, Step2, Step3], optional Tuple of Step1, Step2, and Step3 instances defining the clustering workflow. If None, steps are assembled automatically based on `quick`. quick : str, optional Predefined clustering configuration. Supported values: ['Magalhaes', 'Reynders', 'Neu', 'Kvaale', 'Dederichs', 'hdbscan', 'affinity', 'spectral', 'optics', 'hier_avg', 'hier_sing', 'hier_sing_nodc']. If provided and `steps` is None, `assemble_steps` will populate `steps`. freqlim : tuple[float, float], optional Global frequency range limits (min, max) for clustering. Passed to Step3 if using a `quick` configuration. """ model_config = ConfigDict( from_attributes=True, arbitrary_types_allowed=True, extra="forbid", ) name: str steps: tuple[Step1, Step2, Step3] | None = None quick: str | None = None freqlim: tuple[float, float] | None = None
[docs] @model_validator(mode="after") def assemble_steps(self, freqlim): """ Populate `steps` automatically when `quick` is specified and `steps` is None. This method runs after model initialization and validation. It reads `self.quick` and constructs Step1, Step2, and Step3 instances according to predefined strategies. If `quick` is not recognized, raises AttributeError. """ if self.steps is None and self.quick is not None: if self.quick == "Magalhaes": step1 = Step1(hc=False, sc=False) step2 = Step2( distance=["dfn", "dMAC"], algo="hierarc", dc=0.02, linkage="single" ) step3 = Step3(post_proc=["damp_IQR"], select="avg", freqlim=self.freqlim) self.steps = (step1, step2, step3) elif self.quick == "Reynders": step1 = Step1( hc="after", hc_dict={ "xi_max": 0.2, "mpc_lim": 0.0, "mpd_lim": 1.0, "CoV_max": np.inf, }, sc=False, pre_cluster=True, pre_clus_typ="kmeans", pre_clus_dist=[ "dfn", "dxi", "dlambda", "dMAC", "MPC", "MPD", ], ) step2 = Step2(algo="hierarc", dc="mu+2sig", linkage="average") step3 = Step3( post_proc=["min_size_kmeans"], select="xi_med_close", freqlim=self.freqlim, ) self.steps = (step1, step2, step3) elif self.quick == "Neu": step1 = Step1( hc=True, hc_dict={ "xi_max": 0.2, "mpc_lim": 0.0, "mpd_lim": 1.0, "CoV_max": np.inf, }, sc=False, pre_cluster=True, pre_clus_typ="kmeans", pre_clus_dist=["dfn", "dxi", "dlambda", "dMAC", "dMPD"], transform="box-cox", ) step2 = Step2(algo="hierarc", dc="95weib", linkage="average") step3 = Step3( post_proc=["min_size_pctg", "MTT"], min_pctg=0.5, select="avg", freqlim=self.freqlim, ) self.steps = (step1, step2, step3) elif self.quick == "Kvaale": step1 = Step1( hc=False, sc=True, sc_dict={"err_fn": 0.04, "err_xi": 0.2, "err_phi": 0.1}, ) step2 = Step2(algo="hdbscan", min_size=20) step3 = Step3(post_proc=["1xorder"], select="avg", freqlim=self.freqlim) self.steps = (step1, step2, step3) elif self.quick == "Dederichs": step1 = Step1( hc=True, hc_dict={ "xi_max": 0.2, "mpc_lim": 0.0, "mpd_lim": 1.0, "CoV_max": np.inf, }, sc=False, pre_cluster=True, pre_clus_typ="GMM", pre_clus_dist=["dfn", "dMAC", "dMPC"], ) step2 = Step2(algo="hierarc", dc=None, n_clusters="auto") step3 = Step3( post_proc=["merge_similar", "1xorder", "min_size_gmm"], merge_dist="deder", select="avg", freqlim=self.freqlim, ) self.steps = (step1, step2, step3) elif self.quick == "hdbscan": step1 = Step1(sc=False, pre_cluster=True, pre_clus_typ="GMM") step2 = Step2(algo="hdbscan") step3 = Step3( post_proc=[ # "merge_similar", "damp_IQR", "fn_IQR", "1xorder", "min_size_pctg", "MTT", ], freqlim=self.freqlim, ) self.steps = (step1, step2, step3) elif self.quick == "affinity": step1 = Step1(sc=False, pre_cluster=True, pre_clus_typ="GMM") step2 = Step2(algo="affinity") step3 = Step3( post_proc=[ # "merge_similar", "damp_IQR", "fn_IQR", "1xorder", "min_size_pctg", "MTT", ], freqlim=self.freqlim, ) self.steps = (step1, step2, step3) elif self.quick == "spectral": step1 = Step1(sc=False, pre_cluster=True, pre_clus_typ="GMM") step2 = Step2(algo="spectral") step3 = Step3( post_proc=[ # "merge_similar", "damp_IQR", "fn_IQR", "1xorder", "min_size_pctg", "MTT", ], freqlim=self.freqlim, ) self.steps = (step1, step2, step3) elif self.quick == "optics": step1 = Step1(sc=False, pre_cluster=True, pre_clus_typ="GMM") step2 = Step2(algo="optics") step3 = Step3( post_proc=[ "merge_similar", "damp_IQR", "fn_IQR", "1xorder", "min_size_pctg", "MTT", ], freqlim=self.freqlim, ) self.steps = (step1, step2, step3) elif self.quick == "hier_avg": step1 = Step1(sc=False, pre_cluster=True, pre_clus_typ="GMM") step2 = Step2(algo="hierarc", linkage="average") step3 = Step3( post_proc=[ "damp_IQR", "fn_IQR", "1xorder", "min_size_pctg", "MTT", ], freqlim=self.freqlim, ) self.steps = (step1, step2, step3) elif self.quick == "hier_sing": step1 = Step1(sc=False, pre_cluster=True, pre_clus_typ="GMM") step2 = Step2(algo="hierarc", linkage="single", dc="auto") step3 = Step3( post_proc=[ "damp_IQR", "fn_IQR", "1xorder", "min_size_pctg", "MTT", ], freqlim=self.freqlim, ) self.steps = (step1, step2, step3) elif self.quick == "hier_sing_nodc": step1 = Step1(sc=False, pre_cluster=True, pre_clus_typ="GMM") step2 = Step2( algo="hierarc", linkage="single", dc=None, n_clusters="auto" ) step3 = Step3( post_proc=[ "merge_similar", "damp_IQR", "fn_IQR", "1xorder", "min_size_pctg", "MTT", ], freqlim=self.freqlim, ) self.steps = (step1, step2, step3) else: raise AttributeError( f"Unknown quick option: {self.quick}\n" "Possible values are:\n" "['Magalhaes', 'Reynders', 'Neu', 'Kvaale', 'Dederichs',\n" "'hdbscan', 'affinity', 'spectral', 'optics',\n" "'hier_avg', 'hier_sing', 'hier_sing_nodc']" ) return self