Source code for qsonic.io

""" This module contains functions to read spectra and save deltas. Can be
imported without a need for MPI."""

import argparse
import functools
import warnings

import fitsio
import numpy as np
from numpy.lib.recfunctions import drop_fields, merge_arrays

from qsonic import QsonicException
import qsonic.spectrum


[docs] def add_io_parser(parser=None): """ Adds IO related arguments to parser. These arguments are grouped under 'Input/output parameters and selections'. ``--input-dir`` and ``--catalog`` are required. Arguments --------- parser: argparse.ArgumentParser, default: None Returns --------- parser: argparse.ArgumentParser """ if parser is None: parser = argparse.ArgumentParser( formatter_class=argparse.ArgumentDefaultsHelpFormatter) ingroup = parser.add_argument_group('Input options') ingroup.add_argument( "--input-dir", '-i', required=True, help="Input directory.") ingroup.add_argument( "--catalog", required=True, help="Catalog filename") ingroup.add_argument( "--tile-format", action="store_true", help="Read tile coadd-*.fits files in tiles/cumulative directory.") ingroup.add_argument( "--mock-analysis", action="store_true", help="Input folder is mock. Uses nside=16") ingroup.add_argument( "--keep-surveys", nargs='+', default=['main'], help="Surveys to keep.") ingroup.add_argument( "--skip-resomat", action="store_true", help="Skip reading resolution matrix for 3D.") ingroup.add_argument( "--arms", default=['B', 'R'], choices=['B', 'R', 'Z'], nargs='+', help="Arms to read.") outgroup = parser.add_argument_group('Output options') outgroup.add_argument( "--outdir", '-o', help="Output directory to save deltas.") outgroup.add_argument( "--coadd-arms", default="before", choices=["before", "after", "disable"], help="Coadds arms before or after continuum fitting or not at all.") outgroup.add_argument( "--exposures", default="disable", choices=["before", "after", "disable"], help=("Reads exposures before or after continuum fitting and saves. " "Tile format not supported. Related function " ":func:`qsonic.scripts.qsonic_fit.mpi_read_exposures_after`." )) outgroup.add_argument( "--save-by-hpx", action="store_true", help="Save by healpix. If not, saves by MPI rank.") return parser
[docs] def get_spectra_reader_function( input_dir, arms_to_keep, mock_analysis, skip_resomat, read_true_continuum, is_tile, read_exposures, program="dark" ): """ Returns a callable object (function) that returns a list of Spectrum objects for a given catalog of a single healpix. Essentially, a wrapper for the following functions: - :meth:`read_onehealpix_file_mock`, - :meth:`read_onetile_coaddfile_data`, - :meth:`read_onehealpix_file_data_uncoadd`, - :meth:`read_onehealpix_file_data_coadd`. If for data, ``SURVEY`` column must be present and sorted when calling the function this returns in the healpix grouping. Tile grouping requires ``TILEID`` and ``PETAL_LOC``. Arguments --------- input_dir: str Input directory arms_to_keep: list(str) Must only contain B, R and Z mock_analysis: bool Reads for mock data if true. skip_resomat: bool If true, do not read resomat. read_true_continuum: bool If true, reads the true continuum for mock analysis. is_tile: bool If true, reads for tile grouping read_exposures: bool If true, reads exposures for healpix format only. program: str, default: "dark" Always use dark program. Returns --------- readerFunction: Callable Call this function with a catalog of quasars in a single healpix. Raises --------- RuntimeWarning If number of quasars in the healpix file does not match the catalog. """ if mock_analysis: return functools.partial( read_onehealpix_file_mock, input_dir=input_dir, arms_to_keep=arms_to_keep, skip_resomat=skip_resomat, read_true_continuum=read_true_continuum ) elif is_tile: return functools.partial( read_onetile_coaddfile_data, input_dir=input_dir, arms_to_keep=arms_to_keep, skip_resomat=skip_resomat ) elif read_exposures: return functools.partial( read_onehealpix_file_data_uncoadd, input_dir=input_dir, arms_to_keep=arms_to_keep, skip_resomat=skip_resomat, program=program ) else: return functools.partial( read_onehealpix_file_data_coadd, input_dir=input_dir, arms_to_keep=arms_to_keep, skip_resomat=skip_resomat, program=program )
[docs] def read_resolution_matrices_onehealpix_data( catalog_hpx, input_dir, spectra_list, program="dark" ): """ Returns a list of Spectrum objects for a given catalog of a single healpix. Arguments --------- catalog_hpx: :external+numpy:py:class:`ndarray <numpy.ndarray>` Catalog of quasars in a single healpix. If for data 'SURVEY' column must be present and sorted. input_dir: str Input directory spectra_list: list(Spectrum) List of spectra to read resolution matrix program: str, default: "dark" Always use dark program. Returns --------- spectra_list: list(Spectrum) Raises --------- RuntimeWarning If number of quasars in the healpix file does not match the catalog. """ # assert (catalog_hpx.size == len(spectra_list)) unique_surveys, s2 = np.unique(catalog_hpx['SURVEY'], return_index=True) survey_split_cat = np.split(catalog_hpx, s2[1:]) s2 = np.append(s2, len(spectra_list)) arms_to_keep = spectra_list[0].wave.keys() for ii, cat_by_survey in enumerate(survey_split_cat): survey = cat_by_survey['SURVEY'][0] pixnum = cat_by_survey['HPXPIXEL'][0] targetids_by_survey = cat_by_survey['TARGETID'] fspec = (f"{input_dir}/{survey}/{program}/{pixnum//100}/" f"{pixnum}/coadd-{survey}-{program}-{pixnum}.fits") i1, i2 = s2[ii], s2[ii + 1] spectra_list[i1:i2] = _read_onehealpix_file_onlyreso( targetids_by_survey, fspec, arms_to_keep, spectra_list[i1:i2]) return spectra_list
[docs] def read_deltas(fname): """ Returns a list of all Delta objects in a file. Arguments --------- fname: str FITS file name Returns --------- deltas_list: list(Delta) Raises --------- RuntimeError If the file is missing certain columns and keys. See :class:`Delta`. """ with fitsio.FITS(fname) as fts: deltas_list = [qsonic.spectrum.Delta(hdu) for hdu in fts[1:]] return deltas_list
[docs] def save_deltas( spectra_list, outdir, save_by_hpx=False, mpi_rank=None ): """ Saves given list of spectra as deltas. NO coaddition of arms. Each arm is saved separately. Only valid spectra are saved. Arguments --------- spectra_list: list(Spectrum) Continuum fitted spectra objects. All must be valid! outdir: str Output directory. Does not save if empty of None save_by_hpx: bool, default: False Saves by healpix if True. Has priority over mpi_rank mpi_rank: int, default: None Rank of the MPI process. Save by `mpi_rank` if passed. Raises --------- QsonicException If both `mpi_rank` and `save_by_hpx` is None/False. QsonicException If blinding is not set. """ if not outdir: return if save_by_hpx: pixnos = np.array([spec.hpix for spec in spectra_list]) sort_idx = np.argsort(pixnos) pixnos = pixnos[sort_idx] unique_pix, s = np.unique(pixnos, return_index=True) split_spectra = np.split(np.array(spectra_list)[sort_idx], s[1:]) elif mpi_rank is not None: unique_pix = [mpi_rank] split_spectra = [spectra_list] else: raise QsonicException("save_by_hpx and mpi_rank can't both be None.") if qsonic.spectrum.Spectrum.blinding_not_set(): raise QsonicException("Blinding is not set. Cannot save delta.") for healpix, hp_specs in zip(unique_pix, split_spectra): results = fitsio.FITS( f"{outdir}/delta-{healpix}.fits", 'rw', clobber=True) for spec in qsonic.spectrum.valid_spectra(hp_specs): spec.write(results) results.close()
def _read_resoimage(imhdu, quasar_indices, nwave): # Reading into sorted order is faster # Since the output catalog is already sorted, we place them in order ndiags = imhdu.get_dims()[1] dtype = imhdu._get_image_numpy_dtype() data = np.empty((quasar_indices.size, ndiags, nwave), dtype=dtype) for idata, iqso in enumerate(quasar_indices): data[idata, :, :] = imhdu[int(iqso), :, :] return data def _read_imagehdu(imhdu, quasar_indices, nwave): dtype = imhdu._get_image_numpy_dtype() data = np.empty((quasar_indices.size, nwave), dtype=dtype) for idata, iqso in enumerate(quasar_indices): data[idata, :] = imhdu[int(iqso), :] return data # Timing showed this is slower # def _read_imagehdu_2(imhdu, quasar_indices): # ndims = imhdu.get_info()['ndims'] # if ndims == 2: # return np.vstack([imhdu[int(idx), :] for idx in quasar_indices]) # return np.vstack([imhdu[int(idx), :, :] for idx in quasar_indices])
[docs] def _read_true_continuum(targetids, fspec): """Read true continuum as dictionary from file. Arguments --------- targetids: :external+numpy:py:class:`ndarray <numpy.ndarray>` Target IDs. fspec: str Filename to open. Returns --------- cont_data: dict( :external+numpy:py:class:`ndarray <numpy.ndarray>` ) This has keys for w1, w2, dwave and data, where data is ndarray and others are floats. Raises --------- RuntimeError If number of quasars in TRUE_CONT does not match the input. """ with fitsio.FITS(fspec) as fitsfile: hdr = fitsfile['TRUE_CONT'].read_header() true_continua = fitsfile['TRUE_CONT'].read() w1 = hdr['WMIN'] w2 = hdr['WMAX'] dw = hdr['DWAVE'] common_targetids, idx_fbr, _ = np.intersect1d( true_continua['TARGETID'], targetids, assume_unique=True, return_indices=True ) if (common_targetids.size != targetids.size): raise RuntimeError( f"Error reading true continua from {fspec}. " "Number of quasars in TRUE_CONT does not match the catalog " f"catalog:{targetids.size} vs " f"healpix:{common_targetids.size}!") cont_data = { 'w1': w1, 'w2': w2, 'dwave': dw, 'data': true_continua['TRUE_CONT'][idx_fbr, :].astype("f8") } return cont_data
[docs] def _read_onehealpix_file( targetids_by_survey, fspec, arms_to_keep, skip_resomat, fbrmap_columns=['TARGETID'] ): """Common function to read a single FITS file in DESI healpix grouping. Arguments --------- targetids_by_survey: :external+numpy:py:class:`ndarray <numpy.ndarray>` Targetids_by_survey (used to be catalog). If data, split by survey and contains only one survey. fspec: str Filename to open. arms_to_keep: list(str) Must only contain B, R and Z. skip_resomat: bool If true, do not read resomat. fbrmap_columns: list(str), default: ['TARGETID'] Columns to read from FIBERMAP. For coadded files, it should be ``['TARGETID']``. For uncoadded files (spectra-), you should read at least ``[TARGETID, PETAL_LOC, FIBER, NIGHT, EXPID, TILEID]``. Returns --------- data: dict( :external+numpy:py:class:`ndarray <numpy.ndarray>` ) Only quasar spectra are read into keywords wave, flux etc. Resolution is read if present. idx_cat: :external+numpy:py:class:`ndarray <numpy.ndarray>` Indices in `targetids_by_survey` there were succesfully read. fbrmap: :external+numpy:py:class:`ndarray <numpy.ndarray>` Catalog with ``columns`` from the FIBERMAP. Note the size will not be equal to ``targetids_by_survey.size`` when reading exposures. Raises --------- RuntimeWarning If number of quasars in the healpix file does not match the catalog. """ fitsfile = fitsio.FITS(fspec) fbrmap = fitsfile['FIBERMAP'].read(columns=fbrmap_columns) common_targetids, _, idx_cat = np.intersect1d( fbrmap['TARGETID'], targetids_by_survey, return_indices=True) if (common_targetids.size != targetids_by_survey.size): warnings.warn( f"Error reading {fspec}. " "Number of quasars in healpix does not match the catalog " f"catalog:{targetids_by_survey.size} vs " f"healpix:{common_targetids.size}!", RuntimeWarning) idx_fbr = np.nonzero(np.isin(fbrmap['TARGETID'], targetids_by_survey))[0] targetids_fbr = fbrmap['TARGETID'][idx_fbr] idx_fbr = idx_fbr[targetids_fbr.argsort()] data = { 'wave': {}, 'flux': {}, 'ivar': {}, 'mask': {}, 'reso': {} } for arm in arms_to_keep: # Cannot read by rows= argument. data['wave'][arm] = fitsfile[f'{arm}_WAVELENGTH'].read() nwave = data['wave'][arm].size data['flux'][arm] = _read_imagehdu( fitsfile[f'{arm}_FLUX'], idx_fbr, nwave) data['ivar'][arm] = _read_imagehdu( fitsfile[f'{arm}_IVAR'], idx_fbr, nwave) data['mask'][arm] = _read_imagehdu( fitsfile[f'{arm}_MASK'], idx_fbr, nwave) if skip_resomat or f'{arm}_RESOLUTION' not in fitsfile: continue data['reso'][arm] = _read_resoimage( fitsfile[f'{arm}_RESOLUTION'], idx_fbr, nwave) fitsfile.close() return data, idx_cat, fbrmap[idx_fbr]
[docs] def _read_onehealpix_file_onlyreso( targetids_by_survey, fspec, arms_to_keep, spectra_list ): """ Function to read forest region resolution matrix for data. Note reading resolution matrix is already fast for mocks. Arguments --------- targetids_by_survey: :external+numpy:py:class:`ndarray <numpy.ndarray>` Targetids_by_survey (used to be catalog). If data, split by survey and contains only one survey. fspec: str Filename to open. arms_to_keep: list(str) Must only contain B, R and Z. spectra_list: list(Spectrum) List of spectra that forest region is set. Modified in place and also returned. Returns --------- spectra_list: list(Spectrum) List of spectra where forestreso is set. Raises --------- RuntimeWarning If number of quasars in the healpix file does not match the catalog. """ fitsfile = fitsio.FITS(fspec) fbrmap = fitsfile['FIBERMAP'].read(columns='TARGETID') common_targetids, idx_fbr, idx_cat = np.intersect1d( fbrmap, targetids_by_survey, assume_unique=True, return_indices=True) if (common_targetids.size != targetids_by_survey.size): warnings.warn( f"Error reading {fspec}. " "Number of quasars in healpix does not match the catalog " f"catalog:{targetids_by_survey.size} vs " f"healpix:{common_targetids.size}!", RuntimeWarning) for arm in arms_to_keep: imhdu = fitsfile[f'{arm}_RESOLUTION'] for jj, spec in zip(idx_fbr, spectra_list): # assert (common_targetids[jj] == spec.targetid) if arm not in spec.forestwave.keys(): continue f1 = spec._f1[arm] f2 = spec._f2[arm] spec._forestreso[arm] = imhdu[int(jj), :, f1:f2][0] fitsfile.close() return spectra_list
[docs] def read_onehealpix_file_data_coadd( catalog_hpx, input_dir, arms_to_keep, skip_resomat, program="dark" ): """Read FITS files for all surveys needed for data. Arguments --------- catalog_hpx: :external+numpy:py:class:`ndarray <numpy.ndarray>` Catalog for a healpix. Ordered by SURVEY and TARGETID. input_dir: str Input directory. arms_to_keep: list(str) Must only contain B, R and Z. skip_resomat: bool If true, do not read resomat. program: str, default: "dark" Program. Returns --------- spectra_list: list(Spectrum) Raises --------- RuntimeWarning If number of quasars in the healpix file does not match the catalog. """ spectra_list = [] survey_split_cat = np.split( catalog_hpx, np.unique(catalog_hpx['SURVEY'], return_index=True)[1][1:] ) pixnum = catalog_hpx['HPXPIXEL'][0] for cat_by_survey in survey_split_cat: survey = cat_by_survey['SURVEY'][0] fspec = (f"{input_dir}/{survey}/{program}/{pixnum//100}/" f"{pixnum}/coadd-{survey}-{program}-{pixnum}.fits") data, idx_cat, _ = _read_onehealpix_file( cat_by_survey['TARGETID'], fspec, arms_to_keep, skip_resomat) if idx_cat.size != cat_by_survey.size: cat_by_survey = cat_by_survey[idx_cat] spectra_list.extend( qsonic.spectrum.generate_spectra_list_from_data( cat_by_survey, data) ) return spectra_list
[docs] def read_onehealpix_file_data_uncoadd( catalog_hpx, input_dir, arms_to_keep, skip_resomat, program="dark" ): """Read uncoadded spectra from FITS files for all surveys needed for data. Arguments --------- catalog_hpx: :external+numpy:py:class:`ndarray <numpy.ndarray>` Catalog for a healpix. Ordered by SURVEY and TARGETID. input_dir: str Input directory. arms_to_keep: list(str) Must only contain B, R and Z. skip_resomat: bool If true, do not read resomat. program: str, default: "dark" Program. Returns --------- spectra_list: list(Spectrum) """ spectra_list = [] survey_split_cat = np.split( catalog_hpx, np.unique(catalog_hpx['SURVEY'], return_index=True)[1][1:] ) pixnum = catalog_hpx['HPXPIXEL'][0] for cat_by_survey in survey_split_cat: survey = cat_by_survey['SURVEY'][0] fspec = (f"{input_dir}/{survey}/{program}/{pixnum//100}/" f"{pixnum}/spectra-{survey}-{program}-{pixnum}.fits") data, idx_cat, fbrmap = _read_onehealpix_file( cat_by_survey['TARGETID'], fspec, arms_to_keep, skip_resomat, ['TARGETID', 'PETAL_LOC', 'FIBER', 'NIGHT', 'EXPID', 'TILEID']) if idx_cat.size != cat_by_survey.size: cat_by_survey = cat_by_survey[idx_cat] new_cat = cat_by_survey[ np.searchsorted(cat_by_survey['TARGETID'], fbrmap['TARGETID'])] new_cat = merge_arrays( (new_cat, drop_fields(fbrmap, 'TARGETID', usemask=False)), flatten=True, usemask=False ) spectra_list.extend( qsonic.spectrum.generate_spectra_list_from_data(new_cat, data) ) return spectra_list
[docs] def read_onetile_coaddfile_data( catalog_tile, input_dir, arms_to_keep, skip_resomat ): """Read all petal coadd FITS files for a given tile. Arguments --------- catalog_tile: :external+numpy:py:class:`ndarray <numpy.ndarray>` Catalog for a tile. Ordered by PETAL_LOC and TARGETID. input_dir: str Input directory. arms_to_keep: list(str) Must only contain B, R and Z. skip_resomat: bool If true, do not read resomat. Returns --------- spectra_list: list(Spectrum) Raises --------- RuntimeWarning If number of quasars in the tile file does not match the catalog. """ tileid = catalog_tile['TILEID'][0] lastnight = catalog_tile['LASTNIGHT'][0] petal_split_cat = np.split( catalog_tile, np.unique( catalog_tile['PETAL_LOC'], return_index=True)[1][1:] ) spectra_list = [] for cat_by_petal in petal_split_cat: petal = cat_by_petal['PETAL_LOC'][0] fspec = (f"{input_dir}/{tileid}/{lastnight}/" f"coadd-{petal}-{tileid}-thru{lastnight}.fits") data, idx_cat, _ = _read_onehealpix_file( cat_by_petal['TARGETID'], fspec, arms_to_keep, skip_resomat) if idx_cat.size != cat_by_petal.size: cat_by_petal = cat_by_petal[idx_cat] spectra_list.extend( qsonic.spectrum.generate_spectra_list_from_data( cat_by_petal, data) ) return spectra_list
[docs] def read_onehealpix_file_mock( catalog_hpx, input_dir, arms_to_keep, skip_resomat, read_true_continuum, nside=16 ): """ Read a single FITS file for mocks. Arguments --------- catalog_hpx: :external+numpy:py:class:`ndarray <numpy.ndarray>` Catalog for a single healpix. Ordered by TARGETID. input_dir: str Input directory. arms_to_keep: list(str) Must only contain B, R and Z. skip_resomat: bool If true, do not read resomat. read_true_continuum: bool If true, reads the true continuum for mock analysis. nside: int, default: 16 NSIDE for healpix. Returns --------- spectra_list: list(Spectrum) Raises --------- RuntimeWarning If number of quasars in the healpix file does not match the catalog. """ pixnum = catalog_hpx['HPXPIXEL'][0] fspec = f"{input_dir}/{pixnum//100}/{pixnum}/spectra-{nside}-{pixnum}.fits" data, idx_cat, _ = _read_onehealpix_file( catalog_hpx['TARGETID'], fspec, arms_to_keep, skip_resomat) if idx_cat.size != catalog_hpx.size: catalog_hpx = catalog_hpx[idx_cat] fspec = f"{input_dir}/{pixnum//100}/{pixnum}/truth-{nside}-{pixnum}.fits" if read_true_continuum: data['cont'] = _read_true_continuum(catalog_hpx['TARGETID'], fspec) if skip_resomat: return qsonic.spectrum.generate_spectra_list_from_data( catalog_hpx, data) with fitsio.FITS(fspec) as fitsfile: for arm in arms_to_keep: data['reso'][arm] = np.array(fitsfile[f'{arm}_RESOLUTION'].read()) return qsonic.spectrum.generate_spectra_list_from_data(catalog_hpx, data)
def _float_range(f1, f2): # Define the function with default arguments def float_range_checker(arg): """New Type function for argparse - a float within predefined range. """ try: f = float(arg) except ValueError: raise argparse.ArgumentTypeError("must be a floating point number") if f < f1 or f > f2: raise argparse.ArgumentTypeError(f"must be in range [{f1}--{f2}]") return f # Return function handle to checking function return float_range_checker