Source code for qsonic.calibration

""" Calibration module for noise and flux. """
import argparse

import fitsio
import numpy as np

from qsonic.mathtools import (
    FastCubic1DInterp, FastLinear1DInterp, _one_function)
from qsonic.mpi_utils import mpi_fnc_bcast


[docs] def add_calibration_parser(parser=None): """ Adds calibration related arguments to parser. These arguments are grouped under 'Noise and flux calibation options'. Arguments --------- parser: argparse.ArgumentParser, default: None Returns --------- parser: argparse.ArgumentParser """ if parser is None: parser = argparse.ArgumentParser( formatter_class=argparse.ArgumentDefaultsHelpFormatter) calib_group = parser.add_argument_group( 'Noise and flux calibation options') calib_group.add_argument( "--noise-calibration", help="Noise calibration file.") calib_group.add_argument( "--varlss-as-additive-noise", action="store_true", help="var_lss as additive noise term after continuum fitting.") calib_group.add_argument( "--flux-calibration", help="Flux calibration file.") return parser
[docs] class NoiseCalibrator(): """ Noise calibration object. .. math:: i \\rightarrow i / \\eta, where i is IVAR. FITS file must have 'VAR_FUNC' extension. This extension must have columns for 'lambda' and 'eta'. Wavelength array must be linearly and equally spaced. Uses cubic spline. Parameters ---------- fname: str Filename to read by ``fitsio``. comm: None or MPI.COMM_WORLD, default: None mpi_rank: int, default: 0 add_varlss: bool, default: False Use var_lss as an additive correction to noise in observed frame. Requires continuum. no_eta: bool, default: False Turns off eta scaling by using eta=1. Attributes ---------- eta_interp: FastCubic1DInterp Eta interpolator. """
[docs] def _read(self, fname): with fitsio.FITS(fname) as fts: data = fts['VAR_FUNC'].read() waves = data['lambda'] waves_0 = waves[0] dwave = waves[1] - waves[0] if not np.allclose(np.diff(waves), dwave): raise Exception( "Failed to construct noise calibration from " f"{fname}::wave is not equally spaced.") var_lss = np.array(data['var_lss'], dtype='d') varlss_interp = FastCubic1DInterp(waves_0, dwave, var_lss) eta = np.array(data['eta'], dtype='d') eta[eta == 0] = 1 eta_interp = FastCubic1DInterp(waves_0, dwave, eta) return eta_interp, varlss_interp
def __init__( self, fname, comm=None, mpi_rank=0, add_varlss=False, no_eta=False ): self.eta_interp, self.varlss_interp = mpi_fnc_bcast( self._read, comm, mpi_rank, f"Error loading NoiseCalibrator from file {fname}.", fname) if not add_varlss: self.varlss_interp = None if no_eta: self.eta_interp = _one_function
[docs] def apply(self, spectra_list): """ Apply the noise calibration by **only** scaling :attr:`forestivar <qsonic.spectrum.Spectrum.forestivar>`. Smooth component must be set after this. Arguments ---------- spectra_list: list(Spectrum) Spectrum objects to noise calibrate. """ if self.varlss_interp: if not all(spec.cont_params['cont'] for spec in spectra_list): raise Exception( "NoiseCalibrator needs continuum for additive correction.") for spec in spectra_list: for arm, wave_arm in spec.forestwave.items(): eta = self.eta_interp(wave_arm) vlss = self.varlss_interp(wave_arm) vlss *= spec.cont_params['cont'][arm]**2 spec.forestivar[arm] /= (eta + spec.forestivar[arm] * vlss) spec.forestivar[arm][spec.forestivar[arm] < 0] = 0 else: for spec in spectra_list: for arm, wave_arm in spec.forestwave.items(): eta = self.eta_interp(wave_arm) spec.forestivar[arm] /= eta
[docs] class FluxCalibrator(): """ Flux calibration object. .. math:: f &\\rightarrow f / s i &\\rightarrow i \\times s^2, where i is IVAR and s is the stacked flux. FITS file must have 'STACKED_FLUX' extension. This extension must have columns for 'lambda' and 'stacked_flux'. Wavelength array must be linearly and equally spaced. Uses linear interpolation. Parameters ---------- fname: str Filename to read by ``fitsio``. comm: None or MPI.COMM_WORLD, default: None mpi_rank: int, default: 0 Attributes ---------- flux_interp: FastLinear1DInterp Flux interpolator. """
[docs] def _read(self, fname): with fitsio.FITS(fname) as fts: data = fts['STACKED_FLUX'].read() waves = data['lambda'] waves_0 = waves[0] dwave = waves[1] - waves[0] if not np.allclose(np.diff(waves), dwave): raise Exception( "Failed to construct flux calibration from " f"{fname}::wave is not equally spaced.") stacked_flux = np.array(data['stacked_flux'], dtype='d') stacked_flux[stacked_flux == 0] = 1 return FastLinear1DInterp(waves_0, dwave, stacked_flux)
def __init__(self, fname, comm=None, mpi_rank=0): self.stacked_flux_interp = mpi_fnc_bcast( self._read, comm, mpi_rank, f"Error loading FluxCalibrator from file {fname}.", fname)
[docs] def apply(self, spectra_list): """ Apply the flux calibration by **only** scaling :attr:`forestflux <qsonic.spectrum.Spectrum.forestflux>` and :attr:`forestivar <qsonic.spectrum.Spectrum.forestivar>`. Smooth component must be set after this. Arguments ---------- spectra_list: list(Spectrum) Spectrum objects to flux calibrate. """ for spec in spectra_list: for arm, wave_arm in spec.forestwave.items(): stacked_flux = self.stacked_flux_interp(wave_arm) spec.forestflux[arm] /= stacked_flux spec.forestivar[arm] *= stacked_flux**2