Source code for qsonic.continuum_models.picca_continuum_model

import logging
import numpy as np
from iminuit import Minuit
from scipy.optimize import minimize

from qsonic import QsonicException
from qsonic.mathtools import mypoly1d
from qsonic.continuum_models.base_continuum_model import BaseContinuumModel


[docs] class PiccaContinuumModel(BaseContinuumModel): """Picca continuum model class. Parameters ---------- meancont_interp: FastCubic1DInterp Fast cubic spline object for the mean continuum. meanflux_interp: FastLinear1DInterp Interpolator for mean flux. If fiducial is not set, this equals to 1. varlss_interp: FastLinear1DInterp or FastCubic1DInterp Cubic spline for var_lss if fitting. Linear if from file. eta_interp: FastCubic1DInterp Interpolator for eta. Returns one if fiducial var_lss is set. cont_order: int Order of continuum polynomial from ``args.cont_order``. rfwave0: float First rest-frame wavelength center for the mean continuum. denom: float Denominator for the slope term in the continuum model. minimizer: str ``iminuit`` or ``l_bfgs_b`` to select the minimizer function. Attributes ---------- minimizer: function Function that points to one of the minimizer options. """ def __init__( self, meancont_interp, meanflux_interp, varlss_interp, eta_interp, cont_order, rfwave0, denom, minimizer ): self.meancont_interp = meancont_interp self.meanflux_interp = meanflux_interp self.varlss_interp = varlss_interp self.eta_interp = eta_interp self.cont_order = cont_order self.rfwave0 = rfwave0 self.denom = denom if minimizer == "iminuit": self.minimizer = self._iminuit_minimizer elif minimizer == "l_bfgs_b": self.minimizer = self._scipy_l_bfgs_b_minimizer else: raise QsonicException( "Undefined minimizer. Developer forgot to implement.")
[docs] def _continuum_costfn(self, x, wave, flux, ivar_sm, z_qso): """Cost function to minimize for each quasar. This is a modified chi2 where amplitude is also part of minimization. Cost of each arm is simply added to the total cost. Arguments --------- x: :external+numpy:py:class:`ndarray <numpy.ndarray>` Polynomial coefficients for quasar diversity. wave: dict(:external+numpy:py:class:`ndarray <numpy.ndarray>`) Observed-frame wavelengths. flux: dict(:external+numpy:py:class:`ndarray <numpy.ndarray>`) Flux. ivar_sm: dict(:external+numpy:py:class:`ndarray <numpy.ndarray>`) Smooth inverse variance. z_qso: float Quasar redshift. Returns --------- cost: float Cost (modified chi2) for a given ``x``. """ cost = 0 for arm, wave_arm in wave.items(): cont_est = self.get_continuum_model(x, wave_arm / (1 + z_qso)) # no_neg = np.sum(cont_est<0) # penalty = wave_arm.size * no_neg**2 cont_est *= self.meanflux_interp(wave_arm) var_lss = self.varlss_interp(wave_arm) * cont_est**2 eta = self.eta_interp(wave_arm) weight = ivar_sm[arm] / (eta + ivar_sm[arm] * var_lss) w = weight > 0 cost += np.dot( weight, (flux[arm] - cont_est)**2 ) - np.log(weight[w]).sum() # + penalty return cost
[docs] def get_continuum_model(self, x, wave_rf_arm): """Returns interpolated continuum model. Arguments --------- x: :external+numpy:py:class:`ndarray <numpy.ndarray>` Polynomial coefficients for quasar diversity. wave_rf_arm: :external+numpy:py:class:`ndarray <numpy.ndarray>` Rest-frame wavelength per arm. Returns --------- cont: :external+numpy:py:class:`ndarray <numpy.ndarray>` Continuum at `wave_rf_arm` values given `x`. """ slope = np.log(wave_rf_arm / self.rfwave0) / self.denom cont = self.meancont_interp(wave_rf_arm) * mypoly1d(x, 2 * slope - 1) # Multiply with resolution # Edges are difficult though return cont
[docs] def _iminuit_minimizer(self, spec, a0): def _cost(x): return self._continuum_costfn( x, spec.forestwave, spec.forestflux, spec.forestivar_sm, spec.z_qso) x0 = np.zeros_like(spec.cont_params['x']) x0[0] = a0 mini = Minuit(_cost, x0) mini.errordef = Minuit.LEAST_SQUARES mini.migrad() result = {} result['valid'] = mini.valid result['x'] = np.array(mini.values) result['xcov'] = np.array(mini.covariance) return result
[docs] def _scipy_l_bfgs_b_minimizer(self, spec, a0): x0 = np.zeros_like(spec.cont_params['x']) x0[0] = a0 mini = minimize( self._continuum_costfn, x0, args=(spec.forestwave, spec.forestflux, spec.forestivar_sm, spec.z_qso), method='L-BFGS-B', bounds=None, jac=None ) result = {} result['valid'] = mini.success result['x'] = mini.x result['xcov'] = mini.hess_inv.todense() return result
[docs] def stack_spectra(self, valid_spectra_list, flux_stacker): """Stacks spectra in the observed and rest-frame. Observed-frame and rest-frame stacking is performed over residuals f/C. Arguments --------- valid_spectra_list: list(Spectrum) Valid spectra objects to iterate. flux_stacker: FluxStacker Flux stacker object. """ for spec in valid_spectra_list: for arm, wave_arm in spec.forestwave.items(): wave_rf_arm = wave_arm / (1 + spec.z_qso) cont = spec.cont_params['cont'][arm] weight = spec.forestweight[arm] * cont**2 weighted_flux = weight * spec.forestflux[arm] / cont flux_stacker.add( wave_arm, wave_rf_arm, weighted_flux, weighted_flux, weight )
[docs] def fit_continuum(self, spec): """Fits the continuum for a single Spectrum. This function uses :attr:`forestivar_sm <qsonic.spectrum.Spectrum.forestivar_sm>` in inverse variance, which must be smoothed beforehand. It also modifies :attr:`cont_params <qsonic.spectrum.Spectrum.cont_params>` dictionary's ``valid, cont, x, xcov, chi2, dof`` keys. If the best-fitting continuum is **negative at any point**, the fit is **invalidated**. Chi2 is set separately without using the :meth:`cost function <._continuum_costfn>`. ``x`` key is the best-fitting parameter, and ``xcov`` is their inverse Hessian ``hess_inv`` given by :external+scipy:func:`scipy.optimize.minimize` using 'L-BFGS-B' method. Arguments --------- spec: Spectrum Spectrum object to fit. """ # We can precalculate meanflux and varlss here, # and store them in respective keys to spec.cont_params def get_a0(): a0 = 0 n0 = 1e-6 for arm, ivar_arm in spec.forestivar_sm.items(): a0 += np.dot(spec.forestflux[arm], ivar_arm) n0 += np.sum(ivar_arm) return a0 / n0 result = self.minimizer(spec, get_a0()) spec.cont_params['valid'] = result['valid'] if spec.cont_params['valid']: spec.cont_params['cont'] = {} for arm, wave_arm in spec.forestwave.items(): cont_est = self.get_continuum_model( result['x'], wave_arm / (1 + spec.z_qso)) if any(cont_est < 0): spec.cont_params['valid'] = False break cont_est *= self.meanflux_interp(wave_arm) # cont_est *= self.flux_stacker(wave_arm) spec.cont_params['cont'][arm] = cont_est spec.set_forest_weight(self.varlss_interp, self.eta_interp) # We can further eliminate spectra based chi2 spec.calc_continuum_chi2() if spec.cont_params['valid']: spec.cont_params['x'] = result['x'] spec.cont_params['xcov'] = result['xcov'] else: spec.cont_params['cont'] = None spec.cont_params['chi2'] = -1
[docs] def init_spectra(self, spectra_list): """ Initializes :attr:`cont_params <qsonic.spectrum.Spectrum.cont_params>` for a list of Spectrum objects. Arguments --------- spectra_list: list(Spectrum) Spectrum objects to fit. """ logging.info("Initializing Picca continuum fitting.") for spec in spectra_list: spec.cont_params['method'] = 'picca' spec.cont_params['x'] = np.append( spec.cont_params['x'][0], np.zeros(self.cont_order)) spec.cont_params['xcov'] = np.eye(self.cont_order + 1) spec.cont_params['dof'] = \ spec.get_real_size() - self.cont_order - 1
[docs] def stacks_residual_flux(self): """:meth:`stack_spectra` stacks residual flux values f/C. Returns ------- True: bool """ return True