"""Data representation and manipulation"""
import abc
import logging
from numbers import Number
import numpy as np
from .config import ZeroConfig
from .misc import mag_to_db, db_to_mag
LOGGER = logging.getLogger(__name__)
CONF = ZeroConfig()
[docs]def frequencies_match(vector_a, vector_b):
return np.all(vector_a == vector_b)
[docs]def vectors_match(vector_a, vector_b):
return np.allclose(vector_a, vector_b,
rtol=float(CONF["data"]["response_rel_tol"]),
atol=float(CONF["data"]["response_abs_tol"]))
[docs]def spectra_match(vector_a, vector_b):
return np.allclose(vector_a, vector_b,
rtol=float(CONF["data"]["noise_rel_tol"]),
atol=float(CONF["data"]["noise_abs_tol"]))
[docs]def argmax_difference(vector_a, vector_b):
"""Finds the maximum relative difference in percent between `vector_a` and `vector_b`
Returns index of maximum relative difference as well as its value.
"""
# relative difference in percent between `vector_a` and `vector_b`
difference = 100 * np.abs(vector_a - vector_b) / np.abs(vector_a)
# index of maximum difference
i = np.argmax(difference)
return i, difference[i]
[docs]class Series:
"""Data series"""
def __init__(self, x, y):
if x.shape != y.shape:
raise ValueError("specified x and y vectors do not have the same shape")
self.x = x
self.y = y
[docs] @classmethod
def from_mag_phase(cls, x, magnitude, phase=None, mag_scale=None, phase_scale=None):
"""Create :class:`Series` from magnitude and phase data.
Parameters
----------
x : :class:`np.array`
The x vector.
magnitude : :class:`np.array`
The magnitude. This magnitude's scaling is determined by `mag_scale`.
phase : :class:`np.array`, optional
The phase. If `None`, the magnitude is assumed to have zero phase.
mag_scale : :class:`str`, optional
The magnitude scale. Defaults to absolute.
phase_scale : :class:`str`, optional
The phase scale. Defaults to degrees.
Returns
-------
:class:`Series`
The series containing the data.
Raises
------
:class:`ValueError`
If the specified magnitude or phase scale is unrecognised.
"""
if phase is None:
# set phase to zero
phase = np.zeros_like(magnitude)
if mag_scale is None:
mag_scale = "abs"
if phase_scale is None:
phase_scale = "deg"
if mag_scale.lower() == "db":
magnitude = db_to_mag(magnitude)
elif mag_scale.lower() == "abs":
# don't need to scale
pass
else:
raise ValueError(f"cannot handle scale '{mag_scale}'")
if phase_scale.lower() == "deg":
phase = np.radians(phase)
elif phase_scale.lower() == "rad":
# don't need to scale
pass
else:
raise ValueError(f"cannot handle scale '{phase_scale}'")
# convert magnitude and phase to complex
complex_ = magnitude * (np.cos(phase) + 1j * np.sin(phase))
return cls(x=x, y=complex_)
[docs] @classmethod
def from_re_im(cls, x, re, im):
"""Create :class:`Series` from real and imaginary parts.
Parameters
----------
x : :class:`np.array`
The x vector.
magnitude : :class:`np.array`
The magnitude. This magnitude's scaling is determined by `mag_scale`.
phase : :class:`np.array`
The phase.
magnitude_scale : :class:`str`, optional
The magnitude scale. Defaults to absolute.
phase_scale : :class:`str`, optional
The phase scale. Defaults to radians.
Returns
-------
:class:`Series`
The series containing the data.
Raises
------
:class:`ValueError`
If either the real or imaginary part is complex.
"""
if np.any(np.iscomplex(re)) or np.any(np.iscomplex(im)):
raise ValueError("specified real and imaginary parts must not be complex")
# combine into complex
complex_ = re + 1j * im
return cls(x=x, y=complex_)
def __add__(self, other):
other = getattr(other, "y", other)
return self._new_series(self.y + other)
def __sub__(self, other):
return self + -other
def __neg__(self):
return self._new_series(-self.y)
def __mul__(self, other):
other = getattr(other, "y", other)
return self._new_series(self.y * other)
def __rmul__(self, other):
# Series multiplication is commutative.
return self * other
def __truediv__(self, other):
if hasattr(other, "y"):
return self * other.inverse()
return self._new_series(self.y * 1 / other)
def __rtruediv__(self, other):
return other * self.inverse()
def __pow__(self, other):
other = getattr(other, "y", other)
return self._new_series(self.y ** other)
[docs] def inverse(self):
return self._new_series(np.reciprocal(self.y))
def _new_series(self, new_y):
return self.__class__(self.x, new_y)
def __eq__(self, other):
"""Checks if the specified series is identical to this one, within tolerance"""
return np.allclose(self.x, other.x) and np.allclose(self.y, other.y)
[docs]class BaseFunction(metaclass=abc.ABCMeta):
"""Base function container.
A function represents data between one or many sources and sinks. These can be any type
descending from :class:`.BaseElement`, though concrete subclasses may implement additional type
constraints.
Functions are designed to allow mathematical operations, such as multiplication by scalars or
other functions. Concrete subclasses may implement additional constraints on allowed operations.
Parameters
----------
sources, sinks : list of :class:`.BaseElement`, optional
The function's sources and sinks. Defaults to empty lists.
series : :class:`.Series`, optional
The function's data.
plot_options : :class:`dict`, optional
Plot options, passed to :meth:`.matplotlib.pyplot.plot`.
"""
def __init__(self, sources=None, sinks=None, series=None, plot_options=None):
self._label = None
if sources is None:
sources = []
if sinks is None:
sinks = []
if plot_options is None:
plot_options = {}
self.sources = list(sources)
self.sinks = list(sinks)
self.series = series
self.plot_options = dict(plot_options)
self._set_fallback_plot_options()
def _set_fallback_plot_options(self):
"""Set plot options in cases where user-specified options are not provided."""
if "alpha" not in self.plot_options:
self._set_plot_option("alpha", CONF["plot"]["alpha"])
if "dash_capstyle" not in self.plot_options:
self._set_plot_option("dash_capstyle", CONF["plot"]["dash_capstyle"])
if "linestyle" not in self.plot_options and "ls" not in self.plot_options:
self._set_plot_option("linestyle", CONF["plot"]["linestyle"])
if "linewidth" not in self.plot_options and "lw" not in self.plot_options:
self._set_plot_option("linewidth", CONF["plot"]["linewidth"])
if "zorder" not in self.plot_options:
self._set_plot_option("zorder", CONF["plot"]["zorder"])
def _set_plot_option(self, key, value):
if value is None:
# Ignore empty values, to allow the user to ignore options in the configuration.
return
self.plot_options[key] = value
@property
def frequencies(self):
return self.series.x
[docs] @abc.abstractmethod
def draw(self, *axes):
raise NotImplementedError
@property
def label(self):
return self._format_label()
@label.setter
def label(self, label):
self._label = label
@abc.abstractmethod
def _format_label(self, tex=False, suffix=None, ignore_user_label=False):
raise NotImplementedError
def __str__(self):
return self.label
[docs] def equivalent(self, other):
"""Checks if the specified function has equivalent sources, sinks, labels and data."""
return self.meta_equivalent(other) and self.series_equivalent(other)
[docs] @abc.abstractmethod
def series_equivalent(self, other):
"""Checks if the specified function has an equivalent series to this one."""
raise NotImplementedError
def __eq__(self, other):
"""Checks if the specified data set is identical to this one, within tolerance."""
return self.equivalent(other)
def __hash__(self):
"""Hash of the data set's meta data."""
return hash(self.meta_data())
[docs]class SingleSourceFunction(BaseFunction, metaclass=abc.ABCMeta):
"""Data set containing data for a single source."""
def __init__(self, source=None, **kwargs):
if source is not None:
sources = [source]
else:
sources = []
super().__init__(sources=sources, **kwargs)
@property
def source(self):
return self.sources[0]
@source.setter
def source(self, source):
self.sources[0] = source
@property
def source_unit(self):
return self.source.element_unit
[docs]class SingleSinkFunction(BaseFunction, metaclass=abc.ABCMeta):
"""Data set containing data for a single sink."""
def __init__(self, sink=None, **kwargs):
if sink is not None:
sinks = [sink]
else:
sinks = []
super().__init__(sinks=sinks, **kwargs)
@property
def sink(self):
return self.sinks[0]
@sink.setter
def sink(self, sink):
self.sinks[0] = sink
@property
def sink_unit(self):
return self.sink.element_unit
[docs]class Response(SingleSourceFunction, SingleSinkFunction):
"""Data set representing a response at a sink from a source."""
@property
def complex_magnitude(self):
return self.series.y
@property
def magnitude(self):
"""Absolute magnitude."""
return np.abs(self.complex_magnitude)
@property
def db_magnitude(self):
r"""Magnitude scaled in units of decibel.
The response is power scaled such that the response is :math:`20 \log_{10} \left| x \right|`
where :math:`x` is the complex response provided by :attr:`.complex_magnitude`.
"""
return mag_to_db(self.magnitude)
@property
def phase(self):
"""Phase in degrees."""
return np.angle(self.complex_magnitude) * 180 / np.pi
[docs] def series_equivalent(self, other):
"""Checks if the specified function has an equivalent series to this one."""
return vectors_match(self.magnitude, other.magnitude)
def _draw_magnitude(self, axes, label_suffix=None, scale_db=True):
"""Add magnitude plot to axes"""
label = self._format_label(tex=True, suffix=label_suffix)
if scale_db:
# Decibel y-axis scaling.
axes.semilogx(self.frequencies, self.db_magnitude, label=label, **self.plot_options)
else:
# Linear y-axis scaling.
axes.loglog(self.frequencies, self.magnitude, label=label, **self.plot_options)
def _draw_phase(self, axes):
"""Add phase plot to axes"""
axes.semilogx(self.frequencies, self.phase, **self.plot_options)
[docs] def draw(self, *axes, **kwargs):
if len(axes) != 2:
raise ValueError("two axes (magnitude and phase) must be provided")
self._draw_magnitude(axes[0], **kwargs)
self._draw_phase(axes[1])
def _format_label(self, tex=False, suffix=None, ignore_user_label=False):
if not ignore_user_label and self._label is not None:
return self._label
if tex:
format_str = r"$\bf{%s}$ to $\bf{%s}$ (%s)%s"
else:
format_str = "%s to %s (%s)%s"
if suffix is not None:
suffix = " %s" % suffix
else:
suffix = ""
return format_str % (self.source.label, self.sink.label, self.unit_str, suffix)
@property
def unit_str(self):
if self.sink_unit is None and self.source_unit is None:
return "dimensionless"
if self.sink_unit is None:
# Only source has unit.
return f"1/{self.source_unit}"
if self.source_unit is None:
# Only sink has unit.
return self.sink_unit
# Both have units.
return f"{self.sink_unit}/{self.source_unit}"
def __mul__(self, other):
if isinstance(other, Response):
other_sink = other.sink
other_value = other.series
if self.sink_unit != other.source_unit:
raise ValueError(f"Cannot multiply this response by {other}: the sink unit of this "
f"response, {self.sink_unit}, is incompatible with the source "
f"unit of {other}, {other.source_unit}.")
elif isinstance(other, Number):
other_sink = self.sink
other_value = other
else:
# E.g. NoiseDensityBase.
raise TypeError(f"Cannot multiply {self.__class__.__name__} by "
f"{other.__class__.__name__}.")
return self._new_response(other_sink, self.series * other_value)
def __rmul__(self, other):
# Note: "other" should never be another Response or NoiseDensityBase, since these functions
# implement the corresponding __mul__ methods for Response.
if not isinstance(other, Number):
raise TypeError(f"Cannot multiply {other.__class__.__name__} by "
f"{self.__class__.__name__}.")
# Response-scalar multiplication is commutative.
return self * other
def __truediv__(self, other):
if not isinstance(other, Number):
raise TypeError(f"Cannot divide {self.__class__.__name__} by "
f"{other.__class__.__name__}. To invert, call inverse().")
return self._new_response(self.sink, self.series * 1 / other)
def __rtruediv__(self, other):
if not isinstance(other, Number):
raise TypeError(f"Cannot divide {other.__class__.__name__} by "
f"{self.__class__.__name__}. To invert, call inverse().")
return self._new_response(self.sink, other / self.series)
[docs] def inverse(self):
"""Inverse response."""
return self.__class__(source=self.sink, sink=self.source, series=self.series.inverse())
def _new_response(self, new_sink, new_series):
"""Create a copy of this response with the specified sink and series."""
new_response = self.__class__(source=self.source, sink=new_sink, series=new_series)
new_response.label = self._label
return new_response
[docs]class NoiseDensityBase(SingleSinkFunction, metaclass=abc.ABCMeta):
"""Function with a single noise spectral density."""
@property
def spectral_density(self):
return self.series.y
[docs] def series_equivalent(self, other):
"""Checks if the specified function has an equivalent series to this one."""
return spectra_match(self.spectral_density, other.spectral_density)
[docs] def draw(self, *axes, label_suffix=None):
if len(axes) != 1:
raise ValueError("only one axis supported")
axes = axes[0]
label = self._format_label(tex=True, suffix=label_suffix)
axes.loglog(self.frequencies, self.spectral_density, label=label, **self.plot_options)
def __mul__(self, other):
if isinstance(other, Response):
other_sink = other.sink
other_value = other.magnitude
if self.sink_unit != other.source_unit:
raise ValueError(f"Cannot multiply this noise by {other}: the sink unit of this "
f"noise, {self.sink_unit}, is incompatible with the source unit "
f"of {other}, {other.source_unit}.")
elif isinstance(other, Number):
other_sink = self.sink
other_value = other
else:
# E.g. NoiseDensityBase.
raise TypeError(f"Cannot multiply {self.__class__.__name__} by "
f"{other.__class__.__name__}.")
return self._new_noise_density(other_sink, self.series * other_value)
def __rmul__(self, other):
# Note: "other" should never be another Response or NoiseDensityBase, since these functions
# implement the corresponding __mul__ methods for NoiseDensityBase.
if not isinstance(other, Number):
raise TypeError(f"Cannot divide {other.__class__.__name__} by "
f"{self.__class__.__name__}.")
# Noise-scalar multiplication is commutative.
return self * other
@abc.abstractmethod
def _new_noise_density(self, new_sink, new_series):
raise NotImplementedError
[docs]class NoiseDensity(SingleSourceFunction, NoiseDensityBase):
"""Noise data series"""
@property
def noise_name(self):
return str(self.source)
@property
def element_type(self):
return self.source.element_type
@property
def noise_type(self):
return self.source.noise_type
def _format_label(self, tex=False, suffix=None, ignore_user_label=False):
if not ignore_user_label and self._label is not None:
return self._label
if tex:
format_str = r"$\bf{%s}$ to $\bf{%s}$%s"
else:
format_str = "%s to %s%s"
if suffix is not None:
suffix = " %s" % suffix
else:
suffix = ""
return format_str % (self.noise_name, self.sink.label, suffix)
def _new_noise_density(self, new_sink, new_series):
"""Create a copy of this noise density with the specified sink and series."""
new_noise = self.__class__(source=self.source, sink=new_sink, series=new_series)
new_noise.label = self._label
return new_noise
[docs]class MultiNoiseDensity(NoiseDensityBase):
"""Set of noise data series from multiple sources to a single sink"""
def __init__(self, sources=None, series=None, constituents=None, label=None, **kwargs):
if series is None and constituents is None:
raise ValueError("one of series or constituents must be specified")
elif series is not None and constituents is not None:
raise ValueError("only one of series and constituents can be specified")
if series is not None:
# only total noise is specified
self.constituent_noise = None
else:
# derive series from constituents
# sources are derived from constituents
if sources is not None:
raise ValueError("cannot specify constituents and sources together")
# Use first spectral density to get sink and frequencies.
frequencies = constituents[0].frequencies
# check frequency axes are identical
if not np.all([frequencies == spectral_density.series.x
for spectral_density in constituents]):
raise ValueError("specified spectra do not share common x-axis")
sources = [spectral_density.source for spectral_density in constituents]
# store constituent series
self.constituent_noise = [spectral_density.series for spectral_density in constituents]
# create series
noise_sum = np.sqrt(sum([data.y ** 2 for data in self.constituent_noise]))
series = Series(frequencies, noise_sum)
# call parent constructor
super().__init__(sources=sources, series=series, **kwargs)
if label is None:
label = "Incoherent sum"
self.label = label
if constituents is not None:
# check sink agrees with those set in constituents
if not all([self.sink == spectral_density.sink for spectral_density in constituents]):
raise Exception("cannot handle noise for functions with different sinks")
def _set_fallback_plot_options(self):
"""Set plot options in cases where user-specified options are not provided."""
if "alpha" not in self.plot_options:
self._set_plot_option("alpha", CONF["plot"]["sum_alpha"])
if "dash_capstyle" not in self.plot_options:
self._set_plot_option("dash_capstyle", CONF["plot"]["sum_dash_capstyle"])
if "linestyle" not in self.plot_options and "ls" not in self.plot_options:
self._set_plot_option("linestyle", CONF["plot"]["sum_linestyle"])
if "linewidth" not in self.plot_options and "lw" not in self.plot_options:
self._set_plot_option("linewidth", CONF["plot"]["sum_linewidth"])
if "zorder" not in self.plot_options:
self._set_plot_option("zorder", CONF["plot"]["sum_zorder"])
# Call parent after setting these options so they don't get overridden.
super()._set_fallback_plot_options()
@property
def noise_names(self):
if self.constituent_noise is None:
return "unknown"
return [spectral_density.noise_name for spectral_density in self.constituent_noise]
def _format_label(self, *args, suffix=None, ignore_user_label=False, **kwargs):
if not ignore_user_label and self._label is not None:
return self._label
if suffix is not None:
suffix = " %s" % suffix
else:
suffix = ""
return f"{self._label}{suffix}"
def _new_noise_density(self, new_sink, new_series):
"""Create a copy of this noise density with the specified sink and series."""
new_noise = self.__class__(sources=self.sources, sink=new_sink, series=new_series,
label=self._label)
return new_noise
[docs]class Reference(BaseFunction, metaclass=abc.ABCMeta):
def __init__(self, frequencies, data, label=None, unit=None, **kwargs):
self._sink_unit = unit
super().__init__(series=Series(frequencies, data), **kwargs)
if label is None:
label = "Reference"
self.label = label
def _format_label(self, *args, suffix=None, ignore_user_label=False, **kwargs):
if not ignore_user_label and self._label is not None:
return self._label
if suffix is not None:
suffix = " %s" % suffix
else:
suffix = ""
return f"{self._label}{suffix}"
@property
def sink_unit(self):
return self._sink_unit
[docs]class ReferenceResponse(Reference, Response):
pass
[docs]class ReferenceNoise(Reference, NoiseDensity):
pass