Source code for marvin.tools.query

#!/usr/bin/env python
# encoding: utf-8

# Licensed under a 3-clause BSD license.

# Revision History:
#     Initial Version: 2016-02-17 14:13:28 by Brett Andrews
#     2016-02-23 - Modified to test a programmatic query using a test sample form - B. Cherinka
#     2016-03-02 - Generalized to many parameters and many forms - B. Cherinka
#                - Added config drpver info
#     2016-03-12 - Changed parameter input to be a natural language string

from __future__ import print_function, division, unicode_literals
from marvin.core.exceptions import MarvinError, MarvinUserWarning, MarvinBreadCrumb
from marvin.utils.general.structs import string_folding_wrapper
from sqlalchemy_boolean_search import parse_boolean_search, BooleanSearchException
from sqlalchemy import func
from marvin import config, marvindb
from marvin.tools.results import Results
from marvin.utils.datamodel.query import datamodel
from marvin.utils.datamodel.query.base import query_params
from marvin.utils.general import temp_setattr
from marvin.api.api import Interaction
from marvin.core import marvin_pickle
from marvin.tools.results import remote_mode_only
from sqlalchemy import bindparam
from sqlalchemy.orm import aliased
from sqlalchemy.dialects import postgresql
from sqlalchemy.sql.expression import desc
from operator import le, ge, gt, lt, eq, ne
from collections import defaultdict, OrderedDict
import datetime
import numpy as np
import warnings
import os
import re
import six
from functools import wraps

try:
    import cPickle as pickle
except:
    import pickle

__all__ = ['Query', 'doQuery']
opdict = {'<=': le, '>=': ge, '>': gt, '<': lt, '!=': ne, '=': eq, '==': eq}

breadcrumb = MarvinBreadCrumb()


def tree():
    return defaultdict(tree)


