from __future__ import annotations
from dataclasses import dataclass
from functools import cached_property
from logging import getLogger
from pathlib import Path
from typing import TYPE_CHECKING, ClassVar, List, Optional
from itertools import product
import h5py
import matplotlib.pyplot as plt
import plotly
from plotly.subplots import make_subplots
import plotly.figure_factory as ff
import numpy as np
import pandas as pd
import schema
import seaborn as sns
from matplotlib.colors import LogNorm, SymLogNorm
from schema import Schema, SchemaError
from toast.tod import hex_layout, hex_pol_angles_qu, plot_focalplane
import numba_quaternion
from .io_helper import H5_CREATE_KW, dumper, loader
from .util import geometric_matrix, unique_matrix, joshian_matrix, total_crosstalk_matrix, total_crosstalk_matrix_exact, omega_i_resonance_exact, C_n_resonance_exact
if TYPE_CHECKING:
from typing import Any, Dict, Tuple, Union
logger = getLogger('coscon')
# schema types
Int = schema.Or(int, np.int64)
Float = schema.Or(float, np.float64)
FloatArray = schema.And(np.ndarray, lambda quat: quat.dtype == np.float64)
Char = schema.And(str, lambda s: len(s) == 1)
OptionalChar = schema.Or(None, Char)
Number = schema.Or(float, int, np.float64, np.int64)
OptionalNumber = schema.Or(None, float, int, np.float64, np.int64)
ListOfString = schema.And(list, lambda list_: all(type(i) is str for i in list_))
[docs]def na_to_none(value):
"""Convert pandas null value to None"""
return None if pd.isna(value) else value
[docs]class DictValueEqualsKey:
"""A fake dictionary where its values always equal their keys"""
def __getitem__(self, key):
return key
# from https://github.com/ickc/toast-workshop ##################################
[docs]def fake_focalplane(
samplerate: float = 20.,
epsilon: float = 0.,
net: float = 1.,
fmin: float = 0.,
alpha: float = 1.,
fknee: float = 0.05,
fwhm: float = 30.,
npix: int = 7,
fov: float = 3.,
) -> Dict[str, Union[float, np.ndarray]]:
"""Create a set of fake detectors.
This generates 7 pixels (14 dets) in a hexagon layout at the boresight
and with a made up polarization orientation.
This function is copied from TOAST workshop with the following license:
BSD 2-Clause License
Copyright (c) 2019, hpc4cmb
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
pol_A = hex_pol_angles_qu(npix)
pol_B = hex_pol_angles_qu(npix, offset=90.0)
dets_A = hex_layout(npix, fov, "", "", pol_A)
dets_B = hex_layout(npix, fov, "", "", pol_B)
dets = {}
for p in range(npix):
pstr = "{:01d}".format(p)
for d, layout in zip(["A", "B"], [dets_A, dets_B]):
props = dict()
props["quat"] = layout[pstr]["quat"]
props["epsilon"] = epsilon
props["rate"] = samplerate
props["alpha"] = alpha
props["NET"] = net
props["fmin"] = fmin
props["fknee"] = fknee
props["fwhm_arcmin"] = fwhm
dname = "{}{}".format(pstr, d)
dets[dname] = props
return dets
# FocalPlane ###################################################################
[docs]@dataclass
class GenericDictStructure:
schema: ClassVar[dict] = {}
data: Dict[str, Any]
validate_schema: bool = True
def __post_init__(self):
if self.validate_schema:
self.validate()
[docs] def validate(self):
try:
Schema(self.schema).validate(self.data)
except SchemaError as e:
logger.warn(f'{type(self).__name__} scheme error detected:')
raise e
@cached_property
def dataframe(self) -> pd.DataFrame:
"""DataFrame representation of the focal plane
This has opposite key-ordering from the dict
i.e. self.data[some_key]['quat'] == self.dataframe['quat'][some_key] == self.dataframe.loc[some_key, 'quat']
"""
return pd.DataFrame.from_dict(self.data, orient='index')
[docs] def dump(
self,
path: Union[Path, str],
overwrite: bool = False,
compress: bool = None
):
"""Write hardware config to a TOML/YAML/JSON file.
"""
dumper(self.data, path, overwrite=overwrite, compress=compress)
[docs] @classmethod
def load(cls, path: Union[Path, str]):
"""Read data from a TOML/YAML/JSON file.
"""
data = loader(path)
return cls(data)
[docs]@dataclass
class GenericFocalPlane(GenericDictStructure):
def __post_init__(self):
# quat is nparray[float64]
# but writing to and reading from toml
# resulted in list of str
# so we have to cast it back to the right types
for value in self.data.values():
quat = value['quat']
if type(quat) is not np.ndarray:
value['quat'] = np.array(quat)
super().__post_init__()
@cached_property
def azimuthal_equidistant_projection_with_orientation(self) -> np.ndarray[np.float_]:
qs = numba_quaternion.Quaternion.from_lastcol_array(
np.array(self.dataframe.quat.values.tolist())
)
return qs.azimuthal_equidistant_projection_with_orientation
@cached_property
def dataframe_with_azimuthal_equidistant_projection_with_orientation(self) -> pd.DataFrame:
azimuthal_equidistant_projection_with_orientation = self.azimuthal_equidistant_projection_with_orientation
df = self.dataframe.copy()
temp = azimuthal_equidistant_projection_with_orientation[:, :2] * (180. / np.pi)
df['x'] = temp[:, 0]
df['y'] = temp[:, 1]
df['orient_angle'] = azimuthal_equidistant_projection_with_orientation[:, 2]
return df
[docs] def iplot(
self,
scale: float = 1.,
height: int = 1000,
width: int = 1000,
) -> plotly.graph_objs._figure.Figure:
pointing = self.azimuthal_equidistant_projection_with_orientation
temp = pointing[:, :2] * (180. / np.pi)
x = temp[:, 0]
y = temp[:, 1]
u = np.cos(pointing[:, 2])
v = np.sin(pointing[:, 2])
fig = ff.create_quiver(x, y, u, v, scale=scale, scaleratio=1.)
fig.update_layout(height=height, width=width)
return fig
[docs]@dataclass
class FakeFocalPlane(GenericFocalPlane):
"""a class describing the output of fake_focalplane"""
schema: ClassVar[dict] = {
str: {
'quat': FloatArray,
'epsilon': Float,
'rate': Float,
'alpha': Float,
'NET': Float,
'fmin': Float,
'fknee': Float,
'fwhm_arcmin': Float,
}
}
data: Dict[str, Union[float, np.ndarray]]
[docs] def plot(
self,
width: float = 4.,
height: float = 4.,
outfile: Optional[Path] = None,
facecolor: Optional[Dict[str, Union[str, Tuple[float, float, float]]]] = None,
polcolor: Optional[Dict[str, Union[str, Tuple[float, float, float]]]] = None,
):
df = self.dataframe
if polcolor is None:
data = self.data
kinds = set(name[-1] for name in data)
n_kinds = len(kinds)
if n_kinds == 2:
logger.info(f'Found detector names ending in 2 characters, treat them as unqiue polarizations: {kinds}')
colors = dict(zip(kinds, sns.color_palette("husl", 2)))
polcolor = {name: colors[name[-1]] for name in data}
else:
logger.warn('Cannot guess the polarization of detectors, drawing with no polarization color. Define polcolor explicitly to fix this.')
return plot_focalplane(
df.quat,
width,
height,
outfile,
fwhm=df.fwhm_arcmin,
facecolor=facecolor,
polcolor=polcolor,
labels=DictValueEqualsKey(),
)
[docs]@dataclass
class AvesDetectors(GenericFocalPlane):
"""a class describing the detectors of Aves.
similar to FakeFocalPlane
"""
schema: ClassVar[dict] = {
str: {
'wafer': str,
'pixel': str,
'pixtype': str,
'band': str,
'fwhm': Float,
'pol': Char,
schema.Optional('handed'): OptionalChar,
'orient': Char,
'quat': FloatArray,
'UID': Int,
}
}
data: Dict[str, Optional[Union[str, float, int, np.ndarray]]]
[docs] def plot(
self,
width: float = 20.,
height: float = 20.,
fontname: str = 'TeX Gyre Schola',
wire: bool = False,
wire_TB: bool = False,
wire_connectionstyle: Optional[str] = None,
):
"""Plot the detectors in Azimuthal equidistant projection with orientation.
For `wire_connectionstyle`, use 'arc3' to draw straight-line.
See more in https://matplotlib.org/stable/gallery/userdemo/connectionstyle_demo.html#sphx-glr-gallery-userdemo-connectionstyle-demo-py
"""
df = self.dataframe_with_azimuthal_equidistant_projection_with_orientation
x_min = df.x.min()
x_max = df.x.max()
y_min = df.y.min()
y_max = df.y.max()
df['orient_angle_x'] = np.cos(df.orient_angle.values)
df['orient_angle_y'] = np.sin(df.orient_angle.values)
colors = sns.color_palette("Paired", 8)
color_dict = dict(zip(product(
('A', 'B'),
('Q', 'U'),
('T', 'B'),
), colors))
fig, ax = plt.subplots(figsize=(width, height))
ax.set_aspect(1.)
ax.set_xlabel("Degrees", fontsize="large", fontname=fontname)
ax.set_ylabel("Degrees", fontsize="large", fontname=fontname)
bleed = df.fwhm.max() / 40.
ax.set_xlim([x_min - bleed, x_max + bleed])
ax.set_ylim([y_min - bleed, y_max + bleed])
if wire:
if wire_TB:
prev_head_x = None
prev_head_y = None
prev_orient_angle = None
r2d = 180. / np.pi
else:
prev_x = None
prev_y = None
prev_pixel = None
for _, row in df.iterrows():
x = row.x
y = row.y
radius = row.fwhm / 120.
pol = row.pol
orient = row.orient
handed = row.handed
pixel = row.pixel
orient_angle_x = row.orient_angle_x
orient_angle_y = row.orient_angle_y
color = color_dict[(
handed,
orient,
pol,
)]
# put circle once per TB
if pol == 'T':
circle = plt.Circle((x, y), radius=radius, color=color)
ax.add_artist(circle)
else:
ax.text(x, y, pixel, horizontalalignment='center', verticalalignment='center', color=color, fontname=fontname)
dx = 2. * radius * orient_angle_x
dy = 2. * radius * orient_angle_y
current_tail_x = x - dx
current_tail_y = y - dy
ax.arrow(
current_tail_x,
current_tail_y,
2. * dx,
2. * dy,
width=0.1 * radius,
head_width=0.3 * radius,
head_length=0.3 * radius,
fc=color,
ec=color,
length_includes_head=True,
)
if wire:
if wire_TB:
orient_angle = row.orient_angle * r2d
if prev_orient_angle is not None:
connectionstyle = f"angle3,angleA={prev_orient_angle + 90.},angleB={orient_angle + 90.}" if wire_connectionstyle is None else wire_connectionstyle
ax.annotate(
"",
xy=(current_tail_x, current_tail_y),
xytext=(prev_head_x, prev_head_y),
xycoords='data',
textcoords='data',
arrowprops=dict(
arrowstyle="->",
color="0.5",
connectionstyle=connectionstyle,
),
)
# for next loop
prev_head_x = x + dx
prev_head_y = y + dy
prev_orient_angle = orient_angle
else:
pixel = row.pixel
if prev_pixel is not None and pixel != prev_pixel:
connectionstyle = 'arc3' if wire_connectionstyle is None else wire_connectionstyle
# shorten arrow
delta_x = x - prev_x
delta_y = y - prev_y
angle = np.arctan2(delta_y, delta_x)
dx = 2. * radius * np.cos(angle)
dy = 2. * radius * np.sin(angle)
x_start = prev_x + dx
x_end = x - dx
y_start = prev_y + dy
y_end = y - dy
ax.annotate(
"",
xy=(x_end, y_end),
xytext=(x_start, y_start),
xycoords='data',
textcoords='data',
arrowprops=dict(
arrowstyle="->",
color="0.5",
connectionstyle=connectionstyle,
),
)
# for next loop
prev_x = x
prev_y = y
prev_pixel = pixel
plt.close()
return fig
[docs]@dataclass
class AvesBands(GenericDictStructure):
"""a class describing the bands of Aves.
"""
schema: ClassVar[dict] = {
str: {
'center': Float,
'bandwidth': Number,
'bandpass': str,
'NET': Float,
'fwhm': Float,
'fknee': Float,
'fmin': Float,
'alpha': Number,
}
}
data: Dict[str, Union[float, int, str]]
[docs]@dataclass
class AvesPixels(GenericDictStructure):
"""a class describing the pixels of Aves.
"""
schema: ClassVar[dict] = {
str: {
'sizemm': float,
'bands': ListOfString,
}
}
data: Dict[str, Union[float, List[str]]]
[docs]@dataclass
class AvesWafers(GenericDictStructure):
"""a class describing the wafers of Aves.
"""
schema: ClassVar[dict] = {
str: {
'type': str,
'npixel': Int,
'pixels': ListOfString,
schema.Optional('handed'): OptionalNumber,
'position': Int,
'telescope': str,
}
}
data: Dict[str, Optional[Union[float, int, str, List[str]]]]
[docs]@dataclass
class AvesTelescopes(GenericDictStructure):
"""a class describing the telescopes of Aves.
"""
schema: ClassVar[dict] = {
str: {
'wafers': ListOfString,
'platescale': Float,
'waferspace': Float,
}
}
data: Dict[str, Union[float, List[str]]]
[docs]@dataclass
class AvesHardware(GenericDictStructure):
schema: ClassVar[dict] = {
str: {
'telescopes': dict,
'wafers': dict,
'pixels': dict,
'bands': dict,
'detectors': dict,
}
}
data: Dict[str, dict]
def __post_init__(self):
data =self.data
self.telescopes = AvesTelescopes(data['telescopes'])
self.wafers = AvesWafers(data['wafers'])
self.pixels = AvesPixels(data['pixels'])
self.bands = AvesBands(data['bands'])
self.detectors = AvesDetectors(data['detectors'])
self.plot = self.detectors.plot
self.iplot = self.detectors.iplot
@cached_property
def dataframe(self) -> pd.DataFrame:
"""DataFrame representation of the hardware
"""
return (
self.detectors.dataframe
.merge(
self.telescopes.dataframe.merge(
self.wafers.dataframe,
left_index=True,
right_on='telescope',
how='outer',
).drop('wafers', axis=1),
left_on='wafer',
right_index=True,
how='outer',
suffixes=('', '_wafer')
).drop('pixels', axis=1)
.merge(
self.pixels.dataframe,
left_on='pixtype',
right_index=True,
how='outer',
)
.merge(
self.bands.dataframe,
left_on='band',
right_index=True,
how='outer',
suffixes=('', '_band')
).drop(['bands', 'fwhm_band'], axis=1)
)
@cached_property
def dataframe_with_azimuthal_equidistant_projection_with_orientation(self) -> pd.DataFrame:
"""DataFrame representation of the hardware
"""
return (
self.detectors.dataframe_with_azimuthal_equidistant_projection_with_orientation
.merge(
self.telescopes.dataframe.merge(
self.wafers.dataframe,
left_index=True,
right_on='telescope',
how='outer',
).drop('wafers', axis=1),
left_on='wafer',
right_index=True,
how='outer',
suffixes=('', '_wafer')
).drop('pixels', axis=1)
.merge(
self.pixels.dataframe,
left_on='pixtype',
right_index=True,
how='outer',
)
.merge(
self.bands.dataframe,
left_on='band',
right_index=True,
how='outer',
suffixes=('', '_band')
).drop(['bands', 'fwhm_band'], axis=1)
)
[docs] @classmethod
def from_dataframe(cls, df: pd.DataFrame, validate_schema=True):
"""Create Hardware from a dataframe representation
"""
final: Dict[str, Dict[str, Dict[str, Any]]] = {}
case: List[str]
unique_cols: List[List[str]]
non_unique_cols: List[List[str]]
for case, unique_cols, non_unique_cols in [
[
['telescope', 'telescopes'],
[
['platescale', 'platescale'],
['waferspace', 'waferspace'],
],
[
['wafer', 'wafers'],
],
], [
['wafer', 'wafers'],
[
['type', 'type'],
['npixel', 'npixel'],
['handed_wafer', 'handed'],
['position', 'position'],
['telescope', 'telescope'],
],
[
['pixtype', 'pixels'],
],
], [
['pixtype', 'pixels'],
[
['sizemm', 'sizemm'],
],
[
['band', 'bands'],
],
], [
['band', 'bands'],
[
['center', 'center'],
['bandwidth', 'bandwidth'],
['bandpass', 'bandpass'],
['NET', 'NET'],
['fwhm', 'fwhm'],
['fknee', 'fknee'],
['fmin', 'fmin'],
['alpha', 'alpha'],
],
[],
],
]:
final[case[1]] = temps = {}
for name, group in df.groupby(case[0]):
temps[name] = temp = {}
for col, col_new in unique_cols:
values = group[col].unique()
assert values.size == 1
temp[col_new] = na_to_none(values[0])
for col, col_new in non_unique_cols:
values = group[col].unique()
temp[col_new] = values.tolist()
# detectors
df_detectors = df[['wafer', 'pixel', 'pixtype', 'band', 'fwhm', 'pol', 'handed', 'orient', 'quat', 'UID']]
final['detectors'] = df_detectors.to_dict(orient='index')
return cls(final, validate_schema=validate_schema)
[docs] def reorder_to(
self,
pixels: List[int],
TB_anti_alignment: bool = False,
TB_alternate_anti_alignment: bool = False,
validate_schema: bool = False,
) -> AvesHardware:
"""Reorder pixels according its integer assignment
:param bool TB_anti_alignment: if True, assume TB pixels ordering BTTBBTTBBT..
else BTBTBTBT...
"""
df = self.dataframe
num_to_name = {}
# assume always end in T, B pair
for name, num in df.pixel.items():
if num not in num_to_name:
num_to_name[num] = name[:-1]
else:
assert num_to_name[num] == name[:-1]
names = [num_to_name[str(pixel).zfill(3)] for pixel in pixels]
names_full = []
if TB_anti_alignment:
forward = True
for name in names:
if forward:
names_full.append(f'{name}B')
names_full.append(f'{name}T')
forward = False
else:
names_full.append(f'{name}T')
names_full.append(f'{name}B')
forward = True
elif TB_alternate_anti_alignment:
for i, name in enumerate(names):
mod = i % 4
if mod == 0 or mod == 3:
names_full.append(f'{name}B')
names_full.append(f'{name}T')
else:
names_full.append(f'{name}T')
names_full.append(f'{name}B')
else:
for name in names:
names_full.append(f'{name}B')
names_full.append(f'{name}T')
return AvesHardware.from_dataframe(df.loc[names_full], validate_schema=validate_schema)
[docs] def reorder_to_from_csv(
self,
path: Path,
TB_anti_alignment: bool = False,
TB_alternate_anti_alignment: bool = False,
validate_schema: bool = False,
) -> AvesHardware:
"""Reorder pixels according its integer assignment
:param Path path: a csv or txt file that has the pixel numbers per line
:param bool TB_anti_alignment: if True, assume TB pixels ordering BTTBBTTBBT..
else BTBTBTBT...
"""
df_name = pd.read_csv(path, header=None)
pixels = df_name.T.values[0]
return self.reorder_to(
pixels,
TB_anti_alignment=TB_anti_alignment,
TB_alternate_anti_alignment=TB_alternate_anti_alignment,
validate_schema=validate_schema,
)
# Crosstalk ####################################################################
[docs]@dataclass
class GenericMatrix:
"""Generic matrix that has row names in ASCII.
"""
names: np.ndarray['S']
data: np.ndarray[np.float64]
name: Optional[str] = None
def __post_init__(self):
self.names = np.asarray(self.names, dtype='S')
assert self.names.size == self.data.shape[0]
def __eq__(self, other: GenericMatrix) -> bool:
if type(self) is not type(other):
return False
return np.array_equal(self.names, other.names) and np.array_equal(self.data, other.data)
@property
def size(self) -> int:
return self.data.shape[0]
@property
def shape(self) -> int:
return self.data.shape
@cached_property
def names_str(self) -> List[str]:
"""names in list of str"""
return [name.decode() for name in self.names]
[docs] @classmethod
def load(cls, path: Path):
with h5py.File(path, 'r') as f:
names = f["names"][:]
data = f["data"][:]
return cls(names, data)
[docs] def dump(self, path: Path, compress_level: int = 9):
with h5py.File(path, 'w', libver='latest') as f:
f.create_dataset(
'names',
data=self.names,
compression_opts=compress_level,
**H5_CREATE_KW
)
f.create_dataset(
'data',
data=self.data,
compression_opts=compress_level,
**H5_CREATE_KW
)
[docs]@dataclass(eq=False)
class CrosstalkMatrix(GenericMatrix):
"""Crosstalk square matrix where column and row share the same names
"""
freq: Optional[np.ndarray[np.float64]] = None
@cached_property
def dataframe(self) -> pd.DataFrame:
return pd.DataFrame(self.data, index=self.names_str, columns=self.names_str)
@cached_property
def dataframe_nearest_neighbor_only(self) -> pd.DataFrame:
freq = self.freq
data = self.data
n = data.shape[0]
left = np.empty(n, np.float64)
left[0] = np.nan
left[1:] = np.diag(data, k=-1)
right = np.empty(n, np.float64)
right[:-1] = np.diag(data, k=1)
right[-1] = np.nan
index = None if freq is None else pd.Index(freq, name='freq')
df = pd.DataFrame({'left': left, 'right': right}, index=index)
if self.name is not None:
df.columns = [f'{self.name}-{col}' for col in df.columns]
return df
@cached_property
def dataframe_diag_only(self) -> pd.DataFrame:
return pd.Series(np.diag(self.data), index=self.freq).to_frame(self.name)
[docs] def plot(
self,
annot: bool = True,
figsize: Optional[Tuple[float, float]] = None,
log: bool = True,
):
if figsize is None:
n = self.size
figsize = (n, n)
fig, ax = plt.subplots(figsize=figsize)
if log:
data = self.data
if data.min() < 0.:
linthresh = np.abs(data[~(data == 0.)]).min()
norm = SymLogNorm(linthresh)
else:
norm = LogNorm()
else:
norm = None
sns.heatmap(self.dataframe, annot=annot, ax=ax, norm=norm)
plt.close()
return fig
[docs] def compare_plot_nearest_neighbor(
self,
others: List[CrosstalkMatrix],
):
figs = [self.dataframe_nearest_neighbor_only.plot(backend='plotly')]
for other in others:
figs.append(other.dataframe_nearest_neighbor_only.plot(backend='plotly'))
fig_all = make_subplots()
colors = sns.color_palette("husl", len(others) * 2 + 2)
i = 0
for fig in figs:
for trace in fig.data:
trace['line']['color'] = 'rgb({}%)'.format('%,'.join(map(lambda x: str(x * 100.), colors[i])))
fig_all.add_trace(trace)
i += 1
fig_all.layout.xaxis.title = 'f (Hz)'
fig_all.layout.title = 'Nearest neighbor crosstalk'
return fig_all
[docs] def compare_plot_diag(
self,
others: List[CrosstalkMatrix],
):
figs = [self.dataframe_diag_only.plot(backend='plotly')]
for other in others:
figs.append(other.dataframe_diag_only.plot(backend='plotly'))
fig_all = make_subplots()
colors = sns.color_palette("husl", len(others) + 1)
i = 0
for fig in figs:
for trace in fig.data:
trace['line']['color'] = 'rgb({}%)'.format('%,'.join(map(lambda x: str(x * 100.), colors[i])))
fig_all.add_trace(trace)
i += 1
fig_all.layout.xaxis.title = 'f (Hz)'
fig_all.layout.title = 'Diagonal "crosstalk"'
return fig_all
[docs] @classmethod
def from_geometric(
cls,
names: Union[List[str], np.ndarray],
r: float = 0.01,
) -> CrosstalkMatrix:
"""Generate a crosstalk matrix with coefficients in geometric series.
"""
n = len(names)
m = geometric_matrix(n, r)
return cls(names, m)
[docs] @classmethod
def from_unique_matrix(
cls,
names: Union[List[str], np.ndarray],
) -> CrosstalkMatrix:
"""generate a crosstalk matrix with each entry to be unique.
Mainly for debug use to confirm the matrix multiplication is correct.
"""
n = len(names)
m = unique_matrix(n)
return cls(names, m)
[docs] @classmethod
def from_joshian_matrix(
cls,
names: Union[List[str], np.ndarray],
nearest: float,
next_nearest: float,
floor: float,
) -> CrosstalkMatrix:
"""Generate a crosstalk matrix with structure based on Josh's simple assumption
"""
n = len(names)
m = joshian_matrix(n, nearest, next_nearest, floor)
return cls(names, m)
[docs] def to_random_normal(
self,
) -> CrosstalkMatrix:
"""Generate a random normal matrix based on original matrix except for diagonal
with mean and std given by the original matrix.
"""
data = self.data
res = np.random.normal(data, np.abs(data))
n = self.size
idxs = np.diag_indices(n)
res[idxs] = data[idxs]
return CrosstalkMatrix(self.names, res)
[docs]@dataclass(eq=False)
class NaiveTod(GenericMatrix):
"""Naive TOD matrix that has all data in contiguous array
"""
[docs] def apply_crosstalk(self, crosstalk_matrix: CrosstalkMatrix) -> NaiveTod:
names = self.names
np.testing.assert_array_equal(names, crosstalk_matrix.names)
data = crosstalk_matrix.data @ self.data
return NaiveTod(names, data)
@cached_property
def dataframe(self) -> pd.DataFrame:
return pd.DataFrame(self.data.T, columns=self.names_str)
[docs]@dataclass
class SQUID:
"""A model of the SQUID readout.
See fig. 4 in Montgomery2020.
Montgomery2020: Montgomery et al., “Performance and Characterization of the SPT-3G Digital Frequency Multiplexed Readout System Using an Improved Noise and Crosstalk Model.”
"""
R_TES: Union[float, np.ndarray[np.float64]]
r_s: Union[float, np.ndarray[np.float64]]
L: Union[float, np.ndarray[np.float64]]
L_com: float
C: Optional[np.ndarray[np.float64]] = None
omega: Optional[np.ndarray[np.float64]] = None
def __post_init__(self):
if (self.C is None) is (self.omega is None):
raise ValueError(f'You need to specify either C or omega.')
elif self.omega is None:
self.omega = omega_i_resonance_exact(self.R_TES, self.r_s, self.L, self.C, self.L_com)
elif self.C is None:
self.C = C_n_resonance_exact(self.R_TES, self.r_s, self.L, self.L_com, self.omega)
@cached_property
def freq(self):
return self.omega / (2. * np.pi)
[docs] def to_dict(
self,
include_omega: bool = False,
include_freq: bool = False,
) -> dict:
res = {
'R_TES': self.R_TES,
'r_s': self.r_s,
'L': self.L,
'C': self.C,
'L_com': self.L_com,
}
if include_omega:
res['omega'] = self.mega
if include_freq:
res['freq'] = self.freq
return res
[docs] def dataframe(
self,
include_omega: bool = False,
include_freq: bool = False,
) -> pd.DataFrame:
return pd.DataFrame(self.to_dict(include_omega=include_omega, include_freq=include_freq))
[docs] def to_crosstalk_matrix(
self,
names: Optional[Union[List[str], np.ndarray]] = None,
) -> CrosstalkMatrix:
"""Compute the crosstalk matrix using Montgomery2020 approximations"""
data = total_crosstalk_matrix(self.R_TES, self.r_s, self.L, self.C, self.L_com, self.omega)
if names is None:
# use the numerical value of freq in MHz as names
names = (self.freq * 1.e-6).astype(np.int).astype('S')
return CrosstalkMatrix(names, data)
[docs] def to_crosstalk_matrix_exact(
self,
names: Optional[Union[List[str], np.ndarray]] = None,
) -> CrosstalkMatrix:
"""Compute the crosstalk matrix using Montgomery2020 formulism without the approximations"""
data = total_crosstalk_matrix_exact(self.R_TES, self.r_s, self.L, self.C, self.L_com, self.omega)
if names is None:
# use the numerical value of freq in MHz as names
names = (self.freq * 1.e-6).astype(np.int).astype('S')
return CrosstalkMatrix(names, data)