# -*- coding: utf-8 -*-
"""
General Utility Functions module.
Part of the pyOMA2 package.
Author:
Dag Pasca
"""
import logging
import pickle
import typing
import numpy as np
import pandas as pd
from scipy import linalg, signal
logger = logging.getLogger(__name__)
# =============================================================================
# FUNZIONI GENERALI
# =============================================================================
[docs]def applymask(list_arr, mask, len_phi) -> typing.List[np.ndarray]:
"""
Apply a mask to a list of arrays, filtering their values based on the mask.
Parameters
----------
list_arr : list of np.ndarray
List of arrays to be filtered. Arrays can be 2D or 3D.
mask : np.ndarray
2D boolean array indicating which values to keep (True) or set to NaN (False).
len_phi : int
The length of the mode shape dimension for expanding the mask to 3D.
Returns
-------
list of np.ndarray
List of filtered arrays with the same shapes as the input arrays.
Notes
-----
- If an array in `list_arr` is 3D, the mask is expanded to 3D and applied.
- If an array in `list_arr` is 2D, the original mask is applied directly.
- Values not matching the mask are set to NaN.
"""
# Expand the mask to 3D by adding a new axis (for mode shape)
expandedmask1 = np.expand_dims(mask, axis=-1)
# Repeat the mask along the new dimension
expandedmask1 = np.repeat(expandedmask1, len_phi, axis=-1)
list_filt_arr = []
for arr in list_arr:
if arr is None:
list_filt_arr.append(None)
elif arr.ndim == 3:
list_filt_arr.append(np.where(expandedmask1, arr, np.nan))
elif arr.ndim == 2:
list_filt_arr.append(np.where(mask, arr, np.nan))
return list_filt_arr
# -----------------------------------------------------------------------------
[docs]def HC_conj(lambd) -> typing.Tuple[np.ndarray, np.ndarray]:
"""
Apply Hard validation Criteria (HC), retaining only those elements which have their conjugates also present in the array.
Parameters
----------
lambd : np.ndarray
Array of complex numbers.
Returns
-------
filt_lambd : np.ndarray
Array of the same shape as `lambd` with only elements that have their conjugates also present.
Other elements are set to NaN.
mask : np.ndarray
Boolean array of the same shape as `lambd`, where True indicates that the element and its conjugate are both present.
"""
# Create a set to store elements and their conjugates
element_set = set(lambd.flatten())
# Create a mask to identify elements to keep
mask = np.zeros(lambd.shape, dtype=bool)
for i in range(lambd.shape[0]):
for j in range(lambd.shape[1]):
element = lambd[i, j]
conjugate = np.conj(element)
# Check if both element and its conjugate are in the set
if element in element_set and conjugate in element_set:
mask[i, j] = True
# Create an output array filled with NaNs
filt_lambd = np.full(lambd.shape, np.nan, dtype=lambd.dtype)
# Copy elements that satisfy the condition to the output array
filt_lambd[mask] = lambd[mask]
return filt_lambd, mask
# -----------------------------------------------------------------------------
[docs]def HC_damp(damp, max_damp) -> typing.Tuple[np.ndarray, np.ndarray]:
"""
Apply Hard validation Criteria (HC), retaining only those elements which are positive and less than a specified maximum (damping).
Parameters
----------
damp : np.ndarray
Array of damping ratios.
max_damp : float
Maximum allowed damping ratio.
Returns
-------
filt_damp : np.ndarray
Array of the same shape as `damp` with elements that do not satisfy the condition set to NaN.
mask : np.ndarray
Boolean array of the same shape as `damp`, where True indicates that the element is positive and less than `max_damp`.
"""
mask = np.logical_and(damp < max_damp, damp > 0).astype(int)
filt_damp = damp * mask
filt_damp[filt_damp == 0] = np.nan
# should be the same as
# filt_damp = np.where(damp, np.logical_and(damp < max_damp, damp > 0), damp, np.nan)
return filt_damp, mask
# -----------------------------------------------------------------------------
[docs]def HC_MPC(phi, mpc_lim) -> np.ndarray:
"""
Apply Hard validation Criteria (HC), based on modal phase collinearity (MPC) limit.
Parameters
----------
phi : np.ndarray
Array of mode shapes with shape (number of modes, number of channels, mode shape length).
mpc_lim : float
Minimum allowed value for modal phase collinearity.
Returns
-------
mask_mpc : np.ndarray
Boolean array indicating elements that satisfy the MPC condition.
"""
mask2 = []
for o in range(phi.shape[0]):
for i in range(phi.shape[1]):
try:
mask2.append((MPC(phi[o, i, :]) >= mpc_lim).astype(int))
except Exception:
mask2.append(0)
mask2 = np.array(mask2).reshape((phi.shape[0], phi.shape[1]))
mask3 = np.expand_dims(mask2, axis=-1)
mask3 = np.repeat(mask3, phi.shape[2], axis=-1)
return mask3[:, :, 0]
# -----------------------------------------------------------------------------
[docs]def HC_MPD(phi, mpd_lim) -> np.ndarray:
"""
Apply Hard validation Criteria (HC), based on modal phase deviation (MPD) limit.
Parameters
----------
phi : np.ndarray
Array of mode shapes with shape (number of modes, number of channels, mode shape length).
mpd_lim : float
Maximum allowed value for modal phase deviation.
Returns
-------
mask_mpd : np.ndarray
Boolean array indicating elements that satisfy the MPD condition.
"""
mask = []
for o in range(phi.shape[0]):
for i in range(phi.shape[1]):
try:
mask.append((MPD(phi[o, i, :]) <= mpd_lim).astype(int))
except Exception:
mask.append(0)
mask = np.array(mask).reshape((phi.shape[0], phi.shape[1]))
mask1 = np.expand_dims(mask, axis=-1)
mask1 = np.repeat(mask1, phi.shape[2], axis=-1)
return mask1[:, :, 0]
# -----------------------------------------------------------------------------
[docs]def HC_CoV(Fn, Fn_std, CoV_max) -> typing.Tuple[np.ndarray, np.ndarray]:
"""
Apply Hard validation Criteria (HC), retaining only those elements which have a
Coefficient of Variation (CoV) less than a specified maximum.
Parameters
----------
Fn : np.ndarray
Array of frequencies.
Fn_std : np.ndarray
Array of frequency covariances (standard deviation).
CoV_max : float
Maximum allowed Coefficient of Variation (standard deviation/mean value).
Returns
-------
filt_cov : np.ndarray
Array of the same shape as `Fn_std` with elements that do not satisfy the condition set to NaN.
mask : np.ndarray
Boolean array of the same shape as `Fn_std`, where True indicates that the element is less than `max_cov`.
"""
mask = (Fn_std < CoV_max * Fn).astype(int)
filt_std = Fn_std * mask
filt_std[filt_std == 0] = np.nan
# should be the same as
# filt_damp = np.where(damp, np.logical_and(damp < max_damp, damp > 0), damp, np.nan)
return filt_std, mask
# -----------------------------------------------------------------------------
[docs]def HC_freqlim(Fn, freq_lim) -> typing.Tuple[np.ndarray, np.ndarray]:
"""
Applies a frequency mask to limit an array of frequencies within specified bounds.
Parameters
----------
Fn : np.ndarray
Array of frequency values to be filtered.
freq_lim : tuple of float
Lower and upper bounds of the frequency range (min_freq, max_freq).
Returns
-------
filt_fn : np.ndarray
Array of frequencies where values outside the specified range are replaced with NaN.
mask : np.ndarray
Boolean array indicating which elements of `Fn` are within the specified frequency range (1 if within, 0 otherwise).
"""
mask = np.logical_and(Fn < freq_lim[1], Fn > freq_lim[0]).astype(int)
filt_fn = Fn * mask
filt_fn[filt_fn == 0] = np.nan
return filt_fn, mask
# -----------------------------------------------------------------------------
[docs]def SC_apply(Fn, Xi, Phi, ordmin, ordmax, step, err_fn, err_xi, err_phi) -> np.ndarray:
"""
Apply Soft validation Criteria (SC) to determine the stability of modal parameters between consecutive orders.
Parameters
----------
Fn : np.ndarray
Array of natural frequencies.
Xi : np.ndarray
Array of damping ratios.
Phi : np.ndarray
Array of mode shapes.
ordmin : int
Minimum model order.
ordmax : int
Maximum model order.
step : int
Step size for increasing model order.
err_fn : float
Tolerance for the natural frequency error.
err_xi : float
Tolerance for the damping ratio error.
err_phi : float
Tolerance for the mode shape error.
Returns
-------
Lab : np.ndarray
Array of labels indicating stability (1 for stable, 0 for unstable).
"""
# inirialise labels
Lab = np.zeros(Fn.shape, dtype="int")
# SOFT CONDITIONS
# STABILITY BETWEEN CONSECUTIVE ORDERS
for oo in range(ordmin, ordmax + 1, step):
o = int(oo / step - 1)
f_n = Fn[:, o].reshape(-1, 1)
xi_n = Xi[:, o].reshape(-1, 1)
phi_n = Phi[:, o, :]
f_n1 = Fn[:, o - 1].reshape(-1, 1)
xi_n1 = Xi[:, o - 1].reshape(-1, 1)
phi_n1 = Phi[:, o - 1, :]
# Skip the first order as it has no previous order to compare with
if o == 0:
continue
for i in range(len(f_n)):
try:
idx = np.nanargmin(np.abs(f_n1 - f_n[i]))
cond1 = np.abs(f_n[i] - f_n1[idx]) / f_n[i]
cond2 = np.abs(xi_n[i] - xi_n1[idx]) / xi_n[i]
cond3 = 1 - MAC(phi_n[i, :], phi_n1[idx, :])
if cond1 < err_fn and cond2 < err_xi and cond3 < err_phi:
Lab[i, o] = 1 # Stable
else:
Lab[i, o] = 0 # Nuovo polo o polo instabile
except Exception as e:
# If f_n[i] is nan, do nothin, n.b. the lab stays 0
logger.debug(e)
return Lab
# -----------------------------------------------------------------------------
[docs]def dfphi_map_func(phi, sens_names, sens_map, cstrn=None) -> pd.DataFrame:
"""
Maps mode shapes to sensor locations and constraints, creating a dataframe.
Parameters
----------
phi : np.ndarray
Array of mode shapes.
sens_names : list
List of sensor names corresponding to the mode shapes.
sens_map : pd.DataFrame
DataFrame containing the sensor mappings.
cstrn : pd.DataFrame, optional
DataFrame containing constraints, by default None.
Returns
-------
pd.DataFrame
DataFrame with mode shapes mapped to sensor points.
"""
# create mode shape dataframe
df_phi = pd.DataFrame(
{"sName": sens_names, "Phi": phi},
)
# APPLY POINTS TO SENSOR MAPPING
# check for costraints
if cstrn is not None:
cstr = cstrn.to_numpy(na_value=0)[:, :]
val = cstr @ phi
ctn_df = pd.DataFrame(
{"cName": cstrn.index, "val": val},
)
# apply sensor mapping
mapping_sens = dict(zip(df_phi["sName"], df_phi["Phi"]))
# apply costraints mapping
mapping_cstrn = dict(zip(ctn_df["cName"], ctn_df["val"]))
mapping = dict(mapping_sens, **mapping_cstrn)
# else apply only sensor mapping
else:
mapping = dict(zip(df_phi["sName"], df_phi["Phi"]))
# mode shape mapped to points
df_phi_map = sens_map.replace(mapping).astype(float)
return df_phi_map
# -----------------------------------------------------------------------------
[docs]def check_on_geo1(
file_dict, ref_ind=None
) -> typing.Tuple[
typing.List[str],
pd.DataFrame,
np.ndarray,
np.ndarray,
np.ndarray,
np.ndarray,
np.ndarray,
]:
"""
Validates and processes sensor and background geometry data from a dictionary of dataframes.
Parameters
----------
file_dict : dict
Dictionary containing dataframes of sensor and background geometry data.
ref_ind : list, optional
List of reference indices for sensor names, by default None.
Returns
-------
tuple
A tuple containing:
- sens_names (list): List of sensor names.
- sens_coord (pd.DataFrame): DataFrame of sensor coordinates.
- sens_dir (np.ndarray): Array of sensor directions.
- sens_lines (np.ndarray or None): Array of sensor lines.
- BG_nodes (np.ndarray or None): Array of background nodes.
- BG_lines (np.ndarray or None): Array of background lines.
- BG_surf (np.ndarray or None): Array of background surfaces.
Raises
------
ValueError
If required sheets are missing or invalid.
If shapes of 'sensors coordinates' and 'sensors directions' do not match.
If 'sensors coordinates' or 'BG nodes' does not have 3 columns.
If 'BG lines' does not have 2 columns.
If 'BG surfaces' does not have 3 columns.
If sensor names are not present in the index of 'sensors coordinates'.
"""
# Remove INFO sheet from dict of dataframes
if "INFO" in file_dict:
del file_dict["INFO"]
# -----------------------------------------------------------------------------
required_sheets = ["sensors names", "sensors coordinates", "sensors directions"]
all_sheets = required_sheets + [
"sensors lines",
"BG nodes",
"BG lines",
"BG surfaces",
]
# Ensure required sheets exist
if not all(sheet in file_dict for sheet in required_sheets):
raise ValueError(f"At least the sheets {required_sheets} must be defined!")
# Ensure all sheets are valid
for sheet in file_dict:
if sheet not in all_sheets:
raise ValueError(
f"'{sheet}' is not a valid name. Valid sheet names are: \n"
f"{all_sheets}"
)
# -----------------------------------------------------------------------------
# Check 'sensors coordinates' shape
if file_dict["sensors coordinates"].values.shape[1] != 3:
raise ValueError(
"'sensors coordinates' should have 3 columns for the x,y and z coordinates."
f"'sensors coordinates' have {file_dict['sensors coordinates'].values.shape[1]} columns"
)
# Check on same shape 'sensors coordinates' and 'sensors directions'
if (
file_dict["sensors coordinates"].values.shape
!= file_dict["sensors directions"].values.shape
):
raise ValueError(
"'sensors coordinates' and 'sensors directions' must have the same shape.\n"
f"'sensors coordinates' shape is {file_dict['sensors coordinates'].values.shape} while 'sensors directions' shape is {file_dict['sensors directions'].values.shape}"
)
# Check 'BG nodes' shape
if (
file_dict.get("BG nodes") is not None
and not file_dict["BG nodes"].empty
and file_dict["BG nodes"].values.shape[1] != 3
):
raise ValueError(
"'BG nodes' should have 3 columns for the x,y and z coordinates."
f"'BG nodess' have {file_dict['BG nodes'].values.shape[1]} columns"
)
# Check 'BG lines' shape
if (
file_dict.get("BG lines") is not None
and not file_dict["BG lines"].empty
and file_dict["BG lines"].values.shape[1] != 2
):
raise ValueError(
"'BG lines' should have 2 columns for the starting and ending node of the line."
f"'BG lines' have {file_dict['BG lines'].values.shape[1]} columns"
)
# Check 'BG surfaces' shape
if (
file_dict.get("BG surfaces") is not None
and not file_dict["BG surfaces"].empty
and file_dict["BG surfaces"].values.shape[1] != 3
):
raise ValueError(
"'BG surfaces' should have 3 columns for the i,j and k node of the triangle."
f"'BG surfaces' have {file_dict['BG surfaces'].values.shape[1]} columns"
)
# Check on same index 'sensors coordinates' and 'sensors directions'
if (
file_dict["sensors coordinates"].index.to_list()
!= file_dict["sensors directions"].index.to_list()
):
raise ValueError(
"'sensors coordinates' and 'sensors directions' must have the same index.\n"
f"'sensors coordinates' index is {file_dict['sensors coordinates'].index} while 'sensors directions' index is {file_dict['sensors directions'].index}"
)
# Extract the relevant dataframes
sens_names = file_dict["sensors names"]
sens_names = flatten_sns_names(sens_names, ref_ind)
# Check for the presence of each string in the list
if not all(
item in file_dict["sensors coordinates"].index.to_list() for item in sens_names
):
raise ValueError(
"All sensors names must be present as index of the sensors coordinates dataframe!"
)
# -----------------------------------------------------------------------------
# Find the indices that rearrange sens_coord to sens_names
# newIDX = find_map(sens_names, file_dict['sensors coordinates'].index.to_numpy())
# reorder if necessary
sens_coord = file_dict["sensors coordinates"].reindex(index=sens_names)
sens_dir = file_dict["sensors directions"].reindex(index=sens_names).values
# -----------------------------------------------------------------------------
# Adjust to 0 indexing
for key in ["sensors lines", "BG lines", "BG surfaces"]:
if key in file_dict and not file_dict[key].empty:
file_dict[key] = file_dict[key].sub(1)
# -----------------------------------------------------------------------------
# if there is no entry create an empty one
for key in all_sheets:
if key not in file_dict:
file_dict[key] = pd.DataFrame()
# Transform to None empty dataframes
for sheet, df in file_dict.items():
if df.empty:
file_dict[sheet] = None
# Transform to array relevant dataframes
if (
sheet in ["sensors lines", "BG nodes", "BG lines", "BG surfaces"]
and file_dict[sheet] is not None
):
file_dict[sheet] = file_dict[sheet].to_numpy()
sens_lines = file_dict["sensors lines"]
BG_nodes = file_dict["BG nodes"]
BG_lines = file_dict["BG lines"]
BG_surf = file_dict["BG surfaces"]
return (sens_names, sens_coord, sens_dir, sens_lines, BG_nodes, BG_lines, BG_surf)
# -----------------------------------------------------------------------------
[docs]def check_on_geo2(
file_dict, ref_ind=None, fill_na="zero"
) -> typing.Tuple[
typing.List[str],
pd.DataFrame,
np.ndarray,
pd.DataFrame,
pd.DataFrame,
np.ndarray,
np.ndarray,
np.ndarray,
np.ndarray,
np.ndarray,
]:
"""
Validates and processes sensor and background geometry data from a dictionary of dataframes.
Parameters
----------
file_dict : dict
Dictionary containing dataframes of sensor and background geometry data.
ref_ind : list, optional
List of reference indices for sensor names, by default None.
fill_na : str, optional
Method to fill missing values in the mapping dataframe, by default "zero".
Returns
-------
tuple
A tuple containing:
- sens_names (list): List of sensor names.
- pts_coord (pd.DataFrame): DataFrame of points coordinates.
- sens_map (pd.DataFrame): DataFrame of sensor mappings.
- cstr (pd.DataFrame or None): DataFrame of constraints.
- sens_sign (pd.DataFrame): DataFrame of sensor signs.
- sens_lines (np.ndarray or None): Array of sensor lines.
- sens_surf (np.ndarray or None): Array of sensor surfaces.
- BG_nodes (np.ndarray or None): Array of background nodes.
- BG_lines (np.ndarray or None): Array of background lines.
- BG_surf (np.ndarray or None): Array of background surfaces.
Raises
------
ValueError
If required sheets are missing or invalid.
If shapes of 'points coordinates' and 'mapping' do not match.
If 'points coordinates' or 'BG nodes' does not have 3 columns.
If 'BG lines' does not have 2 columns.
If 'BG surfaces' does not have 3 columns.
If sensor names are not present in the mapping dataframe.
If constraints columns do not correspond to sensor names.
If constraints names are not the same as those used in the mapping.
"""
# Remove INFO sheet from dict of dataframes
if "INFO" in file_dict:
del file_dict["INFO"]
# -----------------------------------------------------------------------------
required_sheets = ["sensors names", "points coordinates", "mapping"]
all_sheets = required_sheets + [
"constraints",
"sensors sign",
"sensors lines",
"sensors surfaces",
"BG nodes",
"BG lines",
"BG surfaces",
]
# Ensure required sheets exist
if not all(sheet in file_dict for sheet in required_sheets):
raise ValueError(f"At least the sheets {required_sheets} must be defined!")
# Ensure all sheets are valid
for sheet in file_dict:
if sheet not in all_sheets:
raise ValueError(
f"'{sheet}' is not a valid name. Valid sheet names are: \n"
f"{all_sheets}"
)
# -----------------------------------------------------------------------------
# Check 'points coordinates' shape
if file_dict["points coordinates"].values.shape[1] != 3:
raise ValueError(
"'points coordinates' should have 3 columns for the x,y and z coordinates."
f"'points coordinates' have {file_dict['points coordinates'].values.shape[1]} columns"
)
# Check on same shape 'points coordinates' and 'mapping'
if file_dict["points coordinates"].values.shape != file_dict["mapping"].values.shape:
raise ValueError(
"'points coordinates' and 'mapping' must have the same shape.\n"
f"'points coordinates' shape is {file_dict['points coordinates'].values.shape} while 'mapping' shape is {file_dict['mapping'].values.shape}"
)
# Check on shape for 'sensors sign'
if (
file_dict.get("sensors sign") is not None
and not file_dict["sensors sign"].empty
and file_dict["points coordinates"].values.shape
!= file_dict["sensors sign"].values.shape
):
raise ValueError(
"'points coordinates' and 'sensors sign' must have the same shape.\n"
f"'points coordinates' shape is {file_dict['points coordinates'].values.shape} while 'sensors sign' shape is {file_dict['sensors sign'].values.shape}"
)
# Check 'BG nodes' shape
if (
file_dict.get("BG nodes") is not None
and not file_dict["BG nodes"].empty
and file_dict["BG nodes"].values.shape[1] != 3
):
raise ValueError(
"'BG nodes' should have 3 columns for the x,y and z coordinates."
f"'BG nodess' have {file_dict['BG nodes'].values.shape[1]} columns"
)
# Check 'BG lines' shape
if (
file_dict.get("BG lines") is not None
and not file_dict["BG lines"].empty
and file_dict["BG lines"].values.shape[1] != 2
):
raise ValueError(
"'BG lines' should have 2 columns for the starting and ending node of the line."
f"'BG lines' have {file_dict['BG lines'].values.shape[1]} columns"
)
# Check 'BG surfaces' shape
if (
file_dict.get("BG surfaces") is not None
and not file_dict["BG surfaces"].empty
and file_dict["BG surfaces"].values.shape[1] != 3
):
raise ValueError(
"'BG surfaces' should have 3 columns for the i,j and k node of the triangle."
f"'BG surfaces' have {file_dict['BG surfaces'].values.shape[1]} columns"
)
# if there is no 'sensors sign' create one
if file_dict.get("sensors sign") is None or file_dict["sensors sign"].empty:
sens_sign = pd.DataFrame(
np.ones(file_dict["points coordinates"].values.shape),
columns=file_dict["points coordinates"].columns,
)
file_dict["sensors sign"] = sens_sign
# -----------------------------------------------------------------------------
# Check that mapping contains all sensors name
# Extract the relevant dataframes
sens_names = file_dict["sensors names"]
sens_names = flatten_sns_names(sens_names, ref_ind)
df_map = file_dict["mapping"]
constraints = file_dict["constraints"].fillna(0)
if fill_na == "zero":
df_map = df_map.fillna(0.0)
# elif fill_na == "interp":
# df_map = df_map.fillna("interp")
file_dict["mapping"] = df_map
# Step 1: Flatten the DataFrame to a single list of values
map_fl = df_map.values.flatten()
# Step 2: Convert all values to strings
map_str_fl = [str(value) for value in map_fl]
# Step 3: Check for the presence of each string in the list
if not all(item in map_str_fl for item in sens_names):
raise ValueError("All sensors names must be present in the mapping dataframe!")
# -----------------------------------------------------------------------------
# Check that the constraints columns correspond to sensors names
columns = constraints.columns.to_list()
indices = constraints.index.to_list()
if not all(item in sens_names for item in columns):
raise ValueError(
"The constraints columns names must correspond to sensors names.\n"
f"constraints columns names: {columns}, \n"
f"sensors names: {sens_names}"
)
# -----------------------------------------------------------------------------
# Check that the constraints names are the same as those used in mapping
list_of_possible_constraints = ["0", "0.0", "interp"]
# remove values equal to sensors names and other possible values (should be left with only contraints)
map_str_cstr = [
value
for value in map_str_fl
if value not in sens_names and value not in list_of_possible_constraints
]
if not all(item in map_str_cstr for item in indices):
raise ValueError(
"The constraints names (index column) must be the same as those used in mapping.\n"
f"constraints index column: {indices}, \n"
f"mapping : {map_str_cstr}"
)
# -----------------------------------------------------------------------------
# Add missing sensor names with all zeros to the constraints DataFrame
missing_sensors = [name for name in sens_names if name not in columns]
for name in missing_sensors:
constraints[name] = 0
# Reorder columns to match the order of sens_names if necessary
file_dict["constraints"] = constraints[sens_names]
# -----------------------------------------------------------------------------
# Adjust to 0 indexing
for key in ["sensors lines", "sensors surfaces", "BG lines", "BG surfaces"]:
if key in file_dict and not file_dict[key].empty:
file_dict[key] = file_dict[key].sub(1)
# -----------------------------------------------------------------------------
# if there is no entry create an empty one
for key in all_sheets:
if key not in file_dict:
file_dict[key] = pd.DataFrame()
# Transform to None empty dataframes
for sheet, df in file_dict.items():
if df.empty:
file_dict[sheet] = None
# Transform to array relevant dataframes
if (
sheet
in [
"sensors lines",
"sensors surfaces",
"BG nodes",
"BG lines",
"BG surfaces",
]
and file_dict[sheet] is not None
):
file_dict[sheet] = file_dict[sheet].to_numpy()
# sens_names = file_dict["sensors names"]
pts_coord = file_dict["points coordinates"]
sens_map = file_dict["mapping"]
cstr = file_dict["constraints"]
sens_sign = file_dict["sensors sign"]
sens_lines = file_dict["sensors lines"]
sens_surf = file_dict["sensors surfaces"]
BG_nodes = file_dict["BG nodes"]
BG_lines = file_dict["BG lines"]
BG_surf = file_dict["BG surfaces"]
return (
sens_names,
pts_coord,
sens_map,
cstr,
sens_sign,
sens_lines,
sens_surf,
BG_nodes,
BG_lines,
BG_surf,
)
# -----------------------------------------------------------------------------
[docs]def flatten_sns_names(sens_names, ref_ind=None) -> typing.List[str]:
"""
Ensures that sensors names is in the correct form (1D list of strings) for both
single-setup or multi-setup geometries.
Parameters
----------
sens_names : list, pd.DataFrame, or np.ndarray
Sensor names which can be a list of strings, list of lists of strings, DataFrame,
or 1D numpy array of strings.
ref_ind : list, optional
List of reference indices for multi-setup geometries, by default None.
Returns
-------
list
Flattened list of sensor names.
Raises
------
AttributeError
If `ref_ind` is not provided for multi-setup geometries.
ValueError
If `sens_names` is not of the expected types.
"""
# check if sens_names is a dataframe with one row and transform it to a list
# FOR SINGLE-SETUP GEOMETRIES
if isinstance(sens_names, pd.DataFrame) and sens_names.values.shape[0] == 1:
sns_names_fl = sens_names.values.tolist()[0]
# Check if sens_names is a DataFrame with more than one row or a list of lists
# FOR MULTI-SETUP GEOMETRIES
elif (isinstance(sens_names, pd.DataFrame) and sens_names.values.shape[0] > 1) or (
isinstance(sens_names, list)
and all(isinstance(elem, list) for elem in sens_names)
):
# if sens_names is a dataframe, transform it to a list
if isinstance(sens_names, pd.DataFrame):
sens_names = [
[item for item in row if not pd.isna(item)]
for row in sens_names.values.tolist()
]
n = len(sens_names)
if ref_ind is None:
raise AttributeError(
"You need to specify the reference indices for a Multi-setup test"
)
k = len(ref_ind[0]) # number of reference sensor (from the first setup)
sns_names_fl = []
# Create the reference strings
for i in range(k):
sns_names_fl.append(f"REF{i+1}")
# Flatten the list of strings and exclude the reference indices
for i in range(n):
for j in range(len(sens_names[i])):
if j not in ref_ind[i]:
sns_names_fl.append(sens_names[i][j])
elif isinstance(sens_names, list) and all(
isinstance(elem, str) for elem in sens_names
):
sns_names_fl = sens_names
elif isinstance(sens_names, np.ndarray) and sens_names.ndim == 1:
sns_names_fl = sens_names.tolist()
else:
raise ValueError(
"The input must of type: [list(str), list(list(str)), pd.DataFrame, NDArray(str)]"
)
return sns_names_fl
# -----------------------------------------------------------------------------
[docs]def example_data() -> tuple:
"""
Simulate a 5‐DOF shear‐type building under white‐noise excitation,
with options for base‐ vs. DOF‐level inputs and choice of measurement type.
Returns
-------
Y_noisy : ndarray, shape (N, dof)
Simulated output time histories (for each DOF) of the requested measurement.
U : ndarray, shape (N, nu)
The actual input(s) (white noise + process noise) used.
(fn, xi, phi) : tuple
fn : array of natural frequencies [Hz], length = dof
xi : damping ratio (scalar)
phi: mass‐normalized mode‐shape matrix (dof × dof)
"""
# -------------------------
# Variable definition
dof = 5 # degrees of freedom
m_val = 25.91 # [kg]
k_val = 10000.0 # [N/m]
base_excitation: bool = False # wheter to apply the input as base excitation or not
meas: str = "a" # "d" = displacement, "v" = velocity, "a" = acceleration
N: int = 30001 # Number of samples
dt: float = 1 / 100 # Time increment [sec]
noise: float = 0.1 # Noise level (used for both process‐ and measurement noise)
xi: float = 0.02 # Damping ratio (all modes)
# -------------------------
# Assembling mass matrix M and stiffness matrix K
M = np.eye(dof) * m_val
K = np.zeros((dof, dof))
for i in range(dof):
if i < dof - 1:
K[i, i] += k_val
K[i, i + 1] += -k_val
K[i + 1, i] += -k_val
K[i + 1, i + 1] += k_val
else:
K[i, i] += k_val
# Eigen‐analysis: K φ = M φ Λ
lam, phi = linalg.eigh(K, M)
idx = np.argsort(lam)
lam = lam[idx]
phi = phi[:, idx]
# Natural frequencies [Hz] and mass‐normalized mode shapes
w = np.sqrt(lam) # [rad/s]
fn = w / (2 * np.pi) # [Hz]
M_mod = phi.T @ M @ phi
norm_factors = 1.0 / np.sqrt(np.diag(M_mod))
phi = phi * norm_factors[np.newaxis, :] # φ.T M φ = I
# Modal damping (diagonal of 2*ξ*ω_i)
zeta = 2 * w * xi
C_modal = np.diag(zeta)
C = M @ phi @ C_modal @ phi.T @ M # physical damping matrix
# Continuous‐time A matrix (2dof × 2dof)
zero = np.zeros((dof, dof))
Idn = np.eye(dof)
A_cont = np.block([[zero, Idn], [-linalg.solve(M, K), -linalg.solve(M, C)]])
# Build C_full and D_full based on measurement type
if meas == "d":
C_full = np.hstack([Idn, zero]) # output = displacement
D_full = np.zeros((dof, dof))
elif meas == "v":
C_full = np.hstack([zero, Idn]) # output = velocity
D_full = np.zeros((dof, dof))
elif meas == "a":
# output = acceleration = -M^{-1}K x - M^{-1}C x_dot + M^{-1} F
C_full = np.hstack([-linalg.solve(M, K), -linalg.solve(M, C)])
D_full = linalg.solve(M, np.eye(dof))
else:
raise ValueError("meas must be one of 'd', 'v', or 'a'")
# -------------------------
# Build B, C, D depending on base vs. DOF excitation
if base_excitation:
# Base excitation enters through first DOF only (r_vec = [1, 0, ..., 0]^T)
r_vec = np.zeros((dof, 1))
r_vec[0, 0] = 1.0
if meas == "d":
B = np.vstack([np.zeros((dof, 1)), -linalg.solve(M, K @ r_vec)])
D = np.zeros((dof, 1))
elif meas == "v":
B = np.vstack([np.zeros((dof, 1)), -linalg.solve(M, C @ r_vec)])
D = np.zeros((dof, 1))
else: # "a"
B = np.vstack([np.zeros((dof, 1)), -r_vec])
D = -r_vec
C = C_full.copy()
else:
# DOF‐level excitation: one independent force per DOF
B_full = np.vstack([np.zeros((dof, dof)), linalg.solve(M, np.eye(dof))])
B = B_full.copy()
C = C_full.copy()
D = D_full.copy()
# Continuous‐time StateSpace → discrete
sysc = signal.StateSpace(A_cont, B, C, D)
sysd = sysc.to_discrete(dt)
# Time vector
t = np.arange(N) * dt
# -------------------------
# Construct input U
rng = np.random.RandomState(12345)
if base_excitation:
# Single‐channel white noise + process noise
U_clean = rng.randn(N, 1)
U_noise = noise * rng.randn(N, 1)
U = U_clean + U_noise
else:
# One white‐noise input per DOF + process noise
U_clean = rng.randn(N, dof)
U_noise = noise * rng.randn(N, dof)
U = U_clean + U_noise
# -------------------------
# Simulate discrete‐time response
tout, Y, X = signal.dlsim(sysd, U, t)
# -------------------------
# Add measurement noise to outputs (same 0.1 factor)
meas_noise_std = noise * np.std(Y, axis=0)
Y_noisy = Y + (rng.randn(*Y.shape) * meas_noise_std)
# Return noisy outputs, inputs, and modal info
return Y_noisy, U, (fn, xi, phi)
# -----------------------------------------------------------------------------
[docs]def merge_mode_shapes(
MSarr_list: typing.List[np.ndarray], reflist: typing.List[typing.List[int]]
) -> np.ndarray:
"""
Merges multiple mode shape arrays from different setups into a single mode shape array.
Parameters
----------
MSarr_list : List[np.ndarray]
A list of mode shape arrays. Each array in the list corresponds
to a different experimental setup. Each array should have dimensions [N x M], where N is the number
of sensors (including both reference and roving sensors) and M is the number of modes.
reflist : List[List[int]]
A list of lists containing the indices of reference sensors. Each sublist
corresponds to the indices of the reference sensors used in the corresponding setup in `MSarr_list`.
Each sublist should contain the same number of elements.
Returns
-------
np.ndarray
A merged mode shape array. The number of rows in the array equals the sum of the number
of unique sensors across all setups minus the number of reference sensors in each setup
(except the first one). The number of columns equals the number of modes.
Raises
------
ValueError
If the mode shape arrays in `MSarr_list` do not have the same number of modes.
"""
Nsetup = len(MSarr_list) # number of setup
Nmodes = MSarr_list[0].shape[1] # number of modes
Nref = len(reflist[0]) # number of reference sensors
M = Nref + np.sum(
[MSarr_list[i].shape[0] - Nref for i in range(Nsetup)]
) # total number of nodes in a mode shape
# Check if the input arrays have consistent dimensions
for i in range(1, Nsetup):
if MSarr_list[i].shape[1] != Nmodes:
raise ValueError("All mode shape arrays must have the same number of modes.")
# Initialize merged mode shape array
merged_mode_shapes = np.zeros((M, Nmodes)).astype(complex)
# Loop through each mode
for k in range(Nmodes):
phi_1_k = MSarr_list[0][:, k] # Save the mode shape from first setup
phi_ref_1_k = phi_1_k[reflist[0]] # Save the reference sensors
merged_mode_k = np.concatenate(
(phi_ref_1_k, np.delete(phi_1_k, reflist[0]))
) # initialise the merged mode shape
# Loop through each setup
for i in range(1, Nsetup):
ref_ind = reflist[i] # reference sensors indices for the specific setup
phi_i_k = MSarr_list[i][:, k] # mode shape of setup i
phi_ref_i_k = MSarr_list[i][ref_ind, k] # save data from reference sensors
phi_rov_i_k = np.delete(
phi_i_k, ref_ind, axis=0
) # saave data from roving sensors
# Find scaling factor
alpha_i_k = MSF(phi_ref_1_k, phi_ref_i_k)
# Merge mode
merged_mode_k = np.hstack((merged_mode_k, alpha_i_k * phi_rov_i_k))
merged_mode_shapes[:, k] = merged_mode_k
return merged_mode_shapes
# -----------------------------------------------------------------------------
[docs]def MPC(phi: np.ndarray) -> float:
"""
Calculate the Modal Phase Collinearity (MPC) of a complex mode shape.
The MPC is a measure of the collinearity between the real and imaginary parts
of a mode shape. A value of 1 indicates perfect collinearity, while lower values
indicate a more complex (non-collinear) mode.
Parameters
----------
phi : ndarray
Complex mode shape vector, shape: (n_locations, ).
Returns
-------
float
MPC value, ranging between 0 and 1, where 1 indicates perfect collinearity.
"""
try:
S = np.cov(phi.real, phi.imag)
lambd = np.linalg.eigvals(S)
MPC = (lambd[0] - lambd[1]) ** 2 / (lambd[0] + lambd[1]) ** 2
except Exception:
MPC = np.nan
return MPC
# -----------------------------------------------------------------------------
[docs]def MPD(phi: np.ndarray) -> float:
"""
Calculate the Mean Phase Deviation (MPD) of a complex mode shape.
The MPD measures the deviation of the mode shape phases from a purely
real mode. It quantifies the phase variation along the mode shape.
Parameters
----------
phi : ndarray
Complex mode shape vector, shape: (n_locations, ).
Returns
-------
float
MPD value, representing the average deviation of the phase from a
purely real mode.
"""
try:
U, s, VT = np.linalg.svd(np.c_[phi.real, phi.imag])
V = VT.T
w = np.abs(phi)
num = phi.real * V[1, 1] - phi.imag * V[0, 1]
den = np.sqrt(V[0, 1] ** 2 + V[1, 1] ** 2) * np.abs(phi)
MPD = np.sum(w * np.arccos(np.abs(num / den))) / np.sum(w)
except Exception:
MPD = np.nan
return MPD
# -----------------------------------------------------------------------------
[docs]def MSF(phi_1: np.ndarray, phi_2: np.ndarray) -> np.ndarray:
"""
Calculates the Modal Scale Factor (MSF) between two sets of mode shapes.
Parameters
----------
phi_1 : ndarray
Mode shape matrix X, shape: (n_locations, n_modes) or n_locations.
phi_2 : ndarray
Mode shape matrix A, shape: (n_locations, n_modes) or n_locations.
Returns
-------
ndarray
The MSF values, real numbers that scale `phi_1` to `phi_2`.
Raises
------
Exception
If `phi_1` and `phi_2` do not have the same shape.
"""
if phi_1.ndim == 1:
phi_1 = phi_1[:, None]
if phi_2.ndim == 1:
phi_2 = phi_2[:, None]
if phi_1.shape[0] != phi_2.shape[0] or phi_1.shape[1] != phi_2.shape[1]:
raise Exception(
f"`phi_1` and `phi_2` must have the same shape: {phi_1.shape} "
f"and {phi_2.shape}"
)
n_modes = phi_1.shape[1]
msf = []
for i in range(n_modes):
_msf = np.dot(phi_2[:, i].T, phi_1[:, i]) / np.dot(phi_1[:, i].T, phi_1[:, i])
msf.append(_msf)
return np.array(msf).real
# -----------------------------------------------------------------------------
[docs]def MCF(phi: np.ndarray) -> np.ndarray:
"""
Calculates the Modal Complexity Factor (MCF) for mode shapes.
Parameters
----------
phi : ndarray
Complex mode shape matrix, shape: (n_locations, n_modes) or n_locations.
Returns
-------
ndarray
MCF values, ranging from 0 (for real modes) to 1 (for complex modes).
"""
if phi.ndim == 1:
phi = phi[:, None]
n_modes = phi.shape[1]
mcf = []
for i in range(n_modes):
S_xx = np.dot(phi[:, i].real, phi[:, i].real)
S_yy = np.dot(phi[:, i].imag, phi[:, i].imag)
S_xy = np.dot(phi[:, i].real, phi[:, i].imag)
_mcf = 1 - ((S_xx - S_yy) ** 2 + 4 * S_xy**2) / (S_xx + S_yy) ** 2
mcf.append(_mcf)
return np.array(mcf)
# -----------------------------------------------------------------------------
[docs]def MAC(phi_X: np.ndarray, phi_A: np.ndarray) -> np.ndarray:
"""
Calculates the Modal Assurance Criterion (MAC) between two sets of mode shapes.
Parameters
----------
phi_X : ndarray
Mode shape matrix X, shape: (n_locations, n_modes) or n_locations.
phi_A : ndarray
Mode shape matrix A, shape: (n_locations, n_modes) or n_locations.
Returns
-------
ndarray
MAC matrix. Returns a single MAC value if both `phi_X` and `phi_A` are
one-dimensional arrays.
Raises
------
Exception
If mode shape matrices have more than 2 dimensions or if their first dimensions do not match.
"""
if phi_X.ndim == 1:
phi_X = phi_X[:, np.newaxis]
if phi_A.ndim == 1:
phi_A = phi_A[:, np.newaxis]
if phi_X.ndim > 2 or phi_A.ndim > 2:
raise Exception(
f"Mode shape matrices must have 1 or 2 dimensions (phi_X: {phi_X.ndim}, phi_A: {phi_A.ndim})"
)
if phi_X.shape[0] != phi_A.shape[0]:
raise Exception(
f"Mode shapes must have the same first dimension (phi_X: {phi_X.shape[0]}, "
f"phi_A: {phi_A.shape[0]})"
)
# mine
# MAC = np.abs(np.dot(phi_X.conj().T, phi_A)) ** 2 / (
# (np.dot(phi_X.conj().T, phi_X)) * (np.dot(phi_A.conj().T, phi_A))
# )
# original
MAC = np.abs(np.conj(phi_X).T @ phi_A) ** 2
MAC = MAC.astype(complex)
for i in range(phi_X.shape[1]):
for j in range(phi_A.shape[1]):
MAC[i, j] = MAC[i, j] / (
np.conj(phi_X[:, i]) @ phi_X[:, i] * np.conj(phi_A[:, j]) @ phi_A[:, j]
)
if MAC.shape == (1, 1):
MAC = MAC[0, 0]
return MAC.real
# -----------------------------------------------------------------------------
[docs]def pre_multisetup(
dataList: typing.List[np.ndarray], reflist: typing.List[typing.List[int]]
) -> typing.List[typing.Dict[str, np.ndarray]]:
"""
Preprocesses data from multiple setups by separating reference and moving sensor data.
Parameters
----------
DataList : list of numpy arrays
List of input data arrays for each setup, where each array represents sensor data.
reflist : list of lists
List of lists containing indices of sensors used as references for each setup.
Returns
-------
list of dicts
A list of dictionaries, each containing the data for a setup.
Each dictionary has keys 'ref' and 'mov' corresponding to reference and moving sensor data.
"""
n_setup = len(dataList) # number of setup
Y = []
for i in range(n_setup):
y = dataList[i]
n_ref = len(reflist[i])
n_sens = y.shape[1]
ref_id = reflist[i]
mov_id = list(range(n_sens))
for ii in range(n_ref):
mov_id.remove(ref_id[ii])
ref = y[:, ref_id]
mov = y[:, mov_id]
# TO DO: check that len(n_ref) is the same in all setup
# N.B. ONLY FOR TEST
# Y.append({"ref": np.array(ref).reshape(n_ref,-1)})
Y.append(
{
"ref": np.array(ref).T.reshape(n_ref, -1),
"mov": np.array(mov).T.reshape(
(n_sens - n_ref),
-1,
),
}
)
return Y
# -----------------------------------------------------------------------------
[docs]def invperm(p: np.ndarray) -> np.ndarray:
"""
Compute the inverse permutation of a given array.
Parameters
----------
p : array-like
A permutation of integers from 0 to n-1, where n is the length of the array.
Returns
-------
ndarray
An array representing the inverse permutation of `p`.
"""
q = np.empty_like(p)
q[p] = np.arange(len(p))
return q
# -----------------------------------------------------------------------------
[docs]def find_map(arr1: np.ndarray, arr2: np.ndarray) -> np.ndarray:
"""
Maps the elements of one array to another based on sorting order.
Parameters
----------
arr1 : array-like
The first input array.
arr2 : array-like
The second input array, which should have the same length as `arr1`.
Returns
-------
ndarray
An array of indices that maps the sorted version of `arr1` to the sorted version of `arr2`.
"""
o1 = np.argsort(arr1)
o2 = np.argsort(arr2)
return o2[invperm(o1)]
# -----------------------------------------------------------------------------
[docs]def filter_data(
data: np.ndarray,
fs: float,
Wn: float,
order: int = 4,
btype: str = "lowpass",
) -> np.ndarray:
"""
Apply a Butterworth filter to the input data.
This function designs and applies a digital Butterworth filter to the input data array. The filter
is applied in a forward-backward manner using the second-order sections representation to minimize
phase distortion.
Parameters
----------
data : array_like
The input signal to filter. If `data` is a multi-dimensional array, the filter is applied along
the first axis.
fs : float
The sampling frequency of the input data.
Wn : array_like
The critical frequency or frequencies. For lowpass and highpass filters, Wn is a scalar; for
bandpass and bandstop filters, Wn is a length-2 sequence.
order : int, optional
The order of the filter. Higher order means a sharper frequency cutoff, but the filter will
also be less stable. The default is 4.
btype : str, optional
The type of filter to apply. Can be 'lowpass', 'highpass', 'bandpass', or 'bandstop'. The default
is 'lowpass'.
Returns
-------
filt_data : ndarray
The filtered signal.
Note
----
This function uses `scipy.signal.butter` to design the filter and `scipy.signal.sosfiltfilt` for
filtering to apply the filter in a zero-phase manner, which does not introduce phase delay to the
filtered signal. For more information, see the scipy documentation for `signal.butter`
(https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.butter.html) and `signal.sosfiltfilt`
(https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.sosfiltfilt.html).
"""
sos = signal.butter(order, Wn, btype=btype, output="sos", fs=fs)
filt_data = signal.sosfiltfilt(sos, data, axis=0)
return filt_data
# -----------------------------------------------------------------------------
[docs]def save_to_file(setup: object, file_name: str) -> None:
"""
Save the specified setup instance to a file.
This method serializes the current instance and saves it to a file using the pickle module.
Parameters
----------
setup : obj
The Setup class that is to be saved.
file_name : str
The name (path) of the file where the setup instance will be saved.
"""
with open(file_name, "wb") as f:
pickle.dump(setup, f)
[docs]def load_from_file(file_name: str) -> object:
"""
Load a setup instance from a file.
This method deserializes a saved setup instance from the specified file.
Parameters
----------
file_name : str
The name (path) of the file from which the setup instance will be loaded.
Returns
-------
Setup
An instance of the setup loaded from the file.
"""
with open(file_name, "rb") as f:
instance = pickle.load(f) # noqa S301
return instance
[docs]def read_excel_file(
path: str,
sheet_name: typing.Optional[str] = None,
engine: str = "openpyxl",
index_col: int = 0,
**kwargs: typing.Any,
) -> dict:
"""
Read an Excel file and return its contents as a dictionary.
Parameters
----------
path : str
The path to the Excel file.
sheet_name : str, optional
The name of the sheet to read. If None, all sheets are read. Default is None.
engine : str, optional
The engine to use for reading the Excel file. Default is 'openpyxl'.
index_col : int, optional
The column to use as the index. Default is 0
**kwargs : dict, optional
Additional keyword arguments to pass to pd.read_excel.
Returns
-------
dict
A dictionary containing the contents of the Excel file, with sheet names as keys.
Raises
------
ImportError
If the specified engine is not available.
RuntimeError
If an error occurs while reading the Excel file.
"""
try:
file_dict = pd.read_excel(
path, sheet_name=sheet_name, engine=engine, index_col=index_col, **kwargs
)
return file_dict
except ImportError as e:
raise ImportError(
"Optional package 'openpyxl' is not installed. "
"Install 'openpyxl' with 'pip install openpyxl' or 'pip install pyoma_2[pyvista]'"
) from e
except Exception as e:
logger.error("An error occurred while reading the Excel file: %s", e)
raise RuntimeError(
f"An error occurred while reading the Excel file: {e.__class__}: {e}"
) from e