import argparse
import numpy as np
from qsonic.mathtools import _zero_function, _one_function, get_smooth_ivar
[docs]
def add_wave_region_parser(parser=None):
""" Adds wavelength analysis related arguments to parser. These
arguments are grouped under 'Wavelength analysis region'. All of them
come with defaults, none are required.
Arguments
---------
parser: argparse.ArgumentParser, default: None
Returns
---------
parser: argparse.ArgumentParser
"""
if parser is None:
parser = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
wave_group = parser.add_argument_group('Wavelength analysis region')
wave_group.add_argument(
"--wave1", type=float, default=3600.,
help="First observed wavelength edge.")
wave_group.add_argument(
"--wave2", type=float, default=6000.,
help="Last observed wavelength edge.")
wave_group.add_argument(
"--forest-w1", type=float, default=1050.,
help="First forest wavelength edge.")
wave_group.add_argument(
"--forest-w2", type=float, default=1180.,
help="Last forest wavelength edge.")
return parser
[docs]
def generate_spectra_list_from_data(cat_by_survey, data):
return [
Spectrum.from_dictionary(catrow, data, idx)
for idx, catrow in enumerate(cat_by_survey)
]
[docs]
def valid_spectra(spectra_list):
"""Generator for continuum valid spectra."""
return (spec for spec in spectra_list if spec.cont_params['valid'])
[docs]
class Spectrum():
"""An object to represent one spectrum.
Parameters
----------
catrow: :external+numpy:py:class:`ndarray <numpy.ndarray>`
Catalog row.
wave: dict(:external+numpy:py:class:`ndarray <numpy.ndarray>`)
Dictionary of arrays specifying the wavelength grid. Static variable!
flux: dict(:external+numpy:py:class:`ndarray <numpy.ndarray>`)
Dictionary of arrays specifying the flux.
ivar: dict(:external+numpy:py:class:`ndarray <numpy.ndarray>`)
Dictionary of arrays specifying the inverse variance.
mask: dict(:external+numpy:py:class:`ndarray <numpy.ndarray>`)
Dictionary of arrays specifying the bitmask. Not stored
reso: dict(:external+numpy:py:class:`ndarray <numpy.ndarray>`)
Dictionary of 2D arrays specifying the resolution matrix.
idx: int
Index to access in flux, ivar, mask and reso that corresponds to the
quasar in `catrow`.
Attributes
----------
rsnr: float
Average SNR above Lya. Calculated in :meth:`set_forest_region`.
mean_snr: dict(float)
Mean signal-to-noise ratio in the forest.
_f1, _f2: dict(int)
Forest indices. Set up using :meth:`set_forest_region` method. Then use
property functions to access forest wave, flux, ivar instead.
cont_params: dict
Continuum parameters. Initial estimates are constructed.
"""
WAVE_LYA_A = 1215.67
"""float: Lya wavelength in A."""
_wave = None
"""dict(:external+numpy:py:class:`ndarray <numpy.ndarray>`): Common
wavelength grid for **all** Spectra."""
_coadd_wave = None
"""dict(:external+numpy:py:class:`ndarray <numpy.ndarray>`): Common
**coadded** wavelength grid for **all** Spectra."""
_dwave = None
"""float: Wavelength spacing."""
_blinding = None
"""str or None: Blinding. Must be set for certain data."""
_fits_colnames = ['LAMBDA', 'DELTA', 'IVAR', 'WEIGHT', 'CONT']
"""list(str): Column names to save in delta files."""
[docs]
@staticmethod
def _set_wave(wave, check_consistency=False):
"""Set the common wavelength grid.
Arguments
---------
check_consistency: bool
Asserts each time key and values are the same if True.
"""
if not Spectrum._wave:
Spectrum._wave = wave.copy()
arm = list(wave.keys())[0]
wave_arm = wave[arm]
Spectrum._dwave = wave_arm[1] - wave_arm[0]
elif check_consistency:
for arm, wave_arm in Spectrum._wave.items():
assert (arm in wave.keys())
assert (np.allclose(Spectrum._wave[arm], wave_arm))
[docs]
@staticmethod
def _set_coadd_wave():
if Spectrum._coadd_wave:
return
min_wave = np.min([wave[0] for wave in Spectrum._wave.values()])
max_wave = np.max([wave[-1] for wave in Spectrum._wave.values()])
# Z arm in mocks are shifted by 0.4 A, so we need these extra steps
# to make sure dwave is preseved.
nwaves = round((max_wave - min_wave) / Spectrum._dwave + 0.1)
coadd_wave = np.linspace(
min_wave, min_wave + nwaves * Spectrum._dwave, nwaves + 1)
Spectrum._coadd_wave = {'brz': coadd_wave}
[docs]
@staticmethod
def set_blinding(maxlastnight, args):
"""Set the blinding strategy.
'LASTNIGHT' column, args.mock_analysis, args.forest_w1 decide the
blinding strategy. Mock, side band and SV analyses are not blinded.
Arguments
---------
maxlastnight: int
Maximum LASTNIGHT column of the entire quasar catalog.
args: argparse.Namespace
Should have ``mock_analysis (bool)`` and ``forest_w1 (floar)``.
"""
# do not blind mocks or metal forests
if args.mock_analysis or args.forest_w1 > Spectrum.WAVE_LYA_A:
Spectrum._blinding = "none"
# sv data, no blinding
elif maxlastnight < 20210514:
Spectrum._blinding = "none"
elif maxlastnight < 20210801:
Spectrum._blinding = "desi_m2"
elif maxlastnight < 20220801:
Spectrum._blinding = "desi_y1"
else:
Spectrum._blinding = "desi_y3"
if Spectrum._blinding != "none":
Spectrum._fits_colnames[1] = 'DELTA_BLIND'
if not args.skip_resomat:
Spectrum._fits_colnames.append('RESOMAT')
[docs]
@staticmethod
def blinding_not_set():
"""bool: ``True`` if blinding is not set."""
return Spectrum._blinding is None
[docs]
@classmethod
def from_dictionary(cls, catrow, data, idx):
"""Create a Spectrum from dictionary. See :class:`Spectrum` for
argument details.
If ``cont`` key is present in ``data``, :attr:`cont_params` dictionary
gains the following::
cont_params['true_data_w1'] (float): First wavelength
cont_params['true_data_dwave'] (float): Wavelength spacing
cont_params['true_data'] (ndarray): True continuum
Returns
-------
Spectrum
"""
spec = cls(catrow, data['wave'], data['flux'], data['ivar'],
data['mask'], data['reso'], idx)
if "cont" in data.keys():
spec.cont_params['true_data_w1'] = data['cont']['w1']
spec.cont_params['true_data_dwave'] = data['cont']['dwave']
spec.cont_params['true_data'] = data['cont']['data'][idx]
return spec
def __init__(self, catrow, wave, flux, ivar, mask, reso, idx):
self.catrow = catrow
Spectrum._set_wave(wave)
self._current_wave = Spectrum._wave
self.flux = {}
self.ivar = {}
self.reso = {}
self.rsnr = None
self.mean_snr = None
self._f1 = {}
self._f2 = {}
self._forestwave = {}
self._forestflux = {}
self._forestivar = {}
self._forestivar_sm = {}
self._forestreso = {}
self._forestweight = {}
self._smoothing_scale = 0
for arm, wave_arm in self.wave.items():
self.flux[arm] = flux[arm][idx].copy()
self.ivar[arm] = ivar[arm][idx].copy()
w = (mask[arm][idx] != 0) | np.isnan(self.flux[arm]) \
| np.isnan(self.ivar[arm]) | (self.ivar[arm] < 0)
self.flux[arm][w] = 0
self.ivar[arm][w] = 0
if not reso:
continue
elif reso[arm].ndim == 2:
self.reso[arm] = reso[arm].copy()
else:
self.reso[arm] = reso[arm][idx].copy()
self.cont_params = {}
self.cont_params['method'] = ''
self.cont_params['valid'] = False
self.cont_params['x'] = np.array([1., 0.])
self.cont_params['xcov'] = np.eye(2)
self.cont_params['chi2'] = -1.
self.cont_params['dof'] = 0
self.cont_params['cont'] = {}
self._set_rsnr()
[docs]
def _set_rsnr(self):
"""Calculates and sets SNR above Lya."""
self.rsnr = 0
rsnr_weight = 1e-6
for arm, wave_arm in self.wave.items():
# Calculate SNR above Lya
ii1 = np.searchsorted(
wave_arm, (1 + self.z_qso) * Spectrum.WAVE_LYA_A)
weight = np.sqrt(self.ivar[arm][ii1:])
self.rsnr += np.dot(self.flux[arm][ii1:], weight)
rsnr_weight += np.sum(weight > 0)
self.rsnr /= rsnr_weight
[docs]
def slice(self, arm, i1, i2):
if i1 == 0 and i2 == self._forestwave[arm].size:
return
self._forestwave[arm] = self._forestwave[arm][i1:i2]
self._forestflux[arm] = self._forestflux[arm][i1:i2]
self._forestivar[arm] = self._forestivar[arm][i1:i2]
self._forestivar_sm[arm] = self._forestivar_sm[arm][i1:i2]
self._forestweight[arm] = self._forestweight[arm][i1:i2]
if arm in self._forestreso:
self._forestreso[arm] = self._forestreso[arm][:, i1:i2]
if arm in self.cont_params['cont']:
self.cont_params['cont'][arm] = \
self.cont_params['cont'][arm][i1:i2]
[docs]
def set_forest_region(self, w1, w2, lya1, lya2):
""" Sets slices for the forest region. Also calculates
the mean SNR in the forest and an initial guess for the continuum
amplitude. This is currently turned off, but it can mask outliers in
each arm separately based on moving median statistics
(see :func:`qsonic.mathtools.get_median_outlier_mask`).
Arguments
---------
w1, w2: float
Observed wavelength range
lya1, lya2: float
Rest-frame wavelength for the forest
"""
l1 = max(w1, (1 + self.z_qso) * lya1)
l2 = min(w2, (1 + self.z_qso) * lya2)
for arm, wave_arm in self.wave.items():
# Slice to forest limits
ii1, ii2 = np.searchsorted(wave_arm, [l1, l2])
real_size_arm = np.sum(self.ivar[arm][ii1:ii2] > 0)
if real_size_arm == 0:
continue
self._f1[arm], self._f2[arm] = ii1, ii2
# See https://numpy.org/doc/stable/user/basics.copies.html
# Slicing creates views, not copies. Bases of views are not removed
# from memory!
self._forestwave[arm] = wave_arm[ii1:ii2].copy()
self._forestflux[arm] = self.flux[arm][ii1:ii2].copy()
self._forestivar[arm] = self.ivar[arm][ii1:ii2].copy()
if self.reso:
self._forestreso[arm] = self.reso[arm][:, ii1:ii2].copy()
self._forestivar_sm = self._forestivar
self._forestweight = self._forestivar
self._set_forest_related_parameters()
[docs]
def drop_arm(self, arm):
self._forestwave.pop(arm, None)
self._forestflux.pop(arm, None)
self._forestivar.pop(arm, None)
self._forestreso.pop(arm, None)
self._forestivar_sm.pop(arm, None)
self._forestweight.pop(arm, None)
self.cont_params['cont'].pop(arm, None)
[docs]
def drop_short_arms(self, lya1=0, lya2=0, skip_ratio=0):
"""Arms that have less than ``skip_ratio`` pixels are removed from
forest dictionary.
Arguments
---------
lya1, lya2: float
Rest-frame wavelength for the forest
skip_ratio: float
Remove arms if they have less than this ratio of pixels
"""
npixels_expected = (1 + self.z_qso) * (lya2 - lya1) / self.dwave
npixels_expected = int(skip_ratio * npixels_expected) + 1
short_arms = [arm for arm, ivar in self.forestivar.items()
if np.sum(ivar > 0) < npixels_expected]
for arm in short_arms:
self.drop_arm(arm)
[docs]
def remove_nonforest_pixels(self):
""" Remove non-forest pixels from storage.
This sets :attr:`flux`, :attr:`ivar` and :attr:`reso` to empty
dictionary, but :attr:`wave` is not modified,
since it is a static variable. Good practive is to loop using, e.g.,
``for arm, wave_arm in self.forestwave.items():``.
"""
self._current_wave = {}
self.flux = {}
self.ivar = {}
self.reso = {}
[docs]
def get_real_size(self):
"""
Returns
-------
int: Sum of number of pixels with ``forestivar > 0`` for all arms.
"""
size = 0
for ivar_arm in self.forestivar.values():
size += np.sum(ivar_arm > 0)
return size
[docs]
def get_effective_meansnr(self):
""" Calculate a weighted average of :attr:`mean_snr` over arms. Only
call if :meth:`set_forest_region` has been called.
.. math::
\\langle\\mathrm{SNR}\\rangle = \\sum_{i} \\mathrm{SNR}_i^3 \\bigg/
{\\sum_{i} \\mathrm{SNR}_i^2}
Returns
-------
float: Effective mean SNR.
"""
msnr_eff = 0
norm = 1e-8
for val in self.mean_snr.values():
msnr_eff += val**3
norm += val**2
return msnr_eff / norm
[docs]
def is_long(self, dforest_wave, skip_ratio):
"""Determine if spectrum is long enough to be accepted.
The condition is :meth:`get_real_size` > ``skip_ratio * npixels``,
where ``npixels`` :math:`=(1 + z_\\mathrm{qso}) \\times`
``dforest_wave`` :math:`/ \\mathrm{d}\\lambda` and
:math:`\\mathrm{d}\\lambda` is wavelength spacing in the observed frame
in A.
Arguments
---------
dforest_wave: float
Length of the forest in the rest-frame in A.
skip_ratio: float
Minimum ratio that needs to be present and unmasked to keep the
spectrum.
Returns
-------
bool
"""
npixels = (1 + self.z_qso) * dforest_wave / self.dwave
return self.get_real_size() > skip_ratio * npixels
[docs]
def set_smooth_forestivar(self, smoothing_size=16.):
""" Set :attr:`forestivar_sm` to smoothed inverse variance. Before this
call :attr:`forestivar_sm` points to :attr:`forestivar`. If
``smoothing_size <= 0``, smoothing is undone such that ivar_sm
points to ivar. Also, :attr:`forestweight` points to this.
``smoothing_size`` is saved to a private :attr:`_smoothing_scale`
variable for future use.
Arguments
---------
smoothing_size: float, default: 16
Gaussian smoothing spread in A.
"""
if smoothing_size <= 0:
self._smoothing_scale = 0
self._forestivar_sm = self._forestivar
else:
self._smoothing_scale = smoothing_size
sigma_pix = smoothing_size / self.dwave
self._forestivar_sm = {
arm: get_smooth_ivar(ivar_arm, sigma_pix)
for arm, ivar_arm in self.forestivar.items()
}
self._forestweight = self._forestivar_sm
[docs]
def set_forest_weight(
self, varlss_interp=_zero_function, eta_interp=_one_function
):
""" Sets :attr:`forestweight` for a given var_lss and eta correction.
Always uses :attr:`forestivar_sm`, which is not actually smoothed if
:meth:`set_smooth_forestivar` is not called.
.. math::
w = i / (\\eta + i \\sigma^2_\\mathrm{LSS} C^2),
where i is IVAR and C is the continuum.
Arguments
---------
varlss_interp: Callable[[ndarray], ndarray], default: 0
LSS variance interpolator.
eta_interp: Callable[[ndarray], ndarray], default: 1
eta interpolator.
"""
if not self.cont_params['valid'] or not self.cont_params['cont']:
self._forestweight = self._forestivar_sm
return
self._forestweight = {}
for arm, wave_arm in self.forestwave.items():
cont_est = self.cont_params['cont'][arm]
var_lss = varlss_interp(wave_arm) * cont_est**2
eta = eta_interp(wave_arm)
ivar_arm = self.forestivar_sm[arm]
self._forestweight[arm] = ivar_arm / (eta + ivar_arm * var_lss)
[docs]
def calc_continuum_chi2(self):
""" Calculate the chi2 of the continuum fitting. This is just a sum
of weight * (flux - cont)^2.
Returns
-------
chi2: float
"""
if not self.cont_params['valid'] or not self.cont_params['cont']:
self.cont_params['chi2'] = -1
return -1
chi2 = 0
for arm, wave_arm in self.forestwave.items():
cont_est = self.cont_params['cont'][arm]
weight = self.forestweight[arm]
flux = self.forestflux[arm]
chi2 += np.dot(weight, (flux - cont_est)**2)
self.cont_params['chi2'] = chi2
return chi2
[docs]
def simple_coadd(self):
"""Coadding without continuum and var_lss terms on the full spectrum.
Weights, forests etc. will not be set. Replaces :attr:`wave`,
:attr:`flux`, :attr:`ivar` and :attr:`reso` attributes with
dictionaries that has a single arm ``brz`` as key to access the coadded
data.
We first set the static coadded wavelength grid if it is not set.
Then we add each arm using :attr:`ivar` (not smoothed). If :attr:`reso`
is set, we coadd the resolution matrix using the same inverse variance
weights. If these weights are zero for both arms, resolution matrix
coadding reverts to equal weights. Final private assignments are done
at the end to keep using arm by arm values of :attr:`ivar`.
"""
Spectrum._set_coadd_wave()
min_wave = Spectrum._coadd_wave['brz'][0]
nwaves = Spectrum._coadd_wave['brz'].size
coadd_flux = np.zeros(nwaves)
coadd_ivar = np.zeros(nwaves)
coadd_norm = np.zeros(nwaves)
idxes = {}
for arm, wave_arm in self.wave.items():
idx = ((wave_arm - min_wave) / self.dwave + 0.1).astype(int)
idxes[arm] = idx
weight = self.ivar[arm]
var = np.zeros_like(weight)
w = self.ivar[arm] > 0
var[w] = 1 / self.ivar[arm][w]
coadd_flux[idx] += weight * self.flux[arm]
coadd_ivar[idx] += weight**2 * var
coadd_norm[idx] += weight
w = coadd_norm > 0
coadd_flux[w] /= coadd_norm[w]
coadd_ivar[w] = coadd_norm[w]**2 / coadd_ivar[w]
if self.reso:
max_ndia = np.max([reso.shape[0] for reso in self.reso.values()])
coadd_reso = np.zeros((max_ndia, nwaves))
coadd_norm *= 0
for arm, reso_arm in self.reso.items():
weight = self.ivar[arm].copy()
weight[weight == 0] = 1e-8
ddia = max_ndia - reso_arm.shape[0]
# Assumption ddia cannot be odd
ddia = ddia // 2
if ddia > 0:
reso_arm = np.pad(reso_arm, ((ddia, ddia), (0, 0)))
coadd_reso[:, idxes[arm]] += weight * reso_arm
coadd_norm[idxes[arm]] += weight
coadd_reso /= coadd_norm
self.reso = {'brz': coadd_reso}
self._current_wave = Spectrum._coadd_wave
self.flux = {'brz': coadd_flux}
self.ivar = {'brz': coadd_ivar}
[docs]
def coadd_arms_forest(
self, varlss_interp=_zero_function, eta_interp=_one_function
):
""" Coadds different arms using :attr:`forestweight`. Interpolators are
needed to reset :attr:`forestweight`.
Replaces ``forest`` variables and ``cont_params['cont']`` with a
dictionary that has a single arm ``brz`` as key to access coadded data.
Arguments
---------
varlss_interp: Callable[[ndarray], ndarray], default: 0
LSS variance interpolator or function.
eta_interp: Callable[[ndarray], ndarray], default: 1
eta interpolator or function.
"""
min_wave = np.min([wave[0] for wave in self.forestwave.values()])
max_wave = np.max([wave[-1] for wave in self.forestwave.values()])
# nwaves = round((max_wave - min_wave) / self.dwave) + 1
# coadd_wave = np.linspace(
# min_wave, min_wave + (nwaves - 1) * self.dwave, nwaves)
nwaves = int((max_wave - min_wave) / self.dwave + 0.1) + 1
coadd_wave = np.linspace(min_wave, max_wave, nwaves)
coadd_flux = np.zeros(nwaves)
coadd_ivar = np.zeros(nwaves)
coadd_norm = np.zeros(nwaves)
idxes = {}
for arm, wave_arm in self.forestwave.items():
idx = ((wave_arm - min_wave) / self.dwave + 0.1).astype(int)
idxes[arm] = idx
weight = self.forestweight[arm]
var = np.zeros_like(weight)
w = self.forestivar[arm] > 0
var[w] = 1 / self.forestivar[arm][w]
coadd_flux[idx] += weight * self.forestflux[arm]
coadd_ivar[idx] += weight**2 * var
coadd_norm[idx] += weight
w = coadd_norm > 0
coadd_flux[w] /= coadd_norm[w]
coadd_ivar[w] = coadd_norm[w]**2 / coadd_ivar[w]
self._forestwave = {'brz': coadd_wave}
self._forestflux = {'brz': coadd_flux}
self._forestivar = {'brz': coadd_ivar}
if self.cont_params['cont']:
coadd_cont = np.empty(nwaves)
for arm, idx in idxes.items():
# continuum needs not weighting
coadd_cont[idx] = self.cont_params['cont'][arm]
self.cont_params['cont'] = {'brz': coadd_cont}
if self.forestreso:
max_ndia = np.max(
[reso.shape[0] for reso in self.forestreso.values()])
coadd_reso = np.zeros((max_ndia, nwaves))
coadd_norm.fill(0)
for arm, reso_arm in self.forestreso.items():
weight = self.forestweight[arm].copy()
weight[weight == 0] = 1e-8
ddia = max_ndia - reso_arm.shape[0]
# Assumption ddia cannot be odd
ddia = ddia // 2
if ddia > 0:
reso_arm = np.pad(reso_arm, ((ddia, ddia), (0, 0)))
coadd_reso[:, idxes[arm]] += weight * reso_arm
coadd_norm[idxes[arm]] += weight
coadd_reso /= coadd_norm
self._forestreso = {'brz': coadd_reso}
self.set_smooth_forestivar(self._smoothing_scale)
self.set_forest_weight(varlss_interp, eta_interp)
mean_snr = np.dot(
np.sqrt(coadd_ivar), coadd_flux) / np.sum(coadd_ivar > 0)
self.mean_snr = {'brz': mean_snr}
[docs]
def mean_resolution(self, arm, weight=None):
""" Returns the weighted mean Gaussian sigma of the spectrograph
resolution in the forest.
Arguments
---------
arm: str
Arm.
weight: None or ndarray, default: None
Weights. If ``None``, :attr:`forestweight` is used.
Returns
-------
mean_reso: float or None
Gaussian sigma. None if forestreso is not set.
"""
if not self.forestreso:
return None
if weight is None:
weight = self.forestweight[arm]
total_weight = np.sum(weight)
reso = np.dot(self.forestreso[arm], weight) / total_weight
lambda_eff = np.dot(self.forestwave[arm], weight) / total_weight
central_idx = reso.argmax()
off_idx = np.array([-2, -1, 1, 2], dtype=int)
ratios = reso[central_idx] / reso[central_idx + off_idx]
ratios = np.log(ratios)
w2 = ratios > 0
norm = np.sum(w2)
new_ratios = np.zeros_like(ratios)
new_ratios[w2] = 1. / np.sqrt(ratios[w2])
rms_in_pixel = np.abs(off_idx).dot(new_ratios) / np.sqrt(2.) / norm
return rms_in_pixel * 3e5 * self.dwave / lambda_eff
[docs]
def write(self, fts_file):
"""Writes each arm to FITS file separately.
Writes 'LAMBDA', 'DELTA', 'IVAR', 'WEIGHT', 'CONT' columns and
'RESOMAT' column if resolution matrix is present to extension name
``targetid-arm``. FITS file must be initialized before.
Each arm has its own `MEANSNR`.
Arguments
---------
fts_file: FITS file
The file handler, not filename.
"""
hdr_dict = {
'LOS_ID': self.targetid,
'TARGETID': self.targetid,
'RA': np.radians(self.ra),
'DEC': np.radians(self.dec),
'Z': self.z_qso,
'BLINDING': Spectrum._blinding,
'WAVE_SOLUTION': "lin",
'MEANSNR': 0.,
'RSNR': self.rsnr,
'DELTA_LAMBDA': self.dwave,
'SMSCALE': self._smoothing_scale
}
for key in set(
['TILEID', 'FIBER', 'PETAL_LOC', 'EXPID', 'NIGHT']
).intersection(self.catrow.dtype.names):
hdr_dict[key] = self.catrow[key]
if 'EXPID' in self.catrow.dtype.names:
expid = f"-{self.catrow['EXPID']}"
else:
expid = ""
for arm, wave_arm in self.forestwave.items():
if self.mean_snr[arm] == 0:
continue
hdr_dict['MEANSNR'] = self.mean_snr[arm]
cont_est = self.cont_params['cont'][arm]
delta = self.forestflux[arm] / cont_est - 1
ivar = self.forestivar[arm] * cont_est**2
weight = self.forestweight[arm] * cont_est**2
delta[ivar == 0] = 0
cols = [wave_arm, delta, ivar, weight, cont_est]
if self.forestreso:
hdr_dict['MEANRESO'] = self.mean_resolution(arm)
cols.append(self.forestreso[arm].T.astype('f8'))
fts_file.write(
cols, names=Spectrum._fits_colnames, header=hdr_dict,
extname=f"{self.targetid}-{arm}{expid}")
@property
def z_qso(self):
"""float: Quasar redshift."""
return self.catrow['Z']
@property
def targetid(self):
"""int: Unique TARGETID identifier."""
return self.catrow['TARGETID']
@property
def hpix(self):
"""int: Healpix."""
return self.catrow['HPXPIXEL']
@property
def ra(self):
"""float: Right ascension."""
return self.catrow['RA']
@property
def dec(self):
"""float: Declination"""
return self.catrow['DEC']
@property
def wave(self):
"""dict(:external+numpy:py:class:`ndarray <numpy.ndarray>`): Original
wavelength grid in A."""
return self._current_wave
@property
def dwave(self):
"""float: Wavelength step size in A."""
return Spectrum._dwave
@property
def forestwave(self):
"""dict(:external+numpy:py:class:`ndarray <numpy.ndarray>`): Forest
wavelength field in A."""
return self._forestwave
@property
def forestflux(self):
"""dict(:external+numpy:py:class:`ndarray <numpy.ndarray>`): Forest
flux field."""
return self._forestflux
@property
def forestivar(self):
"""dict(:external+numpy:py:class:`ndarray <numpy.ndarray>`): Forest
inverse variance field."""
return self._forestivar
@property
def forestivar_sm(self):
"""dict(:external+numpy:py:class:`ndarray <numpy.ndarray>`): Forest
smoothed inverse variance field.
Initially equal to :attr:`.forestivar`. Smoothed if
:meth:`.set_smooth_forestivar` is called."""
return self._forestivar_sm
@property
def forestweight(self):
"""dict(:external+numpy:py:class:`ndarray <numpy.ndarray>`): Forest
weight field. Initially equal to :attr:`.forestivar`."""
return self._forestweight
@property
def forestreso(self):
"""dict(:external+numpy:py:class:`ndarray <numpy.ndarray>`): Resolution
matrix in the forest."""
return self._forestreso
[docs]
class Delta():
"""An object to read one delta from HDU.
Parameters
----------
hdu: fitsio.TableHDU
Table containing delta data.
Raises
---------
RuntimeError
If ``hdu`` doesn't have neither "LAMBDA" nor "LOGLAM" columns.
RuntimeError
If ``hdu`` doesn't have neither "DELTA" nor "DELTA_BLIND" columns.
RuntimeError
If ``hdu`` doesn't have none of "MOCKID", "TARGETID" and "THING_ID"
header keys.
Attributes
----------
wave: :external+numpy:py:class:`ndarray <numpy.ndarray>`
Wavelength array in A.
delta: :external+numpy:py:class:`ndarray <numpy.ndarray>`
Deltas.
ivar: :external+numpy:py:class:`ndarray <numpy.ndarray>`
Inverse variance.
weight: :external+numpy:py:class:`ndarray <numpy.ndarray>`
Weights, which includes var_lss.
cont: :external+numpy:py:class:`ndarray <numpy.ndarray>`
Continuum.
reso: :external+numpy:py:class:`ndarray <numpy.ndarray>`
Resolution matrix. ``None`` if not present.
header: FITS header
Header.
targetid: int
TARGETID, MOCKID or THING_ID from header.
mean_snr: float
MEANSNR from header.
"""
_accepted_wave_columns = set(["LAMBDA", "LOGLAM"])
"""set: Supported column names for wavelength."""
_accepted_delta_columns = set(['DELTA', 'DELTA_BLIND'])
"""set: Supported column names for delta."""
_accepted_targetid_keys = set(['MOCKID', 'TARGETID', 'THING_ID'])
"""set: Supported header keys for unique ID."""
_accepted_colums_map = {
"wave": _accepted_wave_columns,
"delta": _accepted_delta_columns
}
[docs]
@staticmethod
def _check_hdu(colnames, attr):
req_map = Delta._accepted_colums_map[attr]
key = req_map.intersection(colnames)
if not key:
raise RuntimeError(
"One of these must be present in delta files: "
f"{', '.join(req_map)} for {attr}!")
return key.pop()
def __init__(self, hdu):
self.header = hdu.read_header()
key = Delta._accepted_targetid_keys.intersection(self.header.keys())
if not key:
raise RuntimeError(
"One of these must be present in delta file header: "
f"{', '.join(Delta._accepted_targetid_keys)} for TARGETID!")
key = key.pop()
self.targetid = self.header[key]
self.mean_snr = self.header['MEANSNR']
colnames = hdu.get_colnames()
data = hdu.read()
key = Delta._check_hdu(colnames, "wave")
if key == "LOGLAM":
self.wave = 10**data['LOGLAM']
else:
self.wave = data[key].astype("f8")
key = Delta._check_hdu(colnames, "delta")
self._is_blinded = key == "DELTA_BLIND"
self.delta = data[key].astype("f8")
self.ivar = data['IVAR'].astype("f8")
self.weight = data['WEIGHT'].astype("f8")
self.cont = data['CONT'].astype("f8")
self.delta[self.ivar == 0] = 0
if 'RESOMAT' in colnames:
self.reso = data['RESOMAT'].T.astype("f8")
else:
self.reso = None
[docs]
def write(self, fts_file):
"""Writes to FITS file. This function is aimed at saving coadded
deltas.
Writes 'LAMBDA', 'DELTA', 'IVAR', 'WEIGHT', 'CONT' columns and
'RESOMAT' column if resolution matrix is present to extension name
``targetid``. FITS file must be initialized before. Note that ``arm``
is lost in the extension name.
Arguments
---------
fts_file: FITS file
The file handler, not filename.
"""
hdr_dict = self.header
cols = [self.wave, self.delta, self.ivar, self.weight, self.cont]
names = ['LAMBDA', 'DELTA', 'IVAR', 'WEIGHT', 'CONT']
if self._is_blinded:
names[1] = 'DELTA_BLIND'
if self.reso is not None:
cols.append(self.reso.T)
names.append('RESOMAT')
fts_file.write(
cols, names=names, header=hdr_dict,
extname=f"{self.targetid}")
[docs]
def _coadd_reso(self, other, nwaves, idxes):
max_ndia = max(self.reso.shape[0], other.reso.shape[0])
coadd_reso = np.zeros((max_ndia, nwaves))
coadd_norm = np.zeros(nwaves)
for j, obj in enumerate([self, other]):
weight = obj.weight.copy()
weight[weight == 0] = 1e-8
ddia = max_ndia - obj.reso.shape[0]
# Assumption ddia cannot be odd
ddia = ddia // 2
if ddia > 0:
obj.reso = np.pad(obj.reso, ((ddia, ddia), (0, 0)))
coadd_reso[:, idxes[j]] += weight * obj.reso
coadd_norm[idxes[j]] += weight
coadd_reso /= coadd_norm
self.reso = coadd_reso
[docs]
def coadd(self, other, dwave=0.8):
min_wave = min(self.wave[0], other.wave[0])
max_wave = max(self.wave[-1], other.wave[-1])
nwaves = int((max_wave - min_wave) / dwave + 0.1) + 1
coadd_wave = np.linspace(min_wave, max_wave, nwaves)
coadd_delta = np.zeros(nwaves)
coadd_ivar = np.zeros(nwaves)
coadd_lss = np.zeros(nwaves)
coadd_cont = np.zeros(nwaves)
coadd_norm = np.zeros(nwaves)
idxes = [None, None]
for j, obj in enumerate([self, other]):
i0 = ((obj.wave[0] - min_wave) / dwave + 0.1).astype(int)
idx = np.s_[i0:i0 + obj.wave.size]
idxes[j] = idx
var = np.zeros_like(obj.weight)
w = (obj.weight > 0) & (obj.ivar > 0)
var[w] = 1 / obj.ivar[w]
coadd_delta[idx] += obj.weight * obj.delta
coadd_cont[idx] += obj.weight * obj.cont
coadd_ivar[idx] += obj.weight**2 * var
coadd_lss[idx] += 1 - obj.weight * var
coadd_norm[idx] += obj.weight
w = coadd_norm > 0
coadd_delta[w] /= coadd_norm[w]
coadd_cont[w] /= coadd_norm[w]
coadd_lss[w] /= coadd_norm[w]
coadd_ivar[w] = coadd_norm[w]**2 / coadd_ivar[w]
if self.reso is not None:
self._coadd_reso(other, nwaves, idxes)
self.wave = coadd_wave
self.delta = coadd_delta
self.ivar = coadd_ivar
self.weight = self.ivar / (1 + self.ivar * coadd_lss)
self.cont = coadd_cont
self.mean_snr = np.dot(
np.sqrt(coadd_ivar), coadd_delta + 1) / np.sum(coadd_ivar > 0)
self.header['MEANSNR'] = self.mean_snr
@property
def ra(self):
"""float: Right ascension in degrees."""
return np.degrees(self.header['RA'])
@property
def dec(self):
"""float: Declination in degrees"""
return np.degrees(self.header['DEC'])