Source code for marvin.tools.rss

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: Brian Cherinka, José Sánchez-Gallego, and Brett Andrews
# @Date: 2016-04-11
# @Filename: rss.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)
#
# @Last modified by: José Sánchez-Gallego (gallegoj@uw.edu)
# @Last modified time: 2018-07-30 19:42:17


from __future__ import division, print_function

import os
import warnings

import astropy.io.ascii
import astropy.table
import astropy.units
import astropy.wcs
import numpy
from astropy.io import fits

import marvin
from marvin.core.exceptions import MarvinError, MarvinUserWarning
from marvin.utils.datamodel.drp import datamodel_rss
from marvin.utils.datamodel.drp.base import Spectrum as SpectrumDataModel

from .core import MarvinToolsClass
from .cube import Cube
from .mixins import NSAMixIn
from .quantities.spectrum import Spectrum


[docs]class RSS(MarvinToolsClass, NSAMixIn, list): """A class to interface with a MaNGA DRP row-stacked spectra file. This class represents a fully reduced DRP row-stacked spectra object, initialised either from a file, a database, or remotely via the Marvin API. Instances of `.RSS` are a list of `.RSSFiber` objects, one for each fibre and exposure. `.RSSFiber` are initialised lazily, containing only basic information. They need to be initialised by calling `.RSSFiber.load` (unless `.RSS.autoload` is ``True``, in which case the instance is loaded when first accessed). In addition to the input arguments supported by `~.MarvinToolsClass` and `~.NSAMixIn`, this class accepts an ``autoload`` keyword argument that defines whether `.RSSFiber` objects should be automatically loaded when they are accessed. """ _qualflag = 'DRP3QUAL' def __init__(self, input=None, filename=None, mangaid=None, plateifu=None, mode=None, data=None, release=None, autoload=True, drpall=None, download=None, nsa_source='auto'): MarvinToolsClass.__init__(self, input=input, filename=filename, mangaid=mangaid, plateifu=plateifu, mode=mode, data=data, release=release, drpall=drpall, download=download) NSAMixIn.__init__(self, nsa_source=nsa_source) #: An `astropy.table.Table` with the observing information associated #: with this RSS object. self.obsinfo = None #: If True, unloaded `.RSSFiber` instances are automatically loaded #: when accessed. Otherwise, they need to be loaded via `.RSSFiber.load`. self.autoload = autoload if self.data_origin == 'file': self._load_rss_from_file(data=self.data) elif self.data_origin == 'db': self._load_rss_from_db(data=self.data) elif self.data_origin == 'api': self._load_rss_from_api() Cube._init_attributes(self) # Checks that the drpver set in MarvinToolsClass matches the header header_drpver = self.header['VERSDRP3'].strip() header_drpver = 'v1_5_1' if header_drpver == 'v1_5_0' else header_drpver assert header_drpver == self._drpver, ('mismatch between cube._drpver={0} ' 'and header drpver={1}'.format(self._drpver, header_drpver)) # EXPNUM in obsinfo is a string. Cast it to int self.obsinfo['EXPNUM'] = self.obsinfo['EXPNUM'].astype(numpy.int32) # Inits self as an empty list. list.__init__(self, []) self._populate_fibres() def _set_datamodel(self): """Sets the datamodel for DRP.""" self.datamodel = datamodel_rss[self.release.upper()] self._bitmasks = datamodel_rss[self.release.upper()].bitmasks def __repr__(self): """Representation for RSS.""" return ('<Marvin RSS (mangaid={self.mangaid!r}, plateifu={self.plateifu!r}, ' 'mode={self.mode!r}, data_origin={self.data_origin!r})>'.format(self=self)) def __getitem__(self, fiberid): """Returns the `.RSSFiber` whose fiberid matches the input.""" rssfiber = super(RSS, self).__getitem__(fiberid) if self.autoload and not rssfiber.loaded: rssfiber.load() return rssfiber def _getFullPath(self): """Returns the full path of the file in the tree.""" if not self.plateifu: return None plate, ifu = self.plateifu.split('-') return super(RSS, self)._getFullPath('mangarss', ifu=ifu, drpver=self._drpver, plate=plate, wave='LOG')
[docs] def download(self): """Downloads the cube using sdss_access - Rsync""" if not self.plateifu: return None plate, ifu = self.plateifu.split('-') return super(RSS, self).download('mangarss', ifu=ifu, drpver=self._drpver, plate=plate, wave='LOG')
[docs] def getCube(self): """Returns the `~marvin.tools.cube.Cube` associated with this RSS.""" return Cube(plateifu=self.plateifu, mode=self.mode, release=self.release)
[docs] def load_all(self): """Loads all the `.RSSFiber` associated to this `.RSS` instance.""" for rssfiber in self: if not rssfiber.loaded: rssfiber.load()
[docs] def select_fibers(self, exposure_no=None, set=None, mjd=None): """Selects fibres that match one or multiple of the input parameters. Parameters ---------- exposure_no : int The exposure number. Ignored if ``None``. set : int The set id of the exposure. Ignored if ``None``. mjd : int The MJD of the exposure. Ignored if ``None``. Returns ------- rssfibers : list A list of `.RSSFiber` instances whose obsinfo matches all the input parameters. The `.RSS.autoload` option is respected. Example ------- >>> rss = marvin.tools.RSS('8485-1901') >>> fibers = rss.select_fibers(set=2) >>> fibers [<RSSFiber [ 2.22306705, 11.84955406, 9.65761662, ..., 0. , 0. , 0. ] 1e-17 erg / (Angstrom cm2 fiber s)>, <RSSFiber [2.18669987, 1.4861778 , 2.55065155, ..., 0. , 0. , 0. ] 1e-17 erg / (Angstrom cm2 fiber s)>, <RSSFiber [2.75228763, 5.53485441, 2.31695175, ..., 0. , 0. , 0. ] 1e-17 erg / (Angstrom cm2 fiber s)>] """ mask_exp = (self.obsinfo['EXPNUM'].astype(int) == exposure_no) if exposure_no else True mask_set = (self.obsinfo['SET'].astype(int) == set) if set else True mask_mjd = (self.obsinfo['MJD'].astype(int) == mjd) if mjd else True mask = mask_exp & mask_set & mask_mjd valid_exposures = numpy.where(mask)[0] n_exposures = len(self.obsinfo) n_fibres_per_exposure = self._nfibers // n_exposures fibre_to_exposure = numpy.arange(self._nfibers) // n_fibres_per_exposure fibres_in_valid_exposures = numpy.where(numpy.in1d(fibre_to_exposure, valid_exposures))[0] return [self[ii] for ii in fibres_in_valid_exposures]
def _load_rss_from_file(self, data=None): """Initialises the RSS object from a file.""" if data is not None: assert isinstance(data, fits.HDUList), 'data is not an HDUList object' else: try: self.data = fits.open(self.filename) except (IOError, OSError) as err: raise OSError('filename {0} cannot be found: {1}'.format(self.filename, err)) self.header = self.data[1].header self.wcs = astropy.wcs.WCS(self.header) self.wcs = self.wcs.dropaxis(1) # The header creates an empty axis for the exposures. # Confirm that this is a RSS file assert 'XPOS' in self.data and self.header['CTYPE1'] == 'WAVE-LOG', \ 'invalid file type. It does not appear to be a LOGRSS.' self._wavelength = self.data['WAVE'].data self._shape = None self._nfibers = self.data['FLUX'].shape[0] self.obsinfo = astropy.table.Table(self.data['OBSINFO'].data) Cube._do_file_checks(self) def _load_rss_from_db(self, data=None): """Initialises the RSS object from the DB. At this time the DB does not contain enough information to successfully instantiate a RSS object so we hack the data access mode to try to use files. For users this should be irrelevant since they rarely will have a Marvin DB. For the API, it means the access to RSS data will happen via files. """ warnings.warn('DB mode is not working for RSS. Trying file access mode.', MarvinUserWarning) fullpath = self._getFullPath() if fullpath and os.path.exists(fullpath): self.filename = fullpath self.data_origin = 'file' self._load_rss_from_file() else: raise MarvinError('cannot find a valid RSS file for ' 'plateifu={self.plateifu!r}, release={self.release!r}' .format(self=self)) def _load_rss_from_api(self): """Initialises the RSS object using the remote API.""" # Checks that the RSS exists. routeparams = {'name': self.plateifu} url = marvin.config.urlmap['api']['getRSS']['url'].format(**routeparams) try: response = self._toolInteraction(url.format(name=self.plateifu)) except Exception as ee: raise MarvinError('found a problem when checking if remote RSS ' 'exists: {0}'.format(str(ee))) data = response.getData() self.header = fits.Header.fromstring(data['header']) self.wcs = astropy.wcs.WCS(fits.Header.fromstring(data['wcs_header'])) self._wavelength = data['wavelength'] self._nfibers = data['nfibers'] self.obsinfo = astropy.io.ascii.read(data['obsinfo']) if self.plateifu != data['plateifu']: raise MarvinError('remote RSS has a different plateifu!') return def _populate_fibres(self): """Populates the internal list of fibres.""" n_exposures = len(self.obsinfo) n_fibres_per_exposure = self._nfibers // n_exposures for fiberid in range(self._nfibers): exp_index = fiberid // n_fibres_per_exposure exp_obsinfo = self.obsinfo[[exp_index]] self.append(RSSFiber(fiberid, self, self._wavelength, load=False, obsinfo=exp_obsinfo, pixmask_flag=self.header['MASKNAME']))
[docs]class RSSFiber(Spectrum): """A `~astropy.units.Quantity` representing a fibre observation. Represents the spectral flux observed though a fibre, and associated with an `.RSS` object. In addition to the flux, it contains information about the inverse variance, mask, and other associated spectra defined in the datamodel. Parameters ---------- fiberid : int The fiberid (0-indexed row in the parent `.RSS` object) for this fibre observation. rss : `.RSS` The parent `.RSS` object with which this fibre observation is associated. wavelength : numpy.ndarray The wavelength positions of each array element, in Angstrom. load : bool Whether the information in the `.RSSFiber` should be loaded during instantiation. Defaults to lazy loading (use `.RSSFiber.load` to load the fibre information). obsinfo : astropy.table.Table A `~astropy.table.Table` with the information for the exposure to which this fibre observation belongs. kwargs : dict Additional keyword arguments to be passed to `.Spectrum`. """ def __new__(cls, fiberid, rss, wavelength, pixmask_flag=None, load=False, obsinfo=None, **kwargs): # For now we instantiate a mostly empty Spectrum. Proper instantiation # will happen in load(). array_size = len(wavelength) obj = super(RSSFiber, cls).__new__( cls, numpy.zeros(array_size, dtype=numpy.float64), wavelength, scale=None, unit=None,) obj._extra_attributes = ['fiberid', 'rss', 'loaded', 'obsinfo'] obj._spectra = [] return obj def __init__(self, fiberid, rss, wavelength, pixmask_flag=None, load=False, obsinfo=None, **kwargs): self.fiberid = fiberid self.rss = rss self.obsinfo = obsinfo self.pixmask_flag = pixmask_flag self.loaded = False if load: self.load() def __repr__(self): if not self.loaded: return ('<RSSFiber (plateifu={self.rss.plateifu!r}, ' 'fiberid={self.fiberid!r}, loaded={self.loaded!r})>'.format(self=self)) else: return super(RSSFiber, self).__repr__() def __array_finalize__(self, obj): if obj is None: return super(RSSFiber, self).__array_finalize__(obj) # Adds _extra_attributes from the previous object. if hasattr(obj, '_extra_attributes'): for attr in obj._extra_attributes: setattr(self, attr, getattr(obj, attr, None)) self._extra_attributes = getattr(obj, '_extra_attributes', None) # Adds the additional spectra from the previous object. if hasattr(obj, '_spectra'): for spectrum in obj._spectra: setattr(self, spectrum, getattr(obj, spectrum, None)) self._spectra = getattr(obj, '_spectra', None) def __getitem__(self, sl): new_obj = super(RSSFiber, self).__getitem__(sl) for spectra_name in self._spectra: current_spectrum = getattr(self, spectra_name, None) new_spectrum = None if current_spectrum is None else current_spectrum.__getitem__(sl) setattr(new_obj, spectra_name, new_spectrum) return new_obj
[docs] def load(self): """Loads the fibre information.""" assert self.loaded is False, 'object already loaded.' # Depending on whether the parent RSS is a file or API-populated, we # select the data to use. if self.rss.data_origin == 'file': # If the data origin is a file we use the HDUList in rss.data rss_data = self.rss.data elif self.rss.data_origin == 'api': # If data origin is the API, we make a request for the data # associated with this fiberid for all the extensions in the file. url = marvin.config.urlmap['api']['getRSSFiber']['url'] try: response = self.rss._toolInteraction(url.format(name=self.rss.plateifu, fiberid=self.fiberid)) except Exception as ee: raise MarvinError('found a problem retrieving RSS fibre data for ' 'plateifu={!r}, fiberid={!r}: {}'.format( self.rss.plateifu, self.fiberid, str(ee))) api_data = response.getData() # Create a quick and dirty HDUList from the API data so that we # can parse it in the same way as if the data origin is file. rss_data = astropy.io.fits.HDUList([astropy.io.fits.PrimaryHDU()]) for ext in api_data: rss_data.append(astropy.io.fits.ImageHDU(data=api_data[ext], name=ext.upper())) else: raise ValueError('invalid data_origin={!r}'.format(self.rss.data_origin)) # Compile a list of all RSS datamodel extensions, either RSS or spectra datamodel_extensions = self.rss.datamodel.rss + self.rss.datamodel.spectra for extension in datamodel_extensions: # Retrieve the value (and mask and ivar, if associated) for each extension. value, ivar, mask = self._get_extension_data(extension, rss_data, data_origin=self.rss.data_origin) if extension.name == 'flux': self.value[:] = value[:] self.ivar = ivar self.mask = mask self._set_unit(extension.unit) else: new_spectrum = Spectrum(value, self.wavelength, ivar=ivar, mask=mask, unit=extension.unit) setattr(self, extension.name, new_spectrum) self._spectra.append(extension.name) self.loaded = True
def _get_extension_data(self, extension, data, data_origin='file'): """Returns the value of an extension for this fibre, either from file or API. Parameters ---------- extension : datamodel object The datamodel object containing the information for the extension we want to retrieve. data : ~astropy.io.fits.HDUList An `~astropy.io.fits.HDUList` object containing the RSS information. """ # Determine if this is an RSS datamodel object or an spectrum. # If the origin is the API, the extension data contains a single spectrum, # not a row-stacked array, so we consider it a 1D array. is_extension_data_1D = isinstance(extension, SpectrumDataModel) or data_origin == 'api' value = data[extension.fits_extension()].data if extension.has_mask(): mask = data[extension.fits_extension('mask')].data else: mask = None if hasattr(extension, 'has_ivar') and extension.has_ivar(): ivar = data[extension.fits_extension('ivar')].data elif hasattr(extension, 'has_std') and extension.has_std(): std = data[extension.fits_extension('std')].data ivar = 1. / (std**2) else: ivar = None # If this is an RSS, gets the right row in the stacked spectra. if not is_extension_data_1D: value = value[self.fiberid, :] mask = mask[self.fiberid, :] if mask is not None else None ivar = ivar[self.fiberid, :] if ivar is not None else None return value, ivar, mask @property def masked(self): """Return a masked array where the mask is greater than zero.""" assert self.mask is not None, 'mask is not set' return numpy.ma.array(self.value, mask=(self.mask > 0))
[docs] def descale(self): """Returns a copy of the object in which the scale is unity. Note that this only affects to the core value of this quantity. Associated array attributes will not be modified. Example: >>> fiber.unit Unit("1e-17 erg / (Angstrom cm2 fiber s)") >>> fiber[100] <RSSFiber 0.270078063011169 1e-17 erg / (Angstrom cm2 fiber s)> >>> fiber_descaled = fiber.descale() >>> fiber_descaled.unit Unit("Angstrom cm2 fiber s") >>> fiber[100] <RSSFiber 2.70078063011169e-18 erg / (Angstrom cm2 fiber s)> """ if self.unit.scale == 1: return self value_descaled = self.value * self.unit.scale value_unit = astropy.units.CompositeUnit(1, self.unit.bases, self.unit.powers) if self.ivar is not None: ivar_descaled = self.ivar / (self.unit.scale ** 2) else: ivar_descaled = None copy_of_self = self.copy() copy_of_self.value[:] = value_descaled copy_of_self.ivar = ivar_descaled copy_of_self._set_unit(value_unit) return copy_of_self