# !usr/bin/env python
# -*- coding: utf-8 -*-
#
# Licensed under a 3-clause BSD license.
#
# @Author: José Sánchez-Gallego
# @Date: 2014-04-18 00:21:02
# @Last modified by: Brian Cherinka
# @Last Modified time: 2018-08-08 14:05:43
from __future__ import absolute_import, division, print_function, unicode_literals
import os
import sys
import numpy as np
import requests
import warnings
import PIL
from astropy import table
from astropy.io import ascii
from astropy.wcs import WCS
from marvin.core.exceptions import MarvinError, MarvinUserWarning
from marvin.extern import yanny
if sys.version_info.major == 2:
from cStringIO import StringIO as stringio
else:
from io import StringIO as stringio
from io import BytesIO as bytesio
# cached yanny metrology and plateholes objects
SIMOBJ = None
PLATEHOLES = None
[docs]class Bundle(object):
"""The location, size, and shape of a MaNGA IFU bundle.
A bundle of a given size at the RA/Dec coordinates.
Setting envelope creates the hexagon at the outer boundry of the fibers,
otherwise the hexagon runs through the centers of the outer fibers.
Parameters:
ra (float):
The Right Ascension of the target
dec (float):
The Declination of the target
size (int):
The fiber size of the IFU, e.g. 19 or 127
plateifu (str):
The plateifu of the target
use_envelope (bool):
Expands the hexagon area to include an envelope surrounding the hex border
local (bool):
If True, grabs the fiducial metrology file from a local MANGACORE
Attributes:
fibers (ndarray):
An Nx3 array of IFU fiber coordinates
skies (ndarray):
An Nx2 array of sky fiber coordinates. Default is unloaded. Use :meth:`get_sky_coordinates` to load.
hexagon (ndarray):
An Nx2 array of coordinates defining the hexagon vertices
"""
def __init__(self, ra, dec, size=127, use_envelope=True,
local=None, plateifu=None, **kwargs):
self.ra = float(ra)
self.dec = float(dec)
self.size = size
self.plateifu = plateifu
self.simbMapFile = None
self.plateholes_file = None
if self.size not in [7, 19, 37, 61, 91, 127]:
self.hexagon = None
return
self.simbMap = self._get_simbmap_file(local=local)
# get the fiber coordinates
self.skies = None
self.fibers = self.get_fiber_coordinates(size=size)
# get the hexagon coordinates
self.hexagon = self._calculate_hexagon(use_envelope=use_envelope)
def __repr__(self):
return '<Bundle (ra={0}, dec={1}, ifu={2})>'.format(self.ra, self.dec, self.size)
def _get_a_file(self, filename, local=None):
''' Retrieve a file served locally or remotely over the internet
Will retrieve a local filename or grab the file
contents remotely from the SAS
Parameters:
filename (str):
Full filepath to load
local (bool):
If True, does a local system check
Returns:
A file object to be read in by Yanny
'''
if local:
if not os.path.exists(filename):
raise MarvinError('No {0} file found locally.'.format(filename))
else:
fileobj = filename
else:
r = requests.get(filename)
if not r.ok:
raise MarvinError('Could not retrieve {0} file remotely'.format(filename))
else:
fileobj = stringio(r.content.decode())
return fileobj
def _read_in_yanny(self, filename, local=None):
''' Read in a yanny file object
Reads in a local or remote data object as a Yanny file
Parameters:
filename (str):
Full filepath to load
local (bool):
If True, does a local system check
Returns:
A Yanny object
'''
fileobj = self._get_a_file(filename, local=local)
try:
data = yanny.yanny(fileobj, np=True)
except Exception as e:
raise MarvinError('Cannot read file {0}. {1}'.format(filename, e))
return data
def _get_simbmap_file(self, local=None):
''' Retrieves the metrology file locally or remotely
Reads in fresh metrology data or grabs it from
cache if available
Parameters:
local (bool):
If True, does a local system check
Returns:
The metrology object data
'''
# create the file path
rel_path = u'metrology/fiducial/manga_simbmap_127.par'
if local:
base_path = os.environ['MANGACORE_DIR']
else:
base_path = u'https://svn.sdss.org/public/repo/manga/mangacore/tags/v1_2_3'
self.simbMapFile = os.path.join(base_path, rel_path)
# read in the Yanny object
global SIMOBJ
if SIMOBJ is None:
simbmap_data = self._read_in_yanny(self.simbMapFile, local=local)
SIMOBJ = simbmap_data['SIMBMAP']
return SIMOBJ
def _get_plateholes_file(self, plate=None, local=None):
''' Retrieves the platesholes file locally or remotely
Reads in fresh plateHoles data or grabs it from
cache if available
Parameters:
plate (int):
The plateid to load the plateHoles file for
local (bool):
If True, does a local system check
Returns:
The plateHoles object data
'''
# check for plate
if not plate and not self.plateifu:
raise MarvinError('Must have the plate id to get the correct plateholes file')
elif not plate and self.plateifu:
plate, ifu = self.plateifu.split('-')
# create the file path
pltgrp = '{:04d}XX'.format(int(plate) // 100)
rel_path = u'platedesign/plateholes/{0}/plateHolesSorted-{1:06d}.par'.format(pltgrp, int(plate))
if local:
base_path = os.environ['MANGACORE_DIR']
else:
base_path = u'https://svn.sdss.org/public/repo/manga/mangacore/tags/v1_2_3'
self.platesholes_file = os.path.join(base_path, rel_path)
# read in the Yanny object
global PLATEHOLES
if PLATEHOLES is None:
plateholes_data = self._read_in_yanny(self.platesholes_file, local=local)
PLATEHOLES = plateholes_data['STRUCT1']
return PLATEHOLES
@staticmethod
def _get_sky_fibers(data, ifu):
""" Return the sky fibers associated with each galaxy.
Parameters:
data (object):
the plateHoles object data
ifu (int):
The ifu design of the target
Returns:
A numpy array of tuples of sky fiber RA, Dec coordinates
"""
sky_fibers = data[data['targettype'] == 'SKY']
sky_block = sky_fibers['block'] == int(ifu)
zip_data = list(zip(sky_fibers[sky_block]['target_ra'], sky_fibers[sky_block]['target_dec']))
skies = np.array(zip_data)
return skies
[docs] def get_sky_coordinates(self, plateifu=None):
''' Returns the RA, Dec coordinates of the sky fibers
An Nx2 numpy array, with each row the [RA, Dec] coordinate of
the sky fiber.
Parameters:
plateifu (str):
The plateifu of the target
'''
plateifu = plateifu or self.plateifu
if not plateifu:
raise MarvinError('Need the plateifu to get the corresponding sky fibers')
else:
plate, ifu = plateifu.split('-')
data = self._get_plateholes_file(plate=plate)
self.skies = self._get_sky_fibers(data, ifu)
[docs] def get_fiber_coordinates(self, size=None):
""" Returns the RA, Dec coordinates for each fiber.
Parameters:
size (int):
The IFU size. This extracts only the fibers for the given size
Returns:
An Nx3 numpy array, each row containing [fiberid, RA, Dec]
"""
fibreWorld = np.zeros((len(self.simbMap), 3), float)
raOffDeg = self.simbMap['raoff'] / 3600. / np.cos(self.dec *
np.pi / 180.)
decOffDeg = self.simbMap['decoff'] / 3600.
fibreWorld[:, 0] = self.simbMap['fnumdesign']
fibreWorld[:, 1] = self.ra + raOffDeg
fibreWorld[:, 2] = self.dec + decOffDeg
# extract only the fibers for the specified IFU size
if size:
fibreWorld = fibreWorld[fibreWorld[:, 0] <= size]
return fibreWorld
def _calculate_hexagon(self, use_envelope=True):
""" Calculates the vertices of the bundle hexagon. """
simbMapSize = self.simbMap[self.simbMap['fnumdesign'] <= self.size]
middleRow = simbMapSize[simbMapSize['decoff'] == 0]
topRow = simbMapSize[simbMapSize['decoff'] ==
np.max(simbMapSize['decoff'])]
bottopRow = simbMapSize[simbMapSize['decoff'] ==
np.min(simbMapSize['decoff'])]
vertice0 = middleRow[middleRow['raoff'] == np.max(middleRow['raoff'])]
vertice3 = middleRow[middleRow['raoff'] == np.min(middleRow['raoff'])]
vertice1 = topRow[topRow['raoff'] == np.max(topRow['raoff'])]
vertice2 = topRow[topRow['raoff'] == np.min(topRow['raoff'])]
vertice5 = bottopRow[bottopRow['raoff'] == np.max(bottopRow['raoff'])]
vertice4 = bottopRow[bottopRow['raoff'] == np.min(bottopRow['raoff'])]
hexagonOff = np.array(
[[vertice0['raoff'][0], vertice0['decoff'][0]],
[vertice1['raoff'][0], vertice1['decoff'][0]],
[vertice2['raoff'][0], vertice2['decoff'][0]],
[vertice3['raoff'][0], vertice3['decoff'][0]],
[vertice4['raoff'][0], vertice4['decoff'][0]],
[vertice5['raoff'][0], vertice5['decoff'][0]]])
# This array increases the area of the hexagon so that it is an
# envelope of the bundle.
if use_envelope:
halfFibre = 1.
hexagonExtra = np.array(
[[halfFibre, 0.0],
[halfFibre, halfFibre],
[-halfFibre, halfFibre],
[-halfFibre, 0.0],
[-halfFibre, -halfFibre],
[halfFibre, -halfFibre]])
hexagonOff += hexagonExtra
raOffDeg = hexagonOff[:, 0] / 3600. / np.cos(self.dec * np.pi / 180.)
decOffDeg = hexagonOff[:, 1] / 3600.
hexagon = hexagonOff.copy()
hexagon[:, 0] = self.ra + raOffDeg
hexagon[:, 1] = self.dec + decOffDeg
return hexagon
[docs] def create_DS9_regions(self, outputFile=None):
''' Writes out the bundle positions into a DS9 region file
Parameters:
outputFile (str):
The output filename
'''
if outputFile is None:
outputFile = 'bundlePositionsDS9.reg'
radius = 1. / 3600.
template = """
global color=green font="helvetica 10 normal roman" wcs=wcs
"""
template.strip().replace('\n', ' ')
for fibre in self.fibers:
template += ('\nfk5;circle({0:.10f},{1:.10f},'
'{2:.10f}) #text = {{{3}}}').format(
fibre[1], fibre[2], radius, int(fibre[0]))
template += '\n'
out = open(outputFile, 'w')
out.write(template)
out.close()
[docs] def print_bundle(self):
''' Print the bundle to an Astropy Table '''
tt = table.Table(self.fibers, names=['fnumdesign', 'RA', 'Dec'],
dtype=[int, float, float])
ascii.write(tt, format='fixed_width_two_line',
formats={'RA': '{0:.12f}', 'Dec': '{0:.12f}'})
[docs] def print_hexagon(self):
''' Print the hexagon to an Astropy Table '''
tt = table.Table(self.hexagon, names=['RA', 'Dec'],
dtype=[float, float])
ascii.write(tt, format='fixed_width_two_line',
formats={'RA': '{0:.12f}', 'Dec': '{0:.12f}'})
[docs]class Cutout(object):
""" A Generic SDSS Cutout Image
Tool which allows to generate an image using the SDSS Skyserver
Image Cutout service. See http://skyserver.sdss.org/public/en/help/docs/api.aspx#imgcutout
for details.
Parameters:
ra (float):
The central Right Ascension of the cutout
dec (float):
The central Declination of the cutout
width (int):
width of cutout in arcsec
height (int):
height in cutout in arcsec
scale (float):
pixel scale in arcsec/pixel. Default is 0.198 "/pix.
kwargs:
Allowed boolean keyword arguments to the SDSS Image Cutout Service. Set desired
keyword parameters to True to enable.
Available kwargs are:
- 'photo': PhotoObjs
- 'bound': BoundingBox
- 'masks': Masks
- 'grid': Grid
- 'outline': Outline
- 'target': TargetObjs
- 'fields': Fields
- 'invert': Invert Image
- 'spectra': SpecObjs
- 'label': Label
- 'plates': Plates
Attributes:
rawurl (str):
The raw url of the cutout
wcs (WCS):
The WCS of the generated image
image (PIL.Image):
The cutout image object
size (int):
The image size in arcsec
size_pix (int):
The image size in pixels
center (tuple):
The image center RA, Dec
"""
def __init__(self, ra, dec, width, height, scale=None, **kwargs):
self.rawurl = ("http://skyserver.sdss.org/public/SkyServerWS/ImgCutout/getjpeg?"
"ra={ra}&dec={dec}&scale={scale}&width={width_pix}&height={height_pix}&opt={opts}&query=")
self.ra = ra
self.dec = dec
self.scale = scale or 0.198 # default arcsec/pixel
self.image = None
self.center = np.array([ra, dec])
self.size = np.array([width, height], dtype=int)
self.coords = {'ra': ra, 'dec': dec,
'width': width, 'height': height,
'scale': self.scale}
self._get_pix_size()
if max(self.size_pix) >= 2048:
raise MarvinError('Requested image size is too large. '
'The Skyserver image cutout can only return a size up to 2048 pixels')
self._define_wcs()
self._get_cutout(**kwargs)
def __repr__(self):
return ('<Cutout (ra={0}, dec={1}, scale={2}, height={3}, '
'width={4})>'.format(self.ra, self.dec, self.scale, *self.size_pix))
def _get_pix_size(self):
"""height,width converted from arcsec->pixels"""
self.coords['height_pix'] = int(round(self.coords['height'] / self.scale))
self.coords['width_pix'] = int(round(self.coords['width'] / self.scale))
self.size_pix = np.array((self.coords['height_pix'], self.coords['width_pix']))
def _define_wcs(self):
"""
Given what we know about the scale of the image,
define a nearly-correct world coordinate system to use with it.
"""
w = WCS(naxis=2)
w.wcs.crpix = self.size_pix / 2
w.wcs.crval = self.center
w.wcs.cd = np.array([[-1, 0], [0, 1]]) * self.scale / 3600.
w.wcs.ctype = ['RA---TAN', 'DEC--TAN']
w.wcs.cunit = ['deg', 'deg']
w.wcs.radesys = 'ICRS'
w.wcs.equinox = 2000.0
self.wcs = w
def _wcs_to_dict(self):
''' Convert and return the WCS as a dictionary'''
wcshdr = None
if self.wcs:
wcshdr = self.wcs.to_header()
wcshdr = dict(wcshdr)
wcshdr = {key: str(val) for key, val in wcshdr.items()}
return wcshdr
def _make_metadata(self, filetype=None):
''' Make the meta data for the image '''
if 'png' in filetype:
meta = PIL.PngImagePlugin.PngInfo()
else:
meta = None
warnings.warn('Can only save WCS metadata with PNG filetype', MarvinUserWarning)
if meta:
info = {key: str(val) for key, val in self.image.info.items()}
for row in info:
meta.add_text(row, info[row])
return meta
def _update_info(self):
''' Update the image info dictionary '''
for key, value in self.image.info.items():
if isinstance(value, tuple):
self.image.info[key] = value[0]
wcsdict = self._wcs_to_dict()
self.image.info = wcsdict
self.image.info.update(self.coords)
self.image.info['wdthpix'] = self.image.info.pop('width_pix')
self.image.info['hghtpix'] = self.image.info.pop('height_pix')
def _add_options(self, **kwargs):
allowed = {'grid': 'G', 'label': 'L', 'photo': 'P', 'spectra': 'S',
'target': 'T', 'outline': 'O', 'bound': 'B', 'fields': 'F',
'masks': 'M', 'plates': 'Q', 'invert': 'I'}
opts = []
for key, value in kwargs.items():
assert key in allowed.keys(), 'Cutout keyword must be one of: {0}'.format(allowed.keys())
assert isinstance(value, (bool, type(None))), 'Cutout value can only be a Boolean'
if value:
opts.append(allowed[key])
self.coords['opts'] = ''.join(opts)
def _get_cutout(self, **kwargs):
""" Gets an image cutout
Get a cutout around a point, centered at some RA, Dec (in decimal
degrees), and spanning width,height (in arcseconds) in size.
Parameters:
kwargs:
Allowed keywords into the SDSS Skyserver Image Cutout
"""
# add options
self._add_options(**kwargs)
# retrieve the image
url = self.rawurl.format(**self.coords)
response = requests.get(url)
if not response.ok:
raise MarvinError('Cannot retrieve image cutout')
else:
base_image = response.content
ioop = stringio if sys.version_info.major == 2 else bytesio
self.image = PIL.Image.open(ioop(base_image))
self._update_info()
[docs] def save(self, filename, filetype='png'):
''' Save the image cutout to a file
If the filetype is PNG, it will also save the WCS and coordinate
information as metadata in the image.
Parameters:
filename (str):
The output filename
filetype (str):
The output file extension
'''
filename, fileext = os.path.splitext(filename)
extlist = ['.png', '.bmp', '.jpg', '.jpeg', '.tiff', '.gif', '.ppm']
assert fileext.lower() in extlist, 'Specified filename not of allowed image type: png, gif, tiff, jpeg, bmp'
meta = self._make_metadata(filetype=filetype)
self.image.save(filename, filetype, pnginfo=meta)
[docs] def show(self):
''' Show the image cutout '''
if self.image:
self.image.show()