Source code for qsonic.continuum_models.input_continuum_model

import logging
import warnings

import fitsio
import numpy as np

from healpy import ang2pix

from qsonic.mathtools import FastCubic1DInterp
from qsonic.continuum_models.base_continuum_model import BaseContinuumModel


def _find_append_continuum(
        specs, wave_rf_1, dwave_rf, common_targetids, continua
):
    for spec in specs:
        idx = np.nonzero(common_targetids == spec.targetid)[0]
        if idx.size == 0:
            continue

        idx = idx[0]
        spec.cont_params['valid'] = True
        spec.cont_params['input_w1'] = wave_rf_1
        spec.cont_params['input_dwave'] = dwave_rf
        spec.cont_params['input_data'] = continua[idx]


[docs] class InputContinuumModel(BaseContinuumModel): """Input continuum model class. Input continua should be in the rest frame with equal wavelength spacing. These continua are then interpolated using a cubic spline. Uses fiducials for mean flux and varlss interpolation. The files are assumed to be organized by healpix **nside=8** with **nested** ordering. For example, ``input-continuum-{nside}-{pixnum}.fits`` , where ``pixnum = healpy.ang2pix(nside, ra_deg, dec_deg, lonlat=True, nest=True)`` . Each file should have at least two extensions: **FIBERMAP** and **CONTINUA**. - **FIBERMAP** extension should have the typical DESI fibermap columns: ``TARGETID, Z, RA, DEC, PROGRAM, SURVEY``. Each ``TARGETID`` should occur once. - **CONTINUA** extention should be an ImageHDU and have the quasar continua in the same order of ``TARGETID`` s in **FIBERMAP**. Its header should declare the initial rest-frame wavelength by ``WAVE1`` key and the rest-frame wavelength step by ``DWAVE`` key. The data should be in ``(nqso, nwave)`` shape. Parameters ---------- input_dir: str Input directory for all input-continuum FITS files. meanflux_interp: FastLinear1DInterp Interpolator for mean flux. If fiducial is not set, this equals to 1. varlss_interp: FastLinear1DInterp or FastCubic1DInterp Cubic spline for var_lss if fitting. Linear if from file. eta_interp: FastCubic1DInterp Interpolator for eta. nside: int, default: 8 Healpy nside. """ def __init__( self, input_dir, meanflux_interp, varlss_interp, eta_interp, nside=8 ): self.input_dir = input_dir self.meanflux_interp = meanflux_interp self.varlss_interp = varlss_interp self.eta_interp = eta_interp self.nside = nside
[docs] def _read_continua_one_file(self, pixnum, targetids): """Reads one input continuum file. Arguments --------- pixnum: int Healpix number targetids: :external+numpy:py:class:`ndarray <numpy.ndarray>` TARGETIDs in this healpix Returns ------- wave_rf_1: float First wavelength in the rest frame. dwave_rf: float Rest-frame wavelength step common_targetids: :external+numpy:py:class:`ndarray <numpy.ndarray>` TARGETIDs that are found in the file. continua: :external+numpy:py:class:`ndarray <numpy.ndarray>` 2D array of shape ``(common_targetids.size, nwave)`` Raises ------ Exception If the number of quasars in **FIBERMAP** and **CONTINUA** extension do not match. RuntimeWarning If the file cannot be read. In this case, all objects are assumed to have invalid continuum. """ fname = f"{self.input_dir}/input-continuum-{self.nside}-{pixnum}.fits" try: with fitsio.FITS(fname) as fitsfile: fbrmap = fitsfile['FIBERMAP'].read(columns='TARGETID') hdr = fitsfile['CONTINUA'].read_header() continua = fitsfile['CONTINUA'].read().astype(np.float64) except Exception as e: warnings.warn( f"InputContinuumModel cannot read file {fname}. " f"Reason {e}.") return 0, 0, None, None if fbrmap.size != continua.shape[0]: raise Exception( "InputContinuumModel file error. The number of quasars do not " f"match between FIBERMAP and CONTINUA in {fname}.") common_targetids, idx_fbr, _ = np.intersect1d( fbrmap, targetids, return_indices=True) return hdr['WAVE1'], hdr['DWAVE'], common_targetids, continua[idx_fbr]
[docs] def _read_continua(self, spectra_list): """Reads and sets input continuum for all elements in ``spectra_list``. If a TARGETID is not found in its expected file, its continuum will be set to invalid. If found, :attr:`qsonic.spectrum.Spectrum.cont_params` dictionary gains the following:: cont_params['valid'] = True cont_params['input_w1'] (float): First rest-frame wavelength cont_params['input_dwave'] (float): Rest-frame wavelength spacing cont_params['input_data'] (ndarray): Input continuum Arguments --------- spectra_list: list(Spectrum) Spectrum objects. """ local_catalog = np.hstack([_.catrow for _ in spectra_list]).copy() local_catalog['HPXPIXEL'] = ang2pix( self.nside, local_catalog['RA'], local_catalog['DEC'], lonlat=True, nest=True) # local_catalog is not sorted idx_sort = local_catalog.argsort(order=['HPXPIXEL', 'TARGETID']) spectra_list = np.array(spectra_list)[idx_sort] local_catalog = local_catalog[idx_sort] unique_pix, s = np.unique(local_catalog['HPXPIXEL'], return_index=True) hpx_split_catalog = np.split(local_catalog, s[1:]) hpx_split_spectra_list = np.split(spectra_list, s[1:]) for cat, specs in zip(hpx_split_catalog, hpx_split_spectra_list): pixnum = cat['HPXPIXEL'][0] targetids = cat['TARGETID'] wave_rf_1, dwave_rf, common_targetids, continua = \ self._read_continua_one_file(pixnum, targetids) if wave_rf_1 == 0: continue _find_append_continuum( specs, wave_rf_1, dwave_rf, common_targetids, continua)
[docs] def fit_continuum(self, spec): """Input continuum reduction. Uses fiducials for mean flux and varlss interpolation. Continuum is interpolated using a cubic spline. Arguments --------- spec: Spectrum Spectrum object to add true continuum. """ if not spec.cont_params['valid']: spec.cont_params['cont'] = None spec.cont_params['chi2'] = -1 return input_cont_interp = FastCubic1DInterp( spec.cont_params['input_w1'], spec.cont_params['input_dwave'], spec.cont_params['input_data']) for arm, wave_arm in spec.forestwave.items(): cont_est = input_cont_interp(wave_arm / (1 + spec.z_qso)) if any(cont_est <= 0) or any(np.isnan(cont_est)): spec.cont_params['valid'] = False break cont_est *= self.meanflux_interp(wave_arm) spec.cont_params['cont'][arm] = cont_est spec.set_forest_weight(self.varlss_interp, self.eta_interp) spec.calc_continuum_chi2() if not spec.cont_params['valid']: spec.cont_params['cont'] = None spec.cont_params['chi2'] = -1
[docs] def init_spectra(self, spectra_list): """ Reads input continuum. Initializes :attr:`cont_params <qsonic.spectrum.Spectrum.cont_params>` for a list of Spectrum objects. Arguments --------- spectra_list: list(Spectrum) Spectrum objects. """ logging.info("Initializing input continuum.") for spec in spectra_list: spec.cont_params['method'] = 'input' spec.cont_params['valid'] = False spec.cont_params['x'] = np.zeros(1) spec.cont_params['xcov'] = np.eye(1) spec.cont_params['dof'] = spec.get_real_size() spec.cont_params['cont'] = {} self._read_continua(spectra_list)
[docs] def stacks_residual_flux(self): """:meth:`stack_spectra` stacks actual continuum values. Returns ------- False: bool """ return False