[docs]def doQuery(*args, **kwargs): """Convenience function for building a Query and retrieving the Results. Parameters: N/A: See the :class:`~marvin.tools.query.Query` class for a list of inputs. Returns: query, results: A tuple containing the built :class:`~marvin.tools.query.Query` instance, and the :class:`~marvin.tools.results.Results` instance. """ start = kwargs.pop('start', None) end = kwargs.pop('end', None) q = Query(*args, **kwargs) try: res = q.run(start=start, end=end) except TypeError as e: warnings.warn('Cannot run, query object is None: {0}.'.format(e), MarvinUserWarning) res = None return q, res
def updateConfig(f): """Decorator that updates query object with new config drpver version.""" @wraps(f) def wrapper(self, *args, **kwargs): if self.query and self.mode == 'local': self.query = self.query.params({'drpver': self._drpver, 'dapver': self._dapver}) return f(self, *args, **kwargs) return wrapper def makeBaseQuery(f): """Decorator that makes the base query if it does not already exist.""" @wraps(f) def wrapper(self, *args, **kwargs): if not self.query and self.mode == 'local': self._createBaseQuery() return f(self, *args, **kwargs) return wrapper def checkCondition(f): """Decorator that checks if filter is set, if it does not already exist.""" @wraps(f) def wrapper(self, *args, **kwargs): if self.mode == 'local' and self.filterparams and not self._alreadyInFilter(self.filterparams.keys()): self.add_condition() return f(self, *args, **kwargs) return wrapper
[docs]class Query(object): ''' A class to perform queries on the MaNGA dataset. This class is the main way of performing a query. A query works minimally by specifying a list of desired parameters, along with a string filter condition in a natural language SQL format. A local mode query assumes a local database. A remote mode query uses the API to run a query on the Utah server, and return the results. By default, the query returns a list of tupled parameters. The parameters are a combination of user-defined parameters, parameters used in the filter condition, and a set of pre-defined default parameters. The object plate-IFU or mangaid is always returned by default. Parameters: returnparams (str list): A list of string parameters names desired to be returned in the query searchfilter (str): A (natural language) string containing the filter conditions in the query; written as you would say it. returntype (str): The requested Marvin Tool object that the results are converted into mode ({'local', 'remote', 'auto'}): The load mode to use. See :doc:`Mode secision tree</mode_decision>`. sort (str): The parameter name to sort the query on order ({'asc', 'desc'}): The sort order. Can be either ascending or descending. limit (int): The number limit on the number of returned results Returns: results: An instance of the :class:`~marvin.tools.query.results.Results` class containing the results of your Query. Example: >>> # filter of "NSA redshift less than 0.1 and IFU names starting with 19" >>> searchfilter = 'nsa.z < 0.1 and ifu.name = 19*' >>> returnparams = ['cube.ra', 'cube.dec'] >>> q = Query(searchfilter=searchfilter, returnparams=returnparams) >>> results = q.run() ''' def __init__(self, *args, **kwargs): self._release = kwargs.pop('release', config.release) self._drpver, self._dapver = config.lookUpVersions(release=self._release) self.query = None self.params = [] self.filterparams = {} self.queryparams = None self.myparamtree = tree() self._paramtree = None self.session = marvindb.session self.filter = None self.joins = [] self.myforms = defaultdict(str) self.quiet = kwargs.get('quiet', None) self._errors = [] self._basetable = None self._modelgraph = marvindb.modelgraph self._returnparams = [] self._caching = kwargs.get('caching', True) self.verbose = kwargs.get('verbose', True) self.count_threshold = kwargs.get('count_threshold', 1000) self.allspaxels = kwargs.get('allspaxels', None) self.mode = kwargs.get('mode', None) self.limit = int(kwargs.get('limit', 100)) self.sort = kwargs.get('sort', 'mangaid') self.order = kwargs.get('order', 'asc') self.return_all = kwargs.get('return_all', False) self.datamodel = datamodel[self._release] self.marvinform = self.datamodel._marvinform # drop breadcrumb breadcrumb.drop(message='Initializing MarvinQuery {0}'.format(self.__class__), category=self.__class__) # set the mode if self.mode is None: self.mode = config.mode if self.mode == 'local': self._doLocal() if self.mode == 'remote': self._doRemote() if self.mode == 'auto': try: self._doLocal() except Exception as e: warnings.warn('local mode failed. Trying remote now.', MarvinUserWarning) self._doRemote() # get return type self.returntype = kwargs.get('returntype', None) # set default parameters self.set_defaultparams() # get user-defined input parameters returnparams = kwargs.get('returnparams', []) if returnparams: self.set_returnparams(returnparams) # if searchfilter is set then set the parameters searchfilter = kwargs.get('searchfilter', None) if searchfilter: self.set_filter(searchfilter=searchfilter) self._isdapquery = self._checkInFilter(name='dapdb') # Don't do anything if nothing specified allnot = [not searchfilter, not returnparams] if not all(allnot) and self.mode == 'local': # create query parameter ModelClasses self._create_query_modelclasses() # this adds spaxel x, y into default for query 1 dap zonal query self._adjust_defaults() # join tables self._join_tables() # add condition if searchfilter: self.add_condition() # add PipelineInfo self._addPipeline() # check if query if a dap query if self._isdapquery: self._buildDapQuery() self._check_dapall_query() def __repr__(self): return ('Marvin Query(filter={4}, mode={0}, limit={1}, sort={2}, order={3})' .format(repr(self.mode), self.limit, self.sort, repr(self.order), self.searchfilter)) def _doLocal(self): ''' Tests if it is possible to perform queries locally. ''' if not config.db or not self.session: warnings.warn('No local database found. Cannot perform queries.', MarvinUserWarning) raise MarvinError('No local database found. Query cannot be run in local mode') else: self.mode = 'local' def _doRemote(self): ''' Sets up to perform queries remotely. ''' if not config.urlmap: raise MarvinError('No URL Map found. Cannot make remote query calls!') else: self.mode = 'remote' def _check_query(self, name): ''' Check if string is inside the query statement ''' qstate = str(self.query.statement.compile(compile_kwargs={'literal_binds':True})) return name in qstate def _checkInFilter(self, name='dapdb'): ''' Check if the given name is in the schema of any of the filter params ''' if self.mode == 'local': fparams = self.marvinform._param_form_lookup.mapToColumn(self.filterparams.keys()) fparams = [fparams] if not isinstance(fparams, list) else fparams inschema = [name in c.class_.__table__.schema for c in fparams] elif self.mode == 'remote': inschema = [] return True if any(inschema) else False def _check_shortcuts_in_filter(self, strfilter): ''' Check for shortcuts in string filter Replaces shortcuts in string searchfilter with the full tables and names. is there a better way? ''' # table shortcuts # for key in self.marvinform._param_form_lookup._tableShortcuts.keys(): # #if key in strfilter: # if re.search('{0}.[a-z]'.format(key), strfilter): # strfilter = strfilter.replace(key, self.marvinform._param_form_lookup._tableShortcuts[key]) # name shortcuts for key in self.marvinform._param_form_lookup._nameShortcuts.keys(): if key in strfilter: # strfilter = strfilter.replace(key, self.marvinform._param_form_lookup._nameShortcuts[key]) param_form_lookup = self.marvinform._param_form_lookup strfilter = re.sub(r'\b{0}\b'.format(key), '{0}'.format(param_form_lookup._nameShortcuts[key]), strfilter) return strfilter def _adjust_defaults(self): ''' Adjust the default parameters to include necessary parameters For any query involving DAP DB, always return the spaxel index TODO: change this to spaxel x and y TODO: change this entirely ''' dapschema = ['dapdb' in c.class_.__table__.schema for c in self.queryparams] if any(dapschema): dapcols = ['spaxelprop.x', 'spaxelprop.y', 'bintype.name', 'template.name'] self.defaultparams.extend(dapcols) self.params.extend(dapcols) self.params = list(OrderedDict.fromkeys(self.params)) self._create_query_modelclasses() # qpdap = self.marvinform._param_form_lookup.mapToColumn(dapcols) # self.queryparams.extend(qpdap) # self.queryparams_order.extend([q.key for q in qpdap])
[docs] def set_returnparams(self, returnparams): ''' Loads the user input parameters into the query params limit Adds a list of string parameter names into the main list of query parameters to return Parameters: returnparams (list): A string list of the parameters you wish to return in the query ''' if returnparams: returnparams = [returnparams] if not isinstance(returnparams, list) else returnparams # look up shortcut names for the return parameters full_returnparams = [self.marvinform._param_form_lookup._nameShortcuts[rp] if rp in self.marvinform._param_form_lookup._nameShortcuts else rp for rp in returnparams] self._returnparams = full_returnparams self.params.extend(full_returnparams)
[docs] def set_defaultparams(self): ''' Loads the default params for a given return type TODO - change mangaid to plateifu once plateifu works in cube, maps, rss, modelcube - file objects spaxel, map, rssfiber - derived objects (no file) these are also the default params except any query on spaxelprop should return spaxel_index (x/y) Minimum parameters to instantiate a Marvin Tool cube - return plateifu/mangaid modelcube - return plateifu/mangaid, bintype, template rss - return plateifu/mangaid maps - return plateifu/mangaid, bintype, template spaxel - return plateifu/mangaid, spaxel x and y map - do not instantiate directly (plateifu/mangaid, bintype, template, property name, channel) rssfiber - do not instantiate directly (plateifu/mangaid, fiberid) return any of our tools ''' assert self.returntype in [None, 'cube', 'spaxel', 'maps', 'rss', 'modelcube'], 'Query returntype must be either cube, spaxel, maps, modelcube, rss' self.defaultparams = ['cube.mangaid', 'cube.plate', 'cube.plateifu', 'ifu.name'] if self.returntype == 'spaxel': pass #self.defaultparams.extend(['spaxel.x', 'spaxel.y']) elif self.returntype == 'modelcube': self.defaultparams.extend(['bintype.name', 'template.name']) elif self.returntype == 'rss': pass elif self.returntype == 'maps': self.defaultparams.extend(['bintype.name', 'template.name']) # self.defaultparams.extend(['spaxelprop.x', 'spaxelprop.y']) # add to main set of params self.params.extend(self.defaultparams)
def _create_query_modelclasses(self): ''' Creates a list of database ModelClasses from a list of parameter names ''' self.params = [item for item in self.params if item in set(self.params)] self.queryparams = self.marvinform._param_form_lookup.mapToColumn(self.params) self.queryparams = [item for item in self.queryparams if item in set(self.queryparams)] self.queryparams_order = [q.key for q in self.queryparams]
[docs] def get_available_params(self, paramdisplay='best'): ''' Retrieve the available parameters to query on Retrieves a list of the available query parameters. Can either retrieve a list of all the parameters or only the vetted parameters. Parameters: paramdisplay (str {all|best}): String indicating to grab either all or just the vetted parameters. Default is to only return 'best', i.e. vetted parameters Returns: qparams (list): a list of all of the available queryable parameters ''' assert paramdisplay in ['all', 'best'], 'paramdisplay can only be either "all" or "best"!' if paramdisplay == 'all': qparams = self.datamodel.groups.list_params('full') elif paramdisplay == 'best': qparams = query_params return qparams
[docs] @remote_mode_only def save(self, path=None, overwrite=False): ''' Save the query as a pickle object Parameters: path (str): Filepath and name of the pickled object overwrite (bool): Set this to overwrite an existing pickled file Returns: path (str): The filepath and name of the pickled object ''' sf = self.searchfilter.replace(' ', '') if self.searchfilter else 'anon' # set the path if not path: path = os.path.expanduser('~/marvin_query_{0}.mpf'.format(sf)) # check for file extension if not os.path.splitext(path)[1]: path = os.path.join(path + '.mpf') path = os.path.realpath(path) if os.path.isdir(path): raise MarvinError('path must be a full route, including the filename.') if os.path.exists(path) and not overwrite: warnings.warn('file already exists. Not overwriting.', MarvinUserWarning) return dirname = os.path.dirname(path) if not os.path.exists(dirname): os.makedirs(dirname) # set bad pickled attributes to None attrs = ['session', 'datamodel', 'marvinform', 'myform', '_modelgraph'] # pickle the query try: with temp_setattr(self, attrs, None): pickle.dump(self, open(path, 'wb'), protocol=-1) except Exception as ee: if os.path.exists(path): os.remove(path) raise MarvinError('Error found while pickling: {0}'.format(str(ee))) return path
[docs] @classmethod def restore(cls, path, delete=False): ''' Restore a pickled object Parameters: path (str): The filename and path to the pickled object delete (bool): Turn this on to delete the pickled fil upon restore Returns: Query (instance): The instantiated Marvin Query class ''' obj = marvin_pickle.restore(path, delete=delete) obj._modelgraph = marvindb.modelgraph obj.session = marvindb.session obj.datamodel = datamodel[obj._release] # if obj.allspaxels: # obj.datamodel.use_all_spaxels() obj.marvinform = obj.datamodel._marvinform return obj
[docs] def set_filter(self, searchfilter=None): ''' Parses a filter string and adds it into the query. Parses a natural language string filter into the appropriate SQL filter syntax. String is a boolean join of one or more conditons of the form "PARAMETER_NAME OPERAND VALUE" Parameter names must be uniquely specified. For example, nsa.z is a unique parameter name in the database and can be specified thusly. On the other hand, name is not a unique parameter name in the database, and must be clarified with the desired table. Parameter Naming Convention: NSA redshift == nsa.z IFU name == ifu.name Pipeline name == pipeline_info.name Allowed Joins: AND | OR | NOT In the absence of parantheses, the precedence of joins follow: NOT > AND > OR Allowed Operands: == | != | <= | >= | < | > | = Notes: Operand == maps to a strict equality (x == 5 --> x is equal to 5) Operand = maps to SQL LIKE (x = 5 --> x contains the string 5; x = '%5%') (x = 5* --> x starts with the string 5; x = '5%') (x = *5 --> x ends with the string 5; x = '%5') Parameters: searchfilter (str): A (natural language) string containing the filter conditions in the query; written as you would say it. Example: >>> # Filter string >>> filter = "nsa.z < 0.012 and ifu.name = 19*" >>> # Converts to >>> and_(nsa.z<0.012, ifu.name=19*) >>> # SQL syntax >>> mangasampledb.nsa.z < 0.012 AND lower(mangadatadb.ifudesign.name) LIKE lower('19%') >>> # Filter string >>> filter = 'cube.plate < 8000 and ifu.name = 19 or not (nsa.z > 0.1 or not cube.ra > 225.)' >>> # Converts to >>> or_(and_(cube.plate<8000, ifu.name=19), not_(or_(nsa.z>0.1, not_(cube.ra>225.)))) >>> # SQL syntax >>> mangadatadb.cube.plate < 8000 AND lower(mangadatadb.ifudesign.name) LIKE lower(('%' || '19' || '%')) >>> OR NOT (mangasampledb.nsa.z > 0.1 OR mangadatadb.cube.ra <= 225.0) ''' if searchfilter: # if params is a string, then parse and filter if isinstance(searchfilter, six.string_types): searchfilter = self._check_shortcuts_in_filter(searchfilter) try: parsed = parse_boolean_search(searchfilter) except BooleanSearchException as e: raise MarvinError('Your boolean expression contained a syntax error: {0}'.format(e)) else: raise MarvinError('Input parameters must be a natural language string!') # update the parameters dictionary self.searchfilter = searchfilter self._parsed = parsed self._checkParsed() self.strfilter = str(parsed) self.filterparams.update(parsed.params) filterkeys = [key for key in parsed.uniqueparams if key not in self.params] self.params.extend(filterkeys) # print filter if not self.quiet: print('Your parsed filter is: ') print(parsed) # Perform local vs remote modes if self.mode == 'local': # Pass into Marvin Forms try: self._setForms() except KeyError as e: self.reset() raise MarvinError('Could not set parameters. Multiple entries found for key. Be more specific: {0}'.format(e)) elif self.mode == 'remote': # Is it possible to build a query remotely but still allow for user manipulation? pass
def _setForms(self): ''' Set the appropriate WTForms in myforms and set the parameters ''' self._paramtree = self.marvinform._paramtree for key in self.filterparams.keys(): self.myforms[key] = self.marvinform.callInstance(self.marvinform._param_form_lookup[key], params=self.filterparams) self.myparamtree[self.myforms[key].Meta.model.__name__][key] def _validateForms(self): ''' Validate all the data in the forms ''' formkeys = list(self.myforms.keys()) isgood = [form.validate() for form in self.myforms.values()] if not all(isgood): inds = np.where(np.invert(isgood))[0] for index in inds: self._errors.append(list(self.myforms.values())[index].errors) raise MarvinError('Parameters failed to validate: {0}'.format(self._errors))
[docs] def add_condition(self): ''' Loop over all input forms and add a filter condition based on the input parameter form data. ''' # validate the forms self._validateForms() # build the actual filter self.build_filter() # add the filter to the query if not isinstance(self.filter, type(None)): self.query = self.query.filter(self.filter)
@makeBaseQuery def _join_tables(self): ''' Build the join statement from the input parameters ''' self._modellist = [param.class_ for param in self.queryparams] # Gets the list of joins from ModelGraph. Uses Cube as nexus, so that # the order of the joins is the correct one. # TODO: at some point, all the queries should be generalised so that # we don't assume that we are querying a cube. joinmodellist = self._modelgraph.getJoins(self._modellist, format_out='models', nexus=marvindb.datadb.Cube) # sublist = [model for model in modellist if model.__tablename__ not in self._basetable and not self._tableInQuery(model.__tablename__)] # self.joins.extend([model.__tablename__ for model in sublist]) # self.query = self.query.join(*sublist) for model in joinmodellist: name = '{0}.{1}'.format(model.__table__.schema, model.__tablename__) if not self._tableInQuery(name): self.joins.append(model.__tablename__) if 'template' not in model.__tablename__: self.query = self.query.join(model) else: # assume template_kin only now, TODO deal with template_pop later self.query = self.query.join(model, marvindb.dapdb.Structure.template_kin)
[docs] def build_filter(self): ''' Builds a filter condition to load into sqlalchemy filter. ''' try: self.filter = self._parsed.filter(self._modellist) except BooleanSearchException as e: raise MarvinError('Your boolean expression could not me mapped to model: {0}'.format(e))
[docs] def update_params(self, param): ''' Update the input parameters ''' # param = {key: unicode(val) if '*' not in unicode(val) else unicode(val.replace('*', '%')) for key, val in param.items() if key in self.filterparams.keys()} param = {key: val.decode('UTF-8') if '*' not in val.decode('UTF-8') else val.replace('*', '%').decode('UTF-8') for key, val in param.items() if key in self.filterparams.keys()} self.filterparams.update(param) self._setForms()
def _update_params(self, param): ''' this is now broken, this should update the boolean params in the filter condition ''' ''' Update any input parameters that have been bound already. Input is a dictionary of key, value pairs representing parameter name to update, and the value (number only) to update. This does not allow to change the operand. Does not update self.params e.g. original input parameters {'nsa.z': '< 0.012'} newparams = {'nsa.z': '0.2'} update_params(newparams) new condition will be nsa.z < 0.2 ''' param = {key: unicode(val) if '*' not in unicode(val) else unicode(val.replace('*', '%')) for key, val in param.items() if key in self.filterparams.keys()} self.query = self.query.params(param) def _alreadyInFilter(self, names): ''' Checks if the parameter name already added into the filter ''' infilter = None if names: if not isinstance(self.query, type(None)): if not isinstance(self.query.whereclause, type(None)): wc = str(self.query.whereclause.compile(dialect=postgresql.dialect(), compile_kwargs={'literal_binds': True})) infilter = any([name in wc for name in names]) return infilter
[docs] @makeBaseQuery @checkCondition @updateConfig def run(self, start=None, end=None, raw=None, orm=None, core=None): ''' Runs a Marvin Query Runs the query and return an instance of Marvin Results class to deal with results. Parameters: start (int): Starting value of a subset. Default is None end (int): Ending value of a subset. Default is None Returns: results (object): An instance of the Marvin Results class containing the results from the Query. ''' if self.mode == 'local': # Check for adding a sort self._sortQuery() # Check to add the cache if self._caching: from marvin.core.caching_query import FromCache self.query = self.query.options(FromCache("default")).\ options(*marvindb.cache_bits) # turn on streaming of results self.query = self.query.execution_options(stream_results=True) # get total count, and if more than 150 results, paginate and only return the first 100 starttime = datetime.datetime.now() # check for query and get count if marvindb.isdbconnected: qm = self._check_history(check_only=True) self.totalcount = qm.count if qm else None # run count if it doesn't exist if self.totalcount is None: self.totalcount = self.query.count() # get the new count if start and end exist if start and end: count = (end - start) else: count = self.totalcount # # run the query # res = self.query.slice(start, end).all() # count = len(res) # self.totalcount = count if not self.totalcount else self.totalcount # check history if marvindb.isdbconnected: query_meta = self._check_history() if count > self.count_threshold and self.return_all is False: # res = res[0:self.limit] start = 0 end = self.limit count = (end - start) warnings.warn('Results contain more than {0} entries. ' 'Only returning first {1}'.format(self.count_threshold, self.limit), MarvinUserWarning) elif self.return_all is True: warnings.warn('Warning: Attempting to return all results. This may take a long time or crash.', MarvinUserWarning) start = None end = None elif start and end: warnings.warn('Getting subset of data {0} to {1}'.format(start, end), MarvinUserWarning) # slice the query query = self.query.slice(start, end) # run the query if not any([raw, core, orm]): raw = True if raw: # use the db api cursor sql = str(self._get_sql(query)) conn = marvindb.db.engine.raw_connection() cursor = conn.cursor('query_cursor') cursor.execute(sql) res = self._fetch_data(cursor) conn.close() elif core: # use the core connection sql = str(self._get_sql(query)) with marvindb.db.engine.connect() as conn: results = conn.execution_options(stream_results=True).execute(sql) res = self._fetch_data(results) elif orm: # use the orm query yield_num = int(10**(np.floor(np.log10(self.totalcount)))) results = string_folding_wrapper(query.yield_per(yield_num), keys=self.params) res = list(results) # get the runtime endtime = datetime.datetime.now() self.runtime = (endtime - starttime) # clear the session self.session.close() # pass the results into Marvin Results final = Results(results=res, query=query, count=count, mode=self.mode, returntype=self.returntype, queryobj=self, totalcount=self.totalcount, chunk=self.limit, runtime=self.runtime, start=start, end=end) # get the final time posttime = datetime.datetime.now() self.finaltime = (posttime - starttime) return final elif self.mode == 'remote': # Fail if no route map initialized if not config.urlmap: raise MarvinError('No URL Map found. Cannot make remote call') if self.return_all: warnings.warn('Warning: Attempting to return all results. This may take a long time or crash.') # Get the query route url = config.urlmap['api']['querycubes']['url'] params = {'searchfilter': self.searchfilter, 'params': ','.join(self._returnparams) if self._returnparams else None, 'returntype': self.returntype, 'limit': self.limit, 'sort': self.sort, 'order': self.order, 'release': self._release, 'return_all': self.return_all, 'start': start, 'end': end, 'caching': self._caching} try: ii = Interaction(route=url, params=params, stream=True) except Exception as e: # if a remote query fails for any reason, then try to clean them up # self._cleanUpQueries() raise MarvinError('API Query call failed: {0}'.format(e)) else: res = ii.getData() self.queryparams_order = ii.results['queryparams_order'] self.params = ii.results['params'] self.query = ii.results['query'] count = ii.results['count'] chunk = int(ii.results['chunk']) totalcount = ii.results['totalcount'] query_runtime = ii.results['runtime'] resp_runtime = ii.response_time if self.return_all: msg = 'Returning all {0} results'.format(totalcount) else: msg = 'Only returning the first {0} results.'.format(count) if not self.quiet: print('Results contain of a total of {0}. {1}'.format(totalcount, msg)) return Results(results=res, query=self.query, mode=self.mode, queryobj=self, count=count, returntype=self.returntype, totalcount=totalcount, chunk=chunk, runtime=query_runtime, response_time=resp_runtime, start=start, end=end)
def _fetch_data(self, obj): ''' Fetch query using fetchall or fetchmany ''' res = [] if not self.return_all: res = obj.fetchall() else: while True: rows = obj.fetchmany(100000) if rows: res.extend(rows) else: break return res def _check_history(self, check_only=None): ''' Check the query against the query history schema ''' sf = self.marvinform._param_form_lookup.mapToColumn('searchfilter') stringfilter = self.searchfilter.strip().replace(' ', '') qm = self.session.query(sf.class_).filter(sf == stringfilter, sf.class_.release == self._release).one_or_none() if check_only: return qm with self.session.begin(): if not qm: qm = sf.class_(searchfilter=stringfilter, n_run=1, release=self._release, count=self.totalcount) self.session.add(qm) else: qm.n_run += 1 return qm def _cleanUpQueries(self): ''' Attempt to clean up idle queries on the server This is a hack to try to kill all idl processes on the server. Using pg_terminate_backend and pg_stat_activity it terminates all transactions that are in an idle, or idle in transaction, state that have running for > 1 minute, and whose application_name is not psql, and the process is not the one initiating the terminate. The rank part ranks the processes and originally killed all > 1, to leave one alive as a warning to the others. I've changed this to 0 to kill everything. I think this will sometimes also leave a newly orphaned idle ROLLBACK transaction. Not sure why. ''' if self.mode == 'local': sql = ("with inactive as (select p.pid, rank() over (partition by \ p.client_addr order by p.backend_start ASC) as rank from \ pg_stat_activity as p where p.application_name !~ 'psql' \ and p.state ilike '%idle%' and p.pid <> pg_backend_pid() and \ current_timestamp-p.state_change > interval '1 minutes') \ select pg_terminate_backend(pid) from inactive where rank > 0;") self.session.expire_all() self.session.expunge_all() res = self.session.execute(sql) tmp = res.fetchall() #self.session.close() #marvindb.db.engine.dispose() elif self.mode == 'remote': # Fail if no route map initialized if not config.urlmap: raise MarvinError('No URL Map found. Cannot make remote call') # Get the query route url = config.urlmap['api']['cleanupqueries']['url'] params = {'task': 'clean', 'release': self._release} try: ii = Interaction(route=url, params=params) except Exception as e: raise MarvinError('API Query call failed: {0}'.format(e)) else: res = ii.getData() def _getIdleProcesses(self): ''' Get a list of all idle processes on server This grabs a list of all processes in a state of idle, or idle in transaction using pg_stat_activity and returns the process id, the state, and the query ''' if self.mode == 'local': sql = ("select p.pid,p.state,p.query from pg_stat_activity as p \ where p.state ilike '%idle%';") res = self.session.execute(sql) procs = res.fetchall() elif self.mode == 'remote': # Fail if no route map initialized if not config.urlmap: raise MarvinError('No URL Map found. Cannot make remote call') # Get the query route url = config.urlmap['api']['cleanupqueries']['url'] params = {'task': 'getprocs', 'release': self._release} try: ii = Interaction(route=url, params=params) except Exception as e: raise MarvinError('API Query call failed: {0}'.format(e)) else: procs = ii.getData() return procs def _sortQuery(self): ''' Sort the query by a given parameter ''' if not isinstance(self.sort, type(None)): # set the sort variable ModelClass parameter if '.' in self.sort: param = self.datamodel.parameters[str(self.sort)].full else: param = self.datamodel.parameters.get_full_from_remote(self.sort) sortparam = self.marvinform._param_form_lookup.mapToColumn(param) # If order is specified, then do the sort if self.order: assert self.order in ['asc', 'desc'], 'Sort order parameter must be either "asc" or "desc"' # Check if order by already applied if 'ORDER' in str(self.query.statement): self.query = self.query.order_by(None) # Do the sorting if 'desc' in self.order: self.query = self.query.order_by(desc(sortparam)) else: self.query = self.query.order_by(sortparam)
[docs] @updateConfig def show(self, prop=None): ''' Prints into to the console Displays the query to the console with parameter variables plugged in. Works only in local mode. Input prop can be one of Can be one of query, tables, joins, or filter. Only works in LOCAL mode. Allowed Values for Prop: query - displays the entire query (default if nothing specified) tables - displays the tables that have been joined in the query joins - same as table filter - displays only the filter used on the query Parameters: prop (str): The type of info to print. Example: TODO add example ''' assert prop in [None, 'query', 'tables', 'joins', 'filter'], 'Input must be query, tables, joins, or filter' if self.mode == 'local': if not prop or 'query' in prop: sql = self._get_sql(self.query) elif prop == 'tables': sql = self.joins elif prop == 'filter': '''oddly this does not update when bound parameters change, but the statement above does ''' sql = self.query.whereclause.compile(dialect=postgresql.dialect(), compile_kwargs={'literal_binds': True}) else: sql = self.__getattribute__(prop) return str(sql) elif self.mode == 'remote': sql = 'Cannot show full SQL query in remote mode, use the Results showQuery' warnings.warn(sql, MarvinUserWarning) return sql
def _get_sql(self, query): ''' Get the sql for a given query Parameters: query (object): An SQLAlchemy Query object Returms: A raw sql string ''' return query.statement.compile(dialect=postgresql.dialect(), compile_kwargs={'literal_binds': True})
[docs] def reset(self): ''' Resets all query attributes ''' self.__init__()
@updateConfig def _createBaseQuery(self): ''' Create the base query session object. Passes in a list of parameters defined in returnparams, filterparams, and defaultparams ''' labeledqps = [qp.label(self.params[i]) for i, qp in enumerate(self.queryparams)] self.query = self.session.query(*labeledqps) def _query_column(self, column_name): ''' query and return a specific column from the current query ''' qp = self.marvinform._param_form_lookup.mapToColumn(column_name) qp = qp.label(column_name) return self.query.from_self(qp).all() def _getPipeInfo(self, pipename): ''' Retrieve the pipeline Info for a given pipeline version name ''' assert pipename.lower() in ['drp', 'dap'], 'Pipeline Name must either be DRP or DAP' # bindparam values bindname = 'drpver' if pipename.lower() == 'drp' else 'dapver' bindvalue = self._drpver if pipename.lower() == 'drp' else self._dapver # class names if pipename.lower() == 'drp': inclasses = self._tableInQuery('cube') or 'cube' in str(self.query.statement.compile()) elif pipename.lower() == 'dap': inclasses = self._tableInQuery('file') or 'file' in str(self.query.statement.compile()) # set alias pipealias = self._drp_alias if pipename.lower() == 'drp' else self._dap_alias # get the pipeinfo if inclasses: pipeinfo = marvindb.session.query(pipealias).\ join(marvindb.datadb.PipelineName, marvindb.datadb.PipelineVersion).\ filter(marvindb.datadb.PipelineName.label == pipename.upper(), marvindb.datadb.PipelineVersion.version == bindparam(bindname, bindvalue)).one() else: pipeinfo = None return pipeinfo def _addPipeline(self): ''' Adds the DRP and DAP Pipeline Info into the Query ''' self._drp_alias = aliased(marvindb.datadb.PipelineInfo, name='drpalias') self._dap_alias = aliased(marvindb.datadb.PipelineInfo, name='dapalias') drppipe = self._getPipeInfo('drp') dappipe = self._getPipeInfo('dap') # Add DRP pipeline version if drppipe: self.query = self.query.join(self._drp_alias, marvindb.datadb.Cube.pipelineInfo).\ filter(self._drp_alias.pk == drppipe.pk) # Add DAP pipeline version if dappipe: self.query = self.query.join(self._dap_alias, marvindb.dapdb.File.pipelineinfo).\ filter(self._dap_alias.pk == dappipe.pk) @makeBaseQuery def _tableInQuery(self, name): ''' Checks if a given SQL table is already in the SQL query ''' # do the check try: isin = name in str(self.query._from_obj[0]) except IndexError as e: isin = False except AttributeError as e: if isinstance(self.query, six.string_types): isin = name in self.query else: isin = False return isin def _group_by(self, params=None): ''' Group the query by a set of parameters Parameters: params (list): A list of string parameter names to group the query by Returns: A new SQLA Query object ''' if not params: params = [d for d in self.defaultparams if 'spaxelprop' not in d] newdefaults = self.marvinform._param_form_lookup.mapToColumn(params) self.params = params newq = self.query.from_self(*newdefaults).group_by(*newdefaults) return newq # ------------------------------------------------------ # DAP Specific Query Modifiers - subqueries, etc go below here # ----------------------------------------------------- def _buildDapQuery(self): ''' Builds a DAP zonal query ''' # get the appropriate Junk (SpaxelProp) ModelClass self._junkclass = self.marvinform.\ _param_form_lookup['spaxelprop.file'].Meta.model # get good spaxels # bingood = self.getGoodSpaxels() # self.query = self.query.\ # join(bingood, bingood.c.binfile == marvindb.dapdb.Junk.file_pk) # check for additional modifier criteria if self._parsed.functions: # loop over all functions for fxn in self._parsed.functions: # look up the function name in the marvinform dictionary try: methodname = self.marvinform._param_fxn_lookup[fxn.fxnname] except KeyError as e: self.reset() raise MarvinError('Could not set function: {0}'.format(e)) else: # run the method methodcall = self.__getattribute__(methodname) methodcall(fxn) def _check_dapall_query(self): ''' Checks if the query is on the DAPall table. ''' isdapall = self._check_query('dapall') if isdapall: self.query = self._group_by() def _getGoodSpaxels(self): ''' Subquery - Counts the number of good spaxels Counts the number of good spaxels with binid != -1 Uses the junk.bindid_pk != 9999 since this is known and set. Removes need to join to the binid table Returns: bincount (subquery): An SQLalchemy subquery to be joined into the main query object ''' spaxelname = self._junkclass.__name__ bincount = self.session.query(self._junkclass.file_pk.label('binfile'), func.count(self._junkclass.pk).label('goodcount')) # optionally add the filter if the table is SpaxelProp if 'CleanSpaxelProp' not in spaxelname: bincount = bincount.filter(self._junkclass.binid != -1) # group the results by file_pk bincount = bincount.group_by(self._junkclass.file_pk).subquery('bingood', with_labels=True) return bincount def _getCountOf(self, expression): ''' Subquery - Counts spaxels satisfying an expression Counts the number of spaxels of a given parameter above a certain value. Parameters: expression (str): The filter expression to parse Returns: valcount (subquery): An SQLalchemy subquery to be joined into the main query object Example: >>> expression = 'junk.emline_gflux_ha_6564 >= 25' ''' # parse the expression into name, operator, value param, ops, value = self._parseExpression(expression) # look up the InstrumentedAttribute, Operator, and convert Value attribute = self.marvinform._param_form_lookup.mapToColumn(param) op = opdict[ops] value = float(value) # Build the subquery valcount = self.session.query(self._junkclass.file_pk.label('valfile'), (func.count(self._junkclass.pk)).label('valcount')).\ filter(op(attribute, value)).\ group_by(self._junkclass.file_pk).subquery('goodhacount', with_labels=True) return valcount
[docs] def getPercent(self, fxn, **kwargs): ''' Query - Computes count comparisons Retrieves the number of objects that have satisfy a given expression in x% of good spaxels. Expression is of the form Parameter Operand Value. This function is mapped to the "npergood" filter name. Syntax: fxnname(expression) operator value Parameters: fxn (str): The function condition used in the query filter Example: >>> fxn = 'npergood(junk.emline_gflux_ha_6564 > 25) >= 20' >>> Syntax: npergood() - function name >>> npergood(expression) operator value >>> >>> Select objects that have Ha flux > 25 in more than >>> 20% of their (good) spaxels. ''' # parse the function into name, condition, operator, and value name, condition, ops, value = self._parseFxn(fxn) percent = float(value) / 100. op = opdict[ops] # Retrieve the necessary subqueries bincount = self._getGoodSpaxels() valcount = self._getCountOf(condition) # Join to the main query self.query = self.query.join(bincount, bincount.c.binfile == self._junkclass.file_pk).\ join(valcount, valcount.c.valfile == self._junkclass.file_pk).\ filter(op(valcount.c.valcount, percent * bincount.c.goodcount)) # Group the results by main defaultdatadb parameters, # so as not to include all spaxels newdefs = [d for d in self.defaultparams if 'spaxelprop' not in d] self.query = self._group_by(params=newdefs)
# newdefaults = self.marvinform._param_form_lookup.mapToColumn(newdefs) # self.params = newdefs # self.query = self.query.from_self(*newdefaults).group_by(*newdefaults) def _parseFxn(self, fxn): ''' Parse a fxn condition ''' return fxn.fxnname, fxn.fxncond, fxn.op, fxn.value def _parseExpression(self, expr): ''' Parse an expression ''' return expr.fullname, expr.op, expr.value def _checkParsed(self): ''' Check the boolean parsed object check for function conditions vs normal. This should be moved into SQLalchemy Boolean Search ''' # Triggers for only one filter and it is a function condition if hasattr(self._parsed, 'fxn'): self._parsed.functions = [self._parsed] # Checks for shortcut names and replaces them in params # now redundant after pre-check on searchfilter for key, val in self._parsed.params.items(): if key in self.marvinform._param_form_lookup._nameShortcuts.keys(): newkey = self.marvinform._param_form_lookup._nameShortcuts[key] self._parsed.params.pop(key) self._parsed.params.update({newkey: val})