Source code for coscon.cmb

from __future__ import annotations

import io
from collections import defaultdict
from dataclasses import dataclass
from functools import cached_property
from logging import getLogger
from pathlib import Path
from typing import TYPE_CHECKING, List, Optional, Union

import defopt
import healpy as hp
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import plotly
import plotly.express as px
import pysm3
import pysm3.units as u
from astropy.utils.data import download_file

if TYPE_CHECKING:
    from typing import Dict, Tuple

import coscon.fits_helper

from .util import from_Cl_to_Dl, from_Dl_to_Cl

logger = getLogger('coscon')

CANONICAL_NAME = ('TEMPERATURE', 'Q_POLARISATION', 'U_POLARISATION')


[docs]@dataclass class Maps: """A class for multiple healpix maps that wraps some healpy functions In ring ordering, uK ($\\mathrm{\\mu K}$) unit. """ names: Union[List[str], Tuple[str, ...]] maps: np.ndarray name: str = '' def __post_init__(self): maps = self.maps assert maps.ndim == 2 assert maps.shape[0] == len(self.names) @property def n_maps(self) -> int: return self.maps.shape[0] @cached_property def f_sky(self) -> float: try: mask = self.maps.mask return 1. - mask.sum() / mask.size except (NameError, AttributeError): return 1.
[docs] @classmethod def from_fits( cls, path: Path, memmap: bool = True, ) -> Maps: fits_helper = coscon.fits_helper.CMBFitsHelper( path, memmap=memmap, ) return fits_helper.to_maps
[docs] @classmethod def from_pysm( cls, freq: float, nside: int, preset_strings: List[str] = ["c1"], ) -> Maps: sky = pysm3.Sky(nside=nside, preset_strings=preset_strings) freq_u = freq * u.GHz m = sky.get_emission(freq_u) return cls( CANONICAL_NAME, m.to(u.uK_CMB, equivalencies=u.cmb_equivalencies(freq_u)), name=f"PySM 3 {freq} GHz map with preset {', '.join(preset_strings)}" )
[docs] @classmethod def from_planck_2018(cls, nside: int) -> Maps: """Use the Planck 2018 best-fit ΛCDM model to simulate a map Use official release if nside is small enough. """ if nside > 833: logger.info('Using extended Planck 2018 spectra computed using CLASS.') spectra = PowerSpectra.from_planck_2018_extended() else: spectra = PowerSpectra.from_planck_2018() return spectra.to_maps(nside)
@cached_property def maps_dict(self) -> Dict[str, np.ndarray]: return dict(zip(map(lambda x: x[0], self.names), self.maps)) @cached_property def maps_dict_fullname(self) -> Dict[str, np.ndarray]: return dict(zip(self.names, self.maps)) @cached_property def spectra(self) -> np.ndarray: """Use anafast to calculate the spectra results are always 2D-array """ res = (1. / self.f_sky) * hp.sphtfunc.anafast(self.maps) # force 2D array # healpy tried to be smart and return 1D array only if there's only 1 map return res.reshape(1, -1) if self.n_maps == 1 else res @property def to_spectra(self) -> PowerSpectra: n_maps = self.n_maps if n_maps == 1: names = ['TT'] elif n_maps == 3: names = ['TT', 'EE', 'BB', 'TE', 'EB', 'TB'] else: raise ValueError(f'There are {n_maps} maps where I can only understand 1 map or 3 maps.') return PowerSpectra(names, self.spectra, scale='Cl', name=self.name)
[docs] def write( self, path: Path, nest: bool = True, dtype: Optional[Union[np.dtype, List[np.dtype]]] = None, fits_IDL: bool = True, coord: str = 'G', partial: bool = False, column_names: Optional[Union[str, List[str]]] = None, column_units: Union[str, List[str]] = 'uK', extra_header: list = [], overwrite: bool = False, ): maps = self.maps if nest: maps = hp.pixelfunc.reorder(maps, r2n=True) hp.write_map( path, maps, nest=nest, dtype=dtype, fits_IDL=fits_IDL, coord=coord, partial=partial, column_names=column_names, column_units=column_units, extra_header=extra_header, overwrite=overwrite, )
@staticmethod def _mollview( name: str, m: np.ndarray, n_std: Optional[int] = None, fwhm: Optional[float] = None, graticule: bool = True, **kwargs ): if fwhm is not None: m = hp.sphtfunc.smoothing(m, fwhm=fwhm * np.pi / 10800.) if n_std is None: min_ = None max_ = None else: mean = m.mean() std = m.std() min_ = mean - n_std * std max_ = mean + n_std * std hp.mollview(m, title=name, min=min_, max=max_, **kwargs) if graticule: hp.graticule()
[docs] def mollview( self, n_std: Optional[int] = None, fwhm: Optional[float] = None, graticule: bool = True, **kwargs ): """object wrap of healpy.mollview :param n_std: if specified, set the range to be mean ± n_std * std :param fwhm: if specified, smooth map to this number of arcmin """ maps_dict = self.maps_dict for name, m in maps_dict.items(): full_name = f'{self.name} {name}' if self.name else name self._mollview(full_name, m, n_std=n_std, fwhm=fwhm, graticule=graticule, **kwargs) if 'Q' in maps_dict and 'U' in maps_dict: name = 'P' full_name = f'{self.name} {name}' if self.name else name m = np.sqrt(np.square(maps_dict['Q']) + np.square(maps_dict['U'])) self._mollview(full_name, m, n_std=n_std, fwhm=fwhm, graticule=graticule, **kwargs)
[docs]@dataclass class PowerSpectra: """A class for power-spectra. In [microK]^2 unit. """ names: List[str] spectra: np.ndarray l_min: int = 0 l: Optional[np.ndarray] = None scale: str = 'Dl' name: str = '' def __post_init__(self): # make sure l_min and l are self-consistent l = self.l if l is not None: self.l_min = l[0] spectra = self.spectra assert spectra.ndim == 2 assert spectra.shape[0] == len(self.names) if self.scale == 'Cl': logger.info('Converting from Cl scale to Dl scale automatically.') self.spectra = from_Cl_to_Dl(self.spectra, self.l_array) self.scale = 'Dl' assert self.scale == 'Dl' def _repr_html_(self) -> str: return self.dataframe._repr_html_() @property def n_spectra(self) -> int: return self.spectra.shape[0] @cached_property def l_max(self) -> int: """exclusive l_max""" l = self.l return self.l_min + self.spectra.shape[1] if l is None else l[-1] + 1 @cached_property def l_array(self) -> np.ndarray: l = self.l return np.arange(self.l_min, self.l_max) if l is None else l @cached_property def dataframe(self) -> pd.DataFrame: l = self.l index = pd.RangeIndex(self.l_min, self.l_max, name='l') if l is None else pd.Index(l, name='l') return pd.DataFrame(self.spectra.T, index=index, columns=pd.Index(self.names, name=self.name))
[docs] @classmethod def from_dataframe( cls, df: pd.DataFrame, scale: str = 'Dl', ) -> PowerSpectra: names = df.columns.tolist() spectra = df.values.T l = df.index.values name = df.columns.name return cls(names, spectra, l=l, scale=scale, name=name)
[docs] @classmethod def from_class( cls, path: Optional[Path] = None, url: Optional[str] = None, camb: bool = True, name: str = '' ) -> PowerSpectra: '''read from CLASS' .dat output :param bool camb: if True, assumed ``format = camb`` is used when generating the .dat from CLASS To self: migrated from abx's convert_theory.py ''' if path is None and url is None: raise ValueError('Either path or url has to be specified.') if url is not None: if path is not None: logger.warn('Ignoring path as url is specified.') logger.info(f'Downloading and caching using astropy: {url}') path = download_file(url, cache=True) if not name: name = url.split('/')[-1].split('.')[0] else: path = Path(path) if not name: name = path.stem # read once with open(path, 'r') as f: text = f.read() # get last comment line comment = None for line in text.split('\n'): if line.startswith('#'): comment = line if comment is None: raise ValueError('Cannot find header row from CLASS output, make sure you run CLASS with setting headers = yes.') # remove beginning '#' comment = comment[1:] # parse comment line: get name after ':' names = [name.split(':')[1] for name in comment.strip().split()] with io.StringIO(text) as f: df = pd.read_csv(f, delim_whitespace=True, index_col=0, comment='#', header=None, names=names) # to [microK]^2 if not camb: df *= 1.e12 df.columns.name = name return cls.from_dataframe(df)
[docs] @classmethod def from_planck_2018(cls) -> PowerSpectra: """Use the Planck 2018 best-fit ΛCDM model Officially released by Planck up to l equals 2508. see http://pla.esac.esa.int/pla/#cosmology and its description """ # see http://pla.esac.esa.int/pla/#cosmology and its description file = download_file( 'http://pla.esac.esa.int/pla/aio/product-action?COSMOLOGY.FILE_ID=COM_PowerSpect_CMB-base-plikHM-TTTEEE-lowl-lowE-lensing-minimum-theory_R3.01.txt', cache=True, ) calPlanck = 0.1000442E+01 df = pd.read_csv(file, sep='\s+', names=['l', 'TT', 'TE', 'EE', 'BB', 'PP'], comment='#', index_col=0) # planck's 2018 released spectra above 2500 has identically zero BB so it shouldn't be trusted df = df.loc[pd.RangeIndex(2, 2501)] df *= 1. / (calPlanck * calPlanck) df.columns.name = 'Planck 2018 best-fit ΛCDM model' return cls.from_dataframe(df)
[docs] @classmethod def from_planck_2018_extended(cls) -> PowerSpectra: """Use the Planck 2018 best fit ΛCDM model with extended l-range up to 4902 This is reproduced using CLASS but with an extended l-range """ return cls.from_class( url='https://gist.github.com/ickc/cd6bb1753a44ee09f2d09c49b190d65f/raw/398497217109000c0d670bb57cfcc2cb9a617d63/base_2018_plikHM_TTTEEE_lowl_lowE_lensing_cl_lensed.dat', name='Planck 2018 best-fit ΛCDM model extended using CLASS' )
[docs] def to_maps(self, nside: int, pixwin=False) -> Maps: """Use synfast to generate random maps """ # new order cols = ['TT', 'EE', 'BB', 'TE'] spectra = self.dataframe[cols].values.T l = self.l_array cl = from_Dl_to_Cl(spectra, l) ms = hp.sphtfunc.synfast(cl, nside, pixwin=pixwin, new=True) return Maps(CANONICAL_NAME, ms, name=self.name)
[docs] def intersect(self, *others: PowerSpectra) -> List[pd.DataFrame]: cols = self.names l = self.l_array for other in others: cols = np.intersect1d(cols, other.names) l = np.intersect1d(l, other.l_array) res = [self.dataframe.loc[l, cols]] for other in others: res.append(other.dataframe.loc[l, cols]) return res
[docs] def compare( self, *others: PowerSpectra, relative: bool = False, ) -> pd.DataFrame: """Return a tidy DataFrame comparing self and others. """ dfs = self.intersect(*others) try: if relative: dfs = [df / dfs[0] - 1. for df in dfs[1:]] df_all = pd.concat( dfs, keys=(df.columns.name for df in dfs), names=['name', 'l'] ) except pd.errors.InvalidIndexError: counter = defaultdict(int) for df in dfs: counter[df.columns.name] += 1 raise ValueError(f"These names are repeated: {', '.join(k for k, v in counter.items() if v > 1)}") df_all.columns.name = 'spectra' return df_all.stack().to_frame(self.scale).reset_index(level=(0, 1, 2))
[docs] def compare_plot( self, *others: PowerSpectra, show: bool = True, relative: bool = False, ) -> List[plotly.graph_objs._figure.Figure]: df_tidy = self.compare(*others, relative=relative) figs = {} for name, group in df_tidy.groupby('spectra'): title = f'{name}, relative change comparing to {self.name}' if relative else name figs[name] = px.line(group, x='l', y='Dl', color='name', title=title) if show: for fig in figs.values(): fig.show() return figs
def _simmap_planck2018( path: Path, nside: int, *, seed: Optional[int] = None, nest: bool = True, fits_IDL: bool = True, coord: str = 'G', partial: bool = False, column_names: Optional[List[str]] = None, column_units: str = 'uK', extra_header: List[str] = [], overwrite: bool = False, ): if seed is not None: np.random.seed(seed) m = Maps.from_planck_2018(nside) m.write( path=path, nest=nest, fits_IDL=fits_IDL, coord=coord, partial=partial, column_names=column_names, column_units=column_units, extra_header=[tuple(header.split(':')) for header in extra_header], overwrite=overwrite, )
[docs]def simmap_planck2018_cli(): defopt.run(_simmap_planck2018)