Source code for dl.dltasks

#!/usr/bin/env python
#
# DLTASKS.PY -- Task routines for the 'datalab' command-line client.
#

from __future__ import print_function

__authors__ = 'Matthew Graham <graham@noao.edu>, Mike Fitzpatrick <fitz@noao.edu>, Data Lab <datalab@noao.edu>'
__version__ = '20170520'  # yyyymmdd


"""
    Task routines for the 'datalab' command-line client.

Import via

.. code-block:: python

    from dl import datalab
"""

#print ('DLTASKS DEV')

import sys
from sys import platform
try:
    from astropy.samp import SAMPIntegratedClient
except Exception:
    from astropy.vo.samp import SAMPIntegratedClient
import glob
import os
import logging
import shutil
import socket
from subprocess import Popen, PIPE
from time import gmtime, strftime, sleep
import httplib2
from optparse import Values

# raw_input() was renamed input() in python3
try:
    _raw_input = raw_input
except NameError:
    raw_input = input

try:
    import ConfigParser                         # Python 2
    from urllib import quote_plus               # Python 2
except ImportError:
    import configparser as ConfigParser          # Python 2
    from urllib.parse import quote_plus         # Python 3

try:
    from http.client import HTTPConnection # py3
except ImportError:
    from httplib import HTTPConnection # py2

import requests             # need to standarize on one library at some point

is_py3 = sys.version_info.major == 3
if not is_py3:
    # VOSpace imports
    import vos as vos
    from vos.fuse import FUSE
#    from vos.__version__ import version
    from vos.vofs import VOFS
    DAEMON_TIMEOUT = 60                             # Mount timeout
    CAPS_DIR = "../caps"                            # Capability directory
version = "2.2.0"                  		# VOS version


ANON_TOKEN = "anonymous.0.0.anon_access"        # default tokens
DEMO_TOKEN = "dldemo.99999.99999.demo_access"
TEST_TOKEN = "dltest.99998.99998.test_access"


# Data Lab Client interfaces
from dl import authClient
from dl import queryClient
from dl import storeClient
from dl import resClient


# Uncomment to print HTTP and response headers
httplib2.debuglevel = 0
HTTPConnection.debuglevel = 0

# Allow the service URL for dev/test systems to override the default.
THIS_HOST = socket.gethostname()                        # host name
sock = socket.socket(type=socket.SOCK_DGRAM)     # host IP address
sock.connect(('8.8.8.8', 1))        # Example IP address, see RFC 5737
THIS_IP, _ = sock.getsockname()


if THIS_HOST[:5] == 'dldev':
    DEF_SERVICE_ROOT = 'http://dldev.datalab.noao.edu'
    AM_URL = "http://dldev.datalab.noao.edu/auth"       # Auth Manager
    SM_URL = "http://dldev.datalab.noao.edu/storage"    # Storage Manager
    QM_URL = "http://dldev.datalab.noao.edu/query"      # Query Manager
    RES_URL = "http://dldev.datalab.noao.edu/res"       # Resource Manager
elif THIS_HOST[:6] == 'dltest':
    DEF_SERVICE_ROOT = 'http://dltest.datalab.noao.edu'
    AM_URL = "http://dltest.datalab.noao.edu/auth"       # Auth Manager
    SM_URL = "http://dltest.datalab.noao.edu/storage"    # Storage Manager
    QM_URL = "http://dltest.datalab.noao.edu/query"      # Query Manager
    RES_URL = "http://dltest.datalab.noao.edu/res"       # Resource Manager




