import os
import decimal
import datetime
import numpy as np
import json
import yaml
from pkg_resources import parse_version
from brain.core.exceptions import BrainError, BrainWarning
from hashlib import md5
from passlib.apache import HtpasswdFile

from flask import url_for
    from urllib import unquote
except ImportError:
    from urllib.parse import unquote

# General utilities

__all__ = ['getDbMachine', 'merge', 'convertIvarToErr', 'compress_data',
           'uncompress_data', 'inspection_authenticate', 'validate_user',
           'get_db_user', 'build_routemap', 'collaboration_authenticate',

[docs]def getDbMachine(): ''' Get the machine that the app is running on. This determines correct database and app configuration ''' # Get machine machine = os.environ.get('HOSTNAME', None) # Check if localhost or not try: localhost = bool(os.environ['MANGA_LOCALHOST']) except: localhost = machine == 'manga' # Check if Utah or not try: utah = os.environ['UUFSCELL'] == 'kingspeak.peaks' except: utah = None # Check if sas-vm or not sasvm = 'sas-vm' in machine if machine else None # Check if lore or not lore = 'lore' in machine if machine else None # Check if jhu or not sciserver = os.environ.get('SCISERVER', None) jhu = sciserver in ['true', 'True', '1'] # Set the dbconfig variable if localhost: return 'local' elif utah or sasvm: return 'utah' elif lore: return 'lore' elif jhu: return 'jhu' else: return None
[docs]def merge(user, default): """Merges a user configuration with the default one. Merges two dictionaries, replacing default values of similar matching keys from user. Parameters: user (dict): A user defined dictionary default (dict): Returns: A new merged dictionary """ if isinstance(user, dict) and isinstance(default, dict): for kk, vv in default.items(): if kk not in user: user[kk] = vv else: user[kk] = merge(user[kk], vv) return user
[docs]def convertIvarToErr(ivar): ''' Converts a list of inverse variance into an a list of standard errors ''' assert isinstance(ivar, (list, np.ndarray)), 'Input ivar is not of type list or an Numpy ndarray' if isinstance(ivar, list): ivar = np.array(ivar) error = np.zeros(ivar.shape) notnull = ivar != 0.0 error[notnull] = 1 / np.sqrt(ivar[notnull]) error = list(error) return error
def alchemyencoder(obj): """JSON encoder function for SQLAlchemy special classes.""" if isinstance(obj, return obj.isoformat() elif isinstance(obj, decimal.Decimal): return float(obj) def _compress_json(data, uncompress=None): ''' Compress/Uncompress JSON data ''' try: if uncompress: comp_data = json.loads(data) else: comp_data = json.dumps(data) except Exception as e: raise BrainError('Cannot (un)compress JSON data. {0}'.format(e)) else: return comp_data def _compress_msgpack(data, uncompress=None): ''' Compress/Uncompress msgpack data ''' # import the package try: import msgpack import msgpack_numpy as m except ImportError as e: compress_with = 'json' raise BrainWarning('Must have Python packages msgpack and msgpack_numpy ' 'installed to use msgpack compression. Defaulting to json') else: m.patch() # do the compression try: if uncompress: comp_data = msgpack.unpackb(data, raw=False) else: comp_data = msgpack.packb(data, use_bin_type=True) except Exception as e: raise BrainError('Cannot (un)compress msgpack data. {0}'.format(e)) else: return comp_data
[docs]def compress_data(data, compress_with=None, uncompress=None): ''' Compress data via json or msgpack Parameters: data (obj) The input data to compress or uncompress compress_with (str): Compression algorithm. Defaults to config.compression. uncompress (bool): If True, uncompresses the data instead of compressing. Default is False Returns: Data compressed with with json or msgpack ''' from brain import bconfig # check compression if not compress_with: compress_with = bconfig.compression if compress_with == 'msgpack': try: import msgpack except ImportError as e: compress_with = 'json' raise BrainWarning('Must have Python packages msgpack and msgpack_numpy ' 'installed to use msgpack compression. Defaulting to json') assert compress_with in bconfig._compression_types, 'compress_with must be one of {0}'.format(bconfig._compression_types) # compress the data if compress_with == 'json': comp_data = _compress_json(data, uncompress=uncompress) elif compress_with == 'msgpack': comp_data = _compress_msgpack(data, uncompress=uncompress) else: raise BrainError('Unrecognized compression algorithm {0}'.format(compress_with)) return comp_data
[docs]def uncompress_data(data, uncompress_with=None): ''' Compress data via json or msgpack Parameters: data (obj) The data to compress uncompress_with (str): Compression algorithm. Defaults to config.compression. Returns: Data compressed with with json or msgpack ''' return compress_data(data, compress_with=uncompress_with, uncompress=True)
[docs]def inspection_authenticate(session, username=None, password=None): ''' Authenticate with Trac using Inspection .. deprecated:: 2.3.0 Use :class:`brain.utils.general.collaboration_authenticate` instead. Parameters: session (dict): A dict or Flask session object to collect parameters username (str): The Trac username password (str): The Trac user password Returns: A dictionary of user info specifying if the user has authenticated and is valid ''' try: from inspection.marvin import Inspection except ImportError as e: from brain.core.inspection import Inspection auth = md5("{0}:AS3Trac:{1}".format(username, password).encode('utf-8')).hexdigest() if username and password else None result = {'is_valid': False} try: inspection = Inspection(session, username=username, auth=auth) except Exception as e: result['status'] = -1 result['message'] = str(e) else: result = inspection.result() result['is_valid'] = inspection.ready return result
[docs]def collaboration_authenticate(username=None, password=None, verbose=None): ''' Authenticate with Trac using Collaboration Authenticate using the SDSS collaboration python package Parameters: username (str): The Trac username password (str): The Trac user password Returns: A dictionary of user info specifying if the user has authenticated and is valid ''' result = {'is_valid': False, 'status': -1} # try to import the package try: from import Credential except ImportError as e: result['message'] = str(e) return result # get credentials try: cred = Credential(username=username, password=password, verbose=verbose) except Exception as e: result['message'] = str(e) else: cred.authenticate_via_trac() result['is_valid'] = cred.authenticated is True if cred.authenticated: result['status'] = 1 cred.set_member() result['member'] = cred.member if cred.member: result['user'] = cred.member.username result['fullname'] = cred.member.fullname else: result['user'] = username return result
[docs]def validate_user(username, password, htpassfile=None, request=None): ''' Validate the User with htpassfile or Trac Tries to validate a user first with a user login from the htpassfile, and second from a Trac wiki account. Parameters: username (str): The login user id password (str): The login user password htpassfile (str): Optional. The full path to an htpassfile. request (Request): The Flask request object Returns: A tuple of (boolean if the user is valid, the username, and the results dictionary) ''' from brain import bconfig result = {} # validate the user with htpassfile or trac username is_valid = False user = None # try from request if not htpassfile and request: htpassfile = request.environ.get('HTPASSFILE', None) # try from brain config if not htpassfile and hasattr(bconfig, '_htpass_path'): htpassfile = bconfig._htpass_path # validate user if username == 'sdss': if htpassfile: htpass = HtpasswdFile(htpassfile) is_valid = htpass.check_password(username, password) user = username else: result['error'] = 'No valid htpasswd file found!' else: result = collaboration_authenticate(username=username, password=password) is_valid = result['is_valid'] user = result.get('user', None) return is_valid, user, result
[docs]def get_db_user(username, password, dbsession=None, user_model=None, request=None): ''' Get a User from a database session Gets a User object from the database User table. If the User does not exists, adds the User. Parameters: username (str): The user id password (str): The user password dbsession (db.Session): The SQLAlchemy database session object user_model (Model): The SQLALchemy User ModelClass request (Request): The Flask request object Returns: The database User object. ''' if not dbsession: return None if user_model: assert hasattr(user_model, 'set_password'), 'User Model must have the set_password method!' assert hasattr(user_model, 'check_password'), 'User Model must have the check_password method!' user = dbsession.query(user_model).filter(user_model.username == username).one_or_none() with dbsession.begin(): if not user: # add new user user = user_model(username=username, login_count=1) user.set_password(password) dbsession.add(user) else: user.update_stats(request=request) return user
[docs]def build_routemap(app): ''' Builds a Flask Web App's dictionary of routes Constructs a dictionary containing all the routes defined inside a given Flask Web App. The route endpoints are deconstructed into a set of nested dictionaries of the form [blueprint][endpoint], which contains a methods and a url key. The url key returns the full route path. E.g. the API route to get a cube, which has a name "getCube" is expressed as ['api']['getCube']. To access the url, ['api']['getCube']['url'] returns "/marvin/api/cubes/{name}/" Parameters: app (Flask Application): The Flask app to extract routes from Returns: A dict of all routes ''' output = {} for rule in app.url_map.iter_rules(): # get options options = {} for arg in rule.arguments: options[arg] = '[{0}]'.format(arg) options['_external'] = False # get endpoint fullendpoint = rule.endpoint esplit = fullendpoint.split('.') grp, endpoint = esplit[0], None if len(esplit) == 1 else esplit[1] output.setdefault(grp, {}).update({endpoint: {}}) # get methods methods = ','.join(rule.methods) output[grp][endpoint]['methods'] = methods # build url try: rawurl = url_for(fullendpoint, **options) except ValueError as e: raise BrainError('Error generating url for {0}: {1}'.format(fullendpoint, e)) url = unquote(rawurl).replace('[', '{').replace(']', '}') output[grp][endpoint]['url'] = url return output
[docs]def get_yaml_loader(): ''' Get a yaml loader based on the yaml package version ''' if parse_version(yaml.__version__) >= parse_version('5.1'): loader = yaml.FullLoader else: loader = yaml.SafeLoader return loader