#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: Brian Cherinka, José Sánchez-Gallego, and Brett Andrews
# @Date: 2017-11-01
# @Filename: general.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)
#
# @Last modified by: José Sánchez-Gallego (gallegoj@uw.edu)
# @Last modified time: 2019-08-29 15:58:00
from __future__ import absolute_import, division, print_function
import collections
import contextlib
import inspect
import os
import re
import sys
import warnings
from builtins import range
from collections import OrderedDict
from functools import wraps
from pkg_resources import parse_version
import matplotlib.pyplot as plt
import numpy as np
import PIL
from astropy import table, wcs
from astropy.units.quantity import Quantity
from brain.core.exceptions import BrainError
from flask_jwt_extended import get_jwt_identity
from scipy.interpolate import griddata
import marvin
from marvin import log
from marvin.core.exceptions import MarvinError, MarvinUserWarning
try:
from sdss_access import Access, AccessError
except ImportError:
Access = None
try:
from sdss_access.path import Path
except ImportError:
Path = None
try:
import pympler.summary
import pympler.muppy
import psutil
except ImportError:
pympler = None
psutil = None
# General utilities
__all__ = ('convertCoords', 'parseIdentifier', 'mangaid2plateifu', 'findClosestVector',
'getWCSFromPng', 'convertImgCoords', 'getSpaxelXY',
'downloadList', 'getSpaxel', 'get_drpall_row', 'getDefaultMapPath',
'getDapRedux', 'get_nsa_data', '_check_file_parameters',
'invalidArgs', 'missingArgs', 'getRequiredArgs', 'getKeywordArgs',
'isCallableWithArgs', 'map_bins_to_column', '_sort_dir', 'get_drpall_path',
'get_dapall_path', 'temp_setattr', 'map_dapall', 'turn_off_ion', 'memory_usage',
'validate_jwt', 'target_status', 'target_is_observed', 'target_is_mastar',
'get_plates', 'get_manga_image', 'check_versions', 'get_drpall_table',
'get_dapall_table', 'get_drpall_file', 'get_dapall_file')
drpTable = {}
dapTable = {}
[docs]def validate_jwt(f):
''' Decorator to validate a JWT and User '''
@wraps(f)
def wrapper(*args, **kwargs):
current_user = get_jwt_identity()
if not current_user:
raise MarvinError('Invalid user from API token!')
else:
marvin.config.access = 'collab'
return f(*args, **kwargs)
return wrapper
[docs]def getSpaxel(cube=True, maps=True, modelcube=True,
x=None, y=None, ra=None, dec=None, xyorig=None, **kwargs):
"""Returns the |spaxel| matching certain coordinates.
The coordinates of the spaxel to return can be input as ``x, y`` pixels
relative to``xyorig`` in the cube, or as ``ra, dec`` celestial
coordinates.
This function is intended to be called by
:func:`~marvin.tools.cube.Cube.getSpaxel` or
:func:`~marvin.tools.maps.Maps.getSpaxel`, and provides shared code for
both of them.
Parameters:
cube (:class:`~marvin.tools.cube.Cube` or None or bool)
A :class:`~marvin.tools.cube.Cube` object with the DRP cube
data from which the spaxel spectrum will be extracted. If None,
the |spaxel| object(s) returned won't contain spectral information.
maps (:class:`~marvin.tools.maps.Maps` or None or bool)
As ``cube`` but for the :class:`~marvin.tools.maps.Maps`
object representing the DAP maps entity. If None, the |spaxel|
will be returned without DAP information.
modelcube (:class:`~marvin.tools.modelcube.ModelCube` or None or bool)
As ``cube`` but for the :class:`~marvin.tools.modelcube.ModelCube`
object representing the DAP modelcube entity. If None, the |spaxel|
will be returned without model information.
x,y (int or array):
The spaxel coordinates relative to ``xyorig``. If ``x`` is an
array of coordinates, the size of ``x`` must much that of
``y``.
ra,dec (float or array):
The coordinates of the spaxel to return. The closest spaxel to
those coordinates will be returned. If ``ra`` is an array of
coordinates, the size of ``ra`` must much that of ``dec``.
xyorig ({'center', 'lower'}):
The reference point from which ``x`` and ``y`` are measured.
Valid values are ``'center'``, for the centre of the
spatial dimensions of the cube, or ``'lower'`` for the
lower-left corner. This keyword is ignored if ``ra`` and
``dec`` are defined. ``xyorig`` defaults to
``marvin.config.xyorig.``
kwargs (dict):
Arguments to be passed to `~marvin.tools.spaxel.SpaxelBase`.
Returns:
spaxels (list):
The |spaxel| objects for this cube/maps corresponding to the input
coordinates. The length of the list is equal to the number
of input coordinates.
.. |spaxel| replace:: :class:`~marvin.tools.spaxel.Spaxel`
"""
# TODO: for now let's put these imports here, but we should fix the
# circular imports soon.
import marvin.tools.cube
import marvin.tools.maps
import marvin.tools.modelcube
import marvin.tools.spaxel
# Checks that the cube and maps data are correct
assert cube or maps or modelcube, \
'Either cube, maps, or modelcube needs to be specified.'
assert isinstance(cube, (marvin.tools.cube.Cube, bool)), \
'cube is not an instance of Cube or a boolean'
assert isinstance(maps, (marvin.tools.maps.Maps, bool)), \
'maps is not an instance of Maps or a boolean'
assert isinstance(modelcube, (marvin.tools.modelcube.ModelCube, bool)), \
'modelcube is not an instance of ModelCube or a boolean'
# Checks that we have the correct set of inputs.
if x is not None or y is not None:
assert ra is None and dec is None, 'Either use (x, y) or (ra, dec)'
assert x is not None and y is not None, 'Specify both x and y'
inputMode = 'pix'
isScalar = np.isscalar(x)
x = np.atleast_1d(x)
y = np.atleast_1d(y)
coords = np.array([x, y], np.float).T
elif ra is not None or dec is not None:
assert x is None and y is None, 'Either use (x, y) or (ra, dec)'
assert ra is not None and dec is not None, 'Specify both ra and dec'
inputMode = 'sky'
isScalar = np.isscalar(ra)
ra = np.atleast_1d(ra)
dec = np.atleast_1d(dec)
coords = np.array([ra, dec], np.float).T
else:
raise ValueError('You need to specify either (x, y) or (ra, dec)')
if not xyorig:
xyorig = marvin.config.xyorig
if isinstance(maps, marvin.tools.maps.Maps):
ww = maps.wcs if inputMode == 'sky' else None
cube_shape = maps._shape
elif isinstance(cube, marvin.tools.cube.Cube):
ww = cube.wcs if inputMode == 'sky' else None
cube_shape = cube._shape
elif isinstance(modelcube, marvin.tools.modelcube.ModelCube):
ww = modelcube.wcs if inputMode == 'sky' else None
cube_shape = modelcube._shape
iCube, jCube = zip(convertCoords(coords, wcs=ww, shape=cube_shape,
mode=inputMode, xyorig=xyorig).T)
_spaxels = []
for ii in range(len(iCube[0])):
_spaxels.append(
marvin.tools.spaxel.Spaxel(jCube[0][ii], iCube[0][ii],
cube=cube, maps=maps, modelcube=modelcube, **kwargs))
if len(_spaxels) == 1 and isScalar:
return _spaxels[0]
else:
return _spaxels
[docs]def convertCoords(coords, mode='sky', wcs=None, xyorig='center', shape=None):
"""Convert input coordinates to array indices.
Converts input positions in x, y or RA, Dec coordinates to array indices
(in Numpy style) or spaxel extraction. In case of pixel coordinates, the
origin of reference (either the center of the cube or the lower left
corner) can be specified via ``xyorig``.
If ``shape`` is defined (mandatory for ``mode='pix'``, optional for
``mode='sky'``) and one or more of the resulting indices are outside the
size of the input shape, an error is raised.
This functions is mostly intended for internal use.
Parameters:
coords (array):
The input coordinates, as an array of shape Nx2.
mode ({'sky', 'pix'}:
The type of input coordinates, either `'sky'` for celestial
coordinates (in the format defined in the WCS header information),
or `'pix'` for pixel coordinates.
wcs (None or ``astropy.wcs.WCS`` object):
If ``mode='sky'``, the WCS solution from which the cube coordinates
can be derived.
xyorig (str):
If ``mode='pix'``, the reference point from which the coordinates
are measured. Valid values are ``'center'``, for the centre of the
spatial dimensions of the cube, or ``'lower'`` for the lower-left
corner.
shape (None or array):
If ``mode='pix'``, the shape of the spatial dimensions of the cube,
so that the central position can be calculated.
Returns:
result (Numpy array):
An array with the same shape as ``coords``, containing the cube
index positions for the input coordinates, in Numpy style (i.e.,
the first element being the row and the second the column).
"""
coords = np.atleast_2d(coords)
assert coords.shape[1] == 2, 'coordinates must be an array Nx2'
if mode == 'sky':
assert wcs, 'if mode==sky, wcs must be defined.'
coordsSpec = np.ones((coords.shape[0], 3), np.float32)
coordsSpec[:, :-1] = coords
cubeCoords = wcs.wcs_world2pix(coordsSpec, 0)
cubeCoords = np.fliplr(np.array(np.round(cubeCoords[:, :-1]), np.int))
elif mode in ['pix', 'pixel']:
assert xyorig, 'if mode==pix, xyorig must be defined.'
x = coords[:, 0]
y = coords[:, 1]
assert shape is not None, 'if mode==pix, shape must be defined.'
shape = np.atleast_1d(shape)
if xyorig == 'center':
yMid, xMid = shape / 2.
xCube = np.round(xMid + x)
yCube = np.round(yMid + y)
elif xyorig == 'lower':
xCube = np.round(x)
yCube = np.round(y)
else:
raise ValueError('xyorig must be center or lower.')
cubeCoords = np.array([yCube, xCube], np.int).T
else:
raise ValueError('mode must be pix or sky.')
if shape is not None:
if ((cubeCoords < 0).any() or
(cubeCoords[:, 0] > (shape[0] - 1)).any() or
(cubeCoords[:, 1] > (shape[1] - 1)).any()):
raise MarvinError('some indices are out of limits.'
'``xyorig`` is currently set to "{0}". '
'Try setting ``xyorig`` to "{1}".'
.format(xyorig, 'center' if xyorig is 'lower' else 'lower'))
return cubeCoords
[docs]def mangaid2plateifu(mangaid, mode='auto', drpall=None, drpver=None):
"""Return the plate-ifu for a certain mangaid.
Uses either the DB or the drpall file to determine the plate-ifu for
a mangaid. If more than one plate-ifu are available for a certain ifu,
and ``mode='drpall'``, the one with the higher SN2 (calculated as the sum
of redSN2 and blueSN2) will be used. If ``mode='db'``, the most recent one
will be used.
Parameters:
mangaid (str):
The mangaid for which the plate-ifu will be returned.
mode ({'auto', 'drpall', 'db', 'remote'}):
If `'drpall'` or ``'db'``, the drpall file or the local database,
respectively, will be used. If ``'remote'``, a request to the API
will be issued. If ``'auto'``, the local modes will be tried before
the remote mode.
drpall (str or None):
The path to the drpall file to use. If None, the file in
``config.drpall`` will be used.
drpver (str or None):
The DRP version to use. If None, the one in ``config.drpver`` will
be used. If ``drpall`` is defined, this value is ignored.
Returns:
plateifu (str):
The plate-ifu string for the input ``mangaid``.
"""
from marvin import config, marvindb
from marvin.api.api import Interaction
# The modes and order over which the auto mode will loop.
autoModes = ['db', 'drpall', 'remote']
assert mode in autoModes + ['auto'], 'mode={0} is not valid'.format(mode)
config_drpver, __ = config.lookUpVersions()
drpver = drpver if drpver else config_drpver
drpall = drpall if drpall else config._getDrpAllPath(drpver=drpver)
if mode == 'drpall':
# Get the drpall table from cache or fresh
drpall_table = get_drpall_table(drpver=drpver, drpall=drpall)
mangaids = np.array([mm.strip() for mm in drpall_table['mangaid']])
plateifus = drpall_table[np.where(mangaids == mangaid)]
if len(plateifus) > 1:
warnings.warn('more than one plate-ifu found for mangaid={0}. '
'Using the one with the highest SN2.'.format(mangaid),
MarvinUserWarning)
plateifus = plateifus[
[np.argmax(plateifus['bluesn2'] + plateifus['redsn2'])]]
if len(plateifus) == 0:
raise ValueError('no plate-ifus found for mangaid={0}'.format(mangaid))
return plateifus['plateifu'][0]
elif mode == 'db':
if not marvindb.isdbconnected:
raise MarvinError('no DB connection found')
if not drpver:
raise MarvinError('drpver not set.')
cubes = marvindb.session.query(marvindb.datadb.Cube).join(
marvindb.datadb.PipelineInfo, marvindb.datadb.PipelineVersion).filter(
marvindb.datadb.Cube.mangaid == mangaid,
marvindb.datadb.PipelineVersion.version == drpver).use_cache().all()
if len(cubes) == 0:
raise ValueError('no plate-ifus found for mangaid={0}'.format(mangaid))
elif len(cubes) > 1:
warnings.warn('more than one plate-ifu found for mangaid={0}. '
'Using a the one with the higest SN2'.format(mangaid),
MarvinUserWarning)
total_sn2 = [float(cube.header['BLUESN2']) + float(cube.header['REDSN2'])
for cube in cubes]
cube = cubes[np.argmax(total_sn2)]
else:
cube = cubes[0]
return '{0}-{1}'.format(cube.plate, cube.ifu.name)
elif mode == 'remote':
try:
url = marvin.config.urlmap['api']['mangaid2plateifu']['url']
response = Interaction(url.format(mangaid=mangaid))
except MarvinError as e:
raise MarvinError('API call to mangaid2plateifu failed: {0}'.format(e))
else:
plateifu = response.getData(astype=str)
if not plateifu:
if 'error' in response.results and response.results['error']:
raise MarvinError(response.results['error'])
else:
raise MarvinError('API call to mangaid2plateifu failed with error unknown.')
return plateifu
elif mode == 'auto':
for mm in autoModes:
try:
plateifu = mangaid2plateifu(mangaid, mode=mm, drpver=drpver, drpall=drpall)
return plateifu
except:
continue
raise MarvinError(
'mangaid2plateifu was not able to find a plate-ifu for '
'mangaid={0} either local or remotely.'.format(mangaid))
[docs]def findClosestVector(point, arr_shape=None, pixel_shape=None, xyorig=None):
"""Find the closest array coordinates from pixel coordinates.
Find the closest vector of array coordinates (x, y) from an input
vector of pixel coordinates (x, y).
Parameters:
point : tuple
Original point of interest in pixel units, order of (x,y)
arr_shape : tuple
Shape of data array in (x,y) order
pixel_shape : tuple
Shape of image in pixels in (x,y) order
xyorig : str
Indicates the origin point of coordinates. Set to
"relative" switches to an array coordinate system relative
to galaxy center. Default is absolute array coordinates
(x=0, y=0) = upper left corner
Returns:
minind : tuple
A tuple of array coordinates in x, y order
"""
# set as numpy arrays
arr_shape = np.array(arr_shape, dtype=int)
pixel_shape = np.array(pixel_shape, dtype=int)
# compute midpoints
xmid, ymid = arr_shape / 2
xpixmid, ypixmid = pixel_shape / 2
# default absolute array coordinates
xcoords = np.array([0, arr_shape[0]], dtype=int)
ycoords = np.array([0, arr_shape[1]], dtype=int)
# split x,y coords and pixel coords
x1, x2 = xcoords
y1, y2 = ycoords
xpix, ypix = pixel_shape
# build interpolates between array coordinates and pixel coordinates
points = [[x1, y1], [x1, y2], [xmid, ymid], [x2, y1], [x2, y2]]
values = [[0, ypix], [0, 0], [xpixmid, ypixmid], [xpix, ypix], [xpix, 0]] # full image
# values = [[xpixmid-xmid, ypixmid+ymid], [xpixmid-xmid, ypixmid-ymid], [xpixmid, ypixmid], [xpixmid+xmid, ypixmid+ymid], [xpixmid+xmid, ypixmid-ymid]] # pixels based on arr_shape
#values = [[xpixmid-x2, ypixmid+y2], [xpixmid-x2, ypixmid-y2], [xpixmid, ypixmid], [xpixmid+x2, ypixmid+y2], [xpixmid+x2, ypixmid-y2]] # pixels based on arr_shape
# make 2d array of array indices in absolute or (our) relative coordindates
arrinds = np.mgrid[x1:x2, y1:y2].swapaxes(0, 2).swapaxes(0, 1)
# interpolate a new 2d pixel coordinate array
final = griddata(points, values, arrinds)
# find minimum array vector closest to input coordinate point
diff = np.abs(point - final)
prod = diff[:, :, 0] * diff[:, :, 1]
minind = np.unravel_index(prod.argmin(), arr_shape)
# toggle relative array coordinates
if xyorig in ['relative', 'center']:
minind = np.array(minind, dtype=int)
xmin = minind[0] - xmid
ymin = ymid - minind[1]
minind = (xmin, ymin)
return minind
[docs]def getWCSFromPng(filename=None, image=None):
"""Extract any WCS info from the metadata of a PNG image.
Extracts the WCS metadata info from the PNG optical
image of the galaxy using PIL (Python Imaging Library).
Converts it to an Astropy WCS object.
Parameters:
image (object):
An existing PIL image object
filename (str):
The full path to the image
Returns:
pngwcs (WCS):
an Astropy WCS object
"""
assert any([image, filename]), 'Must provide either a PIL image object, or the full image filepath'
pngwcs = None
if filename and not image:
try:
image = PIL.Image.open(filename)
except Exception as e:
raise MarvinError('Cannot open image {0}: {1}'.format(filename, e))
else:
# Close the image
image.close()
# get metadata
meta = image.info if image else None
# parse the image metadata
mywcs = {}
if meta and 'WCSAXES' in meta.keys():
for key, val in meta.items():
try:
val = float(val)
except Exception as e:
pass
mywcs.update({key: val})
tmp = mywcs.pop('WCSAXES')
# Construct Astropy WCS
if mywcs:
pngwcs = wcs.WCS(mywcs)
return pngwcs
[docs]def convertImgCoords(coords, image, to_pix=None, to_radec=None):
"""Transform the WCS info in an image.
Convert image pixel coordinates to RA/Dec based on
PNG image metadata or vice_versa
Parameters:
coords (tuple):
The input coordindates to transform
image (str):
The full path to the image
to_pix (bool):
Set to convert to pixel coordinates
to_radec (bool):
Set to convert to RA/Dec coordinates
Returns:
newcoords (tuple):
Tuple of either (x, y) pixel coordinates
or (RA, Dec) coordinates
"""
try:
wcs = getWCSFromPng(image)
except Exception as e:
raise MarvinError('Cannot get wcs info from image {0}: {1}'.format(image, e))
if to_radec:
try:
newcoords = wcs.all_pix2world([coords], 1)[0]
except AttributeError as e:
raise MarvinError('Cannot convert coords to RA/Dec. No wcs! {0}'.format(e))
if to_pix:
try:
newcoords = wcs.all_world2pix([coords], 1)[0]
except AttributeError as e:
raise MarvinError('Cannot convert coords to image pixels. No wcs! {0}'.format(e))
return newcoords
[docs]def parseIdentifier(galid):
"""Determine if a string input is a plate, plateifu, or manga-id.
Parses a string object id and determines whether it is a
plate ID, a plate-IFU, or MaNGA-ID designation.
Parameters:
galid (str):
The string of an id name to parse
Returns:
idtype (str):
String indicating either plate, plateifu, mangaid, or None
"""
galid = str(galid)
hasdash = '-' in galid
if hasdash:
galidsplit = galid.split('-')
if int(galidsplit[0]) > 6500:
idtype = 'plateifu'
else:
idtype = 'mangaid'
else:
# check for plate
if galid.isdigit():
if len(galid) > 3:
idtype = 'plate'
else:
idtype = None
else:
idtype = None
return idtype
[docs]def getSpaxelXY(cube, plateifu, x, y):
"""Get a spaxel from a cube in the DB.
This function is mostly intended for internal use.
Parameters:
cube (SQLAlchemy object):
The SQLAlchemy object representing the cube from which to extract
the spaxel.
plateifu (str):
The corresponding plateifu of ``cube``.
x,y (int):
The coordinates of the spaxel in the database.
Returns:
spaxel (SQLAlchemy object):
The SQLAlchemy spaxel with coordinates ``(x, y)``.
"""
import sqlalchemy
mdb = marvin.marvindb
try:
spaxel = mdb.session.query(mdb.datadb.Spaxel).filter_by(cube=cube, x=x, y=y).use_cache().one()
except sqlalchemy.orm.exc.NoResultFound as e:
raise MarvinError('Could not retrieve spaxel for plate-ifu {0} at position {1},{2}: No Results Found: {3}'.format(plateifu, x, y, e))
except Exception as e:
raise MarvinError('Could not retrieve cube for plate-ifu {0} at position {1},{2}: Unknown exception: {3}'.format(plateifu, x, y, e))
return spaxel
[docs]def getDapRedux(release=None):
"""Retrieve SAS url link to the DAP redux directory.
Parameters:
release (str):
The release version of the data to download.
Defaults to Marvin config.release.
Returns:
dapredux (str):
The full redux path to the DAP MAPS
"""
if not Path:
raise MarvinError('sdss_access is not installed')
else:
# is_public = 'DR' in release
# path_release = release.lower() if is_public else None
sdss_path = Path(release=release)
release = release or marvin.config.release
drpver, dapver = marvin.config.lookUpVersions(release=release)
## hack a url version of MANGA_SPECTRO_ANALYSIS
#dapdefault = sdss_path.dir('mangadefault', drpver=drpver, dapver=dapver, plate=None, ifu=None)
#dappath = dapdefault.rsplit('/', 2)[0]
dappath = os.path.join(os.getenv("MANGA_SPECTRO_ANALYSIS"), drpver, dapver)
dapredux = sdss_path.url('', full=dappath)
return dapredux
[docs]def getDefaultMapPath(**kwargs):
"""Retrieve the default Maps path.
Uses sdss_access Path to generate a url download link to the
default MAPS file for a given MPL.
Parameters:
release (str):
The release version of the data to download.
Defaults to Marvin config.release.
plate (int):
The plate id
ifu (int):
The ifu number
mode (str):
The bintype of the default file to grab, i.e. MAPS or LOGCUBE. Defaults to MAPS
daptype (str):
The daptype of the default map to grab. Defaults to SPX-GAU-MILESHC
Returns:
maplink (str):
The sas url to download the default maps file
"""
# Get kwargs
release = kwargs.get('release', marvin.config.release)
drpver, dapver = marvin.config.lookUpVersions(release=release)
plate = kwargs.get('plate', None)
ifu = kwargs.get('ifu', None)
daptype = kwargs.get('daptype', 'SPX-GAU-MILESHC')
mode = kwargs.get('mode', 'MAPS')
assert mode in ['MAPS', 'LOGCUBE'], 'mode can either be MAPS or LOGCUBE'
# get sdss_access Path
if not Path:
raise MarvinError('sdss_access is not installed')
else:
# is_public = 'DR' in release
# path_release = release.lower() if is_public else None
sdss_path = Path(release=release)
# get the sdss_path name by MPL
# TODO: this is likely to break in future MPL/DRs. Just a heads up.
if '4' in release:
name = 'mangadefault'
else:
name = 'mangadap'
# construct the url link to default maps file
maplink = sdss_path.url(name, drpver=drpver, dapver=dapver, mpl=release,
plate=plate, ifu=ifu, daptype=daptype, mode=mode)
return maplink
[docs]def downloadList(inputlist, dltype='cube', **kwargs):
"""Download a list of MaNGA objects.
Uses sdss_access to download a list of objects
via rsync. Places them in your local sas path mimicing
the Utah SAS.
i.e. $SAS_BASE_DIR/mangawork/manga/spectro/redux
Can download cubes, rss files, maps, modelcubes, mastar cubes,
png images, default maps, or the entire plate directory.
dltype=`dap` is a special keyword that downloads all DAP files. It sets binmode
and daptype to '*'
Parameters:
inputlist (list):
Required. A list of objects to download. Must be a list of plate IDs,
plate-IFUs, or manga-ids
dltype ({'cube', 'maps', 'modelcube', 'dap', image', 'rss', 'mastar', 'default', 'plate'}):
Indicated type of object to download. Can be any of
plate, cube, image, mastar, rss, map, modelcube, or default (default map).
If not specified, the dltype defaults to cube.
release (str):
The MPL/DR version of the data to download.
Defaults to Marvin config.release.
bintype (str):
The bin type of the DAP maps to download. Defaults to *
binmode (str):
The bin mode of the DAP maps to download. Defaults to *
n (int):
The plan id number [1-12] of the DAP maps to download. Defaults to *
daptype (str):
The daptype of the default map to grab. Defaults to *
dir3d (str):
The directory where the images are located. Either 'stack' or 'mastar'. Defaults to *
verbose (bool):
Turns on verbosity during rsync
limit (int):
A limit to the number of items to download
test (bool):
If True, tests the download path construction but does not download
Returns:
If test=True, returns the list of full filepaths that will be downloaded
"""
assert isinstance(inputlist, (list, np.ndarray)), 'inputlist must be a list or numpy array'
# Get some possible keywords
# Necessary rsync variables:
# drpver, plate, ifu, dir3d, [mpl, dapver, bintype, n, mode]
verbose = kwargs.get('verbose', None)
as_url = kwargs.get('as_url', None)
release = kwargs.get('release', marvin.config.release)
drpver, dapver = marvin.config.lookUpVersions(release=release)
bintype = kwargs.get('bintype', '*')
binmode = kwargs.get('binmode', None)
daptype = kwargs.get('daptype', '*')
dir3d = kwargs.get('dir3d', '*')
n = kwargs.get('n', '*')
limit = kwargs.get('limit', None)
test = kwargs.get('test', None)
wave = 'LOG'
# check for sdss_access
if not Access:
raise MarvinError('sdss_access not installed.')
# Assert correct dltype
dltype = 'cube' if not dltype else dltype
assert dltype in ['plate', 'cube', 'mastar', 'modelcube', 'dap', 'rss', 'maps', 'image',
'default'], ('dltype must be one of plate, cube, mastar, '
'image, rss, maps, modelcube, dap, default')
assert binmode in [None, '*', 'MAPS', 'LOGCUBE'], 'binmode can only be *, MAPS or LOGCUBE'
# Assert correct dir3d
if dir3d != '*':
assert dir3d in ['stack', 'mastar'], 'dir3d must be either stack or mastar'
# Parse and retrieve the input type and the download type
idtype = parseIdentifier(inputlist[0])
if not idtype:
raise MarvinError('Input list must be a list of plates, plate-ifus, or mangaids')
# Set download type
if dltype == 'cube':
name = 'mangacube'
elif dltype == 'rss':
name = 'mangarss'
elif dltype == 'default':
name = 'mangadefault'
elif dltype == 'plate':
name = 'mangaplate'
elif dltype == 'maps':
# needs to change to include DR
if '4' in release:
name = 'mangamap'
else:
name = 'mangadap'
binmode = 'MAPS'
elif dltype == 'modelcube':
name = 'mangadap'
binmode = 'LOGCUBE'
elif dltype == 'dap':
name = 'mangadap'
binmode = '*'
daptype = '*'
elif dltype == 'mastar':
name = 'mangamastar'
elif dltype == 'image':
name = 'mangaimage'
# create rsync
rsync_access = Access(label='marvin_download', verbose=verbose, release=release)
rsync_access.remote()
# Add objects
for item in inputlist:
if idtype == 'mangaid':
try:
plateifu = mangaid2plateifu(item)
except MarvinError:
plateifu = None
else:
plateid, ifu = plateifu.split('-')
elif idtype == 'plateifu':
plateid, ifu = item.split('-')
elif idtype == 'plate':
plateid = item
ifu = '*'
rsync_access.add(name, plate=plateid, drpver=drpver, ifu=ifu, dapver=dapver, dir3d=dir3d,
mpl=release, bintype=bintype, n=n, mode=binmode, daptype=daptype,
wave=wave)
# set the stream
try:
rsync_access.set_stream()
except AccessError as e:
raise MarvinError('Error with sdss_access rsync.set_stream. AccessError: {0}'.format(e))
# get the list and download
listofitems = rsync_access.get_urls() if as_url else rsync_access.get_paths()
# print download location
item = listofitems[0] if listofitems else None
if item:
ver = dapver if dapver in item else drpver
dlpath = item[:item.rfind(ver) + len(ver)]
if verbose:
print('Target download directory: {0}'.format(dlpath))
if test:
return listofitems
else:
rsync_access.commit(limit=limit)
def _get_summary_file(name, summary_path=None, drpver=None, dapver=None):
''' Check for/download the drpall or dapall file
Checks for existence of a local summary file for the
current release set. If not found, uses sdss_access
to download it.
Parameters:
name (str):
The name of the summary file. Either drpall or dapall
summary_path (str):
The local path to either the drpall or dapall file
drpver (str):
The DRP version
dapver (str):
The DAP version
'''
assert name in ['drpall', 'dapall'], 'name must be either drpall or dapall'
from marvin import config
# # check for public release
# is_public = 'DR' in config.release
# release = config.release.lower() if is_public else None
# get drpver and dapver
config_drpver, config_dapver = config.lookUpVersions(config.release)
drpver = drpver if drpver else config_drpver
dapver = dapver if dapver else config_dapver
if name == 'drpall' and not summary_path:
summary_path = get_drpall_path(drpver)
elif name == 'dapall' and not summary_path:
summary_path = get_dapall_path(drpver, dapver)
if not os.path.isfile(summary_path):
warnings.warn('{0} file not found. Downloading it.'.format(name), MarvinUserWarning)
rsync = Access(label='get_summary_file', release=config.release)
rsync.remote()
rsync.add(name, drpver=drpver, dapver=dapver)
try:
rsync.set_stream()
except Exception as e:
raise MarvinError('Could not download the {4} file with sdss_access: '
'{0}\nTry manually downloading it for version ({1},{2}) and '
'placing it {3}'.format(e, drpver, dapver, summary_path, name))
else:
rsync.commit()
[docs]def get_drpall_file(drpver=None, drpall=None):
''' Check for/download the drpall file
Checks for existence of a local drpall file for the
current release set. If not found, uses sdss_access
to download it.
Parameters:
drpver (str):
The DRP version
drpall (str):
The local path to either the drpall file
'''
_get_summary_file('drpall', summary_path=drpall, drpver=drpver)
[docs]def get_dapall_file(drpver=None, dapver=None, dapall=None):
''' Check for/download the dapall file
Checks for existence of a local dapall file for the
current release set. If not found, uses sdss_access
to download it.
Parameters:
drpver (str):
The DRP version
dapver (str):
The DAP version
dapall (str):
The local path to either the dapall file
'''
_get_summary_file('dapall', summary_path=dapall, drpver=drpver, dapver=dapver)
[docs]def get_drpall_row(plateifu, drpver=None, drpall=None):
"""Returns a dictionary from drpall matching the plateifu."""
# get the drpall table
drpall_table = get_drpall_table(drpver=drpver, drpall=drpall, hdu='MANGA')
in_table = plateifu in drpall_table['plateifu']
# check the mastar extension
if not in_table:
drpall_table = get_drpall_table(drpver=drpver, drpall=drpall, hdu='MASTAR')
in_table = plateifu in drpall_table['plateifu']
if not in_table:
raise ValueError('No results found for {0} in drpall table'.format(plateifu))
row = drpall_table[drpall_table['plateifu'] == plateifu]
return row[0]
def _db_row_to_dict(row, remove_columns=False):
"""Converts a DB object to a dictionary."""
from sqlalchemy.inspection import inspect as sa_inspect
from sqlalchemy.ext.hybrid import hybrid_property
row_dict = collections.OrderedDict()
columns = row.__table__.columns.keys()
mapper = sa_inspect(row.__class__)
for key, item in mapper.all_orm_descriptors.items():
if isinstance(item, hybrid_property):
columns.append(key)
for col in columns:
if remove_columns and col in remove_columns:
continue
row_dict[col] = getattr(row, col)
return row_dict
[docs]def get_nsa_data(mangaid, source='nsa', mode='auto', drpver=None, drpall=None):
"""Returns a dictionary of NSA data from the DB or from the drpall file.
Parameters:
mangaid (str):
The mangaid of the target for which the NSA information will be returned.
source ({'nsa', 'drpall'}):
The data source. If ``source='nsa'``, the full NSA catalogue from the DB will
be used. If ``source='drpall'``, the subset of NSA columns included in the drpall
file will be returned.
mode ({'auto', 'local', 'remote'}):
See :ref:`mode-decision-tree`.
drpver (str or None):
The version of the DRP to use, if ``source='drpall'``. If ``None``, uses the
version set by ``marvin.config.release``.
drpall (str or None):
A path to the drpall file to use if ``source='drpall'``. If not defined, the
default drpall file matching ``drpver`` will be used.
Returns:
nsa_data (dict):
A dictionary containing the columns and values from the NSA catalogue for
``mangaid``.
"""
from marvin import config, marvindb
from .structs import DotableCaseInsensitive
valid_modes = ['auto', 'local', 'remote']
assert mode in valid_modes, 'mode must be one of {0}'.format(valid_modes)
valid_sources = ['nsa', 'drpall']
assert source in valid_sources, 'source must be one of {0}'.format(valid_sources)
log.debug('get_nsa_data: getting NSA data for mangaid=%r with source=%r, mode=%r',
mangaid, source, mode)
if mode == 'auto':
log.debug('get_nsa_data: running auto mode mode.')
try:
nsa_data = get_nsa_data(mangaid, mode='local', source=source,
drpver=drpver, drpall=drpall)
return nsa_data
except MarvinError as ee:
log.debug('get_nsa_data: local mode failed with error %s', str(ee))
try:
nsa_data = get_nsa_data(mangaid, mode='remote', source=source,
drpver=drpver, drpall=drpall)
return nsa_data
except MarvinError as ee:
raise MarvinError('get_nsa_data: failed to get NSA data for mangaid=%r in '
'auto mode with with error: %s', mangaid, str(ee))
elif mode == 'local':
if source == 'nsa':
if config.db is not None:
session = marvindb.session
sampledb = marvindb.sampledb
nsa_row = session.query(sampledb.NSA).join(sampledb.MangaTargetToNSA,
sampledb.MangaTarget).filter(
sampledb.MangaTarget.mangaid == mangaid).use_cache().all()
if len(nsa_row) == 1:
return DotableCaseInsensitive(
_db_row_to_dict(nsa_row[0], remove_columns=['pk', 'catalogue_pk']))
elif len(nsa_row) > 1:
warnings.warn('get_nsa_data: multiple NSA rows found for mangaid={0}. '
'Using the first one.'.format(mangaid), MarvinUserWarning)
return DotableCaseInsensitive(
_db_row_to_dict(nsa_row[0], remove_columns=['pk', 'catalogue_pk']))
elif len(nsa_row) == 0:
raise MarvinError('get_nsa_data: cannot find NSA row for mangaid={0}'
.format(mangaid))
else:
raise MarvinError('get_nsa_data: cannot find a valid DB connection.')
elif source == 'drpall':
plateifu = mangaid2plateifu(mangaid, drpver=drpver, drpall=drpall, mode='drpall')
log.debug('get_nsa_data: found plateifu=%r for mangaid=%r', plateifu, mangaid)
drpall_row = get_drpall_row(plateifu, drpall=drpall, drpver=drpver)
nsa_data = collections.OrderedDict()
for col in drpall_row.colnames:
if col.startswith('nsa_'):
value = drpall_row[col]
if isinstance(value, np.ndarray):
value = value.tolist()
else:
# In Astropy 2 the value would be an array of size 1
# but in Astropy 3 value is already an scalar and asscalar fails.
try:
value = np.asscalar(value)
except AttributeError:
pass
nsa_data[col[4:]] = value
return DotableCaseInsensitive(nsa_data)
elif mode == 'remote':
from marvin.api.api import Interaction
try:
if source == 'nsa':
request_name = 'nsa_full'
else:
request_name = 'nsa_drpall'
url = marvin.config.urlmap['api'][request_name]['url']
response = Interaction(url.format(mangaid=mangaid))
except MarvinError as ee:
raise MarvinError('API call to {0} failed: {1}'.format(request_name, str(ee)))
else:
if response.results['status'] == 1:
return DotableCaseInsensitive(collections.OrderedDict(response.getData()))
else:
raise MarvinError('get_nsa_data: %s', response['error'])
def _check_file_parameters(obj1, obj2):
for param in ['plateifu', 'mangaid', 'plate', '_release', 'drpver', 'dapver']:
assert_msg = ('{0} is different between {1} {2}:\n {1}.{0}: {3} {2}.{0}:{4}'
.format(param, obj1.__repr__, obj2.__repr__, getattr(obj1, param),
getattr(obj2, param)))
assert getattr(obj1, param) == getattr(obj2, param), assert_msg
def add_doc(value):
"""Wrap method to programatically add docstring."""
def _doc(func):
func.__doc__ = value
return func
return _doc
def use_inspect(func):
''' Inspect a function of arguments and keywords.
Inspects a function or class method. Uses a different inspect for Python 2 vs 3
Only tested to work with args and defaults. varargs (variable arguments)
and varkw (keyword arguments) seem to always be empty.
Parameters:
func (func):
The function or method to inspect
Returns:
A tuple of arguments, variable arguments, keywords, and default values
'''
pyver = sys.version_info.major
if pyver == 2:
args, varargs, varkw, defaults = inspect.getargspec(func)
elif pyver == 3:
sig = inspect.signature(func)
args = []
defaults = []
varargs = varkw = None
for par in sig.parameters.values():
# most parameters seem to be of this kind
if par.kind == par.POSITIONAL_OR_KEYWORD:
args.append(par.name)
# parameters with default of inspect empty are required
if par.default != inspect._empty:
defaults.append(par.default)
return args, varargs, varkw, defaults
[docs]def getRequiredArgs(func):
''' Gets the required arguments from a function or method
Uses this difference between arguments and defaults to indicate
required versus optional arguments
Parameters:
func (func):
The function or method to inspect
Returns:
A list of required arguments
Example:
>>> import matplotlib.pyplot as plt
>>> getRequiredArgs(plt.scatter)
>>> ['x', 'y']
'''
args, varargs, varkw, defaults = use_inspect(func)
if defaults:
args = args[:-len(defaults)]
return args
[docs]def getKeywordArgs(func):
''' Gets the keyword arguments from a function or method
Parameters:
func (func):
The function or method to inspect
Returns:
A list of keyword arguments
Example:
>>> import matplotlib.pyplot as plt
>>> getKeywordArgs(plt.scatter)
>>> ['edgecolors', 'c', 'vmin', 'linewidths', 'marker', 's', 'cmap',
>>> 'verts', 'vmax', 'alpha', 'hold', 'data', 'norm']
'''
args, varargs, varkw, defaults = use_inspect(func)
req_args = getRequiredArgs(func)
opt_args = list(set(args) - set(req_args))
return opt_args
[docs]def missingArgs(func, argdict, arg_type='args'):
''' Return missing arguments from an input dictionary
Parameters:
func (func):
The function or method to inspect
argdict (dict):
The argument dictionary to test against
arg_type (str):
The type of arguments to test. Either (args|kwargs|req|opt). Default is required.
Returns:
A list of missing arguments
Example:
>>> import matplotlib.pyplot as plt
>>> testdict = {'edgecolors': 'black', 'c': 'r', 'xlim': 5, 'xlabel': 9, 'ylabel': 'y', 'ylim': 6}
>>> # test for missing required args
>>> missginArgs(plt.scatter, testdict)
>>> {'x', 'y'}
>>> # test for missing optional args
>>> missingArgs(plt.scatter, testdict, arg_type='opt')
>>> ['vmin', 'linewidths', 'marker', 's', 'cmap', 'verts', 'vmax', 'alpha', 'hold', 'data', 'norm']
'''
assert arg_type in ['args', 'req', 'kwargs', 'opt'], 'arg_type must be one of (args|req|kwargs|opt)'
if arg_type in ['args', 'req']:
return set(getRequiredArgs(func)).difference(argdict)
elif arg_type in ['kwargs', 'opt']:
return set(getKeywordArgs(func)).difference(argdict)
[docs]def invalidArgs(func, argdict):
''' Return invalid arguments from an input dictionary
Parameters:
func (func):
The function or method to inspect
argdict (dict):
The argument dictionary to test against
Returns:
A list of invalid arguments
Example:
>>> import matplotlib.pyplot as plt
>>> testdict = {'edgecolors': 'black', 'c': 'r', 'xlim': 5, 'xlabel': 9, 'ylabel': 'y', 'ylim': 6}
>>> # test for invalid args
>>> invalidArgs(plt.scatter, testdict)
>>> {'xlabel', 'xlim', 'ylabel', 'ylim'}
'''
args, varargs, varkw, defaults = use_inspect(func)
return set(argdict) - set(args)
[docs]def isCallableWithArgs(func, argdict, arg_type='opt', strict=False):
''' Test if the function is callable with the an input dictionary
Parameters:
func (func):
The function or method to inspect
argdict (dict):
The argument dictionary to test against
arg_type (str):
The type of arguments to test. Either (args|kwargs|req|opt). Default is required.
strict (bool):
If True, validates input dictionary against both missing and invalid keyword arguments. Default is False
Returns:
Boolean indicating whether the function is callable
Example:
>>> import matplotlib.pyplot as plt
>>> testdict = {'edgecolors': 'black', 'c': 'r', 'xlim': 5, 'xlabel': 9, 'ylabel': 'y', 'ylim': 6}
>>> # test for invalid args
>>> isCallableWithArgs(plt.scatter, testdict)
>>> False
'''
if strict:
return not missingArgs(func, argdict, arg_type=arg_type) and not invalidArgs(func, argdict)
else:
return not invalidArgs(func, argdict)
[docs]def map_bins_to_column(column, indices):
''' Maps a dictionary of array indices to column data
Takes a given data column and a dictionary of indices (see the indices key
from output of the histgram data in :meth:`marvin.utils.plot.scatter.hist`),
and produces a dictionary with the data values from column mapped in
individual bins.
Parameters:
column (list):
A column of data
indices (dict):
A dictionary of providing a list of array indices belonging to each
bin in a histogram.
Returns:
A dictionary containing, for each binid, a list of column data in that bin.
Example:
>>>
>>> # provide a list of data in each bin of an output histogram
>>> x = np.random.random(10)*10
>>> hdata = hist(x, bins=3, return_fig=False)
>>> inds = hdata['indices']
>>> pmap = map_bins_to_column(x, inds)
>>> OrderedDict([(1,
>>> [2.5092488009906235,
>>> 1.7494530589363955,
>>> 2.5070840461208754,
>>> 2.188355400587354,
>>> 2.6987990403658992,
>>> 1.6023553861428441]),
>>> (3, [7.9214280403215875, 7.488908995456573, 7.190598204420587]),
>>> (4, [8.533028236560906])])
'''
assert isinstance(indices, dict) is True, 'indices must be a dictionary of binids'
assert len(column) == sum(map(len, indices.values())), 'input column and indices values must have same len'
coldict = OrderedDict()
colarr = np.array(column)
for key, val in indices.items():
coldict[key] = colarr[val].tolist()
return coldict
def _sort_dir(instance, class_):
"""Sort `dir()` to return child class attributes and members first.
Return the attributes and members of the child class, so that
ipython tab completion lists those first.
Parameters:
instance: Instance of `class_` (usually self).
class_: Class of `instance`.
Returns:
list: Child class attributes and members.
"""
members_array = list(zip(*inspect.getmembers(np.ndarray)))[0]
members_quantity = list(zip(*inspect.getmembers(Quantity)))[0]
members_parents = members_array + members_quantity
return_list = [it[0] for it in inspect.getmembers(class_) if it[0] not in members_parents]
return_list += vars(instance).keys()
return_list += ['value']
return return_list
def _get_summary_path(name, drpver, dapver=None):
''' Return the path for either the DRP or DAP summary file
Parameters:
name (str):
The name of the summary file, either drpall or dapall
drpver (str):
The DRP version
dapver (str):
The DAP version
'''
assert name in ['drpall', 'dapall'], 'name must be either drpall or dapall'
release = marvin.config.lookUpRelease(drpver)
# is_public = 'DR' in release
# path_release = release.lower() if is_public else None
path = Path(release=release)
all_path = path.full(name, drpver=drpver, dapver=dapver)
return all_path
[docs]def get_drpall_path(drpver):
"""Returns the path to the DRPall file for ``(drpver, dapver)``."""
drpall_path = _get_summary_path('drpall', drpver=drpver)
return drpall_path
[docs]def get_dapall_path(drpver, dapver):
"""Returns the path to the DAPall file for ``(drpver, dapver)``."""
dapall_path = _get_summary_path('dapall', drpver, dapver)
return dapall_path
[docs]@contextlib.contextmanager
def turn_off_ion(show_plot=True):
''' Turns off the Matplotlib plt interactive mode
Context manager to temporarily disable the interactive
Matplotlib plotting functionality. Useful for only returning
Figure and Axes objects
Parameters:
show_plot (bool):
If True, turns off the plotting
Example:
>>>
>>> with turn_off_ion(show_plot=False):
>>> do_some_stuff
>>>
'''
plt_was_interactive = plt.isinteractive()
if not show_plot and plt_was_interactive:
plt.ioff()
fignum_init = plt.get_fignums()
yield plt
if show_plot:
plt.ioff()
plt.show()
else:
for ii in plt.get_fignums():
if ii not in fignum_init:
plt.close(ii)
# Restores original ion() status
if plt_was_interactive and not plt.isinteractive():
plt.ion()
[docs]@contextlib.contextmanager
def temp_setattr(ob, attrs, new_values):
""" Temporarily set attributed on an object
Temporarily set an attribute on an object for the duration of the
context manager.
Parameters:
ob (object):
A class instance to set attributes on
attrs (str|list):
A list of attribute names to replace
new_values (list):
A list of new values to set as new attribute. If new_values is
None, all attributes in attrs will be set to None.
Example:
>>> c = Cube(plateifu='8485-1901')
>>> print('before', c.mangaid)
>>> with temp_setattr(c, 'mangaid', None):
>>> # do stuff
>>> print('new', c.mangaid)
>>> print('after' c.mangaid)
>>>
>>> # Output
>>> before '1-209232'
>>> new None
>>> after '1-209232'
>>>
"""
# set up intial inputs
attrs = attrs if isinstance(attrs, list) else [attrs]
if new_values:
new_values = new_values if isinstance(new_values, list) else [new_values]
else:
new_values = [new_values] * len(attrs)
assert len(attrs) == len(new_values), 'attrs and new_values must have the same length'
replaced = []
old_values = []
# grab the old values
for i, attr in enumerate(attrs):
new_value = new_values[i]
replace = False
old_value = None
if hasattr(ob, attr):
try:
if attr in ob.__dict__:
replace = True
except AttributeError:
if attr in ob.__slots__:
replace = True
if replace:
old_value = getattr(ob, attr)
replaced.append(replace)
old_values.append(old_value)
setattr(ob, attr, new_value)
# yield
yield replaced, old_values
# replace the old values
for i, attr in enumerate(attrs):
if not replaced[i]:
delattr(ob, attr)
else:
setattr(ob, attr, old_values[i])
[docs]def map_dapall(header, row):
''' Retrieves a dictionary of DAPall db column names
For a given row in the DAPall file, returns a dictionary
of corresponding DAPall database columns names with the
appropriate values.
Parameters:
header (Astropy header):
The primary header of the DAPall file
row (recarray):
A row of the DAPall binary table data
Returns:
A dictionary with db column names as keys and row data as values
Example:
>>> hdu = fits.open('dapall-v2_3_1-2.1.1.fits')
>>> header = hdu[0].header
>>> row = hdu[1].data[0]
>>> dbdict = map_dapall(header, row)
'''
# get names from header
emline_schannels = []
emline_gchannels = []
specindex_channels = []
for key, val in header.items():
if 'ELS' in key:
emline_schannels.append(val.lower().replace('-', '_').replace('.', '_'))
elif 'ELG' in key:
emline_gchannels.append(val.lower().replace('-', '_').replace('.', '_'))
elif re.search('SPI([0-9])', key):
specindex_channels.append(val.lower().replace('-', '_').replace('.', '_'))
# File column names
names = row.array.names
dbdict = {}
for col in names:
name = col.lower()
shape = row[col].shape if hasattr(row[col], 'shape') else ()
array = ''
values = row[col]
if len(shape) > 0:
channels = shape[0]
for i in range(channels):
channame = emline_schannels[i] if 'emline_s' in name else \
emline_gchannels[i] if 'emline_g' in name else \
specindex_channels[i] if 'specindex' in name else i + 1
colname = '{0}_{1}'.format(name, channame)
dbdict[colname] = values[i]
else:
dbdict[name] = values
return dbdict
def get_virtual_memory_usage_kb():
"""
The process's current virtual memory size in Kb, as a float.
Returns:
A float of the virtual memory usage
"""
assert psutil is not None, 'the psutil python package is required to run this function'
return float(psutil.Process().memory_info().vms) / 1024.0
[docs]def memory_usage(where):
"""
Print out a basic summary of memory usage.
Parameters:
where (str):
A string description of where in the code you are summarizing memory usage
"""
assert pympler is not None, 'the pympler python package is required to run this function'
mem_summary = pympler.summary.summarize(pympler.muppy.get_objects())
print("Memory summary: {0}".format(where))
pympler.summary.print_(mem_summary, limit=2)
print("VM: {0:.2f}Mb".format(get_virtual_memory_usage_kb() / 1024.0))
[docs]def target_status(mangaid, mode='auto', source='nsa', drpall=None, drpver=None):
''' Check the status of a MaNGA target
Given a mangaid, checks the status of a target. Will check if
target exists in the NSA catalog (i.e. is a proper target) and checks if
target has a corresponding plate-IFU designation (i.e. has been observed).
Returns a string status indicating if a target has been observed, has not
yet been observed, or is not a valid MaNGA target.
Parameters:
mangaid (str):
The mangaid of the target to check for observed status
mode ({'auto', 'drpall', 'db', 'remote', 'local'}):
See mode in :func:`mangaid2plateifu` and :func:`get_nsa_data`.
source ({'nsa', 'drpall'}):
The NSA catalog data source. See source in :func:`get_nsa_data`.
drpall (str or None):
The drpall file to use. See drpall in :func:`mangaid2plateifu` and :func:`get_nsa_data`.
drpver (str or None):
The DRP version to use. See drpver in :func:`mangaid2plateifu` and :func:`get_nsa_data`.
Returns:
A status of "observed", "not yet observed", or "not valid target"
'''
# check for plateifu - target has been observed
try:
plateifu = mangaid2plateifu(mangaid, mode=mode, drpver=drpver, drpall=drpall)
except (MarvinError, BrainError) as e:
plateifu = None
# check if target in NSA catalog - proper manga target
try:
nsa = get_nsa_data(mangaid, source=source, mode=mode, drpver=drpver, drpall=drpall)
except (MarvinError, BrainError) as e:
nsa = None
# return observed boolean
if plateifu and nsa:
status = 'observed'
elif not plateifu and nsa:
status = 'not yet observed'
elif not plateifu and not nsa:
status = 'not valid target'
return status
[docs]def target_is_observed(mangaid, mode='auto', source='nsa', drpall=None, drpver=None):
''' Check if a MaNGA target has been observed or not
See :func:`target_status` for full documentation.
Returns:
True if the target has been observed.
'''
# check the target status
status = target_status(mangaid, source=source, mode=mode, drpver=drpver, drpall=drpall)
return status == 'observed'
[docs]def target_is_mastar(plateifu, drpver=None, drpall=None):
''' Check if a target is bright-time MaStar target
Uses the local drpall file to check if a plateifu is a MaStar target
Parameters:
plateifu (str):
The plateifu of the target
drpver (str):
The drpver version to check against
drpall (str):
The drpall file path
Returns:
True if it is
'''
row = get_drpall_row(plateifu, drpver=drpver, drpall=drpall)
return row['srvymode'] == 'APOGEE lead'
[docs]def get_drpall_table(drpver=None, drpall=None, hdu='MANGA'):
''' Gets the drpall table
Gets the drpall table either from cache or loads it. For releases
of MPL-8 and up, galaxies are in the MANGA extension, and mastar
targets are in the MASTAR extension, specified with the hdu keyword. For
MPLs 1-7, there is only one data extension, which is read.
Parameters:
drpver (str):
The DRP release version to load. Defaults to current marvin release
drpall (str):
The full path to the drpall table. Defaults to current marvin release.
hdu (str):
The name of the HDU to read in. Default is 'MANGA'
Returns:
An Astropy Table
'''
from marvin import config
assert hdu.lower() in ['manga', 'mastar'], 'hdu can either be MANGA or MASTAR'
hdu = hdu.upper()
# get the drpall file
get_drpall_file(drpall=drpall, drpver=drpver)
# Loads the drpall table if it was not cached from a previous session.
config_drpver, __ = config.lookUpVersions()
drpver = drpver if drpver else config_drpver
# check for drpver
if drpver not in drpTable:
drpTable[drpver] = {}
# check for hdu
hduext = hdu if check_versions(drpver, 'v2_5_3') else 'MANGA'
if hdu not in drpTable[drpver]:
drpall = drpall if drpall else get_drpall_path(drpver=drpver)
data = {hduext: table.Table.read(drpall, hdu=hduext)}
drpTable[drpver].update(data)
drpall_table = drpTable[drpver][hduext]
return drpall_table
[docs]def get_dapall_table(drpver=None, dapver=None, dapall=None):
''' Gets the dapall table
Gets the dapall table either from cache or loads it. For releases
of MPL-6 and up.
Parameters:
drpver (str):
The DRP release version to load. Defaults to current marvin release
dapall (str):
The full path to the dapall table. Defaults to current marvin release.
Returns:
An Astropy Table
'''
from marvin import config
# get the dapall file
get_dapall_file(dapall=dapall, drpver=drpver, dapver=dapver)
# Loads the dapall table if it was not cached from a previous session.
config_drpver, config_dapver = config.lookUpVersions(config.release)
drpver = drpver if drpver else config_drpver
dapver = dapver if dapver else config_dapver
# check for dapver
if dapver not in dapTable:
dapall = dapall if dapall else get_dapall_path(drpver=drpver, dapver=dapver)
data = table.Table.read(dapall, hdu=1)
dapTable[dapver] = data
dapall_table = dapTable[dapver]
return dapall_table
[docs]def get_plates(drpver=None, drpall=None, release=None):
''' Get a list of unique plates from the drpall file
Parameters:
drpver (str):
The DRP release version to load. Defaults to current marvin release
drpall (str):
The full path to the drpall table. Defaults to current marvin release.
release (str):
The marvin release
Returns:
A list of plate ids
'''
assert not all([drpver, release]), 'Cannot set both drpver and release '
if release:
drpver, __ = marvin.config.lookUpVersions(release)
drpall_table = get_drpall_table(drpver=drpver, drpall=drpall)
plates = list(set(drpall_table['plate']))
return plates
[docs]def check_versions(version1, version2):
''' Compate two version ids against each other
Checks if version1 is greater than or equal to version2.
Parameters:
version1 (str):
The version to check
version2 (str):
The version to check against
Returns:
A boolean indicating if version1 is >= version2
'''
return parse_version(version1) >= parse_version(version2)
[docs]def get_manga_image(cube=None, drpver=None, plate=None, ifu=None, dir3d=None, local=None, public=None):
''' Get a MaNGA IFU optical PNG image
Parameters:
cube (Cube):
A Marvin Cube instance
drpver (str):
The drpver version
plate (str|int):
The plate id
ifu (str|int):
An IFU designation
dir3d (str):
The directory for 3d data, either 'stack' or 'mastar'
local (bool):
If True, return the local file path to the image
public (bool):
If True, use only DR releases
Returns:
A url to an IFU MaNGA image
'''
# check inputs
drpver = cube._drpver if cube else drpver
plate = cube.plate if cube else plate
ifu = cube.ifu if cube else ifu
dir3d = cube.dir3d if cube else dir3d
assert all([drpver, plate, ifu]), 'drpver, plate, and ifu must be specified'
# create the sdss Path
release = cube.release if cube else marvin.config.lookUpRelease(drpver, public_only=public)
path = Path(release=release)
dir3d = dir3d if dir3d else 'stack'
assert dir3d in ['stack', 'mastar'], 'dir3d can only be stack or mastar'
if local:
img = path.full('mangaimage', drpver=drpver, plate=plate, ifu=ifu, dir3d=dir3d)
else:
img = path.url('mangaimage', drpver=drpver, plate=plate, ifu=ifu, dir3d=dir3d)
return img