Source code for marvin.tools.modelcube

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: Brian Cherinka, José Sánchez-Gallego, and Brett Andrews
# @Date: 2017-11-01
# @Filename: modelcube.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)
#
# @Last modified by: José Sánchez-Gallego (gallegoj@uw.edu)
# @Last modified time: 2018-11-14 11:48:16


from __future__ import absolute_import, division, print_function

import distutils
import warnings

import numpy as np
from pkg_resources import parse_version
from astropy.io import fits
from astropy.wcs import WCS

import marvin
import marvin.core.exceptions
import marvin.tools.maps
import marvin.tools.spaxel
import marvin.utils.general.general
from marvin.core.exceptions import MarvinError
from marvin.tools.quantities import DataCube, Map, Spectrum
from marvin.utils.datamodel.dap import Model, datamodel
from marvin.utils.general import FuzzyDict, gunzip, check_versions

from .core import MarvinToolsClass
from .mixins import DAPallMixIn, GetApertureMixIn, NSAMixIn


[docs]class ModelCube(MarvinToolsClass, NSAMixIn, DAPallMixIn, GetApertureMixIn): """A class to interface with MaNGA DAP model cubes. This class represents a DAP model cube, initialised either from a file, a database, or remotely via the Marvin API. In addition to the parameters and variables defined for `~.MarvinToolsClass`, the following parameters and attributes are specific to `.Maps`. Parameters: bintype (str or None): The binning type. For MPL-4, one of the following: ``'NONE', 'RADIAL', 'STON'`` (if ``None`` defaults to ``'NONE'``). For MPL-5, one of, ``'ALL', 'NRE', 'SPX', 'VOR10'`` (defaults to ``'SPX'``). MPL-6 also accepts the ``'HYB10'`` binning schema. template (str or None): The stellar template used. For MPL-4, one of ``'M11-STELIB-ZSOL', 'MILES-THIN', 'MIUSCAT-THIN'`` (if ``None``, defaults to ``'MIUSCAT-THIN'``). For MPL-5 and successive, the only option in ``'GAU-MILESHC'`` (``None`` defaults to it). Attributes: header (`astropy.io.fits.Header`): The header of the datacube. wcs (`astropy.wcs.WCS`): The WCS solution for this plate """ def __init__(self, input=None, filename=None, mangaid=None, plateifu=None, mode=None, data=None, release=None, drpall=None, download=None, nsa_source='auto', bintype=None, template=None, template_kin=None): if template_kin is not None: warnings.warn('template_kin is deprecated and will be removed in a future version.', DeprecationWarning) template = template_kin if template is None else template # _set_datamodel will replace these strings with datamodel objects. self.bintype = bintype self.template = template self.datamodel = None self._bitmasks = None MarvinToolsClass.__init__(self, input=input, filename=filename, mangaid=mangaid, plateifu=plateifu, mode=mode, data=data, release=release, drpall=drpall, download=download) NSAMixIn.__init__(self, nsa_source=nsa_source) # Checks that DAP is at least MPL-5 MPL5 = distutils.version.StrictVersion('2.0.2') if self.filename is None and distutils.version.StrictVersion(self._dapver) < MPL5: raise MarvinError('ModelCube requires at least dapver=\'2.0.2\'') self.header = None self.wcs = None self._wavelength = None self._redcorr = None self._shape = None # Model extensions self._extension_data = {} self._binned_flux = None self._redcorr = None self._full_fit = None self._emline_fit = None self._stellarcont_fit = None if self.data_origin == 'file': self._load_modelcube_from_file() elif self.data_origin == 'db': self._load_modelcube_from_db() elif self.data_origin == 'api': self._load_modelcube_from_api() else: raise marvin.core.exceptions.MarvinError( 'data_origin={0} is not valid'.format(self.data_origin)) # Confirm that drpver and dapver match the ones from the header. marvin.tools.maps.Maps._check_versions(self) def __repr__(self): """Representation for ModelCube.""" return ('<Marvin ModelCube (plateifu={0!r}, mode={1!r}, data_origin={2!r}, ' 'bintype={3!r}, template={4!r})>'.format(self.plateifu, self.mode, self.data_origin, str(self.bintype), str(self.template))) def __getitem__(self, xy): """Returns the spaxel for ``(x, y)``""" return self.getSpaxel(x=xy[1], y=xy[0], xyorig='lower') def _set_datamodel(self): """Sets the datamodel, template, and bintype.""" self.datamodel = datamodel[self.release].models self._bitmasks = datamodel[self.release].bitmasks self.bintype = self.datamodel.parent.get_bintype(self.bintype) self.template = self.datamodel.parent.get_template(self.template) def _getFullPath(self): """Returns the full path of the file in the tree.""" if not self.plateifu: return None plate, ifu = self.plateifu.split('-') daptype = '{0}-{1}'.format(self.bintype, self.template) return super(ModelCube, self)._getFullPath('mangadap', ifu=ifu, drpver=self._drpver, dapver=self._dapver, plate=plate, mode='LOGCUBE', daptype=daptype)
[docs] def download(self): """Downloads the cube using sdss_access - Rsync""" if not self.plateifu: return None plate, ifu = self.plateifu.split('-') daptype = '{0}-{1}'.format(self.bintype, self.template) return super(ModelCube, self).download('mangadap', ifu=ifu, drpver=self._drpver, dapver=self._dapver, plate=plate, mode='LOGCUBE', daptype=daptype)
def _load_modelcube_from_file(self): """Initialises a model cube from a file.""" if self.data is not None: assert isinstance(self.data, fits.HDUList), 'data is not an HDUList object' else: try: with gunzip(self.filename) as gg: self.data = fits.open(gg.name) except IOError as err: raise IOError('filename {0} cannot be found: {1}'.format(self.filename, err)) self.header = self.data[0].header self._check_file(self.header, self.data, 'ModelCube') self.wcs = WCS(self.data['FLUX'].header) self._wavelength = self.data['WAVE'].data self._redcorr = self.data['REDCORR'].data self._shape = (self.data['FLUX'].header['NAXIS2'], self.data['FLUX'].header['NAXIS1']) self.plateifu = self.header['PLATEIFU'] self.mangaid = self.header['MANGAID'] # Checks and populates release. file_drpver = self.header['VERSDRP3'] file_drpver = 'v1_5_1' if file_drpver == 'v1_5_0' else file_drpver file_ver = marvin.config.lookUpRelease(file_drpver) assert file_ver is not None, 'cannot find file version.' if file_drpver != self._drpver: warnings.warn('mismatch between file version={0} and object release={1}. ' 'Setting object release to {0}'.format(file_ver, self._release), marvin.core.exceptions.MarvinUserWarning) self._release = file_ver self._drpver, self._dapver = marvin.config.lookUpVersions(release=self._release) # Updates datamodel, bintype, and template with the versions from the header. self.datamodel = datamodel[self._dapver].models self.bintype = self.datamodel.parent.get_bintype(self.header['BINKEY'].strip().upper()) if check_versions(self._dapver, datamodel['MPL-8'].release): tempkey = self.header['DAPTYPE'].split('-', 1)[-1] else: tempkey = self.header['SCKEY'] self.template = self.datamodel.parent.get_template(tempkey.strip().upper()) def _load_modelcube_from_db(self): """Initialises a model cube from the DB.""" mdb = marvin.marvindb plate, ifu = self.plateifu.split('-') if not mdb.isdbconnected: raise MarvinError('No db connected') else: datadb = mdb.datadb dapdb = mdb.dapdb dm = datamodel[self.release] if dm.db_only: if self.bintype not in dm.db_only: raise marvin.core.exceptions.MarvinError( 'Specified bintype {0} is not ' 'available in the DB'.format(self.bintype.name)) if self.data: assert isinstance(self.data, dapdb.ModelCube), \ 'data is not an instance of marvindb.dapdb.ModelCube.' else: # Initial query for version version_query = mdb.session.query(dapdb.ModelCube).join( dapdb.File, datadb.PipelineInfo, datadb.PipelineVersion).filter( datadb.PipelineVersion.version == self._dapver).from_self() # Query for model cube parameters db_modelcube = version_query.join( dapdb.File, datadb.Cube, datadb.IFUDesign).filter( datadb.Cube.plate == plate, datadb.IFUDesign.name == str(ifu)).from_self().join( dapdb.File, dapdb.FileType).filter(dapdb.FileType.value == 'LOGCUBE').join( dapdb.Structure, dapdb.BinType).join( dapdb.Template, dapdb.Structure.template_kin_pk == dapdb.Template.pk).filter( dapdb.BinType.name == self.bintype.name, dapdb.Template.name == self.template.name).use_cache(self.cache_region).all() if len(db_modelcube) > 1: raise MarvinError('more than one ModelCube found for ' 'this combination of parameters.') elif len(db_modelcube) == 0: raise MarvinError('no ModelCube found for this combination of parameters.') self.data = db_modelcube[0] self.header = self.data.file.primary_header self.wcs = WCS(self.data.file.cube.wcs.makeHeader()) self._wavelength = np.array(self.data.file.cube.wavelength.wavelength, dtype=np.float) self._redcorr = np.array(self.data.redcorr[0].value, dtype=np.float) self._shape = self.data.file.cube.shape.shape self.plateifu = str(self.header['PLATEIFU'].strip()) self.mangaid = str(self.header['MANGAID'].strip()) def _load_modelcube_from_api(self): """Initialises a model cube from the API.""" url = marvin.config.urlmap['api']['getModelCube']['url'] url_full = url.format(name=self.plateifu, bintype=self.bintype.name, template=self.template.name) try: response = self._toolInteraction(url_full) except Exception as ee: raise MarvinError('found a problem when checking if remote model cube ' 'exists: {0}'.format(str(ee))) data = response.getData() self.header = fits.Header.fromstring(data['header']) self.wcs = WCS(fits.Header.fromstring(data['wcs_header'])) self._wavelength = np.array(data['wavelength']) self._redcorr = np.array(data['redcorr']) self._shape = tuple(data['shape']) self.plateifu = str(self.header['PLATEIFU'].strip()) self.mangaid = str(self.header['MANGAID'].strip())
[docs] def getSpaxel(self, x=None, y=None, ra=None, dec=None, cube=False, maps=False, **kwargs): """Returns the :class:`~marvin.tools.spaxel.Spaxel` matching certain coordinates. The coordinates of the spaxel to return can be input as ``x, y`` pixels relative to``xyorig`` in the cube, or as ``ra, dec`` celestial coordinates. If ``spectrum=True``, the returned |spaxel| will be instantiated with the DRP spectrum of the spaxel for the DRP cube associated with this ModelCube. The same is true for ``properties=True`` for the DAP properties of the spaxel in the Maps associated with these coordinates. Parameters: x,y (int or array): The spaxel coordinates relative to ``xyorig``. If ``x`` is an array of coordinates, the size of ``x`` must much that of ``y``. ra,dec (float or array): The coordinates of the spaxel to return. The closest spaxel to those coordinates will be returned. If ``ra`` is an array of coordinates, the size of ``ra`` must much that of ``dec``. xyorig ({'center', 'lower'}): The reference point from which ``x`` and ``y`` are measured. Valid values are ``'center'`` (default), for the centre of the spatial dimensions of the cube, or ``'lower'`` for the lower-left corner. This keyword is ignored if ``ra`` and ``dec`` are defined. cube (bool): If ``True``, the |spaxel| will be initialised with the corresponding DRP cube data. maps (bool): If ``True``, the |spaxel| will be initialised with the corresponding DAP Maps properties for this spaxel. Returns: spaxels (list): The |spaxel|_ objects for this cube/maps corresponding to the input coordinates. The length of the list is equal to the number of input coordinates. .. |spaxel| replace:: :class:`~marvin.tools.spaxel.Spaxel` """ for old_param in ['drp', 'properties']: if old_param in kwargs: raise marvin.core.exceptions.MarvinDeprecationError( 'the {0} parameter has been deprecated. ' 'Use cube or maps.'.format(old_param)) return marvin.utils.general.general.getSpaxel( x=x, y=y, ra=ra, dec=dec, cube=cube, maps=maps, modelcube=self, **kwargs)
def _get_extension_data(self, name, ext=None): """Returns the data from an extension.""" model = self.datamodel[name] ext_name = model.fits_extension(ext) if ext_name in self._extension_data: return self._extension_data[ext_name] if self.data_origin == 'file': ext_data = self.data[model.fits_extension(ext)].data elif self.data_origin == 'db': # If the table is "spaxel", this must be a 3D cube. If it is "cube", # uses self.data, which is basically the DataModelClass.Cube instance. ext_data = self.data.get3DCube(model.db_column(ext)) elif self.data_origin == 'api': params = {'release': self._release} url = marvin.config.urlmap['api']['getModelCubeExtension']['url'] try: response = self._toolInteraction( url.format(name=self.plateifu, modelcube_extension=model.fits_extension(ext).lower(), bintype=self.bintype.name, template=self.template.name), params=params) except Exception as ee: raise MarvinError('found a problem when checking if remote ' 'modelcube exists: {0}'.format(str(ee))) data = response.getData() cube_ext_data = data['extension_data'] ext_data = np.array(cube_ext_data) if cube_ext_data is not None else None self._extension_data[ext_name] = ext_data return ext_data def _get_spaxel_quantities(self, x, y, spaxel=None): """Returns a dictionary of spaxel quantities.""" modelcube_quantities = FuzzyDict({}) if self.data_origin == 'db': session = marvin.marvindb.session dapdb = marvin.marvindb.dapdb if self.data_origin == 'file' or self.data_origin == 'db': _db_row = None for dm in self.datamodel: data = {'value': None, 'ivar': None, 'mask': None} for key in data: if key == 'ivar' and not dm.has_ivar(): continue if key == 'mask' and not dm.has_mask(): continue if self.data_origin == 'file': extname = dm.fits_extension(None if key == 'value' else key) data[key] = self.data[extname].data[:, y, x] elif self.data_origin == 'db': colname = dm.db_column(None if key == 'value' else key) if not _db_row: _db_row = session.query(dapdb.ModelSpaxel).filter( dapdb.ModelSpaxel.modelcube_pk == self.data.pk, dapdb.ModelSpaxel.x == x, dapdb.ModelSpaxel.y == y).use_cache(self.cache_region).one() data[key] = np.array(getattr(_db_row, colname)) quantity = Spectrum(data['value'], ivar=data['ivar'], mask=data['mask'], wavelength=self._wavelength, unit=dm.unit, pixmask_flag=dm.pixmask_flag) if spaxel: quantity._init_bin(spaxel=spaxel, parent=self, datamodel=dm) modelcube_quantities[dm.full()] = quantity if self.data_origin == 'api': params = {'release': self._release} url = marvin.config.urlmap['api']['getModelCubeQuantitiesSpaxel']['url'] try: response = self._toolInteraction(url.format(name=self.plateifu, x=x, y=y, bintype=self.bintype.name, template=self.template.name, params=params)) except Exception as ee: raise MarvinError('found a problem when checking if remote modelcube ' 'exists: {0}'.format(str(ee))) data = response.getData() for dm in self.datamodel: quantity = Spectrum(data[dm.name]['value'], ivar=data[dm.name]['ivar'], mask=data[dm.name]['mask'], wavelength=data['wavelength'], unit=dm.unit, pixmask_flag=dm.pixmask_flag) if spaxel: quantity._init_bin(spaxel=spaxel, parent=self, datamodel=dm) modelcube_quantities[dm.full()] = quantity return modelcube_quantities
[docs] def get_binid(self, model=None): """Returns the binid map associated with a model. Parameters ---------- model : `datamodel.Model` or None The model for which the associated binid map will be returned. If ``binid=None``, the default binid is returned. Returns ------- binid : `Map` A `Map` with the binid associated with ``model`` or the default binid. """ assert model is None or isinstance(model, Model), 'invalid model type.' if model is not None: binid_prop = model.binid else: binid_prop = self.datamodel.parent.default_binid # Before MPL-6, the modelcube does not include the binid extension, # so we need to get the binid map from the associated MAPS. if (distutils.version.StrictVersion(self._dapver) < distutils.version.StrictVersion('2.1')): return self.getMaps().get_binid() if self.data_origin == 'file': if binid_prop.channel is None: binid_map_data = self.data[binid_prop.name].data[:, :] else: binid_map_data = self.data[binid_prop.name].data[binid_prop.channel.idx, :, :] elif self.data_origin == 'db': mdb = marvin.marvindb table = mdb.dapdb.ModelSpaxel column = getattr(table, binid_prop.db_column()) binid_list = mdb.session.query(column).filter( table.modelcube_pk == self.data.pk).order_by(table.x, table.y).all() nx = ny = int(np.sqrt(len(binid_list))) binid_array = np.array(binid_list) binid_map_data = binid_array.transpose().reshape((ny, nx)).transpose(1, 0) elif self.data_origin == 'api': params = {'release': self._release} url = marvin.config.urlmap['api']['getModelCubeBinid']['url'] extension = model.fits_extension().lower() if model is not None else 'flux' try: response = self._toolInteraction( url.format(name=self.plateifu, modelcube_extension=extension, bintype=self.bintype.name, template=self.template.name), params=params) except Exception as ee: raise MarvinError('found a problem when checking if remote ' 'modelcube exists: {0}'.format(str(ee))) if response.results['error'] is not None: raise MarvinError('found a problem while getting the binid from API: {}' .format(str(response.results['error']))) binid_map_data = np.array(response.getData()['binid']) binid_map = Map(binid_map_data, unit=binid_prop.unit) binid_map._datamodel = binid_prop return binid_map
@property def binned_flux(self): """Returns the binned flux datacube.""" model = self.datamodel['binned_flux'] binned_flux_array = self._get_extension_data('flux') binned_flux_ivar = self._get_extension_data('flux', 'ivar') binned_flux_mask = self._get_extension_data('flux', 'mask') return DataCube(binned_flux_array, np.array(self._wavelength), ivar=binned_flux_ivar, mask=binned_flux_mask, redcorr=self._redcorr, binid=self.get_binid(model), unit=model.unit, pixmask_flag=model.pixmask_flag) @property def full_fit(self): """Returns the full fit datacube.""" model = self.datamodel['full_fit'] model_array = self._get_extension_data('full_fit') model_mask = self._get_extension_data('flux', 'mask') return DataCube(model_array, np.array(self._wavelength), ivar=None, mask=model_mask, redcorr=self._redcorr, binid=self.get_binid(model), unit=model.unit, pixmask_flag=model.pixmask_flag) @property def emline_fit(self): """Returns the emission line fit.""" model = self.datamodel['emline_fit'] emline_array = self._get_extension_data('emline_fit') if model.has_mask(): emline_mask = self._get_extension_data('emline_fit', 'mask') else: emline_mask = None return DataCube(emline_array, np.array(self._wavelength), ivar=None, mask=emline_mask, redcorr=self._redcorr, binid=self.get_binid(model), unit=model.unit, pixmask_flag=model.pixmask_flag) @property def stellarcont_fit(self): """Returns the stellar continuum fit.""" isMPL8 = check_versions(self._dapver, datamodel['MPL-8'].release) if isMPL8: array = self._get_extension_data('stellar_fit') model = self.datamodel['stellar_fit'] mask = self._get_extension_data('stellar_fit', 'mask') else: array = (self._get_extension_data('full_fit') - self._get_extension_data('emline_fit') - self._get_extension_data('emline_base_fit')) model = self.datamodel['full_fit'] mask = self._get_extension_data('flux', 'mask') return DataCube(array, np.array(self._wavelength), ivar=None, mask=mask, redcorr=self._redcorr, binid=self.get_binid(model), unit=model.unit, pixmask_flag=model.pixmask_flag) @property def lsf(self): """Returns the pre-pixelized LSF""" isMPL10 = check_versions(self._dapver, datamodel['MPL-10'].release) if not isMPL10: raise MarvinError('LSF property only available for MPLs >= 10') model = self.datamodel['lsf'] lsf_array = self._get_extension_data('lsf') if model.has_mask(): lsf_mask = self._get_extension_data('lsf', 'mask') else: lsf_mask = None return DataCube(lsf_array, np.array(self._wavelength), ivar=None, mask=lsf_mask, redcorr=self._redcorr, binid=self.get_binid(model), unit=model.unit, pixmask_flag=model.pixmask_flag)
[docs] def getCube(self): """Returns the associated `~marvin.tools.cube.Cube`.""" if self.data_origin == 'db': cube_data = self.data.file.cube else: cube_data = None return marvin.tools.cube.Cube(data=cube_data, plateifu=self.plateifu, release=self.release)
[docs] def getMaps(self): """Returns the associated`~marvin.tools.maps.Maps`.""" return marvin.tools.maps.Maps(plateifu=self.plateifu, bintype=self.bintype, template=self.template, release=self.release)
[docs] def is_binned(self): """Returns True if the ModelCube is not unbinned.""" return self.bintype.binned
[docs] def get_unbinned(self): """Returns a version of ``self`` corresponding to the unbinned ModelCube.""" if not self.is_binned: return self else: unbinned = self.datamodel.parent.get_unbinned() if self.datamodel.parent.db_only: in_db = unbinned in self.datamodel.parent.db_only else: in_db = unbinned in self.datamodel.parent.bintypes if self.mode == 'remote' and not in_db: raise marvin.core.exceptions.MarvinError( "Bintype {0} for release {1} not available in remote database. Use " "Marvin's local file access mode instead.".format(unbinned.name, self.release)) return ModelCube(plateifu=self.plateifu, release=self.release, bintype=self.datamodel.parent.get_unbinned(), template=self.template, mode=self.mode)