Source code for

#!/usr/bin/env python
# encoding: utf-8
# @Author: José Sánchez-Gallego
# @Date: Oct 30, 2017
# @Filename:
# @License: BSD 3-Clause
# @Copyright: José Sánchez-Gallego

from __future__ import division
from __future__ import print_function
from __future__ import absolute_import

import operator
import os
import warnings

from copy import deepcopy

import astropy.units as units

import numpy as np

import marvin
import marvin.api.api
import marvin.core.marvin_pickle
import marvin.core.exceptions
from marvin.utils.datamodel.dap.base import Property
from marvin.utils.datamodel.dap.plotting import get_default_plot_params
import marvin.utils.general
from marvin.utils.general.general import add_doc

from .base_quantity import QuantityMixIn

    import sqlalchemy
except ImportError:
    sqlalchemy = None

[docs]class Map(units.Quantity, QuantityMixIn): """Describes 2D array object with addtional features. A general-use `~astropy.units.Quantity`-powered 2D array with features such as units, inverse variance, mask, quick acces to statistics methods, and plotting. While `.Map` can be used for any 2D array, it is mostly intended to represent an extension in a DAP MAPS file (normally known in MaNGA as a map). Unlike a `` object, which contains all the information from a DAP MAPS file, this class represents only one of the multiple 2D maps contained within. For instance, `` may contain emission line maps for multiple channels. A `.Map` would be, for example, the map for ``emline_gflux`` and channel ``ha_6564``. A `.Map` is normally initialised from a `` by calling the `` method. It can be initialialised directly using the `.Map.from_maps` classmethod. Parameters: array (array-like): The 2-D array contianing the spectrum. unit (astropy.unit.Unit, optional): The unit of the spectrum. scale (float, optional): The scale factor of the spectrum value. ivar (array-like, optional): The inverse variance array for ``value``. mask (array-like, optional): The mask array for ``value``. binid (array-like, optional): The associated binid map. """ def __new__(cls, array, unit=None, scale=1, ivar=None, mask=None, binid=None, dtype=None, copy=True): if scale is not None: unit = units.CompositeUnit(unit.scale * scale, unit.bases, unit.powers) obj = units.Quantity(np.array(array), unit=unit, dtype=dtype, copy=copy) obj = obj.view(cls) obj._set_unit(unit) obj._maps = None obj._datamodel = None obj.ivar = np.array(ivar) if ivar is not None else None obj.mask = np.array(mask) if mask is not None else None obj.binid = np.array(binid) if binid is not None else None return obj def __repr__(self): if self.datamodel is not None: full = self.datamodel.full() else: full = None return ('<Marvin Map (property={0!r})>\n{1} {2}'.format(full, self.value, self.unit.to_string())) def __getitem__(self, sl): new_obj = super(Map, self).__getitem__(sl) if type(new_obj) is not type(self): new_obj = self._new_view(new_obj) new_obj._set_unit(self.unit) new_obj.ivar = self.ivar.__getitem__(sl) if self.ivar is not None else self.ivar new_obj.mask = self.mask.__getitem__(sl) if self.mask is not None else self.mask new_obj.binid = self.binid.__getitem__(sl) if self.binid is not None else self.binid return new_obj def __deepcopy__(self, memo): new_map = Map(array=deepcopy(self.value, memo), unit=deepcopy(self.unit, memo), ivar=deepcopy(self.ivar, memo), mask=deepcopy(self.mask, memo), binid=deepcopy(self.binid, memo)) new_map._maps = deepcopy(self._maps, memo) # TODO: temporary fix because the datamodel does deepcopy. Fix. new_map._datamodel = self._datamodel new_map.manga_target1 = deepcopy(self.manga_target1) new_map.manga_target2 = deepcopy(self.manga_target2) new_map.manga_target3 = deepcopy(self.manga_target3) new_map.target_flags = deepcopy(self.target_flags) new_map.quality_flag = deepcopy(self.quality_flag) return new_map def __array_finalize__(self, obj): if obj is None: return self._datamodel = getattr(obj, '_datamodel', None) self._maps = getattr(obj, '_maps', None) self.ivar = getattr(obj, 'ivar', None) self.mask = getattr(obj, 'mask', None) self.binid = getattr(obj, 'binid', None) self._set_unit(getattr(obj, 'unit', None))
[docs] def getMaps(self): """Returns the associated ``.""" if not self._maps: raise ValueError('this Map is not linked to any MAPS.') return self._maps
@property def datamodel(self): """Returns the associated `~marvin.utils.datamodel.dap.Property`.""" if not self._datamodel: raise ValueError('this Map does not have an associated Property.') return self._datamodel
[docs] @classmethod def from_maps(cls, maps, prop, dtype=None, copy=True): """Initialise a `.Map` from a ``.""" import assert isinstance(maps, assert isinstance(prop, Property) maps = maps datamodel = maps.datamodel assert prop.full() in datamodel, 'failed sanity check. Property does not match.' if maps.data_origin == 'file': value, ivar, mask = cls._get_map_from_file(maps, prop) elif maps.data_origin == 'db': value, ivar, mask = cls._get_map_from_db(maps, prop) elif maps.data_origin == 'api': value, ivar, mask = cls._get_map_from_api(maps, prop) # Gets the binid array for this property. if != 'binid': binid = maps.get_binid(prop.binid) else: binid = None unit = prop.unit obj = cls(value, unit=unit, ivar=ivar, mask=mask, binid=binid, dtype=dtype, copy=copy) obj._datamodel = prop obj._maps = maps obj.manga_target1 = maps.manga_target1 obj.manga_target2 = maps.manga_target2 obj.manga_target3 = maps.manga_target3 obj.target_flags = maps.target_flags obj.quality_flag = maps.quality_flag return obj
@staticmethod def _get_map_from_file(maps, prop): """Initialise the `.Map` from a `` file.""" if is not None: channel_idx = value =[].data[channel_idx] ivar =[ + '_ivar'].data[channel_idx] if prop.ivar else None mask =[ + '_mask'].data[channel_idx] if prop.mask else None else: value =[].data ivar =[ + '_ivar'].data if prop.ivar else None mask =[ + '_mask'].data if prop.mask else None return value, ivar, mask @staticmethod def _get_map_from_db(maps, prop): """Initialise the `.Map` from the DB.""" mdb = marvin.marvindb if not mdb.isdbconnected: raise marvin.core.exceptions.MarvinError('No db connected') if sqlalchemy is None: raise marvin.core.exceptions.MarvinError('sqlalchemy required to access the local DB.') assert prop.model is not None table = getattr(mdb.dapdb, prop.model) fullname_value = prop.db_column() value = mdb.session.query(getattr(table, fullname_value)).filter( table.file_pk == shape = (int(np.sqrt(len(value))), int(np.sqrt(len(value)))) value = np.array(value).reshape(shape).T ivar = None mask = None if prop.ivar: fullname_ivar = prop.db_column(ext='ivar') ivar = mdb.session.query(getattr(table, fullname_ivar)).filter( table.file_pk == ivar = np.array(ivar).reshape(shape).T if prop.mask: fullname_mask = prop.db_column(ext='mask') mask = mdb.session.query(getattr(table, fullname_mask)).filter( table.file_pk == mask = np.array(mask).reshape(shape).T return value, ivar, mask @staticmethod def _get_map_from_api(maps, prop): """Initialise the `.Map` from the API.""" url = marvin.config.urlmap['api']['getmap']['url'] url_full = url.format( **{'name': maps.plateifu, 'property_name':, 'channel': if else None, 'bintype':, 'template':}) try: response = marvin.api.api.Interaction(url_full, params={'release': maps._release}) except Exception as ee: raise marvin.core.exceptions.MarvinError( 'found a problem when getting the map: {0}'.format(str(ee))) data = response.getData() if data is None: raise marvin.core.exceptions.MarvinError( 'something went wrong. Error is: {0}'.format(response.results['error'])) value = np.array(data['value']) ivar = np.array(data['ivar']) if data['ivar'] is not None else None mask = np.array(data['mask']) if data['mask'] is not None else None return value, ivar, mask
[docs] def save(self, path, overwrite=False): """Pickle the map to a file. This method will fail if the map is associated to a Maps loaded from the db. Parameters: path (str): The path of the file to which the ``Map`` will be saved. Unlike for other Marvin Tools that derive from :class:`~marvin.core.core.MarvinToolsClass`, ``path`` is mandatory for ``Map`` given that the there is no default path for a given map. overwrite (bool): If True, and the ``path`` already exists, overwrites it. Otherwise it will fail. Returns: path (str): The realpath to which the file has been saved. """ # check for file extension if not os.path.splitext(path)[1]: path = os.path.join(path + '.mpf') return, path=path, overwrite=overwrite)
[docs] @classmethod def restore(cls, path, delete=False): """Restore a Map object from a pickled file. If ``delete=True``, the pickled file will be removed after it has been unplickled. Note that, for map objects instantiated from a Maps object with ``data_origin='file'``, the original file must exists and be in the same path as when the object was first created. """ return marvin.core.marvin_pickle.restore(path, delete=delete)
@property def masked(self): """Return a masked array using the recommended masks.""" assert self.mask is not None, 'mask is None' default_params = get_default_plot_params(self._datamodel.parent.release) labels = default_params['default']['bitmasks'] return, mask=self.pixmask.get_mask(labels, dtype=bool)) @property def error(self): """Computes the standard deviation of the measurement.""" if self.ivar is None: return None np.seterr(divide='ignore') return np.sqrt(1. / self.ivar) * self.unit @property def snr(self): """Return the signal-to-noise ratio for each spaxel in the map.""" return np.abs(self.value * np.sqrt(self.ivar)) @staticmethod def _add_ivar(ivar1, ivar2, *args, **kwargs): return 1. / ((1. / ivar1 + 1. / ivar2)) @staticmethod def _mul_ivar(ivar1, ivar2, value1, value2, value12): with np.errstate(divide='ignore', invalid='ignore'): sig1 = 1. / np.sqrt(ivar1) sig2 = 1. / np.sqrt(ivar2) sig12 = abs(value12) * ((sig1 / abs(value1)) + (sig2 / abs(value2))) ivar12 = 1. / sig12**2 return ivar12 @staticmethod def _pow_ivar(ivar, value, power): if ivar is None: return np.zeros(value.shape) else: sig = np.sqrt(1. / ivar) sig_out = value**power * power * sig * value ivar = 1 / sig_out**2. ivar[np.isnan(ivar) | np.isinf(ivar)] = 0 return ivar @staticmethod def _unit_propagation(unit1, unit2, op): ops = {'+': operator.add, '-': operator.sub, '*': operator.mul, '/': operator.truediv} if op in ['*', '/']: unit12 = ops[op](unit1, unit2) else: if unit1 == unit2: unit12 = unit1 else: warnings.warn('Units do not match for map arithmetic.', UserWarning) unit12 = None return unit12 def _arith(self, term2, op): if isinstance(term2, (Map, EnhancedMap)): map_out = self._map_arith(term2, op) else: map_out = self._scalar_arith(term2, op) return map_out def _scalar_arith(self, scalar, op): """Do map arithmetic and correctly handle map attributes.""" ops = {'+': operator.add, '-': operator.sub, '*': operator.mul, '/': operator.truediv} ivar_func = {'+': lambda ivar, c: ivar, '-': lambda ivar, c: ivar, '*': lambda ivar, c: ivar / c**2, '/': lambda ivar, c: ivar * c**2} with np.errstate(divide='ignore', invalid='ignore'): map_value = ops[op](self.value, scalar) map1_ivar = self.ivar if self.ivar is not None else np.zeros(self.shape) map_ivar = ivar_func[op](map1_ivar, scalar) map_mask = self.mask if self.mask is not None else np.zeros(self.shape, dtype=int) bad = np.isnan(map_value) | np.isinf(map_value) map_mask[bad] = map_mask[bad] | self.pixmask.labels_to_value('DONOTUSE') return EnhancedMap(value=map_value, unit=self.unit, ivar=map_ivar, mask=map_mask, datamodel=self._datamodel, copy=True) def _map_arith(self, map2, op): """Do map arithmetic and correctly handle map attributes.""" ops = {'+': operator.add, '-': operator.sub, '*': operator.mul, '/': operator.truediv} assert self.shape == map2.shape, 'Cannot do map arithmetic on maps of different shapes.' ivar_func = {'+': self._add_ivar, '-': self._add_ivar, '*': self._mul_ivar, '/': self._mul_ivar} with np.errstate(divide='ignore', invalid='ignore'): map12_value = ops[op](self.value, map2.value) map1_ivar = self.ivar if self.ivar is not None else np.zeros(self.shape) map2_ivar = map2.ivar if map2.ivar is not None else np.zeros(map2.shape) map12_ivar = ivar_func[op](map1_ivar, map2_ivar, self.value, map2.value, map12_value) map12_ivar[np.isnan(map12_ivar) | np.isinf(map12_ivar)] = 0 map1_mask = self.mask if self.mask is not None else np.zeros(self.shape, dtype=int) map2_mask = map2.mask if map2.mask is not None else np.zeros(map2.shape, dtype=int) map12_mask = map1_mask | map2_mask bad = np.isnan(map12_value) | np.isinf(map12_value) map12_mask[bad] = map12_mask[bad] | self.pixmask.labels_to_value('DONOTUSE') map12_unit = self._unit_propagation(self.unit, map2.unit, op) return EnhancedMap(value=map12_value, unit=map12_unit, ivar=map12_ivar, mask=map12_mask, datamodel=self._datamodel, copy=True) def __add__(self, map2): """Add two maps.""" return self._arith(map2, '+') def __sub__(self, map2): """Subtract two maps.""" return self._arith(map2, '-') def __mul__(self, map2): """Multiply two maps.""" return self._arith(map2, '*') def __div__(self, map2): """Divide two maps.""" return self._arith(map2, '/') def __truediv__(self, map2): """Divide two maps.""" return self.__div__(map2) def __pow__(self, power): """Raise map to power. Parameters: power (float): Power to raise the map values. Returns: map (:class:`` object) """ value = self.value**power ivar = self._pow_ivar(self.ivar, self.value, power) unit = self.unit**power return EnhancedMap(value=value, unit=unit, ivar=ivar, mask=self.mask, datamodel=self._datamodel, copy=True)
[docs] def inst_sigma_correction(self): """Correct for instrumental broadening. Correct observed stellar or emission line velocity dispersion for instrumental broadening. """ if == 'stellar_sigma': if self._datamodel.parent.release == 'MPL-4': raise marvin.core.exceptions.MarvinError( 'Instrumental broadening correction not implemented for MPL-4.') map_corr = self.getMaps()['stellar_sigmacorr'] elif == 'emline_gsigma': map_corr = self.getMaps().getMap(property_name='emline_instsigma', else: raise marvin.core.exceptions.MarvinError( 'Cannot correct {0} for instrumental broadening.'.format(self.datamodel.full())) return (self**2 - map_corr**2)**0.5
@property def pixmask(self): """Maskbit instance for the MANGA_DAPPIXMASK flag. See :ref:`marvin-utils-maskbit` for documentation and :meth:`marvin.utils.general.maskbit.Maskbit` for API reference. """ pixmask = self._datamodel.parent.bitmasks['MANGA_DAPPIXMASK'] pixmask.mask = getattr(self, 'mask', None) return pixmask
[docs] @add_doc( def plot(self, *args, **kwargs): return, *args, **kwargs)
[docs]class EnhancedMap(Map): """Creates a Map that has been modified.""" def __new__(cls, value, unit, *args, **kwargs): ignore = ['datamodel'] [kwargs.pop(it) for it in ignore if it in kwargs] return super(EnhancedMap, cls).__new__(cls, value, unit=unit, *args, **kwargs) def __init__(self, *args, **kwargs): self._datamodel = kwargs.get('datamodel', None) def __repr__(self): return ('<Marvin EnhancedMap>\n{0!r} {1!r}').format(self.value, self.unit.to_string()) def __deepcopy__(self, memo): return EnhancedMap(value=deepcopy(self.value, memo), unit=deepcopy(self.unit, memo), ivar=deepcopy(self.ivar, memo), mask=deepcopy(self.mask, memo), copy=True) def _init_map_from_maps(self): raise AttributeError("'EnhancedMap' has no attribute '_init_map_from_maps'.") def _get_from_file(self): raise AttributeError("'EnhancedMap' has no attribute '_get_from_file'.") def _get_from_db(self): raise AttributeError("'EnhancedMap' has no attribute '_get_from_db'.") def _get_from_api(self): raise AttributeError("'EnhancedMap' has no attribute '_get_from_api'.")
[docs] def inst_sigma_correction(self): """Override Map.inst_sigma_correction with AttributeError.""" raise AttributeError("'EnhancedMap' has no attribute 'inst_sigma_correction'.")
@property def pixmask(self): """Maskbit instance for the MANGA_DAPPIXMASK flag. See :ref:`marvin-utils-maskbit` for documentation and :meth:`marvin.utils.general.maskbit.Maskbit` for API reference. """ pixmask = self._datamodel.parent.bitmasks['MANGA_DAPPIXMASK'] pixmask.mask = self.mask if self.mask is not None else None return pixmask @property def datamodel(self): return None