Source code for qsonic.masks

""" Masking module with classes for sky, BAL and DLA."""
import argparse

from astropy.io.ascii import read as asread
import numpy as np
from numpy.lib.recfunctions import rename_fields
import fitsio

from qsonic import QsonicException
from qsonic.mpi_utils import mpi_fnc_bcast


LIGHT_SPEED = 299792.458
"""float: Speed of light in km/s."""
sqrt_pi = 1.77245385091
"""float: Square root of pi."""
sqrt_2 = 1.41421356237
"""float: Square root of 2."""


[docs] def add_mask_parser(parser=None): """ Adds masking related arguments to parser. These arguments are grouped under 'Masking options'. Arguments --------- parser: argparse.ArgumentParser, default: None Returns --------- parser: argparse.ArgumentParser """ if parser is None: parser = argparse.ArgumentParser( formatter_class=argparse.ArgumentDefaultsHelpFormatter) mask_group = parser.add_argument_group('Masking options') mask_group.add_argument( "--sky-mask", help="Sky mask file.") mask_group.add_argument( "--bal-mask", action="store_true", help="Mask BALs (assumes it is in catalog).") mask_group.add_argument( "--dla-mask", help="DLA catalog to mask.") return parser
[docs] class SkyMask(): """ Sky line masking object. Parameters ---------- fname: str Filename to read by `astropy.io.ascii`. Must have four columns ordered as type, minimum wavelength, maximum wavelength and frame (must be 'RF' or 'OBS'). comm: None or MPI.COMM_WORLD, default: None mpi_rank: int, default: 0 """ column_names = ('type', 'wave_min', 'wave_max', 'frame') """tuple: Assumed ordering of columns in the text file.""" def __init__(self, fname, comm=None, mpi_rank=0): mask = mpi_fnc_bcast( asread, comm, mpi_rank, f"Error loading SkyMask from mask file {fname}.", fname, names=SkyMask.column_names) self.mask_rest_frame = mask[mask['frame'] == 'RF'] self.mask_obs_frame = mask[mask['frame'] == 'OBS']
[docs] def apply(self, spec): """ Apply the mask by setting **only** ``spec.forestivar`` and ``spec.forestflux`` to zero. Arguments ---------- spec: Spectrum Spectrum object to mask. """ for arm, wave_arm in spec.forestwave.items(): w = np.zeros(wave_arm.size, dtype=bool) m1 = np.searchsorted( wave_arm, [self.mask_obs_frame['wave_min'], self.mask_obs_frame['wave_max']] ).T m2 = np.searchsorted( wave_arm / (1.0 + spec.z_qso), [self.mask_rest_frame['wave_min'], self.mask_rest_frame['wave_max']] ).T mask_idx_ranges = np.concatenate((m1, m2)) for idx1, idx2 in mask_idx_ranges: w[idx1:idx2] = 1 spec.forestflux[arm][w] = 0 spec.forestivar[arm][w] = 0
[docs] class BALMask(): """ BAL masking object. Does not need construction. Assumes BAL related columns are present in the catalog. """ lines = np.array([ ("lCIV", 1549), ("lSiIV2", 1403), ("lSiIV1", 1394), ("lNV", 1240.81), ("lLya", 1216.1), ("lCIII", 1175), ("lPV2", 1128), ("lPV1", 1117), ("lSIV2", 1074), ("lSIV1", 1062), ("lOIV", 1031), ("lOVI", 1037), ("lOI", 1039), ("lLyb", 1025.7), ("lLy3", 972.5), ("lCIII", 977.0), ("lNIII", 989.9), ("lLy4", 949.7)], dtype=[("name", "U10"), ("value", 'f8')]) """:external+numpy:py:class:`ndarray <numpy.ndarray>`: Ion transition wavelengths in A.""" expected_columns = [ 'VMIN_CIV_450', 'VMAX_CIV_450', 'VMIN_CIV_2000', 'VMAX_CIV_2000' ] """list(str): Columns needed in catalog to mask wavelength ranges."""
[docs] @staticmethod def check_catalog(catalog): """ Asserts if the required columns are present in the catalog. Arguments ---------- catalog: :external+numpy:py:class:`ndarray <numpy.ndarray>` """ if not all(col in catalog.dtype.names for col in BALMask.expected_columns): raise QsonicException("Input catalog is missing BAL columns.")
[docs] @staticmethod def apply(spec): """ Apply the mask by setting **only** ``spec.forestivar`` and ``spec.forestflux`` to zero. Arguments ---------- spec: Spectrum Spectrum object to mask. """ min_velocities = np.concatenate( (spec.catrow['VMIN_CIV_450'], spec.catrow['VMIN_CIV_2000'])) max_velocities = np.concatenate( (spec.catrow['VMAX_CIV_450'], spec.catrow['VMAX_CIV_2000'])) w = (min_velocities > 0) & (max_velocities > 0) min_velocities = min_velocities[w] max_velocities = max_velocities[w] num_velocities = min_velocities.size if num_velocities == 0: return bal_obs_lines = BALMask.lines['value'] * (1 + spec.z_qso) min_velocities = 1 - min_velocities / LIGHT_SPEED max_velocities = 1 - max_velocities / LIGHT_SPEED mask = np.empty(num_velocities * BALMask.lines.size, dtype=[('wave_min', 'f8'), ('wave_max', 'f8')]) mask['wave_min'] = np.outer(bal_obs_lines, max_velocities).ravel() mask['wave_max'] = np.outer(bal_obs_lines, min_velocities).ravel() for arm, wave_arm in spec.forestwave.items(): w = np.zeros(wave_arm.size, dtype=bool) mask_idx_ranges = np.searchsorted( wave_arm, [mask['wave_min'], mask['wave_max']] ).T # Make sure first index comes before the second mask_idx_ranges.sort(axis=1) for idx1, idx2 in mask_idx_ranges: w[idx1:idx2] = 1 spec.forestflux[arm][w] = 0 spec.forestivar[arm][w] = 0
[docs] class DLAMask(): """ DLA masking object. Maximum numbers for oscillator strengths and Einstein coefficients are picked from NIST. Parameters ---------- fname: str FITS filename to read. local_targetids: :class:`ndarray <numpy.ndarray>` or None, default: None Remove DLAs if they are present in these TARGETIDs. comm: None or MPI.COMM_WORLD, default: None mpi_rank: int, default: 0 dla_mask_limit: float, default: 0.8 """ qe = 4.803204e-10 """float: Charge of electron in statC (cm^3/2 g^1/2 s^-1).""" me = 9.109384e-28 """float: Mass of electron in g.""" c_cms = LIGHT_SPEED * 1e5 """float: Speed of light in cm/s.""" aij_coeff = np.pi * qe**2 / me / c_cms """float: Derived quantity for transition strength in cm^2 s^-1.""" wave_lya_A = 1215.67 """float: Lya wavelength in A.""" # I suppose we are to pick maximum for each from NIST? f12_lya = 4.1641e-01 """float: Lya oscillator strength.""" A12_lya = 6.2648e+08 """float: Lya Einstein coefficient.""" wave_lyb_A = 1025.7220 """float: Lyb wavelength in A.""" f12_lyb = 7.9142e-02 """float: Lyb oscillator strength.""" A12_lyb = 1.6725e+08 """float: Lyb Einstein coefficient.""" accepted_zcolnames = ["Z_DLA", "Z"] """list(str): Column names for the DLA redshift."""
[docs] @staticmethod def H_tepper_garcia(a, u): """ Tepper-Garcia H function. Arguments --------- a: float Gamma / sigma. u: :external+numpy:py:class:`ndarray <numpy.ndarray>` (wave / lambda_12 - 1) / sigma. Returns ------- :external+numpy:py:class:`ndarray <numpy.ndarray>` """ P = u**2 + 1e-12 Q = 1.5 / P R = np.exp(-P) corr = (R**2 * (4 * P**2 + 7 * P + 4 + Q) - Q - 1) corr *= a / P / sqrt_pi return R - corr
[docs] @staticmethod def voigt_tepper_garcia(x, sigma, gamma): """Tepper-Garcia voight function. Arguments --------- x: :external+numpy:py:class:`ndarray <numpy.ndarray>` wave / lambda_12 - 1. sigma: float b / c. gamma: float Returns ------- :external+numpy:py:class:`ndarray <numpy.ndarray>` Voight profile. """ a = gamma / sigma u = x / sigma return DLAMask.H_tepper_garcia(a, u)
[docs] @staticmethod def get_optical_depth(wave_A, lambda12_A, log10N, b, f12, A12): """Optical depth for a transition line. Arguments --------- wave_A: :external+numpy:py:class:`ndarray <numpy.ndarray>` Wavelength array in A. lambda12_A: float Transition wavelength in A. log10N: float Log10 of column density. b: float Doppler parameter in km/s. f12: float Oscillator strength. A12: float Einstein coefficient. Returns ------- tau: :external+numpy:py:class:`ndarray <numpy.ndarray>` Optical depth. """ a12 = DLAMask.aij_coeff * f12 gamma = A12 * lambda12_A * 1e-8 / 4 / np.pi / DLAMask.c_cms sigma_gauss = b / LIGHT_SPEED tau = DLAMask.voigt_tepper_garcia( wave_A / lambda12_A - 1, sigma_gauss, gamma) tau *= a12 * 10**(log10N - 13) * (lambda12_A / b) / sqrt_pi return tau
[docs] @staticmethod def get_dla_flux(wave, z_dla, nhi, b=10.): """Normalized flux from Lya and Lyb wings of DLA. Arguments --------- wave: :external+numpy:py:class:`ndarray <numpy.ndarray>` Wavelength array in A. z_dla: float DLA redshift nhi: float Log10 of DLA column density. b: float, default: 10 Doppler parameter in km/s. Returns ------- F: :external+numpy:py:class:`ndarray <numpy.ndarray>` Normalized flux for DLA. """ wave_rf = wave / (1 + z_dla) tau = DLAMask.get_optical_depth( wave_rf, DLAMask.wave_lya_A, nhi, b, DLAMask.f12_lya, DLAMask.A12_lya) tau += DLAMask.get_optical_depth( wave_rf, DLAMask.wave_lyb_A, nhi, b, DLAMask.f12_lyb, DLAMask.A12_lyb) return np.exp(-tau)
[docs] @staticmethod def get_all_dlas(wave, spec_dlas): """Normalized flux from all DLAs in a sightline. Arguments --------- wave: :external+numpy:py:class:`ndarray <numpy.ndarray>` Wavelength array in A. spec_dlas: :external+numpy:py:class:`ndarray <numpy.ndarray>` Named ndarray with 'Z_DLA' and 'NHI'. Returns ------- F: :external+numpy:py:class:`ndarray <numpy.ndarray>` Normalized flux. """ transmission = np.ones(wave.size) for z_dla, nhi in spec_dlas[['Z_DLA', 'NHI']]: transmission *= DLAMask.get_dla_flux(wave, z_dla, nhi) return transmission
[docs] @staticmethod def _read_catalog(fname): """ Read and return DLA catalog. Should be run on master. If error occurs, returns None. Arguments --------- fname: str FITS filename to read. Returns ------- catalog: :external+numpy:py:class:`ndarray <numpy.ndarray>` DLA catalog. """ z_colname = DLAMask.accepted_zcolnames[0] fts = fitsio.FITS(fname) fts_colnames = set(fts["DLACAT"].get_colnames()) z_colname = fts_colnames.intersection(DLAMask.accepted_zcolnames) if not z_colname: fts.close() raise ValueError( "DLA mask error::Z colname has to be one of " f"{', '.join(DLAMask.accepted_zcolnames)}") z_colname = z_colname.pop() columns_list = ["TARGETID", z_colname, "NHI"] catalog = fts['DLACAT'].read(columns=columns_list) if z_colname != 'Z_DLA': catalog = rename_fields(catalog, {z_colname: 'Z_DLA'}) fts.close() return catalog
def __init__( self, fname, local_targetids=None, comm=None, mpi_rank=0, dla_mask_limit=0.8 ): self.dla_mask_limit = dla_mask_limit catalog = mpi_fnc_bcast( DLAMask._read_catalog, comm, mpi_rank, f"Error loading DLAMask from file {fname}.", fname) if local_targetids is not None: w = np.isin(catalog['TARGETID'], local_targetids) catalog = catalog[w] catalog.sort(order='TARGETID') # Group DLA catalog into targetids self.unique_targetids, s = np.unique( catalog['TARGETID'], return_index=True) self.split_catalog = np.split(catalog, s[1:])
[docs] def apply(self, spec): """ Apply the mask by setting ``spec.forestivar`` and ``spec.forestflux`` to zero. Arguments ---------- spec: Spectrum Spectrum object to mask. """ w = np.nonzero(self.unique_targetids == spec.targetid)[0] if w.size == 0: return idx = w[0] spec_dlas = self.split_catalog[idx] for arm, wave_arm in spec.forestwave.items(): transmission = DLAMask.get_all_dlas(wave_arm, spec_dlas) # Turn off DLA correction for l_rf > l_lya transmission[np.searchsorted( wave_arm, (1 + spec.z_qso) * DLAMask.wave_lya_A): ] = 1 w = transmission < self.dla_mask_limit transmission[w] = 1 spec.forestivar[arm][w] = 0 spec.forestflux[arm][w] = 0 spec.forestflux[arm] /= transmission spec.forestivar[arm] *= transmission**2