[docs]def parseSelf(obj): opt = Values() for attr in dir(obj): if isinstance(getattr(obj, attr), Option): opt.ensure_value(attr, getattr(obj, attr).value) return opt
[docs]def getUserName (self): ''' Get the currently logged-in user token. If we haven't logged in return the anonymous token. ''' _user = self.dl.get("login", "user") if _user is None or _user == '': return "anonymous" else: return _user
[docs]def getUserToken (self): ''' Get the currently logged-in user token. If we haven't logged in return the anonymous token. ''' _token = self.dl.get("login", "authtoken") if _token is None or _token == '': return ANON_TOKEN else: return _token
[docs]class Option: ''' Represents an option ''' def __init__(self, name, value, description, display=None, default=None, required=False): self.name = name self.value = value self.display = display self.description = description self.default = default self.required = required
[docs]class DataLab: ''' Main class for Data Lab interactions ''' def __init__(self): self.home = '%s/.datalab' % os.path.expanduser('~') # Check that $HOME/.datalab exists if not os.path.exists(self.home): os.makedirs(self.home) # See if datalab conf file exists self.config = ConfigParser.RawConfigParser(allow_no_value=True) if not os.path.exists('%s/dl.conf' % self.home): self.config.add_section('datalab') self.config.set('datalab', 'created', strftime( '%Y-%m-%d %H:%M:%S', gmtime())) self.config.add_section('login') self.config.set('login', 'status', 'loggedout') self.config.set('login', 'user', '') self.config.set('login', 'authtoken', '') self.config.add_section('auth') self.config.set('auth', 'profile', 'default') self.config.set('auth', 'svc_url', AM_URL) self.config.add_section('query') self.config.set('query', 'profile', 'default') self.config.set('query', 'svc_url', QM_URL) self.config.add_section('storage') self.config.set('storage', 'profile', 'default') self.config.set('storage', 'svc_url', SM_URL) self.config.add_section('res') self.config.set('storage', 'profile', 'default') self.config.set('storage', 'svc_url', RES_URL) self.config.add_section('vospace') self.config.set('vospace', 'mount', '') self._write() else: self.config.read('%s/dl.conf' % self.home) # Set script variables CAPS_DIR = os.getenv('VOSPACE_CAPSDIR', '../caps') try: # Older versions of dl.conf may not have these configs. authClient.set_svc_url(self.config.get('auth','svc_url')) queryClient.set_svc_url(self.config.get('query','svc_url')) storeClient.set_svc_url(self.config.get('storage','svc_url')) resClient.set_svc_url(self.config.get('res','svc_url')) except Exception: pass
[docs] def save(self, section, param, value): ''' Save the configuration file. ''' if not self.config.has_section(section): self.config.add_section(section) self.config.set(section, param, value) self._write()
[docs] def get(self, section, param): ''' Get a value from the configuration file. ''' return self.config.get(section, param)
def _write(self): ''' Write out the configuration file to disk. ''' with open('%s/dl.conf' % self.home, 'w') as configfile: self.config.write(configfile)
[docs]class Task: ''' Superclass to represent a Task ''' def __init__(self, datalab, name, description): self.dl = datalab self.name = name self.tname = name self.description = description self.logger = None self.params = []
[docs] def run(self): pass
[docs] def addOption(self, name, option): ''' Add an option to the Task. ''' self.params.append(name) setattr(self, name, option) # Set the default value if provided. if option.default is not None: self.setOption(name, option.default)
[docs] def addStdOptions(self): self.addOption(" ", Option( " ", "", " ", default=False)) self.addOption("verbose", Option( "verbose", "", "print verbose level log messages", default=False)) self.addOption("debug", Option("debug", "", "print debug log level messages", default=False)) self.addOption("warning", Option( "warning", "", "print warning level log messages", default=False))
[docs] def addLogger(self, logLevel, logFile): ''' Add a Logger to the Task. ''' logFormat = ("%(asctime)s %(thread)d vos-" + str(version) + "%(module)s.%(funcName)s.%(lineno)d %(message)s") logging.basicConfig(level=logLevel, format=logFormat, filename=os.path.abspath(logFile)) self.logger = logging.getLogger() self.logger.setLevel(logLevel) self.logger.addHandler(logging.StreamHandler())
[docs] def setLogger(self, logLevel=None): ''' Set the logger to be used. ''' if logLevel is None: logLevel = logging.ERROR if self.verbose: logLevel = logging.INFO if self.debug: logLevel = logging.DEBUG if self.warning: logLevel = logging.WARNING else: logLevel = logLevel self.addLogger(logLevel, "%s/.datalab/datalab.err" % os.path.expanduser('~'))
[docs] def setOption(self, name, value): ''' Set a Task option. ''' if hasattr(self, name): opt = getattr(self, name) opt.value = value else: print ("Task '%s' has no option '%s'" % (self.name, name))
[docs] def getSampConnect(self): ''' Get a SAMP listening client connection. ''' # Get a SAMP client client = SAMPIntegratedClient() client.connect() r = Receiver(client) return client
[docs] def broadcast(self, client, messageType, params, app=None): ''' Broadcast a SAMP message. ''' # Broadcast to SAMP clients message = {} message["samp.mtype"] = messageType message["samp.params"] = params try: if app is None: client.notify_all(message) else: for c in client.get_registered_clients(): metadata = client.get_metadata(c) if metadata["samp.name"] in app: client.notify(c, message) finally: client.disconnect()
[docs] def listen(self, client): # notification type, params, run? ''' Setup a listener for a specific SAMP message. ''' client.bind_receive_notification("coord.pointAt.sky", r.point_select) try: while True: sleep(0.1) if r.received: # Parse message ra = r.params['ra'] dec = r.params['dec'] self.run() r.received = False finally: client.disconnect()
[docs] def report(self, resp): ''' Handle call response ''' #if resp.status_code != 200: # print (resp.text) #elif self.verbose.value or self.debug.value: # print (resp.text) pass
################################################ # Print Current Service URLs Tasks ################################################
[docs]class Version(Task): ''' Print the task version. ''' def __init__(self, datalab): Task.__init__(self, datalab, 'version', 'Print task version') self.addStdOptions()
[docs] def run(self): from dl import __version__ as dlver print ("Task Version: " + dlver.version)
[docs]class Services(Task): ''' Print the available data services. ''' def __init__(self, datalab): Task.__init__(self, datalab, 'services', 'Print available data services') self.addOption("svc_type", Option("svc_type", "", "Service type (vos|scs|sia|ssa)", required=False, default=None)) self.addStdOptions()
[docs] def run(self): print (queryClient.services (svc_type=self.svc_type.value))
[docs]class SvcURLs(Task): ''' Print the current service URLS in use. ''' def __init__(self, datalab): Task.__init__(self, datalab, 'svc_urls', 'Print service URLs in use') self.addStdOptions()
[docs] def run(self): print (' Auth Mgr: ' + authClient.get_svc_url()) print (' Query Mgr: ' + queryClient.get_svc_url()) print (' Storage Mgr: ' + storeClient.get_svc_url()) print (' Resource Mgr: ' + resClient.get_svc_url())
################################################ # Initialize Data Lab config information ################################################
[docs]class Init(Task): ''' Print the current service URLS in use. ''' def __init__(self, datalab): Task.__init__(self, datalab, 'init', 'Initialize the Data Lab config') self.addOption("verify", Option("verify", False, "Verify actions?", required=False)) self.addStdOptions() self.home = '%s/.datalab' % os.path.expanduser('~') self.datalab = datalab
[docs] def run(self): ''' Initialize the Data Lab configuration. ''' # TODO -- Logout any existing users .... if self.verify.value: answer = raw_input("Logout existing users (Y/N)? (default: Y): ") if answer == '' or answer is None or answer.lower()[:1] == 'y': if self.verbose.value == True: print ('Removing %s ....' % self.home) logout_task = tasks['logout'](self.datalab) logout_task.run() # Check that $HOME/.datalab exists if self.verify.value: answer = raw_input("Delete $HOME/.datalab (Y/N)? (default: Y): ") if answer == '' or answer is None or answer.lower()[:1] == 'y': if self.verbose.value == True: print ('Removing %s ....' % self.home) shutil.rmtree(self.home) if not os.path.exists(self.home): os.makedirs(self.home) # See if datalab conf file exists self.config = ConfigParser.RawConfigParser(allow_no_value=True) if not os.path.exists('%s/dl.conf' % self.home): if self.verbose.value == True: print ('Initializing %s ....' % self.home) self.config.add_section('datalab') self.config.set('datalab', 'created', strftime( '%Y-%m-%d %H:%M:%S', gmtime())) self.config.add_section('login') self.config.set('login', 'status', 'loggedout') self.config.set('login', 'user', '') self.config.set('login', 'authtoken', '') self.config.add_section('auth') self.config.set('auth', 'profile', 'default') self.config.set('auth', 'svc_url', AM_URL) self.config.add_section('query') self.config.set('query', 'profile', 'default') self.config.set('query', 'svc_url', QM_URL) self.config.add_section('storage') self.config.set('storage', 'profile', 'default') self.config.set('storage', 'svc_url', SM_URL) self.config.add_section('vospace') self.config.set('vospace', 'mount', '') self._write() else: self.config.read('%s/dl.conf' % self.home)
################################################ # Account Login Tasks ################################################
[docs]class Login(Task): ''' Log into the Data Lab ''' def __init__(self, datalab): Task.__init__(self, datalab, 'login', 'Login to the Data Lab') self.addOption("user", Option("user", "", "Username of account in Data Lab", required=True)) self.addOption("password", Option("password", "", "Password for account in Data Lab", required=True)) self.addOption("mount", Option("mount", "", "Mountpoint of remote Virtual Storage")) self.addStdOptions() if self.dl is not None: self.status = self.dl.get("login", "status") self.login_error = None
[docs] def run(self): ''' Execute the Login Task. ''' # Check if we are already logged in. The 'user' field of the # configuration contains the currently active user and token, # however previous logins will have preserved tokens from other # accounts we may be able to use. if self.status == "loggedin": _user = self.dl.get("login", "user") if self.user.value == _user: # See whether current token is still valid for this user. _token = self.dl.get("login", "authtoken") if not authClient.isValidToken (_token): if self.do_login() != "OK": print (self.login_error) sys.exit (-1) else: print ("User '%s' is already logged in to the Data Lab" % \ self.user.value) else: # We're logging in as a different user. if self.do_login() != "OK": print (self.login_error) sys.exit (-1) else: # Log a user into the Data Lab print ("Welcome back to the Data Lab, %s" % self.user.value) else: # If we're not logged in, do so using the name/password provided. if self.do_login() != "OK": print (self.login_error) sys.exit (-1) else: # Log a user into the Data Lab print ("Welcome to the Data Lab, %s" % self.user.value) # Default parameters if the VOSpace mount is requested. if self.mount.value != "": print ("Initializing virtual storage mount") mount = Mountvofs(self.dl) mount.setOption('vospace', 'vos:') mount.setOption('mount', self.mount.value) mount.setOption('cache_dir', None) mount.setOption('cache_limit', 50 * 2 ** (10 + 10 + 10)) mount.setOption('cache_nodes', False) mount.setOption('max_flush_threads', 10) mount.setOption('secure_get', False) mount.setOption('allow_other', False) mount.setOption('foreground', False) mount.setOption('nothreads', True) mount.run()
[docs] def do_login(self): if self.login(): self.dl.save("login", "status", "loggedin") self.dl.save("login", "user", self.user.value) self.dl.save("login", "authtoken", self.token) self.dl.save(self.user.value, "authtoken", self.token) return "OK" else: return self.login_error
[docs] def login(self): ''' Login to the Data Lab. ''' try: self.token = self.dl.get(self.user.value,'authtoken') if authClient.isValidToken (self.token): # FIXME -- What we really want here is a login-by-token call # to the AuthMgr so the login is recorded on the server. self.login_error = None return True except Exception as e: pass # Get the security token for the user self.token = authClient.login (self.user.value,self.password.value) if not authClient.isValidToken (self.token): self.dl.save("login", "status", "loggedout") self.dl.save("login", "user", '') self.dl.save("login", "authtoken", '') self.dl.save(self.user.value, "authtoken", self.token) self.login_error = self.token print ('login error: tok = ' + self.token) return False else: self.login_error = None return True
[docs]class Logout(Task): ''' Logout out of the Data Lab ''' def __init__(self, datalab): Task.__init__(self, datalab, 'logout', 'Logout of the Data Lab') self.addOption("unmount", Option( "unmount", "", "Mount point of remote Virtual Storage")) self.addStdOptions() if self.dl is not None: self.status = self.dl.get("login", "status")
[docs] def run(self): if self.status == 'loggedout': print ("No user is currently logged into the Data Lab") return else: token = getUserToken(self) user, uid, gid, hash = token.strip().split('.', 3) res = authClient.logout (token) if res != "OK": print ("Error: %s" % res) #sys.exit (-1) if self.unmount.value != "": print ("Unmounting remote space") cmd = "umount %s" % self.unmount.value pipe = Popen(cmd, shell=True, stdout=PIPE) output = pipe.stdout.read() self.dl.save("vospace", "mount", "") self.dl.save("login", "status", "loggedout") self.dl.save("login", "user", "") self.dl.save("login", "authtoken", "") print ("'%s' is now logged out of the Data Lab" % user)
[docs]class Status(Task): ''' Status of the Data Lab connection ''' def __init__(self, datalab): Task.__init__(self, datalab, "status", "Report on the user status") self.addStdOptions()
[docs] def run(self): status = self.dl.get("login", "status") if status == "loggedout": print ("No user is currently logged into the Data Lab") else: print ("User %s is logged into the Data Lab" % \ self.dl.get("login", "user")) if self.dl.get("vospace", "mount") != "": if status != "loggedout": print ("The user's Virtual Storage is mounted at %s" % \ self.dl.get("vospace", "mount")) else: print ("The last user's Virtual Storage is still mounted at %s" % \ self.dl.get("vospace", "mount"))
[docs]class WhoAmI(Task): ''' Print the current active user. ''' def __init__(self, datalab): Task.__init__(self, datalab, 'whoami', 'Print the current active user') self.addStdOptions()
[docs] def run(self): print (getUserName(self))
################################################ # Schema Discovery Tasks ################################################
[docs]class Schema(Task): ''' Print information about data servicce schema ''' def __init__(self, datalab): Task.__init__(self, datalab, 'schema', 'Print data service schema info') self.addOption("val", Option("val", "", "Value to list ([[<schema>][.<table>][.<col>]])", required=False, default=None)) self.addOption("profile", Option("profile", "", "Service profile", required=False, default="default")) self.addStdOptions() # NOT YET IMPLEMENTED #self.addOption("format", # Option("format", "", "Output format (csv|text|json)", # required=False, default=None))
[docs] def run(self): print (queryClient.schema (value=self.val.value, format='text', profile=self.profile.value))
################################################ # Storage Manager Capability Tasks ################################################
[docs]class AddCapability(Task): ''' Add a capability to a VOSpace container ''' def __init__(self, datalab): Task.__init__(self, datalab, 'addcapability', 'Activate a capability on a Virtual Storage container') self.addOption("fmt", Option("fmt", "", "List of formats to accept", required=True)) self.addOption("dir", Option("dir", "", "Container name", required=True)) self.addOption("cap", Option("cap", "", "Capability name", required=True)) self.addOption("listcap", Option("listcap", "", "List available capabilities", default=False)) self.addStdOptions() self.capsdir = CAPS_DIR
[docs] def run(self): if self.listcap.value: print ("The available capabilities are: ") for file in glob.glob(self.capsdir): print (file[:file.index("_cap.conf")]) else: mountpoint = self.dl.get('vospace', 'mount') if mountpoint is None: print ("No mounted Virtual Storage can be found") else: if not os.path.exists("%s/%s_cap.conf" % \ (self.capsdir, self.cap.value)): print ("The capability '%s' is not known" % \ self.cap.value) else: shutil.copy("%s/%s_cap.conf" % (self.capsdir, self.cap.value), "%s/%s" % (mountpoint, self.dir.value))
[docs]class ListCapability(Task): ''' Add a capability to a VOSpace container ''' def __init__(self, datalab): Task.__init__(self, datalab, 'listcapability', 'List the capabilities supported by this Virtual Storage') self.addStdOptions() self.capsdir = CAPS_DIR
[docs] def run(self): print ("The available capabilities are: ") for file in glob.glob("%s/*_cap.conf" % self.capsdir): print (" %s" % file[file.rindex("/") + 1:file.index("_cap.conf")])
################################################ # Query Manager Tasks ################################################
[docs]class Query2 (Task): ''' Send a query to a remote query service. [Note: placeholder name until we figure out what to do with the old Query() functionality.] ''' def __init__(self, datalab): Task.__init__(self, datalab, 'query', 'Query a remote data service in the Data Lab') self.addOption("adql", Option("adql", "", "ADQL statement", default='', required=False)) self.addOption("sql", Option("sql", "", "Input SQL string or filename", default='', required=False)) self.addOption("fmt", Option("fmt", "csv", "Output format (ascii|csv|tsv|fits|votable)", required=False)) self.addOption("out", Option("out", "", "Output filename or destination", required=False)) self.addOption("async", Option("async", "", "Asynchronous query?", default=False, required=False)) self.addOption("drop", Option("drop", "", "Overwrite existing table?", default=False, required=False)) self.addOption("profile", Option("profile", "", "Service profile to use", required=False, default="default")) self.addOption("timeout", Option("timeout", "", "Requested query timeout", required=False, default="120")) self.addStdOptions()
[docs] def run(self): token = getUserToken (self) # Get the query string to be used. This can either be supplied # directly or by specifying a filename. sql = None adql = None res = None if self.adql.value is None or self.adql.value == '': if self.sql.value is None or self.sql.value == '': print ("Error: At least one of 'adql' or 'sql' is required.") sys.exit (-1) elif os.path.exists (self.sql.value): with open (self.sql.value, "r") as fd: sql = fd.read (os.path.getsize(self.sql.value)+1) fd.close() else: sql = self.sql.value elif os.path.exists (self.adql.value): with open (self.adql.value, "r") as fd: adql = fd.read (os.path.getsize(self.adql.value)+1) fd.close() else: adql = self.adql.value # Execute the query. if self.profile.value != "default": if self.profile.value != "" and self.profile.value is not None: queryClient.set_profile (profile=self.profile.value) # Workarounds for the "async" option to the query using getattr(). try: res = queryClient.query (token, adql=adql, sql=sql, fmt=self.fmt.value, out=self.out.value, async_=getattr(self,"async").value, drop=self.drop.value, timeout=self.timeout.value) if getattr(self,"async").value: print (res) # Return the JobID elif self.out.value== '' or self.out.value is None: print (res) # Return the results except Exception as e: if not getattr(self,"async").value and str(e) is not None: err = str(e) if err.find("Time-out") > 0: print ("Error: Sync query timeout, try an async query") else: print (str(e)) else: print (str(e))
class QueryStatus(Task): ''' Get the async query job status. ''' def __init__(self, datalab): Task.__init__(self, datalab, 'qstatus', 'Get an async query job status') self.addOption("jobId", Option("jobId", "", "Query Job ID", required=True)) self.addStdOptions() def run(self): token = getUserToken(self) print (queryClient.status (token, jobId=self.jobId.value))
[docs]class QueryResults(Task): ''' Get the async query results. ''' def __init__(self, datalab): Task.__init__(self, datalab, 'qresults', 'Get the async query results') self.addOption("jobId", Option("jobId", "", "Query Job ID", required=True)) self.addStdOptions()
[docs] def run(self): token = getUserToken(self) print (queryClient.results (token, jobId=self.jobId.value))
[docs]class QueryStatus(Task): ''' Get the async query job status. ''' def __init__(self, datalab): Task.__init__(self, datalab, 'qstatus', 'Get an async query job status') self.addOption("jobId", Option("jobId", "", "Query Job ID", required=True)) self.addStdOptions()
[docs] def run(self): token = getUserToken(self) print (queryClient.status (token, jobId=self.jobId.value))
[docs]class QueryProfiles(Task): ''' List the available Query Manager profiles. ''' def __init__(self, datalab): Task.__init__(self, datalab, 'profiles', 'List the available Query Manager profiles') self.addOption("profile", Option("profile", "", "Profile to list", required=False, default=None)) self.addOption("format", Option("format", "", "Output format (csv|text)", required=False, default='text')) self.addStdOptions()
[docs] def run(self): token = getUserToken(self) if self.debug.value: print (self.__dict__) if self.format.value == 'text': print ('\nQuery Manager Profiles:\n-----------------------') print (str(queryClient.list_profiles (token, profile=self.profile.value, format=self.format.value)))
[docs]class Query(Task): ''' Send a query to a remote query service (OLD VERSION - NOT USED)) ''' def __init__(self, datalab): Task.__init__(self, datalab, 'query', 'Query a remote data service in the Data Lab') self.addOption("adql", Option("adql", "", "ADQL statement", required=False)) self.addOption("sql", Option("sql", "", "Input SQL string or filename", required=False)) self.addOption("uri", Option("uri", "", "Remote dataset URI", required=False, default="dldb")) self.addOption("fmt", Option("fmt", "", "Output format (ascii|csv|tsv|fits|votable)", required=False)) self.addOption("out", Option("out", "", "Output filename or destination", required=False)) self.addOption("in", Option("in", "", "Input filename", required=False)) self.addOption("async", Option("async", "", "Asynchronous query?", required=False, default="false")) self.addOption("addArgs", Option("addArgs", "", "Additional arguments to pass to the query service", required=False)) self.addStdOptions()
[docs] def run(self): token = getUserToken(self) h = httplib2.Http() colon = self.uri.value.find(":") scheme = ('' if colon < 0 else self.uri.value[:colon]) if scheme == '': self.dbquery(h, QM_URL, token) elif scheme == 'mydb': self.dbquery(h, url, token) elif scheme == 'http': self.httpquery(h, self.uri.value, token) elif scheme == 'ivo': self.ivoquery(h, self.uri.value, token, self.out.value) else: print ("'uri' parameter does not begin with a recognized scheme")
[docs] def dbquery(self, h, url, token): # Query the Data Lab query service headers = {'Content-Type': 'text/ascii', 'X-DL-AuthToken': token} # application/x-sql if 'mydb' in self.adql.value: # Demo hack out = self.out.value.replace("vos://", "/tmp/vospace/") shutil.copyfile("demo/ltg.csv", out) elif self.sql.value != '': sql = open(self.sql.value).read() dburl = '%s/query?ofmt=%s&out=%s' % \ (url, self.fmt.value, self.out.value) resp, content = h.request(dburl, 'POST', body=sql, headers=headers) else: query = quote_plus(self.adql.value) dburl = '%s/query?adql=%s&ofmt=%s&out=%s' % ( url, query, self.fmt.value, self.out.value) resp, content = h.request(dburl, 'GET', headers=headers) output = self.out.value if output != '': if output[:output.index(':')] not in ['vos', 'mydb']: file = open(output, 'wb') file.write(content) file.close() else: print (content)
[docs] def httpquery(self, h, url, token): # Send a query to remote URL # Hack for CoDR demo count = 0 for line in open(self.input.value): if '#' not in line: parts = line.split(",") httpurl = url + \ "?POS=%s,%s&SIZE=0.0000555" % (parts[0], parts[2].strip()) print (url) resp, content = h.request(httpurl, 'GET') if "vos://" in self.out.value: out = self.out.value.replace("vos://", "/tmp/vospace/") file = open(out + "_%s" % count, 'wb') file.write(content) file.close() count += 1
[docs] def ivoquery(self, h, uri, token, out): # Send a query to remote IVOA service headers = {'X-DL-AuthToken': token} url = "%s/query?uri=%s&out=%s" % (QM_URL, uri, out) resp, content = h.request(url, 'GET', headers=headers) if out == '': print (content)
################################################ # MyDB Tasks ################################################
[docs]class ListMyDB(Task): ''' List the user's MyDB tables. [DEPRECATED] ''' def __init__(self, datalab): Task.__init__(self, datalab, 'listdb', 'List the user MyDB tables') self.addOption("table", Option("table", "", "Table name", required=False)) self.addStdOptions()
[docs] def run(self): token = getUserToken(self) try: res = queryClient.list (token=token, table=self.table.value) except Exception as e: print ("Error listing MyDB tables: " % str(e)) else: print (res)
[docs]class DropMyDB(Task): ''' Drop a user's MyDB table. [DEPRECATED] ''' def __init__(self, datalab): Task.__init__(self, datalab, 'dropdb', 'Drop a user MyDB table') self.addOption("table", Option("table", "", "Table name", required=True)) self.addStdOptions()
[docs] def run(self): token = getUserToken(self) try: queryClient.drop (token, table=self.table.value) except Exception as e: print ("Error dropping table '%s': %s" % (self.table.value, str(e)))
[docs]class MyDB_List(Task): ''' List the user's MyDB tables. ''' def __init__(self, datalab): Task.__init__(self, datalab, 'mydb_list', 'List the user MyDB tables') self.addOption("table", Option("table", "", "Table name", required=False)) self.addStdOptions()
[docs] def run(self): token = getUserToken(self) try: res = queryClient.mydb_list (token, table=self.table.value) except Exception as e: print ("Error listing MyDB tables: %s" % str(e)) else: print (res)
[docs]class MyDB_Drop(Task): ''' Drop a user's MyDB table. ''' def __init__(self, datalab): Task.__init__(self, datalab, 'mydb_drop', 'Drop a user MyDB table') self.addOption("table", Option("table", "", "Table name", required=True)) self.addStdOptions()
[docs] def run(self): token = getUserToken(self) try: res = queryClient.mydb_drop (token, self.table.value) except Exception as e: print ("Error dropping table '%s': %s" % (self.table.value,str(e))) else: print (res)
[docs]class MyDB_Create(Task): ''' Create a user MyDB table. ''' def __init__(self, datalab): Task.__init__(self, datalab, 'mydb_create', 'Create a user MyDB table') self.addOption("table", Option("table", "", "Table name", required=True)) self.addOption("schema", Option("schema", "", "Schema file", required=True)) self.addStdOptions()
[docs] def run(self): token = getUserToken(self) try: res = queryClient.mydb_create (token, self.table.value, self.schema.value) except Exception as e: print ("Error creating table '%s': %s" % (self.table.value,str(e))) else: print (res)
[docs]class MyDB_Import(Task): ''' Import data into a user MyDB table. ''' def __init__(self, datalab): Task.__init__(self, datalab, 'mydb_import', 'Import data into a user MyDB table') self.addOption("table", Option("table", "", "Table name to create", required=True)) self.addOption("data", Option("data", "", "Data file to load", required=True)) self.addOption("append", Option("append", False, "Append existing table?", required=False)) self.addStdOptions()
[docs] def run(self): token = getUserToken(self) try: res = queryClient.mydb_import (token, self.table.value, self.data.value, append=self.append.value) except Exception as e: print ("Error importing table '%s': %s" % \ (self.table.value, str(e))) else: print (res)
[docs]class MyDB_Insert(Task): ''' Insert data into a user MyDB table. ''' def __init__(self, datalab): Task.__init__(self, datalab, 'mydb_insert', 'Insert data into a user MyDB table') self.addOption("table", Option("table", "", "Table name to create", required=True)) self.addOption("data", Option("data", "", "Data file to load", required=True)) self.addOption("csv_header", Option("csv_header", True, "Data file has csv header?", required=False)) self.addStdOptions()
[docs] def run(self): token = getUserToken(self) try: res = queryClient.mydb_list (token, table=self.table.value) if res.find('not known') > 0: res = "Error: MyDB table '%s' does not exist" % self.table.value else: # Table exists, so just insert the data. res = queryClient.mydb_insert (token, self.table.value, self.data.value, csv_header=self.csv_header.value) except Exception as e: print (str(e)) else: print (res)
[docs]class MyDB_Index(Task): ''' Index data in a user's MyDB table. ''' def __init__(self, datalab): Task.__init__(self, datalab, 'mydb_index', 'Index data in a MyDB table') self.addOption("table", Option("table", "", "Table name to create", required=True)) self.addOption("column", Option("column", "", "Column to index", required=True)) self.addOption("q3c", Option("q3c", "", "Column to index using Q3C", required=False)) self.addOption("cluster", Option("cluster", "", "Cluster table on Q3C index?", required=False)) self.addOption("async_", Option("async_", "", "Run asynchronously?", required=False)) self.addStdOptions()
[docs] def run(self): token = getUserToken(self) try: res = queryClient.mydb_index (token, self.table.value, self.column.value, q3c=self.q3c.value, cluster=self.cluster.value, async_=self.async_.value) except Exception as e: print ("Error indexing table '%s': %s" % (self.table.value,str(e))) else: print (res)
[docs]class MyDB_Truncate(Task): ''' Truncate a user MyDB table. ''' def __init__(self, datalab): Task.__init__(self, datalab, 'mydb_truncate', 'Truncate a user MyDB table') self.addOption("table", Option("table", "", "Table name to truncate", required=True)) self.addStdOptions()
[docs] def run(self): token = getUserToken(self) try: res = queryClient.mydb_truncate (token, self.table.value) except Exception as e: print ("Error truncating table '%s': %s" % \ (self.table.value,str(e))) else: print (res)
[docs]class MyDB_Rename(Task): ''' Rename a user MyDB table. ''' def __init__(self, datalab): Task.__init__(self, datalab, 'mydb_rename', 'Rename a user MyDB table') self.addOption("old", Option("old", "", "Old table name", required=True)) self.addOption("new", Option("new", "", "New table name", required=True)) self.addStdOptions()
[docs] def run(self): token = getUserToken(self) try: res = queryClient.mydb_rename(token, self.old.value, self.new.value) except Exception as e: print ("Error renaming table '%s': %s" % (self.old.value,str(e))) else: print (res)
[docs]class MyDB_Copy(Task): ''' Copy a user MyDB table. ''' def __init__(self, datalab): Task.__init__(self, datalab, 'mydb_rename', 'Rename a user MyDB table') self.addOption("source", Option("source", "", "Original table name", required=True)) self.addOption("target", Option("target", "", "New table name", required=True)) self.addStdOptions()
[docs] def run(self): token = getUserToken(self) try: res = queryClient.mydb_copy (token, self.source.value, self.target.value) except Exception as e: print ("Error copying table '%s': %s" % (self.old.value,str(e))) else: print (res)
################################################ # Task Execution Tasks ################################################
[docs]class LaunchJob(Task): ''' Execute a remote processing job in the Data Lab ''' def __init__(self, datalab): Task.__init__(self, datalab, 'exec', 'launch a remote task in the Data Lab') self.addOption("cmd", Option("cmd", "", "name of remote task to run", required=True)) self.addOption("args", Option("args", "", "list of key-value arguments to submit to remote task", required=False)) self.addStdOptions()
[docs] def run(self): job = self.cmd.value if job in self.validJob(job): params = {} for param in self.params.value.split(','): key, value = param.split('=') params[key] = value job = self.getJob(job) job.run(params) else: print ("The remote task '%s' is not supported" % self.cmd.value)
[docs] def validJob(self, job): return False
[docs] def getJob(self, job): return None
################################################ # FUSE Mounting Tasks ################################################
[docs]class Mountvofs(Task): ''' Mount a VOSpace via FUSE ''' def __init__(self, datalab): Task.__init__(self, datalab, 'mount', 'mount the default Virtual Storage') self.addOption("vospace", Option("vospace", "", "Space to mount", required=True, default="vos:")) self.addOption("mount", Option("mount", "", "Mount point for Virtual Storage", required=True, default="/tmp/vospace")) self.addOption("foreground", Option("foreground", "", "Mount the filesystem as a foreground operation", default=True)) self.addOption("cache_limit", Option("cache_limit", "", "Limit on local diskspace to use for file caching (in MB)", default=0 * 2 ** (10 + 10 + 10))) #default=50 * 2 ** (10 + 10 + 10))) self.addOption("cache_dir", Option("cache_dir", "", "Local directory to use for file caching", default=None)) self.addOption("readonly", Option("readonly", "", "Mount as read-only", default=False)) self.addOption("cache_nodes", Option("cache_nodes", "", "Cache dataNode Properties", default=True)) self.addOption("allow_other", Option("allow_other", "", "Allow all users access to this mountpoint", default=True)) self.addOption("max_flush_threads", Option("max_flush_threads", "", "Limit on number of flush (upload) threads", default=10)) self.addOption("secure_get", Option("secure_get", "", "use HTTPS instead of HTTP",default=False)) self.addOption("nothreads", Option("nothreads", "", "Only run in a single thread, causes some blocking.", default=True)) #default=None)) self.addStdOptions()
[docs] def run(self): # readonly = False token = getUserToken(self) user = getUserName(self) root = self.vospace.value + "/" + user mount = os.path.abspath(self.mount.value) self.dl.save('vospace', 'mount', mount) if self.cache_dir.value is None: self.cache_dir.value = os.path.normpath(os.path.join( os.getenv('HOME', '.'), root.replace(":", "_"))) if not os.access(mount, os.F_OK): os.makedirs(mount) opt = parseSelf(self) conn = vos.Connection(vospace_token=token) if platform == "darwin": print ("mounting darwin fuse....") fuse = FUSE(VOFS(root, self.cache_dir.value, opt, conn=conn, cache_limit=self.cache_limit.value, cache_nodes=self.cache_nodes.value, cache_max_flush_threads=self.max_flush_threads.value, secure_get=self.secure_get.value), mount, fsname=root, volname=root, nothreads=opt.nothreads, defer_permissions=True, daemon_timeout=DAEMON_TIMEOUT, readonly=self.readonly.value, allow_other=self.allow_other.value, noapplexattr=True, noappledouble=True, foreground=self.foreground.value) else: try: print ("mounting linux fuse....") fuse = FUSE(VOFS(root, self.cache_dir.value, opt, conn=conn, cache_limit=self.cache_limit.value, cache_nodes=self.cache_nodes.value, cache_max_flush_threads=self.max_flush_threads.value, secure_get=self.secure_get.value), mount, fsname=root, nothreads=opt.nothreads, readonly=self.readonly.value, allow_other=self.allow_other.value, foreground=self.foreground.value) print ("done mounting linux fuse....") except Exception as e: print ("FUSE MOUNT EXCEPTION: " + str(e)) print ("fuse = " + str(fuse)) if not fuse: self.dl.save('vospace', 'mount', '')
################################################ # Storage Manager Tasks ################################################
[docs]class List(Task): ''' List files in Data Lab ''' def __init__(self, datalab): Task.__init__(self, datalab, 'ls', 'list a location in Data Lab') self.addOption("name", Option("name", "", "Location in Data Lab to list", required=False, default="vos://", display="name")) self.addOption("format", Option("format", "", "Format for listing (ascii|csv|raw)", required=False, default="csv")) self.addStdOptions()
[docs] def run(self): token = getUserToken(self) return storeClient.ls (token, name=self.name.value, format=self.format.value)
[docs]class Get(Task): ''' Get one or more files from Data Lab. ''' def __init__(self, datalab): Task.__init__(self, datalab, 'get', 'get a file from Data Lab') self.addOption("fr", Option("fr", "", "Remote Data Lab file name", required=True, default="vos://")) self.addOption("to", Option("to", "", "Local disk file name", required=False, default="./")) self.addOption("verbose", Option("verbose", "", "Verbose output?", required=False, default=True)) self.addStdOptions()
[docs] def run(self): token = getUserToken(self) storeClient.get (token, fr=self.fr.value, to=self.to.value, verbose=self.verbose.value)
[docs]class Put(Task): ''' Put files into Data Lab. ''' def __init__(self, datalab): Task.__init__(self, datalab, 'put', 'Put a file into Data Lab') self.addOption("fr", Option("fr", "", "Local disk file name", required=True, default="")) self.addOption("to", Option("to", "", "Remote Data Lab file name", required=True, default="")) self.addOption("verbose", Option("verbose", "", "Verbose output?", required=False, default=True)) self.addStdOptions()
[docs] def run(self): token = getUserToken(self) storeClient.put (token, fr=self.fr.value, to=self.to.value, verbose=self.verbose.value)
[docs]class Move(Task): ''' Move files in Data Lab ''' def __init__(self, datalab): Task.__init__(self, datalab, 'move', 'move a file in Data Lab') self.addOption("fr", Option("fr", "", "Source location in Data Lab", required=True, default="", display="from")) self.addOption("to", Option("to", "", "Destination location in Data Lab", required=True, default="")) self.addOption("verbose", Option("verbose", "", "Verbose output?", required=False, default=True)) self.addStdOptions()
[docs] def run(self): token = getUserToken(self) storeClient.mv (token, fr=self.fr.value, to=self.to.value, verbose=self.verbose.value)
[docs]class Copy(Task): ''' Copy a file in Data Lab ''' def __init__(self, datalab): Task.__init__(self, datalab, 'copy', 'copy a file in Data Lab') self.addOption("fr", Option("fr", "", "Source location in Data Lab", required=True, default="", display="from")) self.addOption("to", Option("to", "", "Destination location in Data Lab", required=True, default="")) self.addOption("verbose", Option("verbose", "", "Verbose output?", required=False, default=True)) self.addStdOptions()
[docs] def run(self): token = getUserToken(self) storeClient.cp (token, fr=self.fr.value, to=self.to.value, verbose=self.verbose.value)
[docs]class Delete(Task): ''' Delete files in Data Lab ''' def __init__(self, datalab): Task.__init__(self, datalab, 'delete', 'delete a file in Data Lab') self.addOption("name", Option("name", "", "File in Data Lab to delete", required=True, default="")) self.addOption("verbose", Option("verbose", "", "Verbose output?", required=False, default=True)) self.addStdOptions()
[docs] def run(self): token = getUserToken(self) storeClient.rm (token, name=self.name.value, verbose=self.verbose.value)
[docs]class Tag(Task): ''' Tag a file in Data Lab ''' def __init__(self, datalab): Task.__init__(self, datalab, 'tag', 'tag a file in Data Lab') self.addOption("name", Option("name", "", "File in Data Lab to tag", required=True, default="vos://")) self.addOption("tag", Option("tag", "", "Tag to add to file", required=True, default="")) self.addStdOptions()
[docs] def run(self): token = getUserToken(self) storeClient.tag (token, name=self.name.value, tag=self.tag.value)
[docs]class MkDir(Task): ''' Create a directory in Data Lab ''' def __init__(self, datalab): Task.__init__(self, datalab, 'mkdir', 'create a directory in Data Lab') self.addOption("name", Option("name", "", "Directory in Data Lab to create", required=True, default="")) self.addStdOptions()
[docs] def run(self): token = getUserToken(self) storeClient.mkdir (token, name=self.name.value)
[docs]class RmDir(Task): ''' Delete a directory in Data Lab ''' def __init__(self, datalab): Task.__init__(self, datalab, 'rmdir', 'delete a directory in Data Lab') self.addOption("name", Option("name", "", "Directory in Data Lab to delete", required=True, default="")) self.addStdOptions()
[docs] def run(self): token = getUserToken(self) storeClient.rmdir (token, name=self.name.value)
[docs]class Resolve(Task): ''' Resolve a vos short form identifier -- FIXME ''' def __init__(self, datalab): Task.__init__(self, datalab, 'resolve', 'Resolve a short form Virtual Storage identifier') self.addOption("name", Option("name", "", "Short form identifier", required=True, default="vos://")) self.addStdOptions()
[docs] def run(self): token = getUserToken(self) r = requests.get(SM_URL + "resolve?name=%s" % self.name.value, headers={'X-DL-AuthToken': token})
[docs]class StorageProfiles(Task): ''' List the available Storage Manager profiles. ''' def __init__(self, datalab): Task.__init__(self, datalab, 'list_storage_profiles', 'List the available Storage Manager profiles') self.addOption("profile", Option("profile", "", "Profile to list", required=False, default=None)) self.addOption("format", Option("format", "", "Output format (csv|text)", required=False, default='text')) self.addStdOptions()
[docs] def run(self): token = getUserToken(self) print (str(storeClient.list_profiles (token, profile=self.profile.value, format=self.format.value)))
################################################ # SAMP Tasks ################################################
[docs]class Broadcast(Task): ''' Broadcast a SAMP message ''' def __init__(self, datalab): Task.__init__(self, datalab, 'broadcast', 'broadcast a SAMP message') self.addOption("type", Option("mtype", "", "SAMP message type", required=True)) self.addOption("pars", Option("pars", "", "Message parameters", required=True)) self.addStdOptions()
[docs] def run(self): client = self.getSampClient() mtype = self.mtype.value params = {} for param in self.pars.split(","): key, value = param.split("=") params[key] = value self.broadcast(client, mtype, params)
[docs]class Launch(Task): ''' Launch a plugin in Data Lab ''' def __init__(self, datalab): Task.__init__(self, datalab, 'launch', 'launch a plugin') self.addOption("dir", Option("dir", "", "directory in Data Lab to delete", required=True, default="vos://")) self.addStdOptions()
[docs] def run(self): token = getUserToken(self) r = requests.get(SM_URL + "/rmdir?dir=%s" % self.dir.value, headers={'X-DL-AuthToken': token})
[docs]class Receiver(): ''' SAMP listener ''' def __init__(self, client): self.client = client self.received = False
[docs] def receive_notifications(self, private_key, sender_id, mtype, params, extra): self.params = params self.received = True print ('Notification:', private_key, sender_id, mtype, params, extra)
[docs] def receiver_call(self, private_key, sender_id, msg_id, mtype, params, extra): self.params = params self.received = True print ('Call:', private_key, sender_id, msg_id, mtype, params, extra) self.client.reply( msg_id, {'samp.status': 'samp.ok', 'samp.result': {}})
[docs] def point_select(self, private_key, send_id, mtype, params, extra): self.params = params self.received = True
################################################ # SIA Tasks ################################################
[docs]class SiaQuery(Task): ''' SIA query with an uploaded file ''' def __init__(self, datalab): Task.__init__(self, datalab, 'siaquery', 'query a SIA service in the Data Lab') self.addOption("out", Option("out", "", "Output filename", required=False)) self.addOption("input", Option("input", "", "Input filename", required=False)) self.addOption("search", Option("search", "", "Search radius", required=False, default=0.5)) self.addStdOptions()
[docs] def getUID(self): ''' Get the UID for this user ''' token = getUserToken(self) parts = token.strip().split(".") uid = parts[1] return uid
[docs] def run(self): token = getUserToken(self) # If local input file, upload it input = self.input.value shortname = '%s_%s' % (self.getUID(), input[input.rfind('/') + 1:]) if input[:input.find(':')] not in ['vos', 'mydb']: # target = 'vos://nvo.caltech!vospace/siawork/%s' % shortname # Need to set this from config target = 'vos://datalab.noao.edu!vospace/siawork/%s' % shortname r = requests.get(SM_URL + "/put?name=%s" % target, headers={'X-DL-AuthToken': token}) file = open(input).read() resp = requests.put(r.content, data=file, headers={ 'Content-type': 'application/octet-stream', 'X-DL-AuthToken': token}) # Query the Data Lab query service headers = {'Content-Type': 'text/ascii', 'X-DL-AuthToken': token} dburl = '%s/sia?in=%s&radius=%s&out=%s' % ( QM_URL, shortname, self.search.value, self.out.value) r = requests.get(dburl, headers=headers) # Output value output = self.out.value if output != '': if output[:output.index(':')] not in ['vos']: file = open(output, 'wb') file.write(r.content) file.close() else: print (r.content)