#!/usr/bin/env python
#
# STORECLIENT -- Client routines for the Data Lab Storage Manager service
from __future__ import print_function
__authors__ = 'Mike Fitzpatrick <mike.fitzpatrick@noirlab.edu>, Matthew Graham <mjg@caltech.edu>, Data Lab <datalab@noirlab.edu>'
try:
from storagemanager.__version__ import __version__
except ImportError as e:
from dl.__version__ import __version__
'''
Client routines for the DataLab Storage Manager service.
Storage Manager Client Interface
--------------------------------
isAlive (svc_url=DEF_SERVICE_URL)
set_svc_url (svc_url=DEF_SERVICE_URL)
get_svc_url ()
set_profile (profile=DEF_SERVICE_PROFILE)
get_profile ()
services (name=None, svc_type='vos', format=None,
profile='default')
list_profiles (optval, profile=None, format='text', token=None)
list_profiles (token=None, profile=None, format='text')
access (token, path, mode, verbose=True)
access (path, mode, token=None, verbose=True)
access (path, mode=None, token=None, verbose=True)
stat (token, path, verbose=True)
stat (path, token=None, verbose=True)
get (token, fr, to, mode='text', verbose=True, debug=False)
get (opt1, opt2, fr='', to='', token=None, mode='text',
verbose=True, debug=False)
get (optval, fr='', to='', token=None, mode='text',
verbose=True, debug=False)
get (token=None, fr='', to='', mode='text', verbose=True,
debug=False)
put (token, fr, to, verbose=True, debug=False)
put (fr, to, token=None, verbose=True, debug=False)
put (optval, fr='', to='vos//', token=None, verbose=True,
debug=False)
put (fr='', to='vos//', token=None, verbose=True,
debug=False)
cp (token, fr, to, verbose=False)
cp (fr, to, token=None, verbose=False)
cp (token, fr='', to='', verbose=False)
cp (token=None, fr='', to='', verbose=False)
ln (token, fr, target, verbose=False)
ln (fr, target, token=None, verbose=False)
ln (token, fr='', target='', verbose=False)
ls (token, name, format='csv', verbose=False)
ls (optval, name='vos//', token=None, format='csv',
verbose=False)
ls (name='vos//', token=None, format='csv', verbose=False)
mkdir (token, name)
mkdir (optval, name='', token=None)
mv (token, fr, to, verbose=False)
mv (fr, to, token=None, verbose=False)
mv (token, fr='', to='', verbose=False)
mv (token=None, fr='', to='', verbose=False)
rm (token, name, verbose=False)
rm (optval, name='', token=None, verbose=False)
rm (name='', token=None, verbose=False)
rmdir (token, name, verbose=False)
rmdir (optval, name='', token=None, verbose=False)
rmdir (name='', token=None, verbose=False)
saveAs (token, data, name)
saveAs (data, name, token=None)
tag (token, name, tag)
tag (name, tag, token=None)
tag (token, name='', tag='')
load (token, name, endpoint, is_vospace=False)
load (name, endpoint, token=None, is_vospace=False)
pull (token, name, endpoint, is_vospace=False)
pull (name, endpoint, token=None, is_vospace=False)
Import via
.. code-block:: python
from dl import storeClient
'''
import os
import sys
import fnmatch
import requests
import glob
import socket
import json
import time
import re
if os.path.isfile('./Util.py'): # use local dev copy
from Util import multimethod
from Util import def_token, split_auth_token, is_auth_token
else: # use distribution copy
from dl.Util import multimethod
from dl.Util import def_token, split_auth_token, is_auth_token
# Turn off some annoying astropy warnings
#import warnings
#from astropy.utils.exceptions import AstropyWarning
#warnings.simplefilter('ignore', AstropyWarning)
is_py3 = sys.version_info.major == 3
#####################################
# Storage Manager Configuration
#####################################
# The URL of the Storage Manager service to contact. This may be changed by
# passing a new URL into the set_svc_url() method before beginning.
DEF_SERVICE_ROOT = 'https://datalab.noirlab.edu'
# Allow the service URL for dev/test systems to override the default.
THIS_HOST = socket.gethostname() # host name
sock = socket.socket(type=socket.SOCK_DGRAM) # host IP address
sock.connect(('8.8.8.8', 1)) # Example IP address, see RFC 5737
THIS_IP, _ = sock.getsockname()
if THIS_HOST[:5] == 'dldev':
DEF_SERVICE_ROOT = 'https://dldev.datalab.noirlab.edu'
elif THIS_HOST[:6] == 'dltest':
DEF_SERVICE_ROOT = 'https://dltest.datalab.noirlab.edu'
DEF_SERVICE_URL = DEF_SERVICE_ROOT + '/storage'
QM_SERVICE_URL = DEF_SERVICE_ROOT + '/query'
# The requested query 'profile'. A profile refers to the specific
# machines and services used by the Storage Manager on the server.
DEF_SERVICE_PROFILE = 'default'
# Use a /tmp/SM_DEBUG file as a way to turn on debugging in the client code.
DEBUG = os.path.isfile('/tmp/SM_DEBUG')
# Check for a file to override the default service URL.
if os.path.exists('/tmp/SM_SVC_URL'):
with open('/tmp/SM_SVC_URL') as fd:
DEF_SERVICE_URL = fd.read().strip()
if os.path.exists('/tmp/QM_SVC_URL'):
with open('/tmp/QM_SVC_URL') as fd:
QM_SERVICE_URL = fd.read().strip()
URI_RESERVED = ":;?/@&=+$," # RFC2396 reserved URI chars
# DLC-1818. There are file names we served, that contain + signs
# in their names. They need to be escaped for it to work on a URL.
# For now we are only escaping the +/plus sign to minimize introducing
# unwanted behaviours.
# A deeper look at what characters are allowed system wide should be done at
# some point.
PLUS_REGEX = r'\+'
PLUS_URL_ESC_CODE = "%2B"
# ####################################################################
# Store Client error class
# ####################################################################
[docs]
class storeClientError(Exception):
def __init__(self, message):
self.message = message
def __str__(self):
return self.message
# ####################################################################
# Module Functions
# ####################################################################
# --------------------------------------------------------------------
# ISALIVE -- Ping the Storage Manager service to see if it responds.
#
[docs]
def isAlive(svc_url=DEF_SERVICE_URL):
return sc_client.isAlive(svc_url=svc_url)
# --------------------------------------------------------------------
# SET_SVC_URL -- Set the service url to use.
#
[docs]
def set_svc_url(svc_url=DEF_SERVICE_URL):
return sc_client.set_svc_url(svc_url=svc_url)
# --------------------------------------------------------------------
# GET_SVC_URL -- Get the service url being used.
#
[docs]
def get_svc_url():
return sc_client.get_svc_url()
# --------------------------------------------------------------------
# SET_PROFILE -- Set the profile to be used
#
[docs]
def set_profile(profile=DEF_SERVICE_PROFILE):
return sc_client.set_profile(profile=profile)
# --------------------------------------------------------------------
# GET_PROFILE -- Get the profile currently being used.
#
[docs]
def get_profile():
return sc_client.get_profile()
# --------------------------------------------------------------------
# SERVICES -- List public storage services
#
[docs]
def services(name=None, svc_type='vos', format=None, profile='default'):
return sc_client.services(name=name, svc_type=svc_type, format=format,
profile=profile)
# --------------------------------------------------------------------
# LIST_PROFILES -- List the profiles supported by the Storage Manager service
#
@multimethod('sc',1,False)
def list_profiles(optval, profile=None, format='text', token=None):
if optval is not None and is_auth_token(optval):
# optval looks like a token
return sc_client._list_profiles(token=def_token(optval),
profile=profile, format=format)
else:
# optval looks like a profile name
return sc_client._list_profiles(token=def_token(token), profile=optval,
format=format)
[docs]
@multimethod('sc',0,False)
def list_profiles(token=None, profile=None, format='text'):
'''Retrieve the profiles supported by the Storage Manager service.
Usage::
list_profiles(token=None, profile=None, format='text')
MultiMethod Usage::
storeClient.list_profiles(token) # list default profile
storeClient.list_profiles(profile) # list named profile
storeClient.list_profiles() # list default profile
Parameters
----------
token : str [Optional]
Authentication token (see function :func:`authClient.login()`).
profile : str
A specific profile to list.
Returns
-------
profiles : list/dict
A list of the names of the supported profiles or a dictionary of the
specific profile.
Example
-------
.. code-block:: python
# get the list of available profiles
profiles = storeClient.list_profiles()
'''
return sc_client._list_profiles(token=def_token(token), profile=profile,
format=format)
# -----------------------------
# Utility Functions
# -----------------------------
# --------------------------------------------------------------------
# ACCESS -- Determine whether the file can be accessed with the given node.
# Modes are 'r' (read access), 'w' (write access), or '' or None
# for an existence test.
#
@multimethod('sc',3,False)
def access(token, path, mode, verbose=True):
return sc_client._access(path=path, mode=mode, token=def_token(token),
verbose=verbose)
@multimethod('sc',2,False)
def access(path, mode, token=None, verbose=True):
return sc_client._access(path=path, mode=mode, token=def_token(token),
verbose=verbose)
[docs]
@multimethod('sc',1,False)
def access(path, mode=None, token=None, verbose=True):
'''Determine whether the file can be accessed with the given node.
Usage::
access(path, mode=None, token=None, verbose=True)
MultiMethod Usage::
storeClient.access(token, path, mode)
storeClient.access(path, mode)
storeClient.access(path)
Parameters
----------
path : str
A name or file template of the file status to retrieve.
mode : str
Requested access mode. Modes are 'r' (read access), 'w' (write
access), or 'rw' to test for both read/write access. If mode
is None a simple existence check is made.
token : str [Optional]
Authentication token (see function :func:`authClient.login()`).
verbose : bool
Verbose output flag.
Returns
-------
result : bool
True if the node can be access with the requested mode.
Example
-------
.. code-block:: python
if storeClient.access('/mydata.csv'):
print('File exists')
elif storeClient.access('/mydata.csv','rw'):
print('File is both readable and writable')
'''
return sc_client._access(path=path, mode=mode, token=def_token(token),
verbose=verbose)
# --------------------------------------------------------------------
# STAT -- Get file status. Values are returned as a dictionary of the
# requested node.
#
@multimethod('sc',2,False)
def stat(token, path, verbose=True):
return sc_client._stat(path=path, token=def_token(token), verbose=verbose)
[docs]
@multimethod('sc',1,False)
def stat(path, token=None, verbose=True):
'''Get file status information, similar to stat().
Usage::
stat(path, token=None, verbose=True)
MultiMethod Usage::
storeClient.stat(token, path)
storeClient.stat(path)
Parameters
----------
path : str
A name or file template of the file status to retrieve.
token : str [Optional]
Authentication token (see function :func:`authClient.login()`).
verbose : bool
Verbose output flag.
Returns
-------
stat : dictionary
A dictionary of node status values. Returned fields include:
* ``name`` Name of node.
* ``groupread`` List of group/owner names with read access.
* ``groupwrite`` List of group/owner names with write access.
* ``publicread`` Publicly readable (0=False, 1=True).
* ``owner`` Owner name.
* ``perms`` Formatted unix-like permission string.
* ``target`` Node target if LinkNode.
* ``size`` Size of file node (bytes).
* ``type`` Node type (container|data|link).
Example
-------
.. code-block:: python
# get status information for a specific node
stat = storeClient.stat('vos://mydata.csv')
if stat['type'] == 'container':
print('This is a directory')
else:
print('File size is: ' + str(stat['size']))
'''
return sc_client._stat(path=path, token=def_token(token), verbose=verbose)
# --------------------------------------------------------------------
# GET -- Retrieve a file (or files) from the Storage Manager service
#
@multimethod('sc',3,False)
def get(token, fr, to, mode='text', verbose=True, debug=False, timeout=30):
return sc_client._get(fr=fr, to=to, token=def_token(token),
mode=mode, verbose=verbose, debug=debug,
timeout=timeout)
@multimethod('sc',2,False)
def get(opt1, opt2, fr='', to='', token=None, mode='text', verbose=True,
debug=False, timeout=30):
if opt1 is not None and is_auth_token(opt1):
# opt1 looks like a token
return sc_client._get(fr=opt2, to=to, token=def_token(opt1),
mode=mode, verbose=verbose, debug=debug,
timeout=timeout)
else:
# opt1 is the 'fr' value, opt2 is the 'to' value
return sc_client._get(fr=opt1, to=opt2, token=def_token(token),
mode=mode, verbose=verbose, debug=debug,
timeout=timeout)
@multimethod('sc',1,False)
def get(optval, fr='', to='', token=None, mode='text', verbose=True,
debug=False, timeout=30):
if optval is not None and is_auth_token(optval):
# optval looks like a token
return sc_client._get(fr=fr, to=to, token=def_token(optval),
mode=mode, verbose=verbose, debug=debug,
timeout=timeout)
else:
# optval is the 'fr' value
return sc_client._get(fr=optval, to=to, token=def_token(token),
mode=mode, verbose=verbose, debug=debug,
timeout=timeout)
[docs]
@multimethod('sc',0,False)
def get(token=None, fr='', to='', mode='text', verbose=True, debug=False,
timeout=30):
'''Retrieve a file from the Storage Manager service.
Usage::
get(token=None, fr='', to='', mode='text', verbose=True, debug=False,
timeout=30)
MultiMethod Usage::
storeClient.get(token, fr, to)
storeClient.get(fr, to)
storeClient.get(fr)
storeClient.get(token, fr, to)
Parameters
----------
token : str [Optional]
Authentication token (see function :func:`authClient.login()`).
fr : str
A name or file template of the file(s) to retrieve.
to : str
Name of the file(s) to locally save to. If not specified, the contents
of the file are returned to the caller.
mode : [binary | text | fileobj]
Return data type if not saving to file. If set to 'text' the
file contents are converted to string -- this is appropriate when
dealing with unicode but may fail with general binary data. If
set to 'binary' the raw content of the HTTP response is returned --
for Python 2 this will be a 'string', for Python 3 it will be a
'bytes' data type (the caller is responsible for conversion).
verbose : bool
Print verbose output, e.g. progress indicators.
debug : bool
Print debug output.
timeout : integer
Retry timeout value. When processing long lists, download will
pause every ``timeout`` files to lessen server load. For individual
files, transfer will retry for ``timeout`` seconds before aborting.
Failed transfers are automatically appended to the file list so
they may be transferred again later.
Returns
-------
result : str
A list of the names of the files retrieved, or the contents of
a single file.
Example
-------
.. code-block:: python
# get a single file to a local file of a different name
data = storeClient.get('vos://mydata.csv', 'data.csv')
# get the contents of a single file to a local variable
data = storeClient.get('vos://mydata.csv')
# get a list of remote files to a local directory
flist = storeClient.get('vos://*.fits', './data/')
flist = storeClient.get('*.fits', './data/')
'''
return sc_client._get(fr=fr, to=to, token=def_token(token),
mode=mode, verbose=verbose, debug=debug,
timeout=timeout)
# --------------------------------------------------------------------
# PUT -- Upload a file (or files) to the Storage Manager service
#
@multimethod('sc',3,False)
def put(token, fr, to, verbose=True, debug=False):
return sc_client._put(fr=fr, to=to, token=def_token(token),
verbose=verbose, debug=debug)
@multimethod('sc',2,False)
def put(fr, to, token=None, verbose=True, debug=False):
return sc_client._put(fr=fr, to=to, token=def_token(token),
verbose=verbose, debug=debug)
@multimethod('sc',1,False)
def put(optval, fr='', to='vos://', token=None, verbose=True, debug=False):
if optval is not None and is_auth_token(optval):
# optval looks like a token
return sc_client._put(fr=fr, to=to, token=def_token(optval),
verbose=verbose, debug=debug)
else:
# optval looks like source name
return sc_client._put(fr=optval, to=to, token=def_token(token),
verbose=verbose, debug=debug)
[docs]
@multimethod('sc',0,False)
def put(fr='', to='vos://', token=None, verbose=True, debug=False):
'''Upload a file to the Storage Manager service.
Usage::
put(fr='', to='vos://', token=None, verbose=True, debug=False)
MultiMethod Usage::
storeClient.put(token, fr, to)
storeClient.put(fr, to)
storeClient.put(fr)
storeClient.put(fr='',to='')
Parameters
----------
token : str [Optional]
Authentication token (see function :func:`authClient.login()`).
fr : str
Name or template of local file to upload.
to : str
Name of the file for destination directory on remote VOSpace.
Returns
-------
result : str
An ``'OK'`` message or error string for each uploaded file.
Example
-------
.. code-block:: python
# Put the contents of local file 'data.csv' into VOSpace file named 'data_uploaded.csv'.
storeClient.put('data.csv', 'vos://data_uploaded.csv')
'''
return sc_client._put(fr=fr, to=to, token=def_token(token),
verbose=verbose, debug=debug)
# --------------------------------------------------------------------
# CP -- Copy a file/directory within the Storage Manager service
#
@multimethod('sc',3,False)
def cp(token, fr, to, verbose=False):
return sc_client._cp(fr=fr, to=to, token=def_token(token), verbose=verbose)
@multimethod('sc',2,False)
def cp(fr, to, token=None, verbose=False):
return sc_client._cp(fr=fr, to=to, token=def_token(token), verbose=verbose)
@multimethod('sc',1,False)
def cp(token, fr='', to='', verbose=False):
return sc_client._cp(fr=fr, to=to, token=def_token(token), verbose=verbose)
[docs]
@multimethod('sc',0,False)
def cp(token=None, fr='', to='', verbose=False):
'''Copy a file/directory within the Storage Manager service.
Usage::
cp(token=None, fr='', to='', verbose=False)
MultiMethod Usage::
storeClient.cp(token, fr, to)
storeClient.cp(fr, to)
storeClient.cp(fr)
storeClient.cp(fr='',to='')
Parameters
----------
token : str [Optional]
Authentication token (see function :func:`authClient.login()`).
fr : str
Name of the file to be copied (may not be a directory).
to : str
Name of the file to be created.
Returns
-------
result : str
An ``'OK'`` message or error string for each copied file.
Example
-------
.. code-block:: python
# Copy a file in VOSpace
storeClient.cp('foo', 'bar')
storeClient.cp('vos://foo', 'vos:///new/bar')
'''
return sc_client._cp(fr=fr, to=to, token=def_token(token), verbose=verbose)
# ----------------------------------------------------------------------
# LN -- Create a link to a file/directory in the Storage Manager service
#
@multimethod('sc',3,False)
def ln(token, fr, target, verbose=False):
return sc_client._ln(fr=fr, target=target, token=def_token(token),
verbose=verbose)
@multimethod('sc',2,False)
def ln(fr, target, token=None, verbose=False):
return sc_client._ln(fr=fr, target=target, token=def_token(token),
verbose=verbose)
[docs]
@multimethod('sc',1,False)
def ln(token, fr='', target='', verbose=False):
'''Create a link to a file/directory in the Storage Manager service.
Usage::
ln(token, fr='', target='', verbose=False)
MultiMethod Usage::
storeClient.ln(token, fr, target)
storeClient.ln(fr, target)
storeClient.ln(fr, target)
Parameters
----------
token : str [Optional]
Authentication token (see function :func:`authClient.login()`).
fr : str
Name of the file to be linked (may not be a directory).
to : str
Name of the link to be created.
Returns
-------
result : str
An ``'OK'`` message or error string for each linked file.
Example
-------
.. code-block:: python
# Link a file in VOSpace
storeClient.ln('foo', 'bar')
storeClient.ln('vos://foo', 'vos:///new/bar')
'''
return sc_client._ln(fr=fr, target=target, token=def_token(token),
verbose=verbose)
# --------------------------------------------------------------------
# LS -- Get a file/directory listing from the Storage Manager service
#
@multimethod('sc',2,False)
def ls(token, name, format='csv', verbose=False):
return sc_client._ls(name=name, format=format, token=def_token(token),
verbose=verbose)
@multimethod('sc',1,False)
def ls(optval, name='vos://', token=None, format='csv', verbose=False):
if optval is not None and is_auth_token(optval):
# optval looks like a token
return sc_client._ls(name=name, format=format,
token=def_token(optval), verbose=verbose)
else:
return sc_client._ls(name=optval, format=format,
token=def_token(None), verbose=verbose)
[docs]
@multimethod('sc',0,False)
def ls(name='vos://', token=None, format='csv', verbose=False):
'''Get a file/directory listing from the Storage Manager service.
Usage::
ls(name='vos://', token=None, format='csv', verbose=False)
MultiMethod Usage::
storeClient.ls(token, name)
storeClient.ls(name)
storeClient.ls()
Parameters
----------
token : str [Optional]
Authentication token (see function :func:`authClient.login()`).
name : str [Optional]
Valid name of file or directory, e.g. ``vos://somedir``. If no name
is provided, it will return the top directory listing.
format : str
Default ``csv``. The ``long`` option produces an output similar to ``ls -l``.
Example
-------
.. code-block:: python
# Get the file/directory listing of the top directory
listing = storeClient.ls()
# Get the file/directory listing of the ``somedir`` directory
listing = storeClient.ls('vos://somedir')
print(listing)
This prints for instance:
.. code::
bar2.fits,foo1.csv,fancyfile.dat
'''
return sc_client._ls(name=name, format=format, token=def_token(token),
verbose=verbose)
# --------------------------------------------------------------------
# MKDIR -- Create a directory in the Storage Manager service
#
@multimethod('sc',2,False)
def mkdir(token, name):
return sc_client._mkdir(name=name, token=def_token(token))
[docs]
@multimethod('sc',1,False)
def mkdir(optval, name='', token=None):
'''Make a directory in the Storage Manager service.
Usage::
mkdir(optval, name='', token=None)
MultiMethod Usage::
storeClient.mkdir(token, name)
storeClient.mkdir(name)
Parameters
----------
token : str [Optional]
Authentication token (see function :func:`authClient.login()`).
name : str
Name of the container (directory) to create.
Returns
-------
result : str
An ``'OK'`` message or error string.
Example
-------
.. code-block:: python
# Create a directory in VOSpace called ``foo``
storeClient.mkdir('foo')
'''
if optval is not None and is_auth_token(optval):
return sc_client._mkdir(name=name, token=def_token(optval))
else:
return sc_client._mkdir(name=optval, token=def_token(token))
# --------------------------------------------------------------------
# MV -- Move/rename a file/directory within the Storage Manager service
#
@multimethod('sc',3,False)
def mv(token, fr, to, verbose=False):
return sc_client._mv(fr=fr, to=to, token=def_token(token), verbose=verbose)
@multimethod('sc',2,False)
def mv(fr, to, token=None, verbose=False):
return sc_client._mv(fr=fr, to=to, token=def_token(token), verbose=verbose)
@multimethod('sc',1,False)
def mv(token, fr='', to='', verbose=False):
return sc_client._mv(fr=fr, to=to, token=def_token(token), verbose=verbose)
[docs]
@multimethod('sc',0,False)
def mv(token=None, fr='', to='', verbose=False):
'''Move/rename a file/directory within the Storage Manager service.
Usage::
mv(token=None, fr='', to='', verbose=False)
MultiMethod Usage::
storeClient.mv(token, fr, to)
storeClient.mv(fr, to)
storeClient.mv(token)
storeClient.mv(fr='',to='')
Parameters
----------
token : str [Optional]
Authentication token (see function :func:`authClient.login()`).
fr : str
Name of the file or directory to be moved or renamed.
to : str
Name of the file or directory to move or rename the ``'fr'`` file or
directory. If given as a directory the original filename is preserved.
Returns
-------
result : str
An ``'OK'`` message or error string for each moved or renamed
file/directory.
Example
-------
.. code-block:: python
# Move a file in VOSpace
storeClient.mv('foo', 'bar') # rename file
storeClient.mv('foo', 'vos://newdir/') # move to new directory
storeClient.mv('foo', 'newdir')
'''
return sc_client._mv(fr=fr, to=to, token=def_token(token), verbose=verbose)
# --------------------------------------------------------------------
# RM -- Delete a file from the Storage Manager service
#
@multimethod('sc',2,False)
def rm(token, name, verbose=False):
return sc_client._rm(name=name, token=def_token(token), verbose=verbose)
@multimethod('sc',1,False)
def rm(optval, name='', token=None, verbose=False):
if optval is not None and is_auth_token(optval):
# optval looks like a token
return sc_client._rm(name=name, token=def_token(optval),
verbose=verbose)
else:
# optval is the name to be removed
return sc_client._rm(name=optval, token=def_token(token),
verbose=verbose)
[docs]
@multimethod('sc',0,False)
def rm(name='', token=None, verbose=False):
'''Delete a file from the Storage Manager service.
Usage::
rm(name='', token=None, verbose=False)
MultiMethod Usage::
storeClient.rm(token, name)
storeClient.rm(name)
Parameters
----------
token : str [Optional]
Authentication token (see function :func:`authClient.login()`).
name : str
Name of the file to delete.
Returns
-------
result : str
An ``'OK'`` message or error string for each file deleted.
Example
-------
.. code-block:: python
# Remove a file from VOSpace
storeClient.rm('foo.csv')
storeClient.rm('vos://foo.csv')
'''
return sc_client._rm(name=name, token=def_token(token), verbose=verbose)
# --------------------------------------------------------------------
# RMDIR -- Delete a directory from the Storage Manager service
#
@multimethod('sc',2,False)
def rmdir(token, name, verbose=False):
return sc_client._rmdir(name=name, token=def_token(token), verbose=verbose)
@multimethod('sc',1,False)
def rmdir(optval, name='', token=None, verbose=False):
if optval is not None and is_auth_token(optval):
return sc_client._rmdir(name=name, token=def_token(optval),
verbose=verbose)
else:
return sc_client._rmdir(name=optval, token=def_token(token),
verbose=verbose)
[docs]
@multimethod('sc',0,False)
def rmdir(name='', token=None, verbose=False):
'''Delete a directory from the Storage Manager service.
Usage::
rmdir(name='', token=None, verbose=False)
MultiMethod Usage::
storeClient.rmdir(token, name)
storeClient.rmdir(name)
Parameters
----------
token : str [Optional]
Authentication token (see function :func:`authClient.login()`).
name : str
Name of the container (directory) to delete.
Returns
-------
result : str
An ``'OK'`` message or error string for each directory deleted.
Example
-------
.. code-block:: python
# Remove an empty directory from VOSpace
storeClient.rmdir('datadir')
storeClient.rmdir('vos://datadir')
'''
return sc_client._rmdir(name=name, token=def_token(token), verbose=verbose)
# --------------------------------------------------------------------
# SAVEAS -- Save the string representation of a data object as a file.
#
@multimethod('sc',3,False)
def saveAs(token, data, name):
return sc_client._saveAs(data=data, name=name, token=def_token(token))
[docs]
@multimethod('sc',2,False)
def saveAs(data, name, token=None):
'''Save the string representation of a data object as a file.
Usage::
saveAs(data, name, token=None)
MultiMethod Usage::
storeClient.saveAs(token, data, name)
storeClient.saveAs(data, name)
Parameters
----------
token : str [Optional]
Authentication token (see function :func:`authClient.login()`).
data : str
Data object to be saved.
name : str
Name of the file to create containing the string representation
of the data.
Returns
-------
result : str
An ``'OK'`` message or error string for each file saved.
Example
-------
.. code-block:: python
# Save a data object called ``table_data`` to VOSpace as ``table.example``
storeClient.saveAs(table_data, 'table.example')
'''
return sc_client._saveAs(data=data, name=name, token=def_token(token))
# --------------------------------------------------------------------
# TAG -- Annotate a file/directory in the Storage Manager service
#
@multimethod('sc',3,False)
def tag(token, name, tag):
return sc_client._tag(name=name, tag=tag, token=def_token(token))
@multimethod('sc',2,False)
def tag(name, tag, token=None):
return sc_client._tag(name=name, tag=tag, token=def_token(token))
[docs]
@multimethod('sc',1,False)
def tag(token, name='', tag=''):
'''Annotate a file/directory in the Storage Manager service.
Usage::
tag(token, name='', tag='')
MultiMethod Usage::
storeClient.tag(token, name, tag)
storeClient.tag(name, tag)
storeClient.tag(token, name='foo', tag='bar')
Parameters
----------
token : str [Optional]
Authentication token (see function :func:`authClient.login()`).
name : str
Name of the file to tag.
tag : str
Annotation string for file.
Returns
-------
result : str
An ``'OK'`` message or error string for each file annotated.
Example
-------
.. code-block:: python
# Annotate a file in VOSpace
storeClient.tag('foo.csv', 'This is a test')
'''
return sc_client._tag(name=name, tag=tag, token=def_token(token))
# --------------------------------------------------------------------
# LOAD/PULL -- Load a file from a remote endpoint to the Storage Manager service
#
@multimethod('sc',3,False)
def load(token, name, endpoint, is_vospace=False):
return sc_client._load(name=name, endpoint=endpoint,
token=def_token(token), is_vospace=is_vospace)
[docs]
@multimethod('sc',2,False)
def load(name, endpoint, token=None, is_vospace=False):
'''Load a file from a remote endpoint to the Storage Manager service.
Usage::
load(name, endpoint, token=None, is_vospace=False)
MultiMethod Usage::
storeClient.load(token, name, endpoint)
storeClient.load(name, endpoint)
Parameters
----------
token : str [Optional]
Authentication token (see function :func:`authClient.login()`).
name : str
Name or template of local files to upload.
endpoint : str
Name of the file for destination directory on remote VOSpace.
Returns
-------
result : str
An ``'OK'`` message or error string for each loaded file.
Example
-------
.. code-block:: python
# Load a file to VOSpace called ``mydata.fits`` from a remote URL
storeClient.load('vos://mydata.fits', 'http://example.com/data.fits')
'''
return sc_client._load(name=name, endpoint=endpoint,
token=def_token(token), is_vospace=is_vospace)
# Aliases for load() calls.
@multimethod('sc',3,False)
def pull(token, name, endpoint, is_vospace=False):
return sc_client._load(name=name, endpoint=endpoint,
token=def_token(token), is_vospace=is_vospace)
[docs]
@multimethod('sc',2,False)
def pull(name, endpoint, token=None, is_vospace=False):
'''Pull a file from a remote endpoint to the Storage Manager service.
Same as :func:`storeClient.load()` function.
Usage::
pull(name, endpoint, token=None, is_vospace=False)
MultiMethod Usage::
storeClient.pull(token, name, endpoint)
storeClient.pull(name, endpoint)
Parameters
----------
token : str [Optional]
Authentication token (see function :func:`authClient.login()`).
name : str
Name or template of local files to pull.
endpoint : str
Name of the file for destination directory on remote VOSpace.
Returns
-------
result : str
An ``'OK'`` message or error string for each pulled file.
Example
-------
.. code-block:: python
# Pull a file to VOSpace called ``mydata.fits`` from a remote URL
storeClient.pull('vos://mydata.fits', 'http://example.com/data.fits')
'''
return sc_client._load(name=name, endpoint=endpoint,
token=def_token(token), is_vospace=is_vospace)
# ####################################################################
# Module Functions
# ####################################################################
[docs]
class storeClient(object):
'''
STORECLIENT -- Client-side methods to access the Data Lab
Storage Manager service.
'''
def __init__(self, profile=DEF_SERVICE_PROFILE, svc_url=DEF_SERVICE_URL):
'''Initialize the Store Client object.
'''
self.svc_url = svc_url.strip('/') # StoreMgr service URL
self.qm_svc_url = QM_SERVICE_URL # QueryMgr service URL
self.svc_profile = profile # StoreMgr service profile
self.hostip = THIS_IP
self.hostname = THIS_HOST
self.async_wait = False
# Get the $HOME/.datalab directory.
self.home = '%s/.datalab' % os.path.expanduser('~')
self.debug = DEBUG # interface debug flag
# --------------------------------------------------------------------
# ISALIVE -- Ping the Storage Manager service to see if it responds.
#
[docs]
def isAlive(self, svc_url=None, timeout=2):
'''Check whether the Storage Manager service at the given URL is
alive and responding. This is a simple call to the root
service URL or ``ping()`` method.
Parameters
----------
svc_url : str [Optional]
The service URL of the Storage Manager to use.
Returns
-------
Example
-------
.. code-block:: python
# Check if service is responding
storeClient.isAlive()
This prints
.. code::
True
'''
if svc_url is None:
svc_url = self.svc_url
try:
r = requests.get(svc_url.strip('/'), timeout=timeout)
resp = scToString(r.content)
if r.status_code != requests.codes.ok:
return False
elif resp is not None and r.text.lower()[:11] != "hello world":
return False
except Exception:
return False
return True
[docs]
def set_svc_url(self, svc_url):
'''Set the Storage Manager service URL.
Parameters
----------
svc_url : str
The service URL of the Storage Manager to use.
Returns
-------
Nothing
Example
-------
.. code-block:: python
storeClient.set_svc_url("http://demo.datalab.noirlab.edu:7003")
'''
if svc_url is not None and svc_url != '':
self.svc_url = scToString(svc_url.strip('/'))
[docs]
def get_svc_url(self):
'''Return the currently-used Storage Manager service URL.
Parameters
----------
None
Returns
-------
result : str
Current Storage Manager service URL.
Example
-------
.. code-block:: python
print(storeClient.get_svc_url())
'''
return scToString(self.svc_url)
[docs]
def set_profile(self, profile):
'''Set the service profile to be used.
Parameters
----------
profile : str
The name of the profile to use. The list of available ones can
be retrieved from the service (see function
:func:`storeClient.list_profiles()`).
Returns
-------
Nothing
Example
-------
.. code-block:: python
storeClient.set_profile('test')
'''
if profile is not None and profile != '':
self.svc_profile = scToString(profile)
[docs]
def get_profile(self):
'''Get the Storage Manager service profile being used.
Parameters
----------
None
Returns
-------
profile : str
The name of the current profile used with the Storage Manager.
Example
-------
.. code-block:: python
print('Store Service profile = ' + storeClient.get_profile())
'''
return scToString(self.svc_profile)
@multimethod('_sc',1,True)
def list_profiles(self, token, profile=None, format='text'):
''' Usage:: storeClient.list_profiles(token, ....)
'''
return self._list_profiles(token=def_token(token), profile=profile,
format=format)
[docs]
@multimethod('_sc',0,True)
def list_profiles(self, token=None, profile=None, format='text'):
''' Usage:: storeClient.list_profiles(....)
'''
return self._list_profiles(token=def_token(token), profile=profile,
format=format)
def _list_profiles(self, token=None, profile=None, format='text'):
'''Implementation of the ``list_profiles()`` method.
'''
dburl = '/profiles?'
if profile != None and profile != 'None' and profile != '':
dburl += "profile=%s&" % profile
dburl += "format=%s" % format
r = self.getFromURL(self.svc_url, dburl, def_token(token))
profiles = scToString(r.content)
if '{' in profiles:
profiles = json.loads(profiles)
return scToString(profiles)
# --------------------------------------------------------------------
# SERVICES -- List public storage services
#
[docs]
def services(self, name=None, svc_type='vos', format=None,
profile='default'):
return self._services(name=name, svc_type=svc_type, format=format,
profile=profile)
def _services(self, name=None, svc_type='vos', format=None,
profile='default'):
'''
'''
dburl = '/services?'
if profile is not None and profile != 'None' and profile != '':
dburl += ("profile=%s" % profile)
if name is not None and name != 'None' and name != '':
dburl += ("&name=%s" % name.replace('%','%25'))
if svc_type is not None and svc_type != 'None' and svc_type != '':
dburl += ("&type=%s" % svc_type)
if format is not None and format != 'None' and format != '':
dburl += "&format=%s" % format
r = self.getFromURL(self.qm_svc_url, dburl, def_token(None))
svcs = scToString(r.content)
if '{' in svcs:
svcs = json.loads(svcs)
return scToString(svcs)
# -----------------------------
# Utility Methods
# -----------------------------
# --------------------------------------------------------------------
# ACCESS -- Determine whether the file can be accessed with the given node.
#
@multimethod('_sc',3,True)
def access(self, token, path, mode, verbose=True):
''' Usage:: storeClient.access(token, path, mode)
'''
return self._access(path=path, mode=mode, token=def_token(token),
verbose=verbose)
@multimethod('_sc',2,True)
def access(self, path, mode, token=None, verbose=True):
''' Usage:: storeClient.access(path, mode)
'''
return self._access(path=path, mode=mode, token=def_token(token),
verbose=verbose)
[docs]
@multimethod('_sc',1,True)
def access(self, path, mode=None, token=None, verbose=True):
''' Usage:: storeClient.access(path, mode)
'''
return self._access(path=path, mode=mode, token=def_token(token),
verbose=verbose)
def _access(self, path='', mode='', token=None, verbose=True):
'''Implementation of the ``access()`` method.
'''
uri = (path if path.count('://') > 0 else 'vos://' + path)
url = self.svc_url + ("/access?name=%s&mode=%s&verbose=%s" % \
(uri,mode,verbose))
r = requests.get(url, headers={'X-DL-AuthToken': def_token(token)})
if r.status_code != requests.codes.ok:
return False
else:
val = scToString(r.content).lower()
return (True if val == 'true' else False)
pass
# --------------------------------------------------------------------
# STAT -- Get file status. Values are returned as a dictionary of the
# requested node.
#
@multimethod('_sc',2,True)
def stat(self, token, path, verbose=True):
''' Usage:: storeClient.stat(token, path)
'''
return self._stat(path=path, token=def_token(token), verbose=verbose)
[docs]
@multimethod('_sc',1,True)
def stat(self, path, token=None, verbose=True):
''' Usage:: storeClient.stat(path)
'''
return self._stat(path=path, token=def_token(token), verbose=verbose)
def _stat(self, path='', token=None, verbose=True):
'''Implementation of the ``stat()`` method.
'''
uri = (path if path.count('://') > 0 else 'vos://' + path)
url = self.svc_url + ("/stat?name=%s&verbose=%s" % (uri,verbose))
r = requests.get(url, headers={'X-DL-AuthToken': def_token(token)})
if r.status_code != requests.codes.ok:
return {}
else:
return json.loads(scToString(r.content))
# --------------------------------------------------------------------
# GET -- Retrieve a file from the Storage Manager service
# --------------------------------------------------------------------
@multimethod('_sc',3,True)
def get(self, token, fr, to, mode='text', verbose=True, debug=False,
timeout=30):
''' Usage:: storeClient.get(token, fr, to)
'''
return self._get(fr=fr, to=to, token=def_token(token),
mode=mode, verbose=verbose, debug=debug,
timeout=timeout)
@multimethod('_sc',2,True)
def get(self, opt1, opt2, fr='', to='', token=None, verbose=True,
mode='text', debug=False, timeout=30):
''' Usage:: storeClient.get(fr, to)
'''
if opt1 is not None and is_auth_token(opt1):
# opt1 looks like a token
return self._get(fr=opt2, to=to, token=def_token(opt1),
mode=mode, verbose=verbose, debug=debug,
timeout=timeout)
else:
# opt1 is the 'fr' value, opt2 is the 'to' value
return self._get(fr=opt1, to=opt2, token=def_token(token),
mode=mode, verbose=verbose, debug=debug,
timeout=timeout)
@multimethod('_sc',1,True)
def get(self, optval, fr='', to='', token=None, mode='text',
verbose=True, debug=False, timeout=30):
''' Usage:: storeClient.get(fr)
'''
if optval is not None and is_auth_token(optval):
# optval looks like a token
return self._get(fr=fr, to=to, token=def_token(optval),
mode=mode, verbose=verbose, debug=debug,
timeout=timeout)
else:
# optval is the 'fr' value
return self._get(fr=optval, to=to, token=def_token(token),
mode=mode, verbose=verbose, debug=debug,
timeout=timeout)
[docs]
@multimethod('_sc',0,True)
def get(self, fr='', to='', token=None, mode='text', verbose=True,
debug=False, timeout=30):
''' Usage:: storeClient.get(token, fr, to)
'''
return self._get(fr=fr, to=to, token=def_token(token),
mode=mode, verbose=verbose, debug=debug,
timeout=timeout)
def _get(self, token=None, fr='', to='', mode='text', verbose=True,
debug=False, timeout=30):
'''Implementation of the ``get()`` method.
'''
def sizeof_fmt(num):
'''Local pretty-printer for file sizes.
'''
for unit in ['B','K','M','G','T','P','E','Z']:
if abs(num) < 1024.0:
if unit == 'B':
return "%5d%s" % (num, unit)
else:
return "%3.1f%s" % (num, unit)
num /= 1024.0
return "%.1f%s" % (num, 'Y')
tok = def_token(token)
user, uid, gid, hash = split_auth_token(tok.strip())
hdrs = {'Content-Type': 'text/ascii',
'X-DL-ClientVersion': __version__,
'X-DL-OriginIP': self.hostip,
'X-DL-OriginHost': self.hostname,
'X-DL-User': user,
'X-DL-AuthToken': tok} # application/x-sql
# Patch the names with the default URI prefix if needed.
nm = (fr if fr.count("://") > 0 else ("vos://" + fr))
nm = nm.replace('///','//')
if debug:
print("get(): nm = %s" % nm)
if hasmeta(fr):
if to == '' or to is None:
raise storeClientError("Multi-file requests require a download location")
if not os.path.exists(to):
raise storeClientError("Download directory does not exist")
if not os.path.isdir(to):
raise storeClientError("Location must be specified as a directory")
if to != '' and to is not None:
# Expand metacharacters to create a file list for download.
try:
flist = expandFileList(self.svc_url, token, nm, "csv",
full=True)
except Exception as e:
return str(e)
nfiles = len(flist)
if nfiles < 1:
return 'A Node does not exist with the requested URI.'
if debug:
print("get: flist = %s" % flist)
print("get: nfiles = %s" % nfiles)
fnum = 1
resp = []
for f in flist:
nfiles = len(flist) # recompute in case list was modified
# Generate the download file path.
junk, fn = os.path.split(f)
if to.endswith("/"):
dlname = ((to + fn) if hasmeta(fr) else to)
else:
dlname = ((to + "/" + fn) if hasmeta(fr) else to)
# Get a single file.
res = requests.get(self.svc_url + "/get?name=%s" % re.sub(PLUS_REGEX, PLUS_URL_ESC_CODE, f),
headers=hdrs)
if res.status_code != 200:
resp.append("Error: " + scToString(res.text))
else:
r = None
for i in range(1,timeout):
try:
r = requests.get(res.text, stream=True)
except Exception as e:
if "No connection adapters" in str(e) and i%5 == 0:
print('GET error %d: retrying' % i)
if "Internal Server Error" in str(e) and i%5 == 0:
print('GET internal error %d: retrying' % i)
time.sleep(1)
if i == (timeout-1):
r = None
break
else:
break
if r is None:
# If the download failed, put it back at the end
# of the list so we can retry later.
flist.append(f)
elif r.status_code != 200:
resp.append(scToString(r.content))
else:
clen = r.headers.get('content-length')
total_length = (0 if clen is None else int(clen))
# Download the file in chunks so we can have a progress
# indicator on each.
dl = 0
done = 0
with open(dlname, 'wb', 0) as fd:
while 1:
buf = r.raw.read((8*1024))
dl += len(buf)
if not buf:
break
fd.write(buf)
if total_length > 0:
done = int(20 * dl / total_length)
if verbose: # Print a progress indicator
sys.stdout.write(
"\r(%d/%d) [%s%s] [%7s] %s" % \
(fnum, nfiles, '='*done, ' '*(20-done),
sizeof_fmt(dl), f[6:]))
sys.stdout.flush()
# If the download failed, put it back at the end
# of the list so we can retry later.
if total_length > 0 and dl == 0:
flist.append(f)
# Handle a zero-length file download.
if verbose:
if dl == 0:
print("\r(%d/%d) [%s] [%7s] %s" % \
(fnum, nfiles, '=' * 20, "0 B", f[6:]))
else:
print('')
fd.close()
resp.append('OK')
fnum += 1
if fnum % timeout == 0:
time.sleep(5)
return resp
else:
# Get a single file, return the raw contents to the caller.
url = requests.get(self.svc_url + "/get?name=%s" % nm,
headers=hdrs)
r = requests.get(url.text, stream=False, headers=hdrs)
if mode == 'text':
return scToString(r.content)
elif mode == 'binary':
return r.content
elif mode == 'fileobj':
from astropy.utils.data import get_readable_fileobj
from io import BytesIO
try:
fileobj = BytesIO(r.content)
with get_readable_fileobj(fileobj, encoding='binary',
cache=True) as f:
return f
except Exception as e:
raise storeClientError(str(e))
else:
return scToString(r.content)
# --------------------------------------------------------------------
# PUT -- Upload a file to the Storage Manager service
# --------------------------------------------------------------------
@multimethod('_sc',3,True)
def put(self, token, fr, to, verbose=True, debug=False):
''' Usage:: storeClient.put(token, fr, to)
'''
return self._put(fr=fr, to=to, token=def_token(token),
verbose=verbose, debug=False)
@multimethod('_sc',2,True)
def put(self, fr, to, token=None, verbose=True, debug=False):
''' Usage:: storeClient.put(fr, to)
'''
return self._put(fr=fr, to=to, token=def_token(token),
verbose=verbose, debug=False)
@multimethod('_sc',1,True)
def put(self, optval, fr='', to='vos://', token=None, verbose=True,
debug=False):
''' Usage:: storeClient.put(fr)
'''
if optval is not None and is_auth_token(optval):
# optval looks like a token
return self._put(fr=fr, to=to, token=def_token(optval),
verbose=verbose, debug=False)
else:
# optval looks like a source name
return self._put(fr=optval, to=to, token=def_token(token),
verbose=verbose, debug=False)
[docs]
@multimethod('_sc',0,True)
def put(self,fr='', to='vos://', token=None, verbose=True, debug=False):
''' Usage:: storeClient.put(fr='',to='')
'''
return self._put(fr=fr, to=to, token=def_token(token),
verbose=verbose, debug=debug)
def _put(self, token=None, fr='', to='vos://', verbose=True, debug=False):
'''Implementation of the ``put()`` method.
'''
tok = def_token(token)
user, uid, gid, hash = split_auth_token(tok.strip())
hdrs = {'Content-Type': 'text/ascii',
'X-DL-ClientVersion': __version__,
'X-DL-OriginIP': self.hostip,
'X-DL-OriginHost': self.hostname,
'X-DL-User': user,
'X-DL-AuthToken': tok} # application/x-sql
# If the 'fr' is a directory, create it first and then transfer the
# contents.
if os.path.isdir(fr):
if fr.endswith("/"):
dname = (to if to.count("://") > 0 else to[:-1])
self._mkdir(token=token, name=dname)
flist = glob.glob(fr+"/*")
else:
dname = ''
flist = glob.glob(fr)
nfiles = len(flist)
if debug:
print("fr=%s to=%s dname=%s" % (fr, to, dname))
print(flist)
if nfiles > 1:
#pstat = stat(to)
#ptype = pstat.get('type')
ptype = stat(to).get('type')
if ptype is None:
return ['Error: target directory not exist']
elif ptype != 'container':
return ['Error: target must be a container']
fnum = 1
resp = []
for f in flist:
if debug:
print("put: f=%s" % (f))
fr_dir, fr_name = os.path.split(f)
if any(i in fr_name for i in URI_RESERVED):
resp.append('Error: URI reserved char in source filename: '+f)
continue
# Patch the names with the URI prefix if needed.
nm = (to if to.count("://") > 0 else ("vos://" + to))
if to.endswith("/"):
nm = nm + fr_name
if is_vosDir(self.svc_url, token, nm):
nm = nm + '/' + fr_name
nm = nm.replace('///','//') # fix extra path indicators
if any(i in nm[nm.rfind('/')+1:] for i in URI_RESERVED):
resp.append('Error: URI reserved char in target filename: '+nm)
continue
if debug:
print("put: fr_dir=%s fr_name=%s" % (fr_dir,fr_name))
print("put: f=%s nm(to)=%s" % (f,nm))
if not os.path.exists(f):
# Skip files that don't exist
resp.append("Error: Local file '%s' does not exist" % f)
if verbose:
print("Error: Local file '%s' does not exist" % f)
continue
r = requests.get(self.svc_url + "/put?name=%s" % nm, headers=hdrs)
# Cannot upload directly to a container
# if r.status_code == 500 and \
# r.content == "Data cannot be uploaded to a container":
# This is now handles above where we check for a container using is_vosDir
if r.status_code == requests.codes.server_error:
resp.append(scToString(r.content))
else:
try:
if verbose:
sys.stdout.write("(%d / %d) %s -> " % (fnum, nfiles, f))
# This *should* work for large data files - MJG 05/24/17
with open(f, 'rb') as file:
requests.put(r.content, data=file,
headers={'Content-type': 'application/octet-stream',
'X-DL-AuthToken': token})
if verbose:
sys.stdout.write("%s\n" % nm)
except Exception as e:
resp.append(str(e))
else:
resp.append('OK')
fnum += 1
return 'OK' if not resp else resp
# -------------------------------------------------------------------------
# LOAD -- Load a file from a remote endpoint to the Storage Manager service
# -------------------------------------------------------------------------
@multimethod('_sc',3,True)
def load(self, token, name, endpoint, is_vospace=False):
''' Usage:: storeClient.load(token, name, endpoint)
'''
return self._load(name=name, endpoint=endpoint,
token=def_token(token), is_vospace=is_vospace)
[docs]
@multimethod('_sc',2,True)
def load(self, name, endpoint, token=None, is_vospace=False):
''' Usage:: storeClient.load(name, endpoint)
'''
return self._load(name=name, endpoint=endpoint,
token=def_token(token), is_vospace=is_vospace)
# Aliases for load() calls.
@multimethod('_sc',3,True)
def pull(self, token, name, endpoint, is_vospace=False):
''' Usage:: storeClient.pull(token, name, endpoint)
'''
return self._load(name=name, endpoint=endpoint,
token=def_token(token), is_vospace=is_vospace)
[docs]
@multimethod('_sc',2,True)
def pull(self, name, endpoint, token=None, is_vospace=False):
''' Usage:: storeClient.pull(name, endpoint)
'''
return self._load(name=name, endpoint=endpoint,
token=def_token(token), is_vospace=is_vospace)
def _load(self, token=None, name='', endpoint='', is_vospace=False):
'''Implementation of the ``load()`` method.
'''
try:
from urllib import quote_plus # Python 2
except ImportError:
from urllib.parse import quote_plus # Python 3
uri = (name if name.count('://') > 0 else 'vos://' + name)
r = self.getFromURL(self.svc_url,
"/load?name=%s&endpoint=%s&is_vospace=%s" % \
(uri, quote_plus(endpoint), str(is_vospace)),
def_token(token))
return scToString(r.content)
# --------------------------------------------------------------------
# CP -- Copy a file/directory within the Storage Manager service
# --------------------------------------------------------------------
@multimethod('_sc',3,True)
def cp(self, token, fr, to, verbose=False):
''' Usage:: storeClient.cp(token, fr, to)
'''
return self._cp(fr=fr, to=to, token=def_token(token), verbose=verbose)
@multimethod('_sc',2,True)
def cp(self, fr, to, token=None, verbose=False):
''' Usage:: storeClient.cp(fr, to)
'''
return self._cp(fr=fr, to=to, token=def_token(token), verbose=verbose)
@multimethod('_sc',1,True)
def cp(self, token, fr='', to='', verbose=False):
''' Usage:: storeClient.cp(fr, to)
'''
return self._cp(fr=fr, to=to, token=def_token(token), verbose=verbose)
[docs]
@multimethod('_sc',0,True)
def cp(self, token=None, fr='', to='', verbose=False):
''' Usage:: storeClient.cp(fr, to)
'''
return self._cp(fr=fr, to=to, token=def_token(token), verbose=verbose)
def _cp(self, token=None, fr='', to='', verbose=False):
'''Implementation of the ``cp()`` method.
'''
# Patch the names with the URI prefix if needed.
fr_rem = fr.count("://") > 0
to_rem = to.count("://") > 0
if not fr_rem and not to_rem:
src = "vos://" + fr
dest = "vos://" + to
elif not fr_rem:
return "Cannot copy from local to remote; use put() instead."
elif not to_rem:
return "Cannot copy from remote to local; use get() instead."
else:
src = fr
dest = to
# If the 'from' string has no metachars we're copying a single file,
# otherwise expand the file list and process the matches individually.
if not hasmeta(fr):
src = src.replace('///','//')
dest = dest.replace('///','//')
r = self.getFromURL(self.svc_url, "/cp?from=%s&to=%s" % \
(src, dest), def_token(token))
if 'COMPLETED' in scToString(r.content):
return "OK"
else:
return scToString(r.content)
else:
try:
flist = expandFileList(self.svc_url, token, src, "csv",
full=True)
except Exception as e:
return str(e)
nfiles = len(flist)
fnum = 1
resp = []
for f in flist:
junk, fn = os.path.split(f)
to_fname = (dest + ('/%s' % fn)).replace('///','//')
if verbose:
print("(%d / %d) %s -> %s" % (fnum, nfiles, f, to_fname))
r = self.getFromURL(self.svc_url, "/cp?from=%s&to=%s" % \
(f, to_fname), def_token(token))
fnum += 1
if 'COMPLETED' in scToString(r.content):
resp.append("OK")
else:
resp.append(scToString(r.content))
return resp
# ----------------------------------------------------------------------
# LN -- Create a link to a file/directory in the Storage Manager service
# ----------------------------------------------------------------------
@multimethod('_sc',3,True)
def ln(self, token, fr, target, verbose=False):
''' Usage:: storeClient.ln(token, fr, target)
'''
return self._ln(fr=fr, target=target, token=def_token(token),
verbose=verbose)
@multimethod('_sc',2,True)
def ln(self, fr, target, token=None, verbose=False):
''' Usage:: storeClient.ln(fr, target)
'''
return self._ln(fr=fr, target=target, token=def_token(token),
verbose=verbose)
[docs]
@multimethod('_sc',1,True)
def ln(self, token, fr='', target='', verbose=False):
''' Usage:: storeClient.ln(fr, target)
'''
return self._ln(fr=fr, target=target, token=def_token(token),
verbose=verbose)
def _ln(self, token=None, fr='', target='', verbose=True):
'''Implementation of the ``ln()`` method.
'''
try:
fro = (fr if fr.count('://') > 0 else 'vos://' + fr)
to = (target if target.count('://') > 0 else 'vos://' + target)
r = self.getFromURL(self.svc_url, "/ln?from=%s&to=%s" % \
(fro, to), def_token(token))
if r.status_code != requests.codes.created:
return scToString(r.content)
else:
return 'OK'
except Exception:
raise storeClientError(r.content)
# --------------------------------------------------------------------
# LS -- Get a file/directory listing from the Storage Manager service
# --------------------------------------------------------------------
@multimethod('_sc',2,True)
def ls(self, token, name, format='csv', verbose=False):
''' Usage:: storeClient.ls(token, name)
'''
return self._ls(name=name, format=format, token=def_token(token),
verbose=verbose)
@multimethod('_sc',1,True)
def ls(self, optval, name='vos://', token=None, format='csv',
verbose=False):
''' Usage:: storeClient.ls(name)
storeClient.ls(token, name='foo')
'''
if optval is not None and is_auth_token(optval):
# optval looks like a token
return self._ls(name=name, format=format,
token=def_token(optval), verbose=verbose)
else:
return self._ls(name=optval, format=format, token=def_token(None),
verbose=verbose)
[docs]
@multimethod('_sc',0,True)
def ls(self, name='vos://', token=None, format='csv', verbose=False):
''' Usage:: storeClient.ls()
'''
return self._ls(name=name, format=format, token=def_token(token),
verbose=verbose)
def _ls(self, token=None, name='vos://', format='csv', verbose=False):
'''Implementation of the ``ls()`` method.
'''
name = '' if name is None else name
try:
uri = (name if name.count('://') > 0 else 'vos://' + name)
r = self.getFromURL(self.svc_url,
"/ls?name=%s&format=%s&verbose=%s" % \
(uri, format, verbose), def_token(token))
except:
raise Exception(scToString(r.content))
else:
if r.status_code != 200:
return('Error %d: "%s" %s' % (r.status_code,uri,r.reason))
else:
return(scToString(r.content))
# --------------------------------------------------------------------
# MKDIR -- Create a directory in the Storage Manager service
# --------------------------------------------------------------------
@multimethod('_sc',2,True)
def mkdir(self, token, name):
''' Usage:: storeClient.mkdir(token, name)
'''
return self._mkdir(name=name, token=def_token(token))
[docs]
@multimethod('_sc',1,True)
def mkdir(self, optval, name='', token=None):
''' Usage:: storeClient.mkdir(name)
'''
if optval is not None and is_auth_token(optval):
return self._mkdir(name=name, token=def_token(optval))
else:
return self._mkdir(name=optval, token=def_token(token))
def _mkdir(self, token=None, name=''):
'''Implementation of the ``mkdir()`` method.
'''
nm = (name if name.count("://") > 0 else ("vos://" + name))
if nm and nm[-1] == '/': nm = nm[:-1]
try:
r = self.getFromURL(self.svc_url, "/mkdir?dir=%s" % nm,
def_token(token))
if r.status_code != requests.codes.created: return scToString(r.content)
else: return 'OK'
except Exception:
raise storeClientError(r.content)
else:
return 'OK'
# ---------------------------------------------------------------------
# MV -- Move/rename a file/directory within the Storage Manager service
# ---------------------------------------------------------------------
@multimethod('_sc',3,True)
def mv(self, token, fr, to, verbose=False):
''' Usage:: storeClient.mv(token, fr, to)
'''
return self._mv(fr=fr, to=to, token=def_token(token), verbose=verbose)
@multimethod('_sc',2,True)
def mv(self, fr, to, token=None, verbose=False):
''' Usage:: storeClient.mv(fr, to)
'''
return self._mv(fr=fr, to=to, token=def_token(token), verbose=verbose)
@multimethod('_sc',1,True)
def mv(self, token, fr='', to='', verbose=False):
''' Usage:: storeClient.mv(fr, to)
'''
return self._mv(fr=fr, to=to, token=def_token(token), verbose=verbose)
[docs]
@multimethod('_sc',0,True)
def mv(self, token=None, fr='', to='', verbose=False):
''' Usage:: storeClient.mv(fr, to)
'''
return self._mv(fr=fr, to=to, token=def_token(token), verbose=verbose)
def _mv(self, token=None, fr='', to='', verbose=False):
'''Implementation of the ``mv()`` method.
'''
# Patch the names with the URI prefix if needed.
fr_rem = fr.count("://") > 0
to_rem = to.count("://") > 0
if not fr_rem and not to_rem:
src = "vos://" + fr
dest = "vos://" + to
elif not fr_rem:
return "Cannot move from local to remote; use put() instead."
elif not to_rem:
return "Cannot move from remote to local; use get() instead."
else:
src = fr
dest = to
# If the 'from' string has no metachars, we're copying a single file,
# otherwise expand the file list on the and process the matches
# individually.
if not hasmeta(fr):
src = src.replace('///','//')
dest = dest.replace('///','//')
r = self.getFromURL(self.svc_url, "/mv?from=%s&to=%s" % \
(src, dest), def_token(token))
if 'COMPLETED' in scToString(r.content):
return "OK"
else:
return scToString(r.content)
else:
try:
flist = expandFileList(self.svc_url, token, src, "csv",
full=True)
except Exception as e:
return str(e)
print (str(flist))
nfiles = len(flist)
fnum = 1
resp = []
for f in flist:
print ('f: ' + str(f))
junk, fn = os.path.split(f)
to_fname = (dest + ('/%s' % fn)).replace('///','//')
if verbose:
print("(%d / %d) %s -> %s" % (fnum, nfiles, f, to_fname))
r = self.getFromURL(self.svc_url, "/mv?from=%s&to=%s" % \
(f,to_fname), def_token(token))
fnum += 1
if 'COMPLETED' in scToString(r.content):
resp.append("OK")
else:
resp.append(scToString(r.content))
return resp
# --------------------------------------------------------------------
# RM -- Delete a file from the Storage Manager service
# --------------------------------------------------------------------
@multimethod('_sc',2,True)
def rm(self, token, name, verbose=False):
''' Usage:: storeClient.rm(token, name)
'''
return self._rm(name=name, token=def_token(token), verbose=verbose)
@multimethod('_sc',1,True)
def rm(self, optval, name='', token=None, verbose=False):
''' Usage:: storeClient.rm(name)
'''
if optval is not None and is_auth_token(optval):
# optval looks like a token
return self._rm(name=name,token=def_token(optval),verbose=verbose)
else:
return self._rm(name=optval,token=def_token(token),verbose=verbose)
[docs]
@multimethod('_sc',0,True)
def rm(self, name='', token=None, verbose=False):
''' Usage:: storeClient.rm(name)
'''
return self._rm(name=name, token=def_token(token), verbose=verbose)
def _rm(self, token=None, name='', verbose=False):
'''Implementation of the ``rm()`` method.
'''
# Patch the names with the URI prefix if needed.
nm = (name if name.count("://") > 0 else ("vos://" + name))
if nm == "vos://" or nm == "vos://tmp" or nm == "vos://public":
return "Error: operation not permitted"
# If the 'name' string has no metacharacters we're removing a single file,
# otherwise expand the file list on the and process the matches
# individually.
if not hasmeta(nm):
r = is_vosDir(self.svc_url, token, nm)
if not isinstance(r, bool): return scToString(r.content)
elif r: return "%s is a directory." % name
r = self.getFromURL(self.svc_url, "/rm?file=%s" % nm, def_token(token))
if r.status_code != requests.codes.no_content:
return scToString(r.content)
else:
return 'OK'
else:
try:
flist = expandFileList(self.svc_url, token, nm, "csv",
full=True)
except Exception as e:
return str(e)
nfiles = len(flist)
if nfiles < 1:
return 'A Node does not exist with the requested URI.'
fnum = 1
resp = []
for f in flist:
if verbose: print("(%d / %d) %s" % (fnum, nfiles, f))
resp.append(self._rm(token=token, name=f, verbose=verbose))
fnum += 1
return resp
# --------------------------------------------------------------------
# RMDIR -- Delete a directory from the Storage Manager service
# --------------------------------------------------------------------
@multimethod('_sc',2,True)
def rmdir(self, token, name, verbose=False):
''' Usage:: storeClient.rmdir(token, name)
'''
return self._rmdir(name=name, token=def_token(token), verbose=verbose)
@multimethod('_sc',1,True)
def rmdir(self, optval, name='', token=None, verbose=False):
''' Usage:: storeClient.rmdir(name)
'''
if optval is not None and is_auth_token(optval):
return self._rmdir(name=name, token=def_token(optval),
verbose=verbose)
else:
return self._rmdir(name=optval, token=def_token(token),
verbose=verbose)
[docs]
@multimethod('_sc',0,True)
def rmdir(self, name='', token=None, verbose=False):
''' Usage:: storeClient.rmdir(name)
'''
return self._rmdir(name=name, token=def_token(token), verbose=verbose)
def _rmdir(self, token=None, name='', verbose=False):
'''Implementation of the ``rmdir()`` method.
'''
# FIXME - Should handle file templates(?)
# Patch the names with the URI prefix if needed.
nm = (name if name.count("://") > 0 else ("vos://" + name))
if nm == "vos://" or nm == "vos://tmp" or nm == "vos://public":
return "Error: operation not permitted"
if nm and nm[-1] == '/': nm = nm[:-1]
r = is_vosDir(self.svc_url, token, nm)
if not isinstance(r, bool): return scToString(r.content)
elif not r: return "%s is not a directory." % name
try:
r = self.getFromURL(self.svc_url, "/rmdir?dir=%s" % nm,
def_token(token))
if r.status_code != requests.codes.no_content: return scToString(r.content)
else: return 'OK'
except Exception as e:
print('storeClient._rmdir: error: ' + scToString(r.content))
raise storeClientError(str(e))
else:
return 'OK'
# --------------------------------------------------------------------
# SAVEAS -- Save the string representation of a data object as a file.
# --------------------------------------------------------------------
@multimethod('_sc',3,True)
def saveAs(self, token, data, name):
''' Usage:: storeClient.saveAs(token, data, name)
'''
return self._saveAs(data=data, name=name, token=def_token(token))
[docs]
@multimethod('_sc',2,True)
def saveAs(self, data, name, token=None):
''' Usage:: storeClient.saveAs(data, name)
'''
return self._saveAs(data=data, name=name, token=def_token(token))
def _saveAs(self, token=None, data='', name=''):
'''Implementation of the ``saveAs()`` method.
'''
import tempfile
try:
with tempfile.NamedTemporaryFile(mode='w',delete=False) as tfd:
tfd.write(str(data))
tfd.flush()
tfd.close()
except Exception as e:
raise storeClientError(str(e))
# Patch the names with the URI prefix if needed.
nm = (name if name.count("://") > 0 else ("vos://" + name))
# Put the temp file to the VOSpace.
resp = self._put(token=token, fr=tfd.name, to=nm, verbose=False)
os.unlink(tfd.name) # Clean up
return resp
# --------------------------------------------------------------------
# TAG -- Annotate a file/directory in the Storage Manager service
# --------------------------------------------------------------------
@multimethod('_sc',3,True)
def tag(self, token, name, tag):
''' Usage:: storeClient.tag(token, name, tag)
'''
return self._tag(name=name, tag=tag, token=def_token(token))
@multimethod('_sc',2,True)
def tag(self, name, tag, token=None):
''' Usage:: storeClient.tag(name, tag)
'''
return self._tag(name=name, tag=tag, token=def_token(token))
[docs]
@multimethod('_sc',1,True)
def tag(self, token, name='', tag=''):
''' Usage:: storeClient.tag(token, name='foo', tag='bar')
'''
return self._tag(name=name, tag=tag, token=def_token(token))
def _tag(self, token=None, name='', tag=''):
'''Implementation of the ``tag()`` method.
'''
try:
r = self.getFromURL(self.svc_url, "/tag?name=%s&tag=%s" % \
(name, tag), def_token(token))
except Exception:
raise storeClientError(scToString(r.content))
else:
if r.status_code == requests.codes.ok:
return 'OK'
else:
return scToString(r.content)
[docs]
def getFromURL(self, svc_url, path, token):
'''Get something from a URL. Return a 'response' object.
'''
try:
tok = def_token(token)
user, uid, gid, hash = split_auth_token(tok.strip())
hdrs = {'Content-Type': 'text/ascii',
'X-DL-ClientVersion': __version__,
'X-DL-OriginIP': self.hostip,
'X-DL-OriginHost': self.hostname,
'X-DL-User': user,
'X-DL-AuthToken': tok} # application/x-sql
resp = requests.get("%s%s" % (svc_url, path), headers=hdrs)
except Exception as e:
raise storeClientError(str(e))
return resp
# -------------------------------------------------------
# Utility Methods
# -------------------------------------------------------
[docs]
def is_vosDir(svc_url, token, path):
'''Determine whether 'path' is a ContainerNode in the VOSpace.
'''
url = svc_url + ("/isdir?name=%s" % (path))
r = requests.get(url, headers={'X-DL-AuthToken': def_token(token)})
if r.status_code != requests.codes.ok:
return r
else:
return (True if scToString(r.content).lower() == 'true' else False)
[docs]
def expandFileList(svc_url, token, pattern, format, full=False):
'''Expand a filename pattern in a VOSpace URI to a list of files. We
do this by getting a listing of the parent container contents from
the service and then match the pattern on the client side.
'''
debug = False
# Check first that we're only getting a single file.
if not hasmeta(pattern) and not pattern.endswith('/'):
flist = [pattern]
return (flist)
# The URI prefix is constant whether it's included in the pattern string
# or not. The SM sm controls a specific instance of VOSpace so at the
# moment the expansiom to the VOSpace URI is handled on the server. We'll
# prepend this to the service call as needed to ensure a correct argument
# and give the calling routine the option of leaving it off.
if pattern.count('://') > 0:
str = pattern[pattern.index('://')+3:]
uri = pattern[:pattern.index('://')+3]
else:
str = pattern
uri = 'vos://'
# Extract the directory and filename/pattern from the string.
dir, name = os.path.split(str)
if debug:
print("-----------------------------------------")
print("INPUT PATTERN = '" + str + "'")
print("PATTERN = '" + str + "'")
print('str = ' + str)
print("split: '%s' '%s'" % (dir, name))
pstr = (name if (name is not None and hasmeta(name)) else "*")
if dir is not None:
if dir == "/" and name is not None:
dir = dir + name
else:
if dir.endswith("/"):
dir = dir[:-1] # trim trailing '/'
if not dir.startswith("/"):
dir = "/" + dir # prepend '/'
else:
dir = '/'
if name is not None:
dir = dir + name
if dir == "/":
dir = ""
pstr = name
if not hasmeta(name) and name is not None:
pstr = (name if name != '' else "*")
# Check to make sure the parent exists and is a container
if debug:
print ('stat of dir : ' + (uri+dir))
pstat = stat(uri+dir)
if pstat.get('type') == 'link':
dir = pstat['target']
dir = dir[dir.index('://')+3:]
pstat = stat(dir)
if pstat.get('type') != 'container':
raise Exception('A Container does not exist with the requested URI')
# Make the service call to get a listing of the parent directory.
url = svc_url + "/ls?name=%s%s&format=%s" % (uri, dir, "csv")
try:
r = requests.get(url, headers={'X-DL-AuthToken': def_token(token)})
except Exception as e:
raise
# Filter the directory contents list using the filename pattern.
list = []
flist = scToString(r.content).split(',')
for f in flist:
if f and (fnmatch.fnmatch(f, pstr) or f == pstr):
furi = (f if not full else (uri + dir + "/" + f))
list.append(furi.replace("///", "//"))
if debug:
print(url)
print("%s --> '%s' '%s' '%s' => '%s'" % (pattern,uri,dir,name,pstr))
return sorted(list)
# ###################################
# Utility Methods
# ###################################
[docs]
def chunked_upload(token, local_file, remote_file):
'''A streaming file uploader.
'''
debug = False
init = True
CHUNK_SIZE = 4 * 1024 * 1024 # 16MB chunks
url = '%s/xfer' % (sc_client.svc_url)
# Get the size of the file to be transferred.
fsize = os.stat(local_file).st_size
nchunks = fsize / CHUNK_SIZE + 1
if (debug): print('Upload in %d chunks' % nchunks)
with open(local_file, 'rb') as f:
try:
nsent = 0
while nsent < fsize:
data = f.read(CHUNK_SIZE)
requests.post(url, data,
headers={'Content-type': 'application/octet-stream',
'X-DL-FileName': remote_file,
'X-DL-InitXfer': str(init),
'X-DL-AuthToken': token})
nsent += len(data)
if init: init = False
except Exception as e:
raise storeClientError('Upload error: ' + str(e))
# ###################################
# Store Client Handles
# ###################################
[docs]
def getClient(profile=DEF_SERVICE_PROFILE, svc_url=DEF_SERVICE_URL):
''' Create a new storeClient object and set a default profile.
'''
return storeClient(profile=profile, svc_url=svc_url)
# The default client handle for the module.
sc_client = getClient(profile=DEF_SERVICE_PROFILE, svc_url=DEF_SERVICE_URL)
# ##########################################
# Patch the docstrings for module functions
# that aren't MultiMethods.
# ##########################################
isAlive.__doc__ = sc_client.isAlive.__doc__
services.__doc__ = sc_client.services.__doc__
set_svc_url.__doc__ = sc_client.set_svc_url.__doc__
get_svc_url.__doc__ = sc_client.get_svc_url.__doc__
set_profile.__doc__ = sc_client.set_profile.__doc__
get_profile.__doc__ = sc_client.get_profile.__doc__
# ####################################################################
# Py2/Py3 Compatability Utilities
# ####################################################################
[docs]
def scToString(s):
'''scToString -- Force a return value to be type 'string' for all
Python versions. If there is an error, return the
original.
'''
try:
if is_py3:
if isinstance(s,bytes):
strval = str(s.decode())
elif isinstance(s,str):
strval = s
else:
if isinstance(s,bytes) or isinstance(s,unicode):
strval = str(s)
else:
strval = s
except:
return s
return strval