Source code for dl.queryClient

#!/usr/bin/env python
#
# QUERYCLIENT -- Client routines for the Data Lab Query Manager Service


from __future__ import print_function

__authors__ = 'Mike Fitzpatrick <fitz@noao.edu>, Matthew Graham <graham@noao.edu>, Data Lab <datalab@noao.edu>'
__version__ = 'v2.18.12'


'''
    Client methods for the DataLab Query Manager Service.


Query Manager Client Interface
------------------------------
               isAlive  (svc_url=DEF_SERVICE_URL, timeout=2)
           set_svc_url  (svc_url)
           get_svc_url  ()
           set_profile  (profile)
           get_profile  ()
         list_profiles  (optval, token=None, profile=None, format='text')
         list_profiles  (token=None, profile=None, format='text')

   set_timeout_request  (nsec)
   get_timeout_request  ()
                schema  (value, format='text', profile=None)
                schema  (value='', format='text', profile=None)
              services  (name=None, svc_type=None, format=None,
                         profile='default')

                 query  (token, query, adql=None, sql=None, fmt='csv', out=None,
                         async_=False, profile='default', **kw)
                 query  (optval, adql=None, sql=None, fmt='csv', out=None,
                         async_=False, profile='default', **kw)
                 query  (token=None, adql=None, sql=None, fmt='csv', out=None,
                         async_=False, profile='default', **kw)
                status  (token, jobId)
                status  (optval, jobId=None)
                status  (token=None, jobId=None)
                  jobs  (token, jobId, status='all', option='list')
                  jobs  (optval, jobId=None, status='all', option='list')
                  jobs  (token=None, jobId=None, status='all', option='list')
               results  (token, jobId, delete=True)
               results  (optval, jobId=None, delete=True)
               results  (token=None, jobId=None, delete=True)
                 error  (token, jobId)
                 error  (optval, jobId=None)
                 error  (token=None, jobId=None)
                 abort  (token, jobId)
                 abort  (optval, jobId=None)
                 abort  (token=None, jobId=None)
                  wait  (token, jobId, wait=3, verbose=False)
                  wait  (optval, jobId=None, wait=3, verbose=False)
                  wait  (token=None, jobId=None, wait=3, verbose=False)

             mydb_list  (optval, table=None)
             mydb_list  (table=None, token=None)
           mydb_create  (token, table, schema, **kw)
           mydb_create  (table, schema, token=None, **kw)
           mydb_insert  (token, table, data, **kw)
           mydb_insert  (table, data, token=None, **kw)
           mydb_import  (token, table, data, **kw)
           mydb_import  (table, data, token=None, **kw)
         mydb_truncate  (token, table)
         mydb_truncate  (table, token=None)
            mydb_index  (token, table, column)
            mydb_index  (table, column, token=None)
             mydb_drop  (token, table)
             mydb_drop  (table, token=None)
           mydb_rename  (token, source, target)
           mydb_rename  (source, target, token=None)
             mydb_copy  (token, source, target)
             mydb_copy  (source, target, token=None)

                  list  (token, table)			# DEPRECATED
                  list  (optval, table=None)		# DEPRECATED
                  list  (table=None, token=None)	# DEPRECATED
                  drop  (token, table)			# DEPRECATED
                  drop  (optval, table=None)		# DEPRECATED
                  drop  (table=None, token=None)	# DEPRECATED


Import via

.. code-block:: python

    from dl import queryClient
'''

import requests
try:
    from urllib import quote_plus               # Python 2
except ImportError:
    from urllib.parse import quote_plus         # Python 3
try:
    from cStringIO import StringIO
except:
    from io import StringIO			# Python 2/3 compatible
from io import BytesIO
import socket
import json
import time
import os
import sys
import collections
import ast, csv
import pandas
from tempfile import NamedTemporaryFile

from dl import resClient
from dl import storeClient
from dl.helpers.utils import convert
if os.path.isfile('./Util.py'):			# use local dev copy
    from Util import multimethod
    from Util import def_token
else:						# use distribution copy
    from dl.Util import multimethod
    from dl.Util import def_token

is_py3 = sys.version_info.major == 3



# ####################################
#  Query Manager Configuration
# ####################################


# The URL of the QueryManager service to contact.  This may be changed by
# passing a new URL into the set_svc_url() method before beginning.

DEF_SERVICE_ROOT = 'https://datalab.noao.edu'
DAL_SERVICE_URL = 'https://datalab.noao.edu' 	# The base DAL service URL

# Allow the service URL for dev/test systems to override the default.
THIS_HOST = socket.gethostname()			# host name
sock = socket.socket(type=socket.SOCK_DGRAM)     # host IP address
sock.connect(('8.8.8.8', 1))        # Example IP address, see RFC 5737
THIS_IP, _ = sock.getsockname()


if THIS_HOST[:5] == 'dldev':
    DEF_SERVICE_ROOT = 'http://dldev.datalab.noao.edu'
elif THIS_HOST[:6] == 'dltest':
    DEF_SERVICE_ROOT = 'http://dltest.datalab.noao.edu'

DEF_SERVICE_URL = DEF_SERVICE_ROOT + '/query'
SM_SERVICE_URL  = DEF_SERVICE_ROOT + '/storage'
RM_SERVICE_URL  = DEF_SERVICE_ROOT + '/res'

# The requested query 'profile'.  A profile refers to the specific
# machines and services used by the QueryManager on the server.
DEF_PROFILE 	= 'default'

# Use a /tmp/QM_DEBUG file as a way to turn on debugging in the client code.
DEBUG 		= os.path.isfile('/tmp/QM_DEBUG')

# Default sync query timeout default (300sec)
TIMEOUT_REQUEST = 300



# ####################################################################
#  Query Client error class
# ####################################################################

