"""
Frequency Domain Decomposition (FDD) Utility Functions module.
Part of the pyOMA2 package.
Author:
Dag Pasca
"""
from __future__ import annotations
import logging
import typing
import numpy as np
from scipy import signal
from scipy.optimize import curve_fit
from scipy.signal import find_peaks
from tqdm import tqdm, trange
from .gen import MAC
logger = logging.getLogger(__name__)
# =============================================================================
# FUNZIONI FDD
# =============================================================================
[docs]def SD_PreGER(
Y: typing.List[typing.Dict[str, np.ndarray]],
fs: float,
nxseg: int = 1024,
pov: float = 0.5,
method: typing.Literal["per", "cor"] = "per",
):
"""
Estimate the PSD matrix for a multi-setup experiment using either the correlogram
method or the periodogram method.
Parameters
----------
Y : list of dicts
A list where each element corresponds to a different setup. Each element is a
dictionary with keys 'ref' and 'mov' for reference and moving sensor data,
respectively. Each should be a numpy array with dimensions [N x M], where N is the
number of sensors and M is the number of data points.
fs : float
Sampling frequency of the data.
nxseg : int, optional
Number of data points in each segment for spectral analysis. Default is 1024.
pov : float, optional
Proportion of overlap between segments in spectral analysis. Default is 0.5.
method : str, optional
Method for spectral density estimation. 'per' for periodogram and 'cor' for
correlogram method. Default is 'per'.
Returns
-------
tuple
freq : ndarray
Array of frequency values at which the spectral densities are evaluated.
Sy : ndarray
The scaled spectral density matrices. The shape of the array is [N x N x K],
where N is the total number of sensors (reference + moving) and K is the number
of frequency points.
Note
-----
The function uses an internal function 'SD_Est' to estimate the spectral densities.
The logger is used for debugging purposes to track the progress of analysis.
"""
dt = 1 / fs
n_setup = len(Y) # number of setup
n_ref = Y[0]["ref"].shape[0] # number of reference sensor
# n_mov = [Y[i]["mov"].shape[0] for i in range(n_setup)] # number of moving sensor
# n_DOF = n_ref+np.sum(n_mov) # total number of sensors
Gyy = []
for ii in trange(n_setup):
logger.debug("Analyising setup nr.: %s...", ii)
Y_ref = Y[ii]["ref"]
Y_mov = Y[ii]["mov"]
# Ndat = Y[ii]["ref"].shape[1] # number of data points
Y_all = np.vstack((Y[ii]["ref"], Y[ii]["mov"]))
# r = Y_all.shape[0] # total sensor for the ii setup
if method == "per":
# noverlap = nxseg*pov
freq, Sy_allref = SD_est(Y_all, Y_ref, dt, nxseg, method)
_, Sy_allmov = SD_est(Y_all, Y_mov, dt, nxseg, method)
Gyy.append(np.hstack((Sy_allref, Sy_allmov)))
elif method == "cor":
freq, Sy_allref = SD_est(Y_all, Y_ref, dt, nxseg, method)
_, Sy_allmov = SD_est(Y_all, Y_mov, dt, nxseg, method)
Gyy.append(np.hstack((Sy_allref, Sy_allmov)))
logger.debug("... Done with setup nr.: %s!", ii)
Gy_refref = (
1 / n_setup * np.sum([Gyy[ii][:n_ref, :n_ref] for ii in range(n_setup)], axis=0)
)
Gg = []
# Scale spectrum to reference spectrum
for ff in range(len(freq)):
G1 = [
np.dot(
np.dot(
Gyy[ii][n_ref:, :n_ref][:, :, ff],
np.linalg.inv(Gyy[ii][:n_ref, :n_ref][:, :, ff]),
),
Gy_refref[:, :, ff],
)
for ii in range(n_setup)
]
G2 = np.vstack(G1)
G3 = np.vstack([Gy_refref[:, :, ff], G2])
Gg.append(G3)
Sy = np.array(Gg)
Sy = np.moveaxis(Sy, 0, 2)
return freq, Sy
# -----------------------------------------------------------------------------
[docs]def SD_est(
Yall,
Yref,
dt,
nxseg=1024,
method="cor",
pov=0.5,
):
"""
Estimate the Cross-Spectral Density (CSD) using either the correlogram method or the
periodogram method.
Parameters
----------
Yall : ndarray
Input signal data.
Yref : ndarray
Reference signal data.
dt : float
Sampling interval.
nxseg : int, optional
Length of each segment for CSD estimation. Default is 1024.
method : str, optional
Method for CSD estimation, either "cor" for correlogram method or "per" for
periodogram. Default is "cor".
pov : float, optional
Proportion of overlap for the periodogram method. Default is 0.5.
Returns
-------
tuple
freq : ndarray
Array of frequencies.
Sy : ndarray
Cross-Spectral Density (CSD) estimation.
"""
if method == "cor":
Ndat = Yref.shape[1] # number of data points
n_ref = Yref.shape[0]
n_all = Yall.shape[0]
# Calculating Auto e Cross-Spectral Density (Y_all, Y_ref)
_, Pxy = signal.csd(
Yall.reshape(n_all, 1, Ndat),
Yref.reshape(1, n_ref, Ndat),
nperseg=nxseg // 2,
nfft=nxseg,
noverlap=0,
window="boxcar",
)
Rxy = np.fft.irfft(Pxy)
tau = -Rxy.shape[2] / np.log(0.01)
win = signal.windows.exponential(Rxy.shape[2], center=0, tau=tau, sym=False)
Rxy *= win
Sy = np.fft.rfft(Rxy)
freq = np.arange(0, Sy.shape[2]) * (1 / dt / (nxseg)) # Frequency vector
elif method == "per":
noverlap = nxseg * pov
Ndat = Yref.shape[1]
n_ref = Yref.shape[0]
n_all = Yall.shape[0]
# Calculating Auto e Cross-Spectral Density (Y_all, Y_ref)
freq, Sy = signal.csd(
Yall.reshape(n_all, 1, Ndat),
Yref.reshape(1, n_ref, Ndat),
fs=1 / dt,
nperseg=nxseg,
noverlap=noverlap,
window="hann",
)
return freq, Sy
# -----------------------------------------------------------------------------
[docs]def SD_svalsvec(SD):
"""
Compute the singular values and singular vectors for a given set of Cross-Spectral
Density (CSD) matrices.
Parameters
----------
SD : ndarray
Array of Cross-Spectral Density (CSD) matrices, with shape
(number_of_rows, number_of_columns, number_of_frequencies).
Returns
-------
tuple
S_val : ndarray
Singular values.
S_vec : ndarray
Singular vectors.
"""
nr, nc, nf = SD.shape
Sval = np.zeros((nf, nc))
S_val = np.empty((nf, nc, nc))
S_vec = np.empty((nf, nr, nr), dtype=complex)
for k in range(nf):
U1, S, _ = np.linalg.svd(SD[:, :, k])
U1_1 = U1.conj().T
Sval[k, :] = np.sqrt(S)
S_val[k, :, :] = np.diag(np.sqrt(S))
S_vec[k, :, :] = U1_1
S_val = np.moveaxis(S_val, 0, 2)
S_vec = np.moveaxis(S_vec, 0, 2)
return S_val, S_vec
# -----------------------------------------------------------------------------
[docs]def FDD_mpe(
Sval,
Svec,
freq,
sel_freq,
DF=0.1,
):
"""
Extracts modal parameters using the Frequency Domain Decomposition (FDD) method.
Parameters
----------
Sval : ndarray
A 3D array of singular values. Dimensions are [Nch, Nref, Nf], where Nch is the
number of channels, Nref is the number of reference channels, and Nf is the
number of frequency points.
Svec : ndarray
A 3D array of singular vectors corresponding to Sval. Dimensions are the same as Sval.
freq : ndarray
1D array of frequency values corresponding to the singular values and vectors.
sel_freq : list or ndarray
Selected frequencies around which modal parameters are to be extracted.
DF : float, optional
Frequency bandwidth around each selected frequency within which the function
searches for a peak. Default is 0.1.
Returns
-------
tuple
Fn : ndarray
Extracted modal frequencies.
Phi : ndarray
Corresponding normalized mode shapes (each column corresponds to a mode shape).
Note
-----
The function assumes that the first singular value and vector correspond to the dominant
mode at each frequency point.
"""
# Sval, Svec = SD_svalsvec(Sy)
Nch, Nref, Nf = Sval.shape
Freq = []
Fi = []
index = []
maxSy_diff = []
logger.info("Extracting FDD modal parameters")
for sel_fn in tqdm(sel_freq):
# Frequency bandwidth where the peak is searched
lim = (sel_fn - DF, sel_fn + DF)
idxlim = (
np.argmin(np.abs(freq - lim[0])),
np.argmin(np.abs(freq - lim[1])),
) # Indices of the limits
# Ratios between the first and second singular value
diffS1S2 = Sval[0, 0, idxlim[0] : idxlim[1]] / Sval[1, 1, idxlim[0] : idxlim[1]]
maxDiffS1S2 = np.max(diffS1S2) # Looking for the maximum difference
idx1 = np.argmin(np.abs(diffS1S2 - maxDiffS1S2)) # Index of the max diff
idxfin = idxlim[0] + idx1 # Final index
# Modal properties
fn_FDD = freq[idxfin] # Frequency
phi_FDD = Svec[0, :, idxfin] # Mode shape
# Normalized (unity displacement)
phi_FDDn = phi_FDD / phi_FDD[np.argmax(np.abs(phi_FDD))]
Freq.append(fn_FDD)
Fi.append(phi_FDDn)
index.append(idxfin)
maxSy_diff.append(maxDiffS1S2)
logger.debug("Done!")
Fn = np.array(Freq)
Phi = np.array(Fi).T
index = np.array(index)
return Fn, Phi
# -----------------------------------------------------------------------------
# COMMENT
# Utility function (Hidden for users?)
[docs]def SDOF_bellandMS(Sy, dt, sel_fn, phi_FDD, method="FSDD", cm=1, MAClim=0.85, DF=1.0):
"""
Computes the SDOF bell and mode shapes for a specified frequency range using FSDD or
EFDD methods.
Parameters
----------
Sy : ndarray
Spectral matrix of the system. Expected dimensions are [Nch, Nch, Nf], where Nch is
the number of channels and Nf is the number of frequency points.
dt : float
Time interval of the data sampling.
sel_fn : float
Selected modal frequency around which the SDOF analysis is to be performed.
phi_FDD : ndarray
Mode shape corresponding to the selected modal frequency.
method : str, optional
Method for SDOF analysis. Supports 'FSDD' for Frequency Spatial Domain Decomposition
and 'EFDD' for Enhanced Frequency Domain Decomposition. Default is 'FSDD'.
cm : int, optional
Number of close modes to consider in the analysis. Default is 1.
MAClim : float, optional
Threshold for the Modal Assurance Criterion (MAC) to filter modes. Default is 0.85.
DF : float, optional
Frequency bandwidth around the selected frequency for analysis. Default is 1.0.
Returns
-------
tuple
SDOFbell1 : ndarray
The SDOF bell (power spectral density) of the selected mode.
SDOFms1 : ndarray
The mode shapes corresponding to the SDOF bell.
"""
Sval, Svec = SD_svalsvec(Sy)
Nch = phi_FDD.shape[0]
nxseg = Sval.shape[2]
freq = np.arange(0, nxseg) * (1 / dt / (2 * nxseg))
# Frequency bandwidth where the peak is searched
lim = (sel_fn - DF, sel_fn + DF)
idxlim = (
np.argmin(np.abs(freq - lim[0])),
np.argmin(np.abs(freq - lim[1])),
) # Indices of the limits
# Initialise SDOF bell and Mode Shape
SDOFbell = np.zeros(len(np.arange(idxlim[0], idxlim[1])), dtype=complex)
SDOFms = np.zeros((len(np.arange(idxlim[0], idxlim[1])), Nch), dtype=complex)
for csm in range(cm): # Loop through close mode (if any, default 1)
# Frequency Spatial Domain Decomposition variation (defaulf)
if method == "FSDD":
# Save values that satisfy MAC > MAClim condition
SDOFbell += np.array(
[
np.dot(
np.dot(phi_FDD.conj().T, Sy[:, :, el]), phi_FDD
) # Enhanced PSD matrix (frequency filtered)
if MAC(phi_FDD, Svec[csm, :, el]) > MAClim
else 0
for el in range(int(idxlim[0]), int(idxlim[1]))
]
)
# Do the same for mode shapes
SDOFms += np.array(
[
Svec[csm, :, el]
if MAC(phi_FDD, Svec[csm, :, el]) > MAClim
else np.zeros(Nch)
for el in range(int(idxlim[0]), int(idxlim[1]))
]
)
elif method == "EFDD":
SDOFbell += np.array(
[
Sval[csm, csm, l_] if MAC(phi_FDD, Svec[csm, :, l_]) > MAClim else 0
for l_ in range(int(idxlim[0]), int(idxlim[1]))
]
)
SDOFms += np.array(
[
Svec[csm, :, l_]
if MAC(phi_FDD, Svec[csm, :, l_]) > MAClim
else np.zeros(Nch)
for l_ in range(int(idxlim[0]), int(idxlim[1]))
]
)
SDOFbell1 = np.zeros((nxseg), dtype=complex)
SDOFms1 = np.zeros((nxseg, Nch), dtype=complex)
SDOFbell1[idxlim[0] : idxlim[1]] = SDOFbell
SDOFms1[idxlim[0] : idxlim[1], :] = SDOFms
return SDOFbell1, SDOFms1
# -----------------------------------------------------------------------------
[docs]def EFDD_mpe(
Sy: np.ndarray,
freq: np.ndarray,
dt: float,
sel_freq: typing.Sequence[float],
methodSy: str,
method: str = "FSDD",
DF1: float = 0.1,
DF2: float = 1.0,
cm: int = 1,
MAClim: float = 0.85,
sppk: int = 3,
npmax: int = 20,
) -> typing.Tuple[np.ndarray, np.ndarray, np.ndarray, typing.List[typing.Any]]:
"""
Extracts modal parameters using the Enhanced Frequency Domain Decomposition (EFDD)
and Frequency Spatial Domain Decomposition (FSDD) algorithms.
Parameters
----------
Sy : ndarray
Spectral matrix with dimensions [Nch, Nch, Nf] where Nch is the number of channels
and Nf is the number of frequency points.
freq : ndarray
Array of frequency values corresponding to the spectral matrix.
dt : float
Sampling interval of the data.
sel_freq : sequence of float
Selected modal frequencies around which parameters are to be estimated.
methodSy : str
Method used for spectral density estimation:
- 'cor' for correlation
- 'per' for periodogram
method : str, optional
Specifies the SDOF analysis method ('FSDD' or 'EFDD'). Default is 'FSDD'.
DF1 : float, optional
Frequency bandwidth for initial FDD modal parameter extraction. Default is 0.1.
DF2 : float, optional
Frequency bandwidth for SDOF analysis. Default is 1.0.
cm : int, optional
Number of close modes to consider. Default is 1.
MAClim : float, optional
Threshold for the Modal Assurance Criterion (MAC) to filter modes. Default is 0.85.
sppk : int, optional
Number of initial peaks to skip in autocorrelation analysis. Default is 3.
npmax : int, optional
Maximum number of peaks to consider in the curve fitting for damping ratio
estimation. Default is 20.
Returns
-------
Fn : ndarray
Estimated natural frequencies [Hz].
Xi : ndarray
Estimated damping ratios.
Phi : ndarray
Mode shapes array of shape [Nch, Nm], where Nm = len(sel_freq).
PerPlot : list
List of per-mode metadata for plotting and analysis. Each element is a list:
[freq, time, SDOF bell, Sval, idSV, normalized autocorr,
peak indices, fitted lambda, log-decrement values].
"""
logger = logging.getLogger(__name__)
# 1) Spectral decomposition
Sval, Svec = SD_svalsvec(Sy)
Nch, Nref, nxseg = Sval.shape
# 2) Prepare zero-padding and time axis
pad_factor = 5
n_fft = nxseg * pad_factor
df = 1.0 / (dt * nxseg)
t_record = 1.0 / df
# 3) Initial FDD extraction
Freq_FDD, Phi_FDD = FDD_mpe(Sval, Svec, freq, sel_freq, DF=DF1)
# 4) Containers for results
PerPlot: typing.List[typing.Any] = []
Fn_list, Xi_list, Phi_list = [], [], []
# logger.info("Starting EFDD modal extraction for %d modes", len(sel_freq))
for idx, sel_fn in enumerate(sel_freq, start=1):
# logger.info("Mode %d/%d: sel_fn=%.4f Hz", idx, len(sel_freq), sel_fn)
phi_ref = Phi_FDD[:, idx - 1]
# Single-DOF bell and mode shape
SDOFbell, _ = SDOF_bellandMS(
Sy, dt, sel_fn, phi_ref, method=method, cm=cm, MAClim=MAClim, DF=DF2
)
idSV = np.array(np.where(SDOFbell)).T
# 5) Autocorrelation
sdof_corr = np.fft.ifft(SDOFbell, n=n_fft, norm="ortho").real
half = sdof_corr.size // 2
time = np.linspace(0, t_record, half)
norm_corr = sdof_corr[:half] / sdof_corr[0]
# 6) Peak detection
peaks_pos, _ = find_peaks(norm_corr)
peaks_neg, _ = find_peaks(-norm_corr)
all_peaks = np.sort(np.concatenate([peaks_pos, peaks_neg]))
# 7) Bounds check
required = sppk + npmax
if all_peaks.size < required:
raise ValueError(
f"Mode {idx}/{len(sel_freq)}: sel_fn={sel_fn:.4f} Hz \n"
f"Only {all_peaks.size} peaks found; Reduce npmax:{npmax}."
)
# 8) Select peaks and values
peak_idx = all_peaks[sppk:required]
peak_vals = norm_corr[peak_idx]
# 9) Damped frequency estimation
Td = 2.0 * np.diff(time[peak_idx])
fd = 1.0 / np.mean(Td)
# 10) Log-decrement & curve fit
delta = np.log(np.abs(peak_vals[0]) / np.abs(peak_vals))
lam, _ = curve_fit(lambda x, m: m * x, np.arange(delta.size), delta)
# 11) Correct lambda based on spectral method
if methodSy == "cor":
tau = -(nxseg - 1) / np.log(0.01)
lam = 2 * lam - 1 / tau
else:
lam = 2 * lam
# 12) Compute damping ratio and natural frequency
xi = lam / np.sqrt(4 * np.pi**2 + lam**2)
fn = fd / np.sqrt(1 - xi**2)
# 13) Store results
Fn_list.append(fn)
Xi_list.append(xi)
Phi_list.append(phi_ref)
PerPlot.append(
[freq, time, SDOFbell, Sval, idSV, norm_corr, peak_idx, lam, delta]
)
# 14) Final assembly
Fn = np.array(Fn_list)
Xi = np.array(Xi_list)
Phi = np.array(Phi_list).T
logger.info("EFDD extraction complete.")
return Fn, Xi, Phi, PerPlot