Source code for marvin.tools.modelcube

#!/usr/bin/env python
# encoding: utf-8
#
# @Author: José Sánchez-Gallego
# @Date: Nov 1, 2017
# @Filename: modelcube.py
# @License: BSD 3-Clause
# @Copyright: José Sánchez-Gallego


from __future__ import division
from __future__ import print_function
from __future__ import absolute_import

import distutils
import warnings

from astropy.io import fits
from astropy.wcs import WCS
import numpy as np

import marvin
import marvin.core.exceptions
import marvin.tools.spaxel
import marvin.utils.general.general
import marvin.tools.maps

from marvin.core.core import MarvinToolsClass, NSAMixIn, DAPallMixIn
from marvin.core.exceptions import MarvinError
from marvin.tools.quantities import DataCube, Spectrum
from marvin.utils.datamodel.dap import datamodel, Model
from marvin.utils.general import FuzzyDict
from marvin.utils.general.maskbit import get_manga_target


[docs]class ModelCube(MarvinToolsClass, NSAMixIn, DAPallMixIn): """A class to interface with MaNGA DAP model cubes. This class represents a DAP model cube, initialised either from a file, a database, or remotely via the Marvin API. In addition to the parameters and variables defined for `~.MarvinToolsClass`, the following parameters and attributes are specific to `.Maps`. Parameters: bintype (str or None): The binning type. For MPL-4, one of the following: ``'NONE', 'RADIAL', 'STON'`` (if ``None`` defaults to ``'NONE'``). For MPL-5, one of, ``'ALL', 'NRE', 'SPX', 'VOR10'`` (defaults to ``'SPX'``). MPL-6 also accepts the ``'HYB10'`` binning schema. template (str or None): The stellar template used. For MPL-4, one of ``'M11-STELIB-ZSOL', 'MILES-THIN', 'MIUSCAT-THIN'`` (if ``None``, defaults to ``'MIUSCAT-THIN'``). For MPL-5 and successive, the only option in ``'GAU-MILESHC'`` (``None`` defaults to it). Attributes: header (`astropy.io.fits.Header`): The header of the datacube. wcs (`astropy.wcs.WCS`): The WCS solution for this plate """ def __init__(self, input=None, filename=None, mangaid=None, plateifu=None, mode=None, data=None, release=None, drpall=None, download=None, nsa_source='auto', bintype=None, template=None, template_kin=None): if template_kin is not None: warnings.warn('template_kin is deprecated and will be removed in a future version.', DeprecationWarning) template = template_kin if template is None else template # _set_datamodel will replace these strings with datamodel objects. self.bintype = bintype self.template = template self.datamodel = None self._bitmasks = None MarvinToolsClass.__init__(self, input=input, filename=filename, mangaid=mangaid, plateifu=plateifu, mode=mode, data=data, release=release, drpall=drpall, download=download) NSAMixIn.__init__(self, nsa_source=nsa_source) # Checks that DAP is at least MPL-5 MPL5 = distutils.version.StrictVersion('2.0.2') if self.filename is None and distutils.version.StrictVersion(self._dapver) < MPL5: raise MarvinError('ModelCube requires at least dapver=\'2.0.2\'') self.header = None self.wcs = None self._wavelength = None self._redcorr = None self._shape = None # Model extensions self._extension_data = {} self._binned_flux = None self._redcorr = None self._full_fit = None self._emline_fit = None self._stellarcont_fit = None if self.data_origin == 'file': self._load_modelcube_from_file() elif self.data_origin == 'db': self._load_modelcube_from_db() elif self.data_origin == 'api': self._load_modelcube_from_api() else: raise marvin.core.exceptions.MarvinError( 'data_origin={0} is not valid'.format(self.data_origin)) # Confirm that drpver and dapver match the ones from the header. marvin.tools.maps.Maps._check_versions(self) def __repr__(self): """Representation for ModelCube.""" return ('<Marvin ModelCube (plateifu={0!r}, mode={1!r}, data_origin={2!r}, ' 'bintype={3!r}, template={4!r})>'.format(self.plateifu, self.mode, self.data_origin, str(self.bintype), str(self.template))) def __getitem__(self, xy): """Returns the spaxel for ``(x, y)``""" return self.getSpaxel(x=xy[1], y=xy[0], xyorig='lower') def _set_datamodel(self): """Sets the datamodel, template, and bintype.""" self.datamodel = datamodel[self.release].models self._bitmasks = datamodel[self.release].bitmasks self.bintype = self.datamodel.parent.get_bintype(self.bintype) self.template = self.datamodel.parent.get_template(self.template) def _getFullPath(self): """Returns the full path of the file in the tree.""" if not self.plateifu: return None plate, ifu = self.plateifu.split('-') daptype = '{0}-{1}'.format(self.bintype, self.template) return super(ModelCube, self)._getFullPath('mangadap5', ifu=ifu, drpver=self._drpver, dapver=self._dapver, plate=plate, mode='LOGCUBE', daptype=daptype)
[docs] def download(self): """Downloads the cube using sdss_access - Rsync""" if not self.plateifu: return None plate, ifu = self.plateifu.split('-') daptype = '{0}-{1}'.format(self.bintype, self.template) return super(ModelCube, self).download('mangadap5', ifu=ifu, drpver=self._drpver, dapver=self._dapver, plate=plate, mode='LOGCUBE', daptype=daptype)
def _load_modelcube_from_file(self): """Initialises a model cube from a file.""" if self.data is not None: assert isinstance(self.data, fits.HDUList), 'data is not an HDUList object' else: try: self.data = fits.open(self.filename) except IOError as err: raise IOError('filename {0} cannot be found: {1}'.format(self.filename, err)) self.header = self.data[0].header self._check_file(self.header, self.data, 'ModelCube') self.wcs = WCS(self.data['FLUX'].header) self._wavelength = self.data['WAVE'].data self._redcorr = self.data['REDCORR'].data self._shape = (self.data['FLUX'].header['NAXIS2'], self.data['FLUX'].header['NAXIS1']) self.plateifu = self.header['PLATEIFU'] self.mangaid = self.header['MANGAID'] # Checks and populates release. file_drpver = self.header['VERSDRP3'] file_drpver = 'v1_5_1' if file_drpver == 'v1_5_0' else file_drpver file_ver = marvin.config.lookUpRelease(file_drpver) assert file_ver is not None, 'cannot find file version.' if file_ver != self._release: warnings.warn('mismatch between file version={0} and object release={1}. ' 'Setting object release to {0}'.format(file_ver, self._release), marvin.core.exceptions.MarvinUserWarning) self._release = file_ver self._drpver, self._dapver = marvin.config.lookUpVersions(release=self._release) # Updates datamodel, bintype, and template with the versions from the header. self.datamodel = datamodel[self._dapver].models self.bintype = self.datamodel.parent.get_bintype(self.header['BINKEY'].strip().upper()) self.template = self.datamodel.parent.get_template(self.header['SCKEY'].strip().upper()) def _load_modelcube_from_db(self): """Initialises a model cube from the DB.""" mdb = marvin.marvindb plate, ifu = self.plateifu.split('-') if not mdb.isdbconnected: raise MarvinError('No db connected') else: datadb = mdb.datadb dapdb = mdb.dapdb dm = datamodel[self.release] if dm.db_only: if self.bintype not in dm.db_only: raise marvin.core.exceptions.MarvinError('Specified bintype {0} is not available in the DB'.format(self.bintype.name)) if self.data: assert isinstance(self.data, dapdb.ModelCube), \ 'data is not an instance of marvindb.dapdb.ModelCube.' else: # Initial query for version version_query = mdb.session.query(dapdb.ModelCube).join( dapdb.File, datadb.PipelineInfo, datadb.PipelineVersion).filter( datadb.PipelineVersion.version == self._dapver).from_self() # Query for model cube parameters db_modelcube = version_query.join( dapdb.File, datadb.Cube, datadb.IFUDesign).filter( datadb.Cube.plate == plate, datadb.IFUDesign.name == str(ifu)).from_self().join( dapdb.File, dapdb.FileType).filter(dapdb.FileType.value == 'LOGCUBE').join( dapdb.Structure, dapdb.BinType).join( dapdb.Template, dapdb.Structure.template_kin_pk == dapdb.Template.pk).filter( dapdb.BinType.name == self.bintype.name, dapdb.Template.name == self.template.name).all() if len(db_modelcube) > 1: raise MarvinError('more than one ModelCube found for ' 'this combination of parameters.') elif len(db_modelcube) == 0: raise MarvinError('no ModelCube found for this combination of parameters.') self.data = db_modelcube[0] self.header = self.data.file.primary_header self.wcs = WCS(self.data.file.cube.wcs.makeHeader()) self._wavelength = np.array(self.data.file.cube.wavelength.wavelength, dtype=np.float) self._redcorr = np.array(self.data.redcorr[0].value, dtype=np.float) self._shape = self.data.file.cube.shape.shape self.plateifu = str(self.header['PLATEIFU'].strip()) self.mangaid = str(self.header['MANGAID'].strip()) def _load_modelcube_from_api(self): """Initialises a model cube from the API.""" url = marvin.config.urlmap['api']['getModelCube']['url'] url_full = url.format(name=self.plateifu, bintype=self.bintype.name, template=self.template.name) try: response = self._toolInteraction(url_full) except Exception as ee: raise MarvinError('found a problem when checking if remote model cube ' 'exists: {0}'.format(str(ee))) data = response.getData() self.header = fits.Header.fromstring(data['header']) self.wcs = WCS(fits.Header.fromstring(data['wcs_header'])) self._wavelength = np.array(data['wavelength']) self._redcorr = np.array(data['redcorr']) self._shape = tuple(data['shape']) self.plateifu = str(self.header['PLATEIFU'].strip()) self.mangaid = str(self.header['MANGAID'].strip()) @property def manga_target1(self): """Return MANGA_TARGET1 flag.""" return get_manga_target('1', self._bitmasks, self.header) @property def manga_target2(self): """Return MANGA_TARGET2 flag.""" return get_manga_target('2', self._bitmasks, self.header) @property def manga_target3(self): """Return MANGA_TARGET3 flag.""" return get_manga_target('3', self._bitmasks, self.header) @property def target_flags(self): """Bundle MaNGA targeting flags.""" return [self.manga_target1, self.manga_target2, self.manga_target3] @property def quality_flag(self): """Return ModelCube DAPQUAL flag.""" dapqual = self._bitmasks['MANGA_DAPQUAL'] dapqual.mask = int(self.header['DAPQUAL']) return dapqual @property def pixmask(self): """Return the DAPSPECMASK flag.""" pixmask = self._bitmasks['MANGA_DAPSPECMASK'] pixmask.mask = getattr(self.binned_flux, 'mask', None) return pixmask
[docs] def getSpaxel(self, x=None, y=None, ra=None, dec=None, drp=True, properties=True, **kwargs): """Returns the :class:`~marvin.tools.spaxel.Spaxel` matching certain coordinates. The coordinates of the spaxel to return can be input as ``x, y`` pixels relative to``xyorig`` in the cube, or as ``ra, dec`` celestial coordinates. If ``spectrum=True``, the returned |spaxel| will be instantiated with the DRP spectrum of the spaxel for the DRP cube associated with this ModelCube. The same is true for ``properties=True`` for the DAP properties of the spaxel in the Maps associated with these coordinates. Parameters: x,y (int or array): The spaxel coordinates relative to ``xyorig``. If ``x`` is an array of coordinates, the size of ``x`` must much that of ``y``. ra,dec (float or array): The coordinates of the spaxel to return. The closest spaxel to those coordinates will be returned. If ``ra`` is an array of coordinates, the size of ``ra`` must much that of ``dec``. xyorig ({'center', 'lower'}): The reference point from which ``x`` and ``y`` are measured. Valid values are ``'center'`` (default), for the centre of the spatial dimensions of the cube, or ``'lower'`` for the lower-left corner. This keyword is ignored if ``ra`` and ``dec`` are defined. drpa (bool): If ``True``, the |spaxel| will be initialised with the corresponding DRP data. properties (bool): If ``True``, the |spaxel| will be initialised with the corresponding DAP properties for this spaxel. Returns: spaxels (list): The |spaxel|_ objects for this cube/maps corresponding to the input coordinates. The length of the list is equal to the number of input coordinates. .. |spaxel| replace:: :class:`~marvin.tools.spaxel.Spaxel` """ return marvin.utils.general.general.getSpaxel( x=x, y=y, ra=ra, dec=dec, cube=drp, maps=properties, modelcube=self, **kwargs)
def _get_extension_data(self, name, ext=None): """Returns the data from an extension.""" model = self.datamodel[name] ext_name = model.fits_extension(ext) if ext_name in self._extension_data: return self._extension_data[ext_name] if self.data_origin == 'file': ext_data = self.data[model.fits_extension(ext)].data elif self.data_origin == 'db': # If the table is "spaxel", this must be a 3D cube. If it is "cube", # uses self.data, which is basically the DataModelClass.Cube instance. ext_data = self.data.get3DCube(model.db_column(ext)) elif self.data_origin == 'api': params = {'release': self._release} url = marvin.config.urlmap['api']['getModelCubeExtension']['url'] try: response = self._toolInteraction( url.format(name=self.plateifu, modelcube_extension=model.fits_extension(ext).lower(), bintype=self.bintype.name, template=self.template.name), params=params) except Exception as ee: raise MarvinError('found a problem when checking if remote ' 'modelcube exists: {0}'.format(str(ee))) data = response.getData() cube_ext_data = data['extension_data'] ext_data = np.array(cube_ext_data) if cube_ext_data is not None else None self._extension_data[ext_name] = ext_data return ext_data def _get_spaxel_quantities(self, x, y): """Returns a dictionary of spaxel quantities.""" modelcube_quantities = FuzzyDict({}) if self.data_origin == 'db': session = marvin.marvindb.session dapdb = marvin.marvindb.dapdb if self.data_origin == 'file' or self.data_origin == 'db': _db_row = None for dm in self.datamodel: data = {'value': None, 'ivar': None, 'mask': None} for key in data: if key == 'ivar' and not dm.has_ivar(): continue if key == 'mask' and not dm.has_mask(): continue if self.data_origin == 'file': extname = dm.fits_extension(None if key == 'value' else key) data[key] = self.data[extname].data[:, y, x] elif self.data_origin == 'db': colname = dm.db_column(None if key == 'value' else key) if not _db_row: _db_row = session.query(dapdb.ModelSpaxel).filter( dapdb.ModelSpaxel.modelcube_pk == self.data.pk, dapdb.ModelSpaxel.x == x, dapdb.ModelSpaxel.y == y).one() data[key] = np.array(getattr(_db_row, colname)) modelcube_quantities[dm.name] = Spectrum(data['value'], ivar=data['ivar'], mask=data['mask'], wavelength=self._wavelength, unit=dm.unit) if self.data_origin == 'api': params = {'release': self._release} url = marvin.config.urlmap['api']['getModelCubeQuantitiesSpaxel']['url'] try: response = self._toolInteraction(url.format(name=self.plateifu, x=x, y=y, bintype=self.bintype.name, template=self.template.name, params=params)) except Exception as ee: raise MarvinError('found a problem when checking if remote modelcube ' 'exists: {0}'.format(str(ee))) data = response.getData() for dm in self.datamodel: modelcube_quantities[dm.name] = Spectrum(data[dm.name]['value'], ivar=data[dm.name]['ivar'], mask=data[dm.name]['mask'], wavelength=data['wavelength'], unit=dm.unit) return modelcube_quantities
[docs] def get_binid(self, model=None): """Returns the 2D array for the binid map associated with ``model``.""" assert model is None or isinstance(model, Model), 'invalid model type.' if model is not None: binid_prop = model.binid else: binid_prop = self.datamodel.parent.default_binid # Before MPL-6, the modelcube does not include the binid extension, # so we need to get the binid map from the associated MAPS. if (distutils.version.StrictVersion(self._dapver) < distutils.version.StrictVersion('2.1')): return self.getMaps().get_binid() if self.data_origin == 'file': if binid_prop.channel is None: return self.data[binid_prop.name].data[:, :] else: return self.data[binid_prop.name].data[binid_prop.channel.idx, :, :] elif self.data_origin == 'db': mdb = marvin.marvindb table = mdb.dapdb.ModelSpaxel column = getattr(table, binid_prop.db_column()) binid_list = mdb.session.query(column).filter( table.modelcube_pk == self.data.pk).order_by(table.x, table.y).all() nx = ny = int(np.sqrt(len(binid_list))) binid_array = np.array(binid_list) return binid_array.transpose().reshape((ny, nx)).transpose(1, 0) elif self.data_origin == 'api': params = {'release': self._release} url = marvin.config.urlmap['api']['getModelCubeBinid']['url'] extension = model.fits_extension().lower() if model is not None else 'flux' try: response = self._toolInteraction( url.format(name=self.plateifu, modelcube_extension=extension, bintype=self.bintype.name, template=self.template.name), params=params) except Exception as ee: raise MarvinError('found a problem when checking if remote ' 'modelcube exists: {0}'.format(str(ee))) if response.results['error'] is not None: raise MarvinError('found a problem while getting the binid from API: {}' .format(str(response.results['error']))) return np.array(response.getData()['binid'])
@property def binned_flux(self): """Returns the binned flux datacube.""" model = self.datamodel['binned_flux'] binned_flux_array = self._get_extension_data('flux') binned_flux_ivar = self._get_extension_data('flux', 'ivar') binned_flux_mask = self._get_extension_data('flux', 'mask') return DataCube(binned_flux_array, np.array(self._wavelength), ivar=binned_flux_ivar, mask=binned_flux_mask, redcorr=self._redcorr, binid=self.get_binid(model), unit=model.unit) @property def full_fit(self): """Returns the full fit datacube.""" model = self.datamodel['full_fit'] model_array = self._get_extension_data('full_fit') model_mask = self._get_extension_data('flux', 'mask') return DataCube(model_array, np.array(self._wavelength), ivar=None, mask=model_mask, redcorr=self._redcorr, binid=self.get_binid(model), unit=model.unit) @property def emline_fit(self): """Returns the emission line fit.""" model = self.datamodel['emline_fit'] emline_array = self._get_extension_data('emline_fit') emline_mask = self._get_extension_data('emline_fit', 'mask') return DataCube(emline_array, np.array(self._wavelength), ivar=None, mask=emline_mask, redcorr=self._redcorr, binid=self.get_binid(model), unit=model.unit) @property def stellarcont_fit(self): """Returns the stellar continuum fit.""" array = (self._get_extension_data('full_fit') - self._get_extension_data('emline_fit') - self._get_extension_data('emline_base_fit')) model = self.datamodel['full_fit'] stellarcont_mask = self._get_extension_data('flux', 'mask') return DataCube(array, np.array(self._wavelength), ivar=None, mask=stellarcont_mask, redcorr=self._redcorr, binid=self.get_binid(model), unit=model.unit)
[docs] def getCube(self): """Returns the associated `~marvin.tools.cube.Cube`.""" if self.data_origin == 'db': cube_data = self.data.file.cube else: cube_data = None return marvin.tools.cube.Cube(data=cube_data, plateifu=self.plateifu, release=self.release)
[docs] def getMaps(self): """Returns the associated`~marvin.tools.maps.Maps`.""" return marvin.tools.maps.Maps(plateifu=self.plateifu, bintype=self.bintype, template=self.template, release=self.release)
[docs] def is_binned(self): """Returns True if the ModelCube is not unbinned.""" return self.bintype.binned
[docs] def get_unbinned(self): """Returns a version of ``self`` corresponding to the unbinned ModelCube.""" if not self.is_binned: return self else: return ModelCube(plateifu=self.plateifu, release=self.release, bintype=self.datamodel.parent.get_unbinned(), template=self.template, mode=self.mode)