[docs]class queryClientError(Exception): def __init__(self, message): self.message = message def __str__(self): return self.message
# #################################################################### # Module Functions # #################################################################### # -------------------------------------------------------------------- # ISALIVE -- Ping the Query Manager service to see if it responds. #
[docs]def isAlive(svc_url=DEF_SERVICE_URL, timeout=5): return qc_client.isAlive(svc_url=svc_url, timeout=timeout)
# -------------------------------------------------------------------- # SET_SVC_URL -- Set the Query Manager ServiceURL to call. #
[docs]def set_svc_url(svc_url): return qc_client.set_svc_url(svc_url)
# -------------------------------------------------------------------- # GET_SVC_URL -- Get the Query Manager ServiceURL being called. #
[docs]def get_svc_url(): return qc_client.get_svc_url()
# -------------------------------------------------------------------- # SET_PROFILE -- Set the Query Manager service profile to be used. #
[docs]def set_profile(profile): return qc_client.set_profile(profile)
# -------------------------------------------------------------------- # GET_PROFILE -- Get the Query Manager service profile being used. #
[docs]def get_profile(): return qc_client.get_profile()
# ----------------------------- # Utility Functions # ----------------------------- # -------------------------------------------------------------------- # LIST_PROFILES -- List the available service profiles. # @multimethod('qc',1,False) def list_profiles(optval, token=None, profile=None, format='text'): if optval is not None and len(optval.split('.')) >= 4: # optval looks like a token return qc_client._list_profiles (token=def_token(optval), profile=profile, format=format) else: # optval looks like a profile name return qc_client._list_profiles (token=def_token(token), profile=optval, format=format)
[docs]@multimethod('qc',0,False) def list_profiles(token=None, profile=None, format='text'): '''Retrieve the profiles supported by the query manager service. Usage: list_profiles (token=None, profile=None, format='text') MultiMethod Usage: ------------------ queryClient.list_profiles (token) queryClient.list_profiles () Parameters ---------- token : str Authentication token (see function :func:`authClient.login()`) profile : str A specific profile configuration to list. If None, a list of profiles available to the given auth token is returned. format : str Result format: One of 'text' or 'json' Returns ------- profiles : list/dict A list of the names of the supported profiles or a dictionary of the specific profile Example ------- .. code-block:: python profiles = queryClient.list_profiles() profiles = queryClient.list_profiles(token) ''' return qc_client._list_profiles (token=def_token(token), profile=profile, format=format)
# -------------------------------------------------------------------- # SET_TIMEOUT_REQUEST -- Set the Synchronous query timeout value (in sec). #
[docs]def set_timeout_request(nsec): return qc_client.set_timeout_request (nsec)
# -------------------------------------------------------------------- # GET_TIMEOUT_REQUEST -- Get the Synchronous query timeout value (in sec). #
[docs]def get_timeout_request(): return qc_client.get_timeout_request ()
# -------------------------------------------------------------------- # SCHEMA -- Return information about a data service schema value. # @multimethod('qc',1,False) def schema(value, format='text', profile=None): return qc_client._schema (value=value, format=format, profile=profile)
[docs]@multimethod('qc',0,False) def schema(value='', format='text', profile=None): '''Return information about a data service schema. Usage: schema (value='', format='text', profile=None) Parameters ---------- value : str Schema object to return: Of the form <schema>[.<table>[.<column]] profile : str The name of the service profile to use. The list of available profiles can be retrieved from the service (see function :func:`queryClient.list_profiles()`) format : str Result format: One of 'text' or 'json' (NOT CURRENTLY USED) Returns ------- Example ------- .. code-block:: python # List the available schema queryClient.schema("", "text", "default") # List the tables in the USNO schema queryClient.schema("usno", "text", "default") # List the columns of the USNO-A2 table queryClient.schema("usno.a2", "text", "default") # List the attributes of the USNO-A2 'raj2000' column queryClient.schema("usno.a2.raj2000", "text", "default") ''' return qc_client._schema (value=value, format=format, profile=profile)
# -------------------------------------------------------------------- # SERVICES -- List public storage services #
[docs]def services(name=None, svc_type=None, mode='list', profile='default'): '''Search or list available data services. Usage: services (name=None, svc_type=None, mode='list', profile='default') Parameters ---------- name : str Schema object to return: Of the form <schema>[.<table>[.<column]] svc_type : str Limit results to specified service type. Supported options are 'tap', 'sia', 'scs', or 'vos'. mode : str Query mode: profile : str The name of the service profile to use. The list of available profiles can be retrieved from the service (see function :func:`queryClient.list_profiles()`) Returns ------- If mode is 'list' then a human-readable list of matching services is returned. If mode is 'resolve' then a JSON string of matching services is returned un the form "{<svc_name> : <svc_url>, ....}" Example ------- .. code-block:: python # List the available SIA services queryClient.services(svc_type="sia") # List the available USNO services, note the '%' matching metacharacter queryClient.services(name="usno%") # Get the serviceURL of the USNO-A2 table queryClient.services(name="usno/a2", mode="resolve") ''' return qc_client.services (name=name, svc_type=svc_type, mode=mode, profile=profile)
# ----------------------------- # Query Functions # ----------------------------- # -------------------------------------------------------------------- # QUERY -- Send a query to the Query Manager service # @multimethod('qc',2,False) def query(token, query, adql=None, sql=None, fmt='csv', out=None, async_=False, drop=False, profile='default', **kw): return qc_client._query (token=def_token(token), adql=adql, sql=query, fmt=fmt, out=out, async_=async_, drop=drop, profile=profile, **kw) @multimethod('qc',1,False) def query(optval, adql=None, sql=None, fmt='csv', out=None, async_=False, drop=False, token=None, profile='default', **kw): if optval is not None and optval.lower()[:6] == 'select': # optval looks like a query string return qc_client._query (token=def_token(None), adql=adql, sql=optval, fmt=fmt, out=out, async_=async_, drop=drop, profile=profile, **kw) else: # optval is (probably) a token return qc_client._query (token=def_token(optval), adql=adql, sql=sql, fmt=fmt, out=out, async_=async_, drop=drop, profile=profile, **kw)
[docs]@multimethod('qc',0,False) def query(token=None, adql=None, sql=None, fmt='csv', out=None, async_=False, drop=False, profile='default', **kw): '''Send an SQL or ADQL query to the database or TAP service. Usage: query (token=None, adql=None, sql=None, fmt='csv', out=None, async_=False, drop=False, profile='default', **kw): MultiMethod Usage: ------------------ queryClient.query (token, query, <args>) queryClient.query (token | query, <args>) Parameters ---------- token : str Secure token obtained via :func:`authClient.login()` adql : str or None ADQL query string that will be passed to the DB query manager, e.g. .. code-block:: python adql='select top 3 ra,dec from gaia_dr1.gaia_source' sql : str or None SQL query string that will be passed to the DB query manager, e.g. .. code-block:: python sql='select ra,dec from gaia_dr1.gaia_source limit 3' fmt : str Format of result to be returned by the query. Permitted values are: * 'csv' The returned result is a comma-separated string that looks like a csv file (newlines at the end of every row) [DEFAULT] * 'csv-noheader' A csv result with no column headers (data only) * 'ascii' Same, but the column separator is a tab \t * 'array' Returns a NumPy array * 'pandas' Returns a Pandas DataFrame * 'structarray' Numpy structured array (aka 'record array') * 'table' Returns an Astropy Table object The following formats may be used when saving a file to virtual storage on the server: * 'fits' FITS binary * 'hdf5' HDF5 file (NOT YET IMPLEMENTED) * 'votable' An XML-formatted VOTable out : str or None The output filename to create on the local machine, the URI of a VOSpace or MyDB resource to create, or ``None`` if the result is to be returned directly to the caller. async_ : bool If ``True``, the query is Asynchronous, i.e. a job is submitted to the DB, and a jobID token is returned the caller. The jobID must be then used to check the query's status and to retrieve the result (when the job status is ``COMPLETED``) or the error message (when the job status is ``ERROR``). Default is ``False``, i.e. the task runs a Synchroneous query. ``async_`` replaces the previous ``async`` parameter, because ``async`` was promoted to a keyword in Python 3.7. Users of Python versions prior to 3.7 can continue to use the ``async`` keyword. drop : bool If ``True``, then if the query is saving to mydb where the same table name already exists, it will overwrite the old mydb table. profile : str or None The Query Manager profile to use for this call. If ``None`` then the default profile is used. Available profiles may be listed using the :func:`queryClient.list_profiles()` **kw : dict Optional keyword arguments. Supported keywords currently include: wait = False Wait for asynchronous queries to complete? If enabled, the query() method will submit the job in async mode and then poll for results internally before returning. the default is to return the job ID immediately and let the client poll for job status and return results. timeout = 300 Requested timeout (in seconds) for a query. For a Sync query, this value sets a session timeout request in the database that will abort the query at the specified time. A maximum value of 600 seconds is permitted. If the ``wait`` option is enabled for an ASync query, this is the maximum time the query will be allowed to run before an abort() is issued on the job. The maximum timeout for an ASync job is 24-hrs (86400 sec). poll = 1 ASync job polling time in seconds. verbose = False Print verbose messages during ASync job. Returns ------- result : str If ``async=False``, the return value is the result of the query as a formatted string (see ``fmt``). Otherwise the result string is a job token, with which later the Asynchroneaous query's status can be checked (:func:`queryClient.status()`), and the result retrieved (see :func:`queryClient.result()`. Example ------- Get security token first, see :func:`authClient.login()`. Then: .. code-block:: python query = 'select ra,dec from gaia_dr1.gaia_source limit 3' response = queryClient.query(token, adql=query, fmt='csv') print response This prints .. code:: ra,dec 315.002571989537842,35.2662974820284489 315.00408275885701,35.2665448169895797 314.996334457679438,35.2673478725552698 ''' return qc_client._query (token=def_token(token), adql=adql, sql=sql, fmt=fmt, out=out, async_=async_, drop=drop, profile=profile, **kw)
# -------------------------------------------------------------------- # STATUS -- Get the status of an Asynchronous query # @multimethod('qc',2,False) def status(token, jobId): return qc_client._status (token=def_token(token), jobId=jobId) @multimethod('qc',1,False) def status(optval, jobId=None): if optval is not None and len(optval.split('.')) >= 4: # optval looks like a token return qc_client._status (token=def_token(optval), jobId=jobId) else: # optval is probably a jobId return qc_client._status (token=def_token(None), jobId=optval)
[docs]@multimethod('qc',0,False) def status(token=None, jobId=None): '''Get the status of an asynchronous query. Usage: status (token=None, jobId=None) MultiMethod Usage: ------------------ queryClient.status (jobId) queryClient.status (token, jobId) queryClient.status (token, jobId=<id>) queryClient.status (jobId=<str>) Use the authentication token and the jobId of a previously issued asynchronous query to check the query's current status. Parameters ---------- token : str Authentication token (see function :func:`authClient.login()`) jobId : str The jobId returned when issuing an asynchronous query via :func:`queryClient.query()` with ``async=True``. Returns ------- status : str Either 'QUEUED' or 'EXECUTING' or 'COMPLETED'. If the token & jobId combination does not correspond to an actual job, then a HTML-formatted error message is returned. If there is a problem with the backend, the returned value can be 'ERROR'. When status is 'COMPLETED', you can retrieve the results of the query via :func:`queryClient.results()` Example ------- .. code-block:: python import time query = 'select ra,dec from gaia_dr1.gaia_source limit 200000' jobId = queryClient.query(token, adql=query, fmt='csv', async=True) while True: status = queryClient.status(token, jobId) print "time index =", time.localtime()[5], " status =", status if status == 'COMPLETED': break time.sleep(1) This prints .. code:: time index = 16 status = EXECUTING time index = 17 status = EXECUTING time index = 18 status = COMPLETED ''' return qc_client._status (token=def_token(token), jobId=jobId)
# -------------------------------------------------------------------- # JOBS -- Get a list of the user's Async jobs. # @multimethod('qc',2,False) def jobs(token, jobId, format='text', status='all', option='list'): return qc_client._jobs (token=def_token(token), jobId=jobId, format=format, status=status, option=option) @multimethod('qc',1,False) def jobs(optval, jobId=None, format='text', status='all', option='list'): if optval is not None and len(optval.split('.')) >= 4: # optval looks like a token return qc_client._jobs (token=def_token(optval), jobId=jobId, format=format, status=status, option=option) else: # optval is probably a jobId return qc_client._jobs (token=def_token(None), jobId=optval, format=format, status=status, option=option)
[docs]@multimethod('qc',0,False) def jobs(token=None, jobId=None, format='text', status='all', option='list'): '''Get a list of the user's Async jobs. Usage: jobs (token=None, jobId=None, format='text', status='all') MultiMethod Usage: ------------------ queryClient.jobs (jobId) queryClient.jobs (token, jobId) queryClient.jobs (token, jobId=<id>) queryClient.jobs (jobId=<str>) Use the authentication token and the jobId of a previously issued asynchronous query to check the query's current status. Parameters ---------- token : str Authentication token (see function :func:`authClient.login()`) jobId : str The jobId returned when issuing an asynchronous query via :func:`queryClient.query()` with ``async=True``. format : str Format of the result. Support values include 'text' for a simple formatted table suitable for printing, or 'json' for a JSON string of the full matching record(s). status : str If status='all' then all async jobs are returned, otherwise this value may be used to return only those jobs with the specified status. Allowed values are: all Return all jobs EXECUTING Job is still running COMPLETED Job completed successfully ERROR Job exited with an error ABORTED Job was aborted by the user option : str If 'list' then the matching records are returned, if 'delete' then the records are removed from the database (e.g. to clear up long job lists of completed jobs). Returns ------- joblist : str Returns a list of async query jobs submitted by the user in the last 30 days, possibly filtered by the 'status' parameter. The 'json' format option allows the caller to format the full contents of the job record beyond the supplied simple 'text' option. Example ------- .. code-block:: python print (queryClient.jobs(token, jobId)) This prints .. code:: JobID Start End Status tfu8zpn2tkrlfyr9e 07-22-20T13:10:22 07-22-20T13:34:12 COMPLETED k8uznptrkkl29ryef 07-22-20T14:09:45 EXECUTING : : : : ''' return qc_client._jobs (token=def_token(token), jobId=jobId, format=format, status=status, option=option)
# -------------------------------------------------------------------- # RESULTS -- Get the results of an Asynchronous query # @multimethod('qc',2,False) def results(token, jobId, delete=True): return qc_client._results (token=def_token(token), jobId=jobId, delete=True) @multimethod('qc',1,False) def results(optval, jobId=None, delete=True): if optval is not None and len(optval.split('.')) >= 4: # optval looks like a token return qc_client._results (token=def_token(optval), jobId=jobId, delete=delete) else: # optval is probably a jobId return qc_client._results (token=def_token(None), jobId=optval, delete=delete)
[docs]@multimethod('qc',0,False) def results(token=None, jobId=None, delete=True): '''Retrieve the results of an asynchronous query, once completed. Usage: results (token=None, jobId=None, delete=True) MultiMethod Usage: ------------------ queryClient.results (jobId) queryClient.results (token, jobId) queryClient.results (token, jobId=<id>) queryClient.results (jobId=<str>) Parameters ---------- token : str Authentication token (see function :func:`authClient.login()`) jobId : str The jobId returned when issuing an asynchronous query via :func:`queryClient.query()` with ``async=True``. Returns ------- results : str Example ------- .. code-block:: python # issue an async query (here a tiny one just for this example) query = 'select ra,dec from gaia_dr1.gaia_source limit 3' jobId = queryClient.query(token, adql=query, fmt='csv', async=True) # ensure job completes...then check status and retrieve results time.sleep(4) if queryClient.status(token, jobId) == 'COMPLETED': results = queryClient.results(token,jobId) print type(results) print results This prints .. code:: <type 'str'> ra,dec 301.37502633933002,44.4946851014515588 301.371102372343785,44.4953207577355698 301.385106974224186,44.4963443903961604 ''' return qc_client._results (token=def_token(token), jobId=jobId, delete=True)
# -------------------------------------------------------------------- # ERROR -- Get the error message of a failed Asynchronous query # @multimethod('qc',2,False) def error(token, jobId): return qc_client._error (token=def_token(token), jobId=jobId) @multimethod('qc',1,False) def error(optval, jobId=None): if optval is not None and len(optval.split('.')) >= 4: # optval looks like a token return qc_client._error (token=def_token(optval), jobId=jobId) else: # optval is probably a jobId return qc_client._error (token=def_token(None), jobId=optval)
[docs]@multimethod('qc',0,False) def error(token=None, jobId=None): '''Retrieve the error of an asynchronous query, once completed. Usage: error (token=None, jobId=None) MultiMethod Usage: ------------------ queryClient.error (jobId) queryClient.error (token, jobId) queryClient.error (token, jobId=<id>) queryClient.error (jobId=<str>) Parameters ---------- token : str Authentication token (see function :func:`authClient.login()`) jobId : str The jobId returned when issuing an asynchronous query via :func:`queryClient.query()` with ``async=True``. Returns ------- error : str Example ------- .. code-block:: python # issue an async query (here a tiny one just for this example) query = 'select ra,dec from gaia_dr1.gaia_source limit 3' jobId = queryClient.query(token, adql=query, fmt='csv', async=True) # ensure job completes...then check status and retrieve error time.sleep(4) if queryClient.status(token, jobId) == 'ERROR': error = queryClient.error(token,jobId) print type(error) print error This prints .. code:: <type 'str'> ra,dec 301.37502633933002,44.4946851014515588 301.371102372343785,44.4953207577355698 301.385106974224186,44.4963443903961604 ''' return qc_client._error (token=def_token(token), jobId=jobId)
# -------------------------------------------------------------------- # ABORT -- Abort the specified Asynchronous job. # @multimethod('qc',2,False) def abort(token, jobId): return qc_client._abort (token=def_token(token), jobId=jobId) @multimethod('qc',1,False) def abort(optval, jobId=None): if optval is not None and len(optval.split('.')) >= 4: # optval looks like a token return qc_client._abort (token=def_token(optval), jobId=jobId) else: # optval is probably a jobId return qc_client._abort (token=def_token(None), jobId=optval)
[docs]@multimethod('qc',0,False) def abort(token=None, jobId=None): '''Abort the specified asynchronous job. Usage: abort (token=None, jobId=None) MultiMethod Usage: ------------------ queryClient.abort (token, jobId) queryClient.abort (jobId) queryClient.abort (token, jobId=<id>) queryClient.abort (jobId=<str>) Parameters ---------- token : str Authentication token (see function :func:`authClient.login()`) jobId : str The jobId to abort. Returns ------- results : str Example ------- .. code-block:: python # issue an async query (here a tiny one just for this example) query = 'select ra,dec from gaia_dr1.gaia_source limit 3' jobId = queryClient.query(token, adql=query, fmt='csv', async=True) # ensure job completes...then check status and retrieve results time.sleep(4) if queryClient.status(token, jobId) == 'COMPLETED': results = queryClient.results(token,jobId) print type(results) print results This prints .. code:: <type 'str'> ra,dec 301.37502633933002,44.4946851014515588 301.371102372343785,44.4953207577355698 301.385106974224186,44.4963443903961604 ''' return qc_client._abort (token=def_token(token), jobId=jobId)
# -------------------------- # -------------------------- # -------------------------------------------------------------------- # WAIT -- Wait for completion of asynchronous job. # @multimethod('qc',2,False) def wait(token, jobId, wait=3, verbose=False): '''Usage: queryClient.wait (token, jobID) ''' return qc_client._wait (token=def_token(token), jobId=jobId, wait=wait, verbose=verbose) @multimethod('qc',1,False) def wait(optval, jobId=None, wait=3, verbose=False): '''Usage: queryClient.wait (jobID) queryClient.wait (token, jobId=<id>) ''' if optval is not None and len(optval.split('.')) >= 4: # optval looks like a token return qc_client._wait (token=def_token(optval), jobId=jobId, wait=wait, verbose=verbose) else: # optval is probably a jobId return qc_client._wait (token=def_token(None), jobId=optval, wait=wait, verbose=verbose)
[docs]@multimethod('qc',0,False) def wait(token=None, jobId=None, wait=3, verbose=False): '''Usage: queryClient.wait (jobID=<str>) ''' '''Loop until an async job has completed. Parameters ---------- jobid : str The job ID string of a submitted query job. wait : int | float Wait for `wait` seconds before checking status again. Default: 3sec ''' return qc_client._wait (token=def_token(token), jobId=jobId, wait=wait, verbose=verbose)
# ----------------------------- # MyDB Functions (deprecated) # ----------------------------- # -------------------------------------------------------------------- # LIST -- List the tables or table schema in a user's MyDB. # @multimethod('qc',2,False) def list(token, table): return qc_client.mydb_list (token=def_token(token), table=table) @multimethod('qc',1,False) def list(optval, table=None): if optval is not None and len(optval.split('.')) >= 4: # optval looks like a token return qc_client.mydb_list (token=def_token(optval), table=table) else: # optval is likely a table return qc_client.mydb_list (token=def_token(None), table=optval)
[docs]@multimethod('qc',0,False) def list(table=None, token=None): '''List the tables or table schema in the user's MyDB. Usage: list (table=None, token=None) MultiMethod Usage: ------------------ queryClient.list (token, table) queryClient.list (table) queryClient.list (token, table=<id>) queryClient.list () Parameters ---------- token : str Authentication token (see function :func:`authClient.login()`) table: str The specific table to list (returns the table schema), or an empty string to return a list of the names of all tables. Returns ------- listing : str The list of tables in the user's MyDB or the schema of the named table Example ------- .. code-block:: python # List the tables queryClient.list() ''' return qc_client.mydb_list (token=def_token(token), table=table)
# -------------------------------------------------------------------- # DROP -- Drop the named table from a user's MyDB. # @multimethod('qc',2,False) def drop(token, table): return qc_client.mydb_drop (token=def_token(token), table=table) @multimethod('qc',1,False) def drop(optval, table=None): if optval is not None and len(optval.split('.')) >= 4: # optval looks like a token return qc_client.mydb_drop (token=def_token(optval), table=table) else: # optval is likely a table return qc_client.mydb_drop (token=def_token(None), table=optval)
[docs]@multimethod('qc',0,False) def drop(table=None, token=None): '''Drop the specified table from the user's MyDB Usage: drop (table=None, token=None) MultiMethod Usage: ------------------ queryClient.drop (token, table) queryClient.drop (table) queryClient.drop (token, table=<id>) queryClient.drop (table=<str>) Parameters ---------- token : str Authentication token (see function :func:`authClient.login()`) table: str The specific table to drop Returns ------- Example ------- .. code-block:: python # List the tables queryClient.drop('foo1') ''' return qc_client.mydb_drop (token=def_token(token), table=table)
# ----------------------------- # MyDB Functions (New API) # ----------------------------- # -------------------------------------------------------------------- # MYDB_LIST -- List the tables or table schema in a user's MyDB. # @multimethod('qc',1,False) def mydb_list(optval, table=None, index=False): if optval is not None and len(optval.split('.')) >= 4: # optval looks like a token return qc_client._mydb_list (token=def_token(optval), table=table, index=index) else: # optval is likely a table name return qc_client._mydb_list (token=def_token(None), table=optval, index=index)
[docs]@multimethod('qc',0,False) def mydb_list(table=None, token=None, index=False): '''List the tables or table schema in the user's MyDB. Usage: mydb_list (table=None, token=None) MultiMethod Usage: ------------------ queryClient.mydb_list (table) queryClient.mydb_list (token, table=<str>) queryClient.mydb_list (table=<str>) Parameters ---------- token : str Authentication token (see function :func:`authClient.login()`) table: str The specific table to list (returns the table schema), or an empty string to return a list of the names of all tables. Returns ------- listing : str The list of tables in the user's MyDB or the schema of the named table Example ------- .. code-block:: python # List the tables queryClient.mydb_list() ''' return qc_client._mydb_list (token=def_token(token), table=table, index=index)
# -------------------------------------------------------------------- # MYDB_CREATE -- Create a table in the user's MyDB from a local file # or python data object. # @multimethod('qc',3,False) def mydb_create(token, table, schema, **kw): return qc_client._mydb_create (token=def_token(None), table=table, schema=schema, **kw)
[docs]@multimethod('qc',2,False) def mydb_create(table, schema, token=None, **kw): '''Create a table in the user's MyDB Usage: mydb_create (table, schema, token=None, **kw) MultiMethod Usage: ------------------ queryClient.mydb_create (token, table, <schema_dict>) queryClient.mydb_create (table, <schema_dict>) Parameters ---------- token : str Authentication token (see function :func:`authClient.login()`) table: str The name of the table to create schema: str or dict The schema is CSV text containing the name of the column and it's PostgreSQL data type. If set as a 'str' type it is either a CSV string, or the name of a file containing the CSV. If passed as a 'dict' type, it is a dictionary object where keys are the column names and values are the data types. drop: bool (optional) Drop any existing table of the same name before creating new one. Returns ------- Example ------- .. code-block:: python # Create table in MyDB named 'foo' with columns defined by # the file 'schema.txt'. The schema file contains: # # id,text # ra,double precision # dec,double precision # queryClient.mydb_create ('foo', 'schema.txt') ''' return qc_client._mydb_create (token=def_token(None), table=table, schema=schema, **kw)
#-------------------------------------------------------------------- # MYDB_INSERT -- Insert data into a table in the user's MyDB from a local # file or python data object. # @multimethod('qc',3,False) def mydb_insert(token, table, data, **kw): return qc_client._mydb_insert (token=def_token(token), table=table, data=data, **kw)
[docs]@multimethod('qc',2,False) def mydb_insert(table, data, token=None, **kw): '''Insert data into a table in the user's MyDB Usage: mydb_insert (table, data, token=None, **kw) MultiMethod Usage: ------------------ queryClient.mydb_insert (token, table, <filename>) queryClient.mydb_insert (token, table, <data_object>) queryClient.mydb_insert (table, <filename>) queryClient.mydb_insert (table, <data_object>) Parameters ---------- token : str Authentication token (see function :func:`authClient.login()`) table: str The name of the table to append data: str or data object The schema is CSV text containing the name of the column and it's PostgreSQL data type. If set as a 'str' type it is either a CSV string, or the name of a file containing the CSV data. If passed as a tabular data object, it is converted to CSV and sent to the service. csv_header: bool [OPTIONAL] If True, then the CSV data object contains a CSV header line, i.e. the first line is a row of column names. Otherwise, no column names are assumed and the column order must match the table schema. Returns ------- Example ------- .. code-block:: python # Insert data into a MyDB table named 'foo'. queryClient.mydb_insert ('foo', 'data.csv') ''' return qc_client._mydb_insert (token=def_token(token), table=table, data=data, **kw)
# -------------------------------------------------------------------- # MYDB_IMPORT -- Import a file or Python object to a MyDB table. # @multimethod('qc',3,False) def mydb_import(token, table, data, **kw): try: result = qc_client._mydb_import (token=def_token(token), table=table, data=data, **kw) except Exception as e: return (str(e)) return result
[docs]@multimethod('qc',2,False) def mydb_import(table, data, token=None, **kw): '''Import data into a table in the user's MyDB Usage: mydb_import (table, data, token=None, **kw) MultiMethod Usage: ------------------ queryClient.mydb_import (token, table, data) queryClient.mydb_import (table, data) Parameters ---------- token : str Authentication token (see function :func:`authClient.login()`) table: str The name of the table to be loaded data: str or data object The data file or python object to be loaded. The 'data' value may be one of the following types: filename A CSV file of data string A string containing CSV data Pandas DataFrame A Pandas DataFrame object : : : : Additional object types can be added provided the data can be converted to a CSV format. schema: str [OPTIONAL] If set, this is a filename or string containing a schema for the data table to be created. A schema contains a comma-delimited row for each column containing the column name and it's Postgres data type. If not set, the schema is determined automatically from the data. drop: bool [Optional] Drop any existing table of the same name before creating new one. verbose: bool [Optional] Be verbose about operations. Returns ------- schema A string containing the table schema data_obj The CSV data to be imported (possibly converted) Example ------- .. code-block:: python # Import data into a MyDB table named 'foo' from file 'data.csv'. schema, data = queryClient.mydb_import ('foo', 'data.csv') ''' try: result = qc_client._mydb_import (token=def_token(token), table=table, data=data, **kw) except Exception as e: return (str(e)) return result
# -------------------------------------------------------------------- # MYDB_TRUNCATE -- Truncate a table in the user's MyDB. # @multimethod('qc',2,False) def mydb_truncate(token, table): return qc_client._mydb_truncate (token=def_token(token), table=table)
[docs]@multimethod('qc',1,False) def mydb_truncate(table, token=None): '''Truncate the specified table in the user's MyDB Usage: mydb_truncate (table, token=None) MultiMethod Usage: ------------------ queryClient.mydb_truncate (token, table) queryClient.mydb_truncate (table) queryClient.mydb_truncate (token, table=<id>) Parameters ---------- token : str Authentication token (see function :func:`authClient.login()`) table: str The specific table to drop Returns ------- Example ------- .. code-block:: python # Truncate the table 'foo' queryClient.truncate('foo') ''' return qc_client._mydb_truncate (token=def_token(token), table=table)
# -------------------------------------------------------------------- # MYDB_INDEX -- Index a column in a user's MyDB table. # @multimethod('qc',3,False) def mydb_index(token, table, column, q3c=None, cluster=False, async_=False): return qc_client._mydb_index(token=def_token(token), table=table, column=column, q3c=q3c, cluster=cluster, async_=async_) @multimethod('qc',2,False) def mydb_index(table, column, token=None, q3c=None, cluster=False, async_=False): return qc_client._mydb_index(token=def_token(token), table=table, column=column, q3c=q3c, cluster=cluster, async_=async_)
[docs]@multimethod('qc',1,False) def mydb_index(table, column='', token=None, q3c=None, cluster=False, async_=False): '''Index the specified column in a table in the user's MyDB MultiMethod Usage: ------------------ queryClient.mydb_index (table, colunm) queryClient.mydb_index (token, table, column) queryClient.mydb_index (table, column, token=None) Parameters ---------- token : str Authentication token (see function :func:`authClient.login()`) table: str The table to be indexed column: str The column q3c: str A comma-delimited list of two column names giving the RA and Dec positions (decimal degrees) to be used to Q3C index the table. If None, no Q3C index will be computed. cluster: bool If enabled, data table will be rewritten to cluster on the Q3C index for efficiency. Only used when 'q3c' columns are specified. async_: bool If enabled, index commands will be submitted asynchronously. Returns ------- command status Example ------- .. code-block:: python # Index the table's "id" column queryClient.index('foo1', 'id') # Index and cluster the table by position queryClient.index('foo1', q3c='ra,dec', cluster=True) ''' return qc_client._mydb_index(token=def_token(token), table=table, column=column, q3c=q3c, cluster=cluster, async_=async_)
# -------------------------------------------------------------------- # MYDB_DROP -- Drop the named table from a user's MyDB. # @multimethod('qc',2,False) def mydb_drop(token, table): return qc_client._mydb_drop (token=def_token(token), table=table)
[docs]@multimethod('qc',1,False) def mydb_drop(table, token=None): '''Drop the specified table from the user's MyDB Usage: mydb_drop (table, token=None) MultiMethod Usage: ------------------ queryClient.mydb_drop (token, table) queryClient.mydb_drop (table) queryClient.mydb_drop (token, table=<id>) Parameters ---------- token : str Authentication token (see function :func:`authClient.login()`) table: str The specific table to drop Returns ------- Example ------- .. code-block:: python # Drop the 'foo1' table queryClient.drop('foo1') ''' return qc_client._mydb_drop (token=def_token(token), table=table)
# -------------------------------------------------------------------- # MYDB_FLUSH -- Flush user's MyDB tables from temporary space #
[docs]def mydb_flush(token=None): return qc_client._mydb_flush(token=def_token(token))
# -------------------------------------------------------------------- # MYDB_RENAME -- Rename a table in the user's MyDB. # @multimethod('qc',3,False) def mydb_rename(token, source, target): return qc_client._mydb_rename (token=def_token(token), source=source, target=target)
[docs]@multimethod('qc',2,False) def mydb_rename(source, target, token=None): '''Rename a table in the user's MyDB to a new name Usage: mydb_rename (source, target, token=None) MultiMethod Usage: ------------------ queryClient.mydb_rename (token, source, target) queryClient.mydb_rename (source, target) Parameters ---------- token : str Authentication token (see function :func:`authClient.login()`) source: str The old table name target: str The new table name Returns ------- Example ------- .. code-block:: python # Copy table 'foo' to a new table, 'bar' queryClient.mydb_rename ('foo', 'bar') ''' return qc_client._mydb_rename (source, target, token=def_token(token))
# -------------------------------------------------------------------- # MYDB_COPY -- Copy a table in the user's MyDB. # @multimethod('qc',3,False) def mydb_copy(token, source, target): return qc_client._mydb_copy (token=def_token(token), source=source, target=target)
[docs]@multimethod('qc',2,False) def mydb_copy(source, target, token=None): '''Copy a table in the user's MyDB to a new name Usage: mydb_copy (source, target, token=None) MultiMethod Usage: ------------------ queryClient.mydb_copy (token, source, target) queryClient.mydb_copy (source, target) Parameters ---------- token : str Authentication token (see function :func:`authClient.login()`) source: str The old table name, i.e. the table to be copied target: str The new table name, i.e. the table to be created Returns ------- Example ------- .. code-block:: python # Copy table 'foo' to a new table, 'bar' queryClient.mydb_copy ('foo', 'bar') ''' return qc_client._mydb_copy (source, target, token=def_token(token))
# ################################### # Query Class procedures # ###################################
[docs]class queryClient (object): ''' QUERYCLIENT -- Client-side methods to access the Data Lab Query Manager Service. ''' def __init__(self, profile=DEF_PROFILE, svc_url=DEF_SERVICE_URL): '''Initialize the query client. ''' self.svc_url = svc_url.strip('/') # QueryMgr service URL self.svc_profile = profile # QueryMgr service profile self.sm_svc_url = SM_SERVICE_URL # StorageMgr service URL self.rm_svc_url = RM_SERVICE_URL # ResMgr service URL self.hostip = THIS_IP self.hostname = THIS_HOST self.timeout_request = TIMEOUT_REQUEST self.async_wait = False # Get the $HOME/.datalab directory. self.home = '%s/.datalab' % os.path.expanduser('~') self.debug = DEBUG # interface debug flag resClient.set_svc_url(self.rm_svc_url) storeClient.set_svc_url(self.sm_svc_url)
[docs] def isAlive(self, svc_url=None, timeout=5): '''Check whether the QueryManager service at the given URL is alive and responding. This is a simple call to the root service URL or ping() method. Parameters ---------- svc_url : str The Query Service URL to ping. timeout : int Call will assume to have failed if 'timeout' seconds pass. Returns ------- result : bool True if service responds properly, False otherwise Example ------- .. code-block:: python if queryClient.isAlive(): print ("Query Manager is alive") ''' if svc_url is None: svc_url = self.svc_url try: r = requests.get (svc_url, timeout=timeout) resp = r.text if r.status_code != 200: return False elif resp is not None and r.text.lower()[:11] != "hello world": return False except Exception: return False return True
[docs] def set_svc_url(self, svc_url): '''Set the Query Manager service URL. Parameters ---------- svc_url : str The service URL of the Query Manager to call. Returns ------- Nothing Example ------- .. code-block:: python queryClient.set_svc_url ("http://localhost:7002") ''' self.svc_url = qcToString(svc_url.strip('/'))
[docs] def get_svc_url(self): '''Get the currently-used Query Manager serice URL. Parameters ---------- None Returns ------- service_url : str The currently-used Query Service URL. Example ------- .. code-block:: python print (queryClient.get_svc_url()) ''' return qcToString(self.svc_url)
[docs] def set_profile(self, profile): '''Set the service profile to be used. Parameters ---------- profile : str The name of the profile to use. The list of available profiles can be retrieved from the service (see function func:`queryClient.list_profiles()`) Returns ------- Nothing Example ------- .. code-block:: python queryClient.set_profile('test') ''' self.svc_profile = qcToString(profile)
[docs] def get_profile(self): '''Get the current query profile. Parameters ---------- None Returns ------- profile : str The name of the current profile used with the Query Manager service Example ------- .. code-block:: python print ("Query Service profile = " + queryClient.get_profile()) ''' return qcToString(self.svc_profile)
[docs] def set_timeout_request(self, nsec): '''Set the requested Sync query timeout value (in seconds). Parameters ---------- nsec : int The number of seconds requested before a sync query timeout occurs. The service may cap this as a server defined maximum. Returns ------- Nothing Example ------- .. code-block:: python # set the sync query timeout request to 30 seconds queryClient.set_timeout_request(30) ''' self.timeout_request = nsec
[docs] def get_timeout_request(self): '''Get the current Sync query timeout value. Parameters ---------- None Returns ------- result: int Current sync query timeout value. Example ------- .. code-block:: python # get the current timeout value print (queryClient.get_timeout_request()) ''' return self.timeout_request
# ########################### # Utility Methods # ########################### @multimethod('_qc',1,True) def list_profiles(self, optval, token=None, profile=None, format='text'): '''Usage: queryClient.client.list_profiles (token, ...) ''' if optval is not None and len(optval.split('.')) >= 4: # optval looks like a token return self._list_profiles (token=def_token(optval), profile=profile, format=format) else: # optval looks like a token return self._list_profiles (token=def_token(token), profile=optval, format=format)
[docs] @multimethod('_qc',0,True) def list_profiles(self, token=None, profile=None, format='text'): '''Usage: queryClient.client.list_profiles (...) ''' return self._list_profiles(token=def_token(token), profile=profile, format=format)
def _list_profiles(self, token=None, profile=None, format='text'): '''Implementation of the list_profiles() method. ''' headers = self.getHeaders (token) dburl = '%s/profiles?' % self.svc_url if profile != None and profile != 'None' and profile != '': dburl += "profile=%s&" % profile dburl += "format=%s" % format r = requests.get (dburl, headers=headers) profiles = qcToString(r.content) if '{' in profiles: profiles = json.loads(profiles) return qcToString(profiles) @multimethod('_qc',1,True) def schema(self, value, format='text', profile=None): '''Usage: queryClient.schema ([value]) ''' return self._schema (value=value, format=format, profile=profile)
[docs] @multimethod('_qc',0,True) def schema(self, value='', format='text', profile=None): '''Usage: queryClient.schema ([value]) ''' return self._schema (value=value, format=format, profile=profile)
def _schema(self, value='', format='text', profile=None, **kw): '''Implementation of the schema() method. ''' if profile is None: profile = self.svc_profile url = '%s/schema?value=%s&format=%s&profile=%s' % \ (self.svc_url, (value), str(format), str(profile)) try: r = requests.get (url, timeout=2) resp = r.text return resp except Exception: raise queryClientError("Error getting schema: " + value) return qcToString(resp)
[docs] def services(self, name=None, svc_type=None, mode='list', profile='default'): '''Usage: queryClient.services () ''' return self._services (name=name, svc_type=svc_type, mode=mode, profile=profile)
def _services(self, name=None, svc_type=None, mode='list', profile='default'): '''Implementation of the services() method. ''' dburl = '/services?' if profile is not None and profile != 'None' and profile != '': dburl += ("profile=%s" % profile) if name is not None and name != 'None' and name != '': dburl += ("&name=%s" % name.replace('%','%25')) if svc_type is not None and svc_type != 'None' and svc_type != '': dburl += ("&type=%s" % svc_type) dburl += "&mode=%s" % mode r = self.getFromURL(self.svc_url, dburl, def_token(None)) svcs = qcToString(r.content) return svcs # ########################### # Query Methods # ########################### @multimethod('_qc',2,True) def query(self, token, query, adql=None, sql=None, fmt='csv', out=None, async_=False, drop=False, profile='default', **kw): '''Usage: queryClient.query (token) ''' return self._query (token=def_token(token), adql=adql, sql=query, fmt=fmt, out=out, async_=async_, drop=drop, profile=profile, **kw) @multimethod('_qc',1,True) def query(self, optval, adql=None, sql=None, fmt='csv', out=None, async_=False, token=None, drop=False, profile='default', **kw): '''Usage: queryClient.client.query (token, ...) ''' if optval is not None and optval.lower()[:6] == 'select': # optval looks like a query string return self._query (token=def_token(None), adql=adql, sql=optval, fmt=fmt, out=out, async_=async_, drop=drop, profile=profile, **kw) else: # optval is (probably) a token return self._query (token=def_token(optval), adql=adql, sql=sql, fmt=fmt, out=out, async_=async_, drop=drop, profile=profile, **kw)
[docs] @multimethod('_qc',0,True) def query(self, token=None, adql=None, sql=None, fmt='csv', out=None, async_=False, drop=False, profile='default', **kw): '''Usage: queryClient.client.query (...) ''' return self._query (token=def_token(token), adql=adql, sql=sql, fmt=fmt, out=out, async_=async_, drop=drop, profile=profile, **kw)
def _query(self, token=None, adql=None, sql=None, fmt='csv', out=None, async_=False, drop=False, profile='default', **kw): '''Implementation of the query() method. ''' # Process optional keyword arguments. if 'async' in kw: async_ = kw['async'] if 'timeout' in kw: # set requested timeout on the query timeout = int(kw['timeout']) else: timeout = self.timeout_request self.set_timeout_request (timeout) wait = self.async_wait # see if we wait for an Async result if async_ and 'wait' in kw: self.async_wait = wait = kw['wait'] stream = False # set a streaming request and adjust if 'stream' in kw: stream = kw['stream'] if stream: timeout = 0 async_ = False self.set_timeout_request(0) poll_time = 1 # set polling interval if async_ and 'poll' in kw: self.async_poll = poll_time = int(kw['poll']) verbose = False # set verbose output if async_ and 'verbose' in kw: verbose = kw['verbose'] # Set service call headers. headers = {'Content-Type': 'text/ascii', 'X-DL-TimeoutRequest': str(timeout), 'X-DL-ClientVersion': __version__, 'X-DL-OriginIP': self.hostip, 'X-DL-OriginHost': self.hostname, 'X-DL-AuthToken': def_token(token)} # application/x-sql if fmt in ['pandas','array','structarray','table','csv-noheader']: qfmt = 'csv' else: qfmt = fmt if adql is not None and adql != '': query = quote_plus(adql) # URL-encode the query string dburl = '%s/query?adql=%s&ofmt=%s&out=%s&async=%s&drop=%s' % ( self.svc_url, query, qfmt, out, async_, drop) elif sql is not None and sql != '': query = quote_plus(sql) # URL-encode the query string dburl = '%s/query?sql=%s&ofmt=%s&out=%s&async=%s&drop=%s' % ( self.svc_url, query, qfmt, out, async_, drop) else: raise queryClientError("No query specified") if profile != "default": # append the service profile dburl += "&profile=%s" % profile else: dburl += "&profile=%s" % self.svc_profile # Make the service call. In a streaming request we force a Sync # operation and by setting the timeout to zero let it run as long as # needed. Once a JM is implemented an ASync save will be possible. # Note: Results may still be limited by the memory available to # contain the result string, especially in notebook environments. if stream: if (out is not None and out != '') and not async_: # If we're saving a local file (e.g. in a notebook directory), # save the file here. Results saved to VOSpace/MyDB are handled # on the server side. if out[:7] == 'file://': out = out[7:] if ':' in out and out[:out.index(':')] in ['vos', 'mydb']: out = None try: resp = self.getStreamURL(dburl, headers=headers, fname=out) except Exception as e: print ('Error in getStreamURL: %s' % qcToString(str(e))) return qcToString(str(e)) if 'noheader' in fmt: strval = qcToString(resp).strip() strval = strval[strval.find('\n')+1:] else: strval = qcToString(resp) if (out is not None and out != ''): return strval else: # Otherwise, simply return the result of the query. if fmt in ['pandas','array','structarray','table']: return convert (strval,fmt) else: return strval # If we're not streaming the request result, process it here. r = requests.get (dburl, headers=headers, timeout=timeout) if r.status_code != 200: raise queryClientError (r.text) resp = qcToString(r.content) if async_ and wait: # Sync query timeouts are handled on the server. If waiting # for an async query, loop until job is completed or the timeout # expires. jobId = resp stat = self._status (token=token, jobId=jobId) tval = 0 while (stat not in ['COMPLETED','ERROR']): if verbose: print (stat) time.sleep (poll_time) try: stat = self._status (token=token, jobId=jobId) except Exception as e: raise queryClientError (str(e)) else: if tval > self.timeout_request: stat = self._abort (token=token, jobId=jobId) break if verbose: tim = tval + poll_time rem = timeout - tim print ('Status = %s; elapsed time: %d, timeout in %d' % (stat, tim, rem)) tval = tval + poll_time if tval > self.timeout_request: if verbose: print ('Timeout (%d sec) exceeded' % self.timeout_request) raise queryClientError ('Query timeout exceeded') elif stat not in ['COMPLETED','ERROR']: resp = stat elif stat == 'ERROR': # Retrieve Async error. if verbose: print ('Retrieving error') resp = self._error (token=token, jobId=jobId) elif stat == 'COMPLETED': # Retrieve Async results. A save to vos/mydb is handled below. if verbose: print ('Retrieving results') resp = self._results (token=token, jobId=jobId) if (out is not None and out != '') and not async_: # If we're saving to a local file (e.g. in a notebook directory), # save the file here. Results saved to VOSpace or MyDB are handled # on the server side. if out[:7] == 'file://': out = out[7:] if ':' not in out or out[:out.index(':')] not in ['vos', 'mydb']: with open(out, 'wb') as file: file.write(resp.encode("utf-8")) return 'OK' else: # Otherwise, simply return the result of the query. if 'noheader' in fmt: strval = qcToString(resp).strip() strval = strval[strval.find('\n')+1:] else: strval = qcToString(resp) if fmt in ['pandas','array','structarray','table']: return convert (strval, fmt) else: return strval # -------------------------- # Async jobs status() # -------------------------- @multimethod('_qc',2,True) def status(self, token, jobId): '''Usage: queryClient.status (token, jobID) ''' return self._status (token=def_token(token), jobId=jobId) @multimethod('_qc',1,True) def status(self, optval, jobId=None): '''Usage: queryClient.status (jobID) queryClient.status (token, jobId=<id>) ''' if optval is not None and len(optval.split('.')) >= 4: # optval looks like a token return self._status (token=def_token(optval), jobId=jobId) else: # optval is probably a jobId return self._status (token=def_token(None), jobId=optval)
[docs] @multimethod('_qc',0,True) def status(self, token=None, jobId=None): '''Usage: queryClient.status (jobID=<str>) ''' return self._status (token=def_token(token), jobId=jobId)
def _status(self, token=None, jobId=None): '''Implementation of the status() method. ''' headers = self.getHeaders (token) dburl = '%s/status?jobid=%s' % (self.svc_url, jobId) if self.svc_profile != "default": dburl += "&profile=%s" % self.svc_profile r = requests.get (dburl, headers=headers) return qcToString(r.content) # -------------------------- # Async jobs list # -------------------------- @multimethod('_qc',2,True) def jobs(self, token, jobId, format='text', status='all', option='list'): '''Usage: queryClient.jobs (token, jobID) ''' return self._jobs (token=def_token(token), jobId=jobId, format=format, status=status, option=option) @multimethod('_qc',1,True) def jobs(self, optval, jobId=None, format='text', status='all', option='list'): '''Usage: queryClient.jobs (jobID) queryClient.jobs (token, jobId=<id>) ''' if optval is not None and len(optval.split('.')) >= 4: # optval looks like a token return self._jobs (token=def_token(optval), jobId=jobId, format=format, status=status, option=option) else: # optval is probably a jobId return self._jobs (token=def_token(None), jobId=optval, format=format, status=status, option=option)
[docs] @multimethod('_qc',0,True) def jobs(self, token=None, jobId=None, format='text', status='all', option='list'): '''Usage: queryClient.jobs (jobID=<str>) ''' return self._jobs (token=def_token(token), jobId=jobId, format=format, status=status, option=option)
def _jobs(self, token=None, jobId=None, format='text', status='all', option='list'): '''Implementation of the jobs() method. ''' from datetime import datetime res = resClient.findJobs(token, jobId, format=format, status=status, option=option) if option == 'delete': return qcToString(res) return res # -------------------------- # Async jobs results() # -------------------------- @multimethod('_qc',2,True) def results(self, token, jobId, delete=True): '''Usage: queryClient.results (token, jobID) ''' return self._results (token=def_token(token), jobId=jobId, delete=delete) @multimethod('_qc',1,True) def results(self, optval, jobId=None, delete=True): '''Usage: queryClient.results (jobID) queryClient.results (token, jobId=<id>) ''' if optval is not None and len(optval.split('.')) >= 4: # optval looks like a token return self._results (token=def_token(optval), jobId=jobId, delete=delete) else: # optval is probably a jobId return self._results (token=def_token(None), jobId=optval, delete=delete)
[docs] @multimethod('_qc',0,True) def results(self, token=None, jobId=None, delete=True): '''Usage: queryClient.results (jobID=<str>) ''' return self._results (token=def_token(token), jobId=jobId, delete=delete)
def _results(self, token=None, jobId=None, delete=True): '''Implementation of the results() method. ''' headers = self.getHeaders (token) dburl = '%s/results?jobid=%s&delete=%s' % (self.svc_url, jobId, delete) if self.svc_profile != "default": dburl += "&profile=%s" % self.svc_profile r = requests.get (dburl, headers=headers) return qcToString(r.content) # -------------------------- # Async jobs error() # -------------------------- @multimethod('_qc',2,True) def error(self, token, jobId): '''Usage: queryClient.error (token, jobID) ''' return self._error (token=def_token(token), jobId=jobId) @multimethod('_qc',1,True) def error(self, optval, jobId=None): '''Usage: queryClient.error (jobID) queryClient.error (token, jobId=<id>) ''' if optval is not None and len(optval.split('.')) >= 4: # optval looks like a token return self._error (token=def_token(optval), jobId=jobId) else: # optval is probably a jobId return self._error (token=def_token(None), jobId=optval)
[docs] @multimethod('_qc',0,True) def error(self, token=None, jobId=None): '''Usage: queryClient.error (jobID=<str>) ''' return self._error (token=def_token(token), jobId=jobId)
def _error(self, token=None, jobId=None): '''Implementation of the error() method. ''' headers = self.getHeaders (token) dburl = '%s/error?jobid=%s' % (self.svc_url, jobId) if self.svc_profile != "default": dburl += "&profile=%s" % self.svc_profile r = requests.get (dburl, headers=headers) return qcToString(r.content) # -------------------------- # Async job abort() # -------------------------- @multimethod('_qc',2,True) def abort(self, token, jobId): '''Usage: queryClient.abort (token, jobID) ''' return self._abort (token=def_token(token), jobId=jobId) @multimethod('_qc',1,True) def abort(self, optval, jobId=None): '''Usage: queryClient.abort (jobID) queryClient.abort (token, jobId=<id>) ''' if optval is not None and len(optval.split('.')) >= 4: # optval looks like a token return self._abort (token=def_token(optval), jobId=jobId) else: # optval is probably a jobId return self._abort (token=def_token(None), jobId=optval)
[docs] @multimethod('_qc',0,True) def abort(self, token=None, jobId=None): '''Usage: queryClient.abort (jobID=<str>) ''' return self._abort (token=def_token(token), jobId=jobId)
def _abort(self, token=None, jobId=None): '''Implementation of the abort() method. ''' headers = self.getHeaders (token) dburl = '%s/abort?jobid=%s' % (self.svc_url, jobId) if self.svc_profile != "default": dburl += "&profile=%s" % self.svc_profile r = requests.get (dburl, headers=headers) return qcToString(r.content) # -------------------------- # Async job wait() # -------------------------- @multimethod('_qc',2,True) def wait(self, token, jobId, wait=3, verbose=False): '''Usage: queryClient.wait (token, jobID) ''' return self._wait (token=def_token(token), jobId=jobId, wait=wait, verbose=verbose) @multimethod('_qc',1,True) def wait(self, optval, jobId=None, wait=3, verbose=False): '''Usage: queryClient.wait (jobID) queryClient.wait (token, jobId=<id>) ''' if optval is not None and len(optval.split('.')) >= 4: # optval looks like a token return self._wait (token=def_token(optval), jobId=jobId, wait=wait, verbose=verbose) else: # optval is probably a jobId return self._wait (token=def_token(None), jobId=optval, wait=wait, verbose=verbose)
[docs] @multimethod('_qc',0,True) def wait(self, token=None, jobId=None, wait=3, verbose=False): '''Usage: queryClient.wait (jobID=<str>) ''' return self._wait (token=def_token(token), jobId=jobId, wait=wait, verbose=verbose)
def _wait(self, token=None, jobId=None, wait=3, verbose=False): '''Implementation of the wait() method. Loop until an async job has completed. Parameters ---------- jobid : str The job ID string of a submitted query job. wait : int | float Wait for `wait` seconds before checking status again. Default: 3sec ''' while True: status = qc_client._status(token=token, jobId=jobId) if verbose: print(status) if status in ('QUEUED','EXECUTING'): if verbose: print('Waiting %g seconds...' % wait) time.sleep(wait) else: return status #========================================================================= # ########################### # MyDB Methods # ########################### # LIST -- List the tables or table schema in the user's MyDB. # @multimethod('_qc',2,True) def list(self, token, table): '''Usage: queryClient.list (token, table) ''' return self.mydb_list (token=def_token(token), table=table) @multimethod('_qc',1,True) def list(self, optval, table=None): '''Usage: queryClient.list (table) queryClient.list (token, table=<id>) ''' if optval is not None and len(optval.split('.')) >= 4: # optval looks like a token return self.mydb_list (token=def_token(optval), table=table) else: # optval is probably a table return self.mydb_list (token=def_token(None), table=optval)
[docs] @multimethod('_qc',0,True) def list(self, token=None, table=None): '''Usage: queryClient.list (table=<str>) ''' return self.mydb_list (token=def_token(token), table=table)
# DROP -- Drop the specified table from the user's MyDB # @multimethod('_qc',2,True) def drop(self, token, table): '''Usage: queryClient.drop (token, table) ''' return self.mydb_drop (token=def_token(token), table=table) @multimethod('_qc',1,True) def drop(self, optval, table=None): '''Usage: queryClient.drop (table) queryClient.drop (token, table=<id>) ''' if optval is not None and len(optval.split('.')) >= 4: # optval looks like a token return self.mydb_drop (token=def_token(optval), table=table) else: # optval is probably a table return self._drop (token=def_token(None), table=optval)
[docs] @multimethod('_qc',0,True) def drop(self, token=None, table=None): '''Usage: queryClient.drop (table=<str>) ''' return self.mydb_drop (token=def_token(token), table=table)
# ----------------------------- # MyDB Functions (New API) # ----------------------------- # -------------------------------------------------------------------- # MYDB_LIST -- List the tables or table schema in a user's MyDB. # @multimethod('_qc',1,True) def mydb_list(self, optval, table=None, index=False): '''Usage: queryClient.mydb_list (table) queryClient.mydb_list (token, table=<str>) ''' if optval is not None and len(optval.split('.')) >= 4: # optval looks like a token return self._mydb_list (token=def_token(optval), table=table, index=index) else: # optval is probably a table return self._mydb_list (token=def_token(None), table=optval, index=index)
[docs] @multimethod('_qc',0,True) def mydb_list(self, token=None, table=None, index=False): '''Usage: queryClient.mydb_list (table=<str>) ''' return self._mydb_list (token=def_token(token), table=table, index=index)
def _mydb_list(self, token=None, table=None, index=False): '''Implementation of the mydb_list() method. ''' headers = self.getHeaders (token) if table is None: table = '' dburl = '%s/list?table=%s&index=%s' % (self.svc_url, table, str(index)) if self.svc_profile != "default": dburl += "&profile=%s" % self.svc_profile r = requests.get (dburl, headers=headers) return qcToString(r.content) # -------------------------------------------------------------------- # MYDB_CREATE -- Copy a table in the user's MyDB. # @multimethod('_qc',3,True) def mydb_create(self, token, table, schema, **kw): '''Usage: queryClient.mydb_create (token, table, <schema_dict>) ''' return self._mydb_create (token=def_token(None), table=table, schema=schema, **kw)
[docs] @multimethod('_qc',2,True) def mydb_create(self, table, schema, token=None, **kw): '''Usage: queryClient.mydb_create (table, <schema_dict>) ''' return self._mydb_create (token=def_token(None), table=table, schema=schema, **kw)
def _mydb_create(self, token, table, schema, **kw): '''Implementation of the mydb_create() method. ''' # Set the request headers. headers = self.getHeaders (token) headers['Content'] = 'text/ascii' params = { 'table' : table} dburl = '%s/create' % (self.svc_url) drop = True # drop table if exists verbose = False verbose = False # verbose output if 'verbose' in kw: verbose = kw['verbose'] if 'drop' in kw: drop = kw['drop'] params['verbose'] = str(verbose) params['drop'] = str(drop) params['profile'] = self.svc_profile # Schema can be a dictionary, a CSV string, or the name of a file. if isinstance (schema,str): if os.path.exists (schema): with open(schema, 'r') as f: s = f.read() params['schema'] = s else: params['schema'] = schema elif isinstance (schema,collections.OrderedDict): # We can't use a regular 'dict' object because the key ordered isn't # guaranteed to be preserved, but allow OrderedDict. s = '' for i in schema: s += i + ',' + schema[i] + '\n' params['schema'] = s r = requests.post (dburl, params=params, headers=headers) if r.content[:5].lower() == 'error': raise queryClientError (qcToString(r.content)) else: return 'OK' # -------------------------------------------------------------------- # MYDB_INSERT -- Insert data into a table in the user's MyDB from a local # file or python data object. # @multimethod('_qc',3,True) def mydb_insert(self, token, table, data, **kw): '''Usage: queryClient.mydb_insert (token, table, <filename>) queryClient.mydb_insert (token, table, <data_object>) ''' return self._mydb_insert (token=def_token(token), table=table, data=data, **kw)
[docs] @multimethod('_qc',2,True) def mydb_insert(self, table, data, token=None, **kw): '''Usage: queryClient.mydb_insert (table, <filename>) queryClient.mydb_insert (table, <data_object>) ''' return self._mydb_insert (token=def_token(token), table=table, data=data, **kw)
def _mydb_insert(self, token=None, table=None, data=None, **kw): '''Implementation of the mydb_create() method. ''' # Get optional parameters. csv_header = (kw['csv_header'] if 'csv_header' in kw else True) verbose = (kw['drop'] if 'drop' in kw else False) # Set up the request headers and initialize. params = {} headers = self.getHeaders (token) params['profile'] = self.svc_profile dburl = '%s/ingest' % (self.svc_url) # Data can be the name of a CSV file or a python tablular object that # can be converted. tmp_file = NamedTemporaryFile(delete=True, dir='/tmp').name if isinstance (data, str): params = { 'table' : table, 'csv_header' : str(csv_header) } if data.startswith ('http://') or data.startswith('https://') or \ data.startswith ('vos://'): # Passing a URI in the filename to be loaded on server-side params['filename'] = data elif os.path.exists (data): # Upload the file to the staging area. data_name = os.path.basename (data) storeClient.chunked_upload (token, data, data_name) params['filename'] = data_name else: # Upload the file to the staging area. with open(tmp_file, 'w') as f: f.write(data) f.close() tmp_name = os.path.basename (tmp_file) storeClient.chunked_upload(token, tmp_file, tmp_name) params['filename'] = tmp_name elif isinstance (data, pandas.core.frame.DataFrame): # Convert a DataFrame to a CSV string object. schema, data_to_load = self.getSchema (data) with open (tmp_file, 'w') as f: f.write(data_to_load) f.close() tmp_name = os.path.basename (tmp_file) storeClient.chunked_upload(token, tmp_file, tmp_name) params['filename'] = tmp_name else: pass r = requests.post (dburl, params=params, headers=headers) if tmp_file is not None and os.path.exists(tmp_file): os.remove (tmp_file) if verbose or self.debug: print (str(r.text)) if r.content[:5].lower() == 'error': raise queryClientError (qcToString(r.content)) else: return 'OK' # -------------------------------------------------------------------- # MYDB_IMPORT -- Import a file or Python object to a MyDB table. # @multimethod('_qc',3,True) def mydb_import(self, token, table, data, **kw): '''Usage: queryClient.mydb_import (token, table, data) ''' return self._mydb_import (token=def_token(token), table=table, data=data, **kw)
[docs] @multimethod('_qc',2,True) def mydb_import(self, table, data, token=None, **kw): '''Usage: queryClient.mydb_import (table, data) ''' return self._mydb_import (token=def_token(token), table=table, data=data, **kw)
def _mydb_import(self, token=None, table=None, data=None, **kw): '''Implementation of the mydb_create() method. ''' # Get optional parameters. csv_header = (kw['csv_header'] if 'csv_header' in kw else True) verbose = (kw['verbose'] if 'verbose' in kw else False) append = (kw['append'] if 'append' in kw else False) delimiter = (kw['delimiter'] if 'delimiter' in kw else ',') # Set up the request headers and initialize. headers = self.getHeaders (token) dburl = '%s/import' % (self.svc_url) # Data can be the name of a CSV file or a python tablular object that # can be converted. tmp_file = NamedTemporaryFile(delete=True, dir='/tmp').name params = { 'table' : table, 'delimiter' : str(delimiter), 'append' : str(append), 'profile' : self.svc_profile, 'csv_header' : str(csv_header) } if isinstance (data, str): if data.find('[') > 0: fname = data.split('[')[0] extn = data.split('[')[1].split(']')[0] extn = extn.replace("'","").replace('"','') par = 'extnum' if extn[0].isdigit() else 'extname' params[par] = extn else: fname, extn = data, None if data.startswith ('http://') or \ data.startswith ('https://') or \ data.startswith ('vos://'): # Passing a URI in the filename to be loaded on server-side params['filename'] = fname if data.find('[') > 0 else data elif os.path.exists (fname): # Upload the file to the staging area. data_name = os.path.basename (fname) storeClient.chunked_upload (token, fname, data_name) params['filename'] = data_name else: # Upload the CSV string to the staging area. with open (tmp_file, 'w') as f: f.write(data) f.close() tmp_name = os.path.basename (tmp_file) storeClient.chunked_upload(token, tmp_file, tmp_name) params['filename'] = tmp_name elif isinstance (data, pandas.core.frame.DataFrame): # Convert a DataFrame to a CSV string object. schema, data_to_load = self.getSchema (data) with open (tmp_file, 'w') as f: f.write(data_to_load) f.close() tmp_name = os.path.basename (tmp_file) storeClient.chunked_upload(token, tmp_file, tmp_name) params['filename'] = tmp_name else: # Reserved for future format support. pass # Execute the service call. r = requests.post (dburl, params=params, headers=headers) if verbose or self.debug: print (qcToString (r.content)) if tmp_file is not None and os.path.exists(tmp_file): os.remove(tmp_file) if r.content[:5].lower() == 'error' or r.status_code != 200: raise queryClientError (qcToString(r.content)) else: return 'OK' # -------------------------------------------------------------------- # MYDB_TRUNCATE -- Truncate a table in the user's MyDB. # @multimethod('_qc',2,True) def mydb_truncate(self, token, table): '''Usage: queryClient.mydb_truncate (token, table) ''' return self._mydb_truncate (token=def_token(token), table=table)
[docs] @multimethod('_qc',1,True) def mydb_truncate(self, table, token=None): '''Usage: queryClient.mydb_truncate (table) ''' return self._mydb_truncate (token=def_token(token), table=table)
def _mydb_truncate(self, token=None, table=None): '''Implementation of the mydb_truncate() method. ''' headers = self.getHeaders (token) dburl = '%s/truncate?table=%s' % (self.svc_url, table) if self.svc_profile != "default": dburl += "&profile=%s" % self.svc_profile r = requests.get (dburl, headers=headers) if r.content[:5].lower() == 'error': return qcToString(r.content) else: return 'OK' # -------------------------------------------------------------------- # MYDB_INDEX -- Index a column in a user's MyDB table. # @multimethod('_qc',3,True) def mydb_index(self, token, table, column, q3c=None, cluster=False, async_=False): '''Usage: queryClient.mydb_index (token, table, column) ''' return self._mydb_index(token=def_token(token), table=table, column=column, q3c=q3c, cluster=cluster, async_=async_)
[docs] @multimethod('_qc',2,True) def mydb_index(self, table, column, token=None, q3c=None, cluster=False, async_=False): '''Usage: queryClient.mydb_index (table, colunm) ''' return self._mydb_index(token=def_token(token), table=table, column=column, q3c=q3c, cluster=cluster, async_=async_)
def _mydb_index(self, token=None, table=None, column='', q3c=None, cluster=False, async_=False): '''Implementation of the mydb_index() method. ''' headers = self.getHeaders (token) if async_: def async_call (url, params, headers): r = requests.get (url, params=params, headers=headers) return qcToString(r.content) from multiprocessing.pool import ThreadPool params = { 'tbl' : table, 'col' : column, 'profile' : self.svc_profile } if q3c is not None: params['q3c'] = q3c params['cluster'] = cluster pool = ThreadPool(processes=1) dburl = '%s/index' % self.svc_url res = pool.apply_async (async_call, (dburl, params, headers)) return 'OK' else: dburl = '%s/index?tbl=%s&col=%s' % (self.svc_url, table, column) if q3c is not None: dburl += '&q3c=%s&cluster=%s' % (q3c, cluster) if self.svc_profile != "default": dburl += "&profile=%s" % self.svc_profile r = requests.get (dburl, headers=headers) if r.content[:5].lower() == 'error': return qcToString(r.content) else: return 'OK' # -------------------------------------------------------------------- # MYDB_DROP -- Drop the named table from a user's MyDB. # @multimethod('_qc',2,True) def mydb_drop(self, token, table): '''Usage: queryClient.mydb_drop (token, table) ''' return self._mydb_drop (token=def_token(token), table=table)
[docs] @multimethod('_qc',1,True) def mydb_drop(self, table, token=None): '''Usage: queryClient.mydb_drop (table) queryClient.mydb_drop (token, table=<id>) ''' return self._mydb_drop (token=def_token(token), table=table)
def _mydb_drop(self, token=None, table=None): '''Implementation of the mydb_drop() method. ''' headers = self.getHeaders (token) dburl = '%s/delete?table=%s' % (self.svc_url, table) if self.svc_profile != "default": dburl += "&profile=%s" % self.svc_profile r = requests.get (dburl, headers=headers) if r.content[:5].lower() == 'error': return qcToString(r.content) else: return 'OK' # -------------------------------------------------------------------- # MYDB_FLUSH -- Drop the temporary tables in mydb schema in tapdb DB # def _mydb_flush(self, token=None): '''Usage: queryClient.mydb_flush () ''' headers = self.getHeaders(token) dburl = '%s/flush' % (self.svc_url) if self.svc_profile != "default": dburl += "?profile=%s" % self.svc_profile r = requests.get (dburl, headers=headers) if r.content[:5].lower() == 'error': return qcToString(r.content) else: return 'OK' # -------------------------------------------------------------------- # MYDB_RENAME -- Rename a table in the user's MyDB. # @multimethod('_qc',3,True) def mydb_rename(self, token, source, target): '''Usage: queryClient.mydb_rename (token, source, target) ''' return self._mydb_rename (token=def_token(token), source=source, target=target)
[docs] @multimethod('_qc',2,True) def mydb_rename(self, source, target, token=None): '''Usage: queryClient.mydb_rename (source, target) ''' return self._mydb_rename (source=source, target=target, token=def_token(token))
def _mydb_rename(self, source='', target='', token=None): '''Implementation of the mydb_rename() method. ''' headers = self.getHeaders (token) dburl = '%s/rename?source=%s&target=%s' % (self.svc_url, source, target) if self.svc_profile != "default": dburl += "&profile=%s" % self.svc_profile r = requests.get (dburl, headers=headers) if r.content[:5].lower() == 'error': return qcToString(r.content) else: return 'OK' # -------------------------------------------------------------------- # MYDB_COPY -- Copy a table in the user's MyDB. # @multimethod('_qc',3,True) def mydb_copy(self, token, source, target): '''Usage: queryClient.mydb_copy (token, source, target) ''' return self._mydb_copy (token=def_token(token), source=source, target=target)
[docs] @multimethod('_qc',2,True) def mydb_copy(self, source, target, token=None): '''Usage: queryClient.mydb_copy (source, target) ''' return self._mydb_copy (source=source, target=target, token=def_token(token))
def _mydb_copy(self, source='', target='', token=None): '''Implementation of the mydb_copy() method. ''' headers = self.getHeaders (token) dburl = '%s/copy?source=%s&target=%s' % (self.svc_url, source, target) if self.svc_profile != "default": dburl += "&profile=%s" % self.svc_profile r = requests.get (dburl, headers=headers) if r.content[:5].lower() == 'error': return qcToString(r.content) else: return 'OK'
[docs] @staticmethod def pretty_print_POST(req): ''' At this point it is completely built and ready to be fired; it is "prepared". However pay attention at the formatting used in this function because it is programmed to be pretty printed and may differ from the actual request. ''' print('{}\n{}\n{}\n\n{}'.format( '-----------START-----------', req.method + ' ' + req.url, '\n'.join('{}: {}'.format(k, v) for k, v in req.headers.items()), req.body, ))
# ############################################ # DEPRECATED / NOT-YET-IMPLEMENTED # ############################################ # SIAQUERY -- Send a SIA query to the query manager service #
[docs] def siaquery(self, token, input=None, out=None, search=0.5): '''Send a SIA (Simple Image Access) query to the query manager service ''' headers = self.getHeaders (token) user, uid, gid, hash = token.strip().split('.', 3) shortname = '%s_%s' % (uid, input[input.rfind('/') + 1:]) if input[:input.find(':')] not in ['vos', 'mydb']: # Need to set this from config? target = 'vos://datalab.noao.edu!vospace/siawork/%s' % shortname r = requests.get (SM_SERVICE_URL + "/put?name=%s" % target, headers=headers) file = open(input).read() headers2 = {'Content-type': 'application/octet-stream', 'X-DL-AuthToken': token} requests.put(r.content, data=file, headers=headers2) dburl = '%s/sia?in=%s&radius=%s&out=%s' % ( self.svc_url, shortname, search, out) r = requests.get (dburl, headers=headers) if out is not None: if out[:out.index(':')] not in ['vos', 'mydb']: with open(out, 'wb') as file: file.write(r.content.encode("utf-8")) else: return qcToString(r.content)
# CONEQUERY -- Send a cone search query to the query manager service #
[docs] def conequery(self, token, input=None, out=None, schema=None, table=None, ra=None, dec=None, search=0.5): '''Send a cone search query to the consearch service ''' headers = self.getHeaders (token) user, uid, gid, hash = token.strip().split('.', 3) # shortname = '%s_%s' % (uid, input[input.rfind('/') + 1:]) # if input[:input.find(':')] not in ['vos', 'mydb']: # # Need to set this from config? # target = 'vos://datalab.noao.edu!vospace/siawork/%s' % shortname # r = requests.get (SM_SERVICE_URL + "/put?name=%s" % # target, headers={'X-DL-AuthToken': token}) # file = open(input).read() # # headers2 = {'Content-type': 'application/octet-stream', # 'X-DL-AuthToken': token} # requests.put(r.content, data=file, headers=headers2) dburl = '%s/scs/%s/%s?ra=%s&dec=%s&radius=%s' % ( DAL_SERVICE_URL, schema, table, ra, dec) r = requests.get (dburl, headers=headers) if out is not None: if out[:out.index(':')] not in ['vos', 'mydb']: with open(out, 'wb') as file: file.write(r.content.encode("utf-8")) else: return qcToString(r.content)
# ------------------------------------------------------- # Private Utility Methods # -------------------------------------------------------
[docs] def getHeaders(self, token): '''Get default tracking headers, ''' tok = def_token(token) user, uid, gid, hash = tok.strip().split('.', 3) hdrs = {#'Content-Type': 'text/ascii', 'X-DL-ClientVersion': __version__, 'X-DL-OriginIP': self.hostip, 'X-DL-OriginHost': self.hostname, 'X-DL-User': user, 'X-DL-AuthToken': tok} # application/x-sql return hdrs
[docs] def getFromURL(self, svc_url, path, token): '''Get something from a URL. Return a 'response' object ''' try: hdrs = self.getHeaders (token) resp = requests.get("%s%s" % (svc_url, path), headers=hdrs) except Exception as e: raise queryClientError(str(e)) return resp
[docs] def getStreamURL (self, url, headers, fname=None, chunk_size=1048576): ''' Get the specified URL in a streaming fashion. This allows for large downloads without hitting timeout limits. ''' r = requests.get(url, headers=headers, stream=True) if r.status_code != 200: return r.status_code, r.content else: try: # Download the request in chunks to avoid timeouts. #clen = min(chunk_size, r.headers.get('content-length')) if fname is not None and fname != '': with open(fname, 'wb', 0) as fd: for chunk in r.iter_content(chunk_size=chunk_size): if chunk: fd.write(chunk) return 'OK' else: resp = '' for chunk in r.iter_content(chunk_size=chunk_size): if chunk: resp = resp + chunk return resp except IOError as e: print ('IOError in getStreamURL: %s' % qcToString(str(e))) raise queryClientError(str(e)) except Exception as e: print ('Error in getStreamURL: %s' % qcToString(str(e))) raise queryClientError(str(e))
[docs] def chunked_upload(self, token, local_file, remote_file): '''A streaming file uploader. ''' debug = False init = True CHUNK_SIZE = 4 * 1024 * 1024 # 16MB chunks url = '%s/xfer' % (self.svc_url) # Get the size of the file to be transferred. fsize = os.stat(local_file).st_size nchunks = fsize / CHUNK_SIZE + 1 if (debug): print ('Upload in %d chunks ....' % nchunks) with open(local_file, 'rb') as f: try: nsent = 0 while nsent < fsize: data = f.read(CHUNK_SIZE) requests.post (url, data, headers={'Content-type': 'application/octet-stream', 'X-DL-FileName': remote_file, 'X-DL-InitXfer': str(init), 'X-DL-AuthToken': token}) nsent += len(data) if init: init = False except Exception as e: raise queryClientError ('Upload error: ' + str(e))
[docs] def dataType(self, val, current_type): '''Lexically scan a value to determine the datatype. ''' try: # Evaluates numbers to an appropriate type, and strings an error t = ast.literal_eval(val.strip()) except ValueError: return 'text' except SyntaxError: return 'text' if type(t) in [int, float]: if (type(t) in [int]) and current_type not in ['float', 'varchar']: # Use smallest possible int type if (-32768 < t < 32767) and current_type not in ['int', 'bigint']: return 'smallint' elif (-2147483648 < t < 2147483647) and current_type not in ['bigint']: return 'int' else: return 'bigint' if type(t) is float and current_type not in ['varchar']: return ('float' if len(val) < 6 else 'double precision') else: return 'text'
[docs] def getSchema(self, data, **kw): '''Generate a schema for mydb_create() from a CSV file or data object. ''' data_obj = data if isinstance (data, str): # If a file, assume it is CSV but allow the 'delimiter' et al opts if os.path.exists (data): reader = csv.reader(open(data, 'r'), **kw) else: reader = csv.reader(StringIO(data), **kw) elif isinstance (data, pandas.core.frame.DataFrame): data_obj = data.to_csv(index=False) reader = csv.reader(StringIO(data_obj), **kw) else: print ('Unsupported data format') return '', data # TODO: Check behavior when no CSV header and data row contains floats longest, headers, type_list = [], [], [] nrows = 10 for row in reader: if len(headers) == 0: # First row of CSV headers = row for col in row: longest.append(0) type_list.append('') else: for i in range(len(row)): if type_list[i] == 'varchar' or row[i] == 'NA': # NA is the csv null value pass else: var_type = self.dataType(row[i], type_list[i]) type_list[i] = var_type if len(row[i]) > longest[i]: longest[i] = len(row[i]) if nrows == 0: # Only read first 10 rows break nrows = nrows - 1 schema = '' for i in range(len(headers)): if type_list[i] == 'text': schema += '{},text\n'.format(headers[i].lower().replace('.','_')) else: schema += '{},{}\n'.format(headers[i].lower().replace('.','_'), type_list[i]) return schema, data_obj
# ################################### # Query Client Handles # ###################################
[docs]def getClient(profile=DEF_PROFILE, svc_url=DEF_SERVICE_URL): '''Create a new queryClient object and set a default profile. ''' return queryClient(profile=profile, svc_url=svc_url)
# The default client handle for the module. qc_client = getClient(profile=DEF_PROFILE, svc_url=DEF_SERVICE_URL) # ########################################## # Patch the docstrings for module functions # that aren't MultiMethods. # ########################################## isAlive.__doc__ = qc_client.isAlive.__doc__ services.__doc__ = qc_client.services.__doc__ set_svc_url.__doc__ = qc_client.set_svc_url.__doc__ get_svc_url.__doc__ = qc_client.get_svc_url.__doc__ set_profile.__doc__ = qc_client.set_profile.__doc__ get_profile.__doc__ = qc_client.get_profile.__doc__ set_timeout_request.__doc__ = qc_client.set_timeout_request.__doc__ get_timeout_request.__doc__ = qc_client.get_timeout_request.__doc__ # #################################################################### # Py2/Py3 Compatability Utilities # ####################################################################
[docs]def qcToString(s): '''qcToString -- Force a return value to be type 'string' for all Python versions. ''' strval = s if is_py3 and isinstance(s,bytes): strval = str(s.decode()) elif not is_py3 and (isinstance(s,bytes) or isinstance(s,unicode)): strval = str(s) else: strval = s return strval