""" This module contains functions to read, validate and make necessary changes
to DESI quasar catalogs. Can be imported without a need for MPI. """
import logging
import warnings
import fitsio
from healpy import ang2pix
import numpy as np
from numpy.lib.recfunctions import rename_fields, append_fields
from qsonic import QsonicException
from qsonic.mpi_utils import balance_load, mpi_fnc_bcast
_accepted_extnames = set(['QSO_CAT', 'ZCATALOG', 'METADATA'])
"""set: Accepted extensions for quasar catalog."""
_required_columns = [
set(['TARGETID']), set(['Z']), set(['TARGET_RA', 'RA']),
set(['TARGET_DEC', 'DEC'])
]
"""list(set): Required columns for all cases."""
_required_data_columns = [
set(['SURVEY']),
set(['COADD_LASTNIGHT', 'LAST_NIGHT', 'LASTNIGHT'])
]
"""list(set): Required columns for real data analysis."""
_required_tile_columns = [set(['TILEID']), set(['PETAL_LOC'])]
"""list(set): Required columns for tile data analysis."""
_optional_columns = [
'HPXPIXEL', 'PROGRAM', 'VMIN_CIV_450', 'VMAX_CIV_450', 'VMIN_CIV_2000',
'VMAX_CIV_2000'
]
"""list(str): Optional columns."""
_all_columns = [
col for reqset in (
_required_columns + _required_data_columns + _required_tile_columns
) for col in reqset
] + _optional_columns
[docs]
def read_quasar_catalog(
filename, is_mock=False, is_tile=False, keep_surveys=None,
zmin=0, zmax=100.0
):
""" Returns a quasar catalog object (ndarray).
It is sorted in the following order:
1. HPXPIXEL or TILEID
2. SURVEY or PETAL_LOC (if applicable),
3. TARGETID.
BAL info included if available. It is required for BAL masking.
If 'HPXPIXEL' column is not present, n_side is assumed 16 for mocks, 64 for
data.
Arguments
----------
filename: str
Filename to catalog.
is_mock: bool, default: False
If the catalog is for mocks.
is_tile: bool, default: False
If the catalog is for tiles. Duplicate TARGETIDs are allowed as long as
they are in different TILEIDs.
keep_surveys: None or list(str), default: None
List of surveys to subselect. None keeps all.
zmin: float, default: 0
Minimum quasar redshift
zmax: float, default: 100
Maximum quasar redshift
Returns
----------
catalog: :external+numpy:py:class:`ndarray <numpy.ndarray>`
Sorted catalog.
"""
n_side = 16 if is_mock else 64
catalog = _read(filename)
catalog = _validate_adjust_column_names(catalog, is_mock, is_tile)
catalog = _prime_catalog(
catalog, n_side, keep_surveys, zmin, zmax, is_tile)
return catalog
[docs]
def mpi_read_quasar_catalog(
filename, comm=None, mpi_rank=0, is_mock=False, is_tile=False,
keep_surveys=None, zmin=0, zmax=100
):
""" Returns the same quasar catalog object on all MPI ranks.
It is sorted in the following order: HPXPIXEL, SURVEY (if applicable),
TARGETID. BAL info included if available. It is required for BAL masking.
Can be used without MPI by passing ``comm=None`` and ``mpi_rank=0``.
Arguments
----------
filename: str
Filename to catalog.
comm: MPI comm object or None, default: None
MPI comm object for bcast
mpi_rank: int, default: 0
Rank of the MPI process
is_mock: bool, default: False
If the catalog is for mocks.
is_tile: bool, default: False
If the catalog is for tiles. Duplicate TARGETIDs are allowed.
keep_surveys: None or list(str), default: None
List of surveys to subselect. None keeps all.
zmin: float, default: 0
Minimum quasar redshift
zmax: float, default: 100
Maximum quasar redshift
Returns
----------
catalog: :external+numpy:py:class:`ndarray <numpy.ndarray>`
Sorted catalog on all MPI ranks.
Raises
------
QsonicException
If error occurs while reading the catalog.
"""
catalog = mpi_fnc_bcast(
read_quasar_catalog,
comm, mpi_rank, "Error while reading catalog.",
filename, is_mock, is_tile, keep_surveys, zmin, zmax)
return catalog
[docs]
def mpi_get_local_queue(
filename, comm=None, mpi_rank=0, mpi_size=1, is_mock=False,
is_tile=False, keep_surveys=None, zmin=0, zmax=100.0
):
"""Reads catalog on master and scatter a list of catalogs to every
mpi_rank. If in tile format, sort key is 'TILEID'.
Arguments
----------
filename: str
Filename to catalog.
comm: MPI comm object or None, default: None
MPI comm object for scatter
mpi_rank: int
Rank of the MPI process
mpi_size: int
Size of MPI processes
is_mock: bool, default: False
If the catalog is for mocks.
is_tile: bool, default: False
If the catalog is for tiles. Split key will be 'TILEID'. Duplicate
TARGETIDs are allowed.
keep_surveys: None or list(str), default: None
List of surveys to subselect. None keeps all.
zmin: float, default: 0
Minimum quasar redshift
zmax: float, default: 100
Maximum quasar redshift
Returns
----------
local_queue: list(:external+numpy:py:class:`ndarray <numpy.ndarray>`)
List of sorted catalogs.
"""
# We decide forest filename list
# Group into unique pixels
if mpi_rank == 0:
try:
catalog = read_quasar_catalog(
filename, is_mock, is_tile, keep_surveys, zmin, zmax)
status = True
except Exception as e:
logging.exception(e)
status = False
else:
status = False
status = comm.bcast(status)
if not status:
raise QsonicException("Error while reading catalog.")
if mpi_rank == 0:
split_key = "TILEID" if is_tile else "HPXPIXEL"
unique_pix, s = np.unique(catalog[split_key], return_index=True)
split_catalog = np.split(catalog, s[1:])
logging.info(
f"There are {unique_pix.size} keys ({split_key})."
" Don't use more MPI processes.")
# Roughly equal number of spectra
logging.info("Load balancing.")
# Returns a list of catalog (ndarray)
local_queue = balance_load(split_catalog, mpi_size)
else:
local_queue = None
return comm.scatter(local_queue)
[docs]
def _check_required_columns(required_cols, colnames):
"""Asserts all required columns are present.
Arguments
----------
required_cols: list(set)
Required columns as list of sets.
colnames: list(str)
Present column names
Raises
------
Exception
If none of a required set is in colnames.
"""
for reqset in required_cols:
if reqset.intersection(colnames):
continue
raise Exception(
"One of these columns must be present in the catalog: "
f"{', '.join(reqset)}!")
[docs]
def _validate_adjust_column_names(catalog, is_mock, is_tile):
""" Validate `catalog` for required columns in `_required_columns`.
'SURVEY' is also required for data. 'TARGET_{RA, DEC}' is transformed to
'RA' and 'DEC' in return.
Arguments
----------
catalog: :external+numpy:py:class:`ndarray <numpy.ndarray>`
Catalog.
is_mock: bool
If the catalog is for mocks, does not perform 'SURVEY' check.
is_tile: bool
If the catalog is for tiles, duplicate TARGETIDs are allowed as long as
they are in different TILEIDs.
Returns
----------
catalog: :external+numpy:py:class:`ndarray <numpy.ndarray>`
Catalog. No checks are performed.
"""
colnames = catalog.dtype.names
# Check if required columns are present
_check_required_columns(_required_columns, colnames)
if not is_mock:
_check_required_columns(_required_data_columns, colnames)
if is_tile:
_check_required_columns(_required_tile_columns, colnames)
nqso = catalog.size
logging.info(f"There are {nqso} quasars in the catalog.")
if not is_tile and nqso != np.unique(catalog['TARGETID']).size:
raise Exception("There are duplicate TARGETIDs in catalog!")
if is_tile and nqso != np.unique(catalog[['TARGETID', 'TILEID']]).size:
raise Exception(
"There are duplicate TARGETID - TILEID combinations in catalog!")
# Adjust column names
colname_map = {}
for x in ['RA', 'DEC']:
if (f'TARGET_{x}' in colnames) and (x not in colnames):
colname_map[f'TARGET_{x}'] = x
if ('LASTNIGHT' not in colnames) and ('COADD_LASTNIGHT' in colnames):
colname_map['COADD_LASTNIGHT'] = 'LASTNIGHT'
elif ('LASTNIGHT' not in colnames) and ('LAST_NIGHT' in colnames):
colname_map['LAST_NIGHT'] = 'LASTNIGHT'
if colname_map:
catalog = rename_fields(catalog, colname_map)
return catalog
[docs]
def _read(filename):
""" Reads FITS file catalog with all columns present that are in
`_all_columns`
Arguments
----------
filename: str
Returns
----------
catalog: :external+numpy:py:class:`ndarray <numpy.ndarray>`
Catalog. No checks are performed.
"""
logging.info(f'Reading catalogue from {filename}')
fitsfile = fitsio.FITS(filename)
extnames = [hdu.get_extname() for hdu in fitsfile]
cat_hdu = _accepted_extnames.intersection(extnames)
if not cat_hdu:
cat_hdu = 1
warnings.warn(
"Catalog HDU not found by hduname. Using extension 1.",
RuntimeWarning)
else:
cat_hdu = cat_hdu.pop()
colnames = fitsfile[cat_hdu].get_colnames()
keep_columns = [col for col in colnames if col in _all_columns]
catalog = np.array(fitsfile[cat_hdu].read(columns=keep_columns))
fitsfile.close()
return catalog
[docs]
def _add_healpix(catalog, n_side, keep_columns):
""" Add 'HPXPIXEL' column to catalog if not present.
Arguments
----------
catalog: :external+numpy:py:class:`ndarray <numpy.ndarray>`
Catalog.
n_side: int
Healpix nside.
keep_columns: list(str)
List of surveys to subselect.
Returns
----------
catalog: :external+numpy:py:class:`ndarray <numpy.ndarray>`
'HPXPIXEL' calculated and added catalog.
"""
if 'HPXPIXEL' not in keep_columns:
pixnum = ang2pix(
n_side, catalog['RA'], catalog['DEC'], lonlat=True, nest=True)
catalog = append_fields(
catalog, 'HPXPIXEL', pixnum, dtypes=int, usemask=False)
return catalog
[docs]
def _prime_catalog(
catalog, n_side, keep_surveys, zmin, zmax, is_tile
):
""" Returns quasar catalog object. It is sorted in the following order:
sort_key (HPXPIXEL), SURVEY (if applicable), TARGETID
Arguments
----------
catalog: :external+numpy:py:class:`ndarray <numpy.ndarray>`
Catalog.
n_side: int
Healpix nside.
keep_surveys: list(str)
List of surveys to subselect.
zmin: float
Minimum quasar redshift
zmax: float
Maximum quasar redshift
is_tile: bool
If the catalog is for tiles. Sort oder will be TILEID, PETAL_LOC.
Returns
----------
catalog: :external+numpy:py:class:`ndarray <numpy.ndarray>`
Sorted catalog. BAL info included if available (req. for BAL masking)
"""
colnames = catalog.dtype.names
# Redshift cuts
w = (catalog['Z'] >= zmin) & (catalog['Z'] <= zmax)
logging.info(f"There are {w.sum()} quasars in the redshift range.")
catalog = catalog[w]
if is_tile:
sort_order = ['TILEID', 'PETAL_LOC', 'TARGETID']
else:
sort_order = ["HPXPIXEL", 'TARGETID']
# Filter all the objects in the catalogue not belonging to the specified
# surveys.
if 'SURVEY' in colnames and keep_surveys is not None:
w = np.isin(catalog["SURVEY"], keep_surveys)
catalog = catalog[w]
if len(keep_surveys) > 1 and not is_tile:
sort_order.insert(1, 'SURVEY')
logging.info(
f"There are {w.sum()} quasars in given surveys {keep_surveys}.")
if catalog.size == 0:
raise Exception("Empty quasar catalogue.")
catalog = _add_healpix(catalog, n_side, colnames)
catalog.sort(order=sort_order)
return catalog