refactor the storage base API to use requests' support for IOStream and remove callback system
Project: http://git-wip-us.apache.org/repos/asf/libcloud/repo Commit: http://git-wip-us.apache.org/repos/asf/libcloud/commit/5e04dbce Tree: http://git-wip-us.apache.org/repos/asf/libcloud/tree/5e04dbce Diff: http://git-wip-us.apache.org/repos/asf/libcloud/diff/5e04dbce Branch: refs/heads/trunk Commit: 5e04dbce554830eca3f9812272076a2fbdbe7cdc Parents: 6287bf1 Author: Anthony Shaw <anthonys...@apache.org> Authored: Fri Jan 6 14:43:41 2017 +1100 Committer: Anthony Shaw <anthonys...@apache.org> Committed: Fri Jan 6 14:43:41 2017 +1100 ---------------------------------------------------------------------- libcloud/common/base.py | 10 +- libcloud/storage/base.py | 238 +++++---------------------- libcloud/storage/drivers/atmos.py | 4 - libcloud/storage/drivers/azure_blobs.py | 10 +- libcloud/storage/drivers/cloudfiles.py | 27 +-- libcloud/storage/drivers/oss.py | 38 +---- libcloud/storage/drivers/s3.py | 36 +--- libcloud/test/__init__.py | 1 + 8 files changed, 65 insertions(+), 299 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/libcloud/blob/5e04dbce/libcloud/common/base.py ---------------------------------------------------------------------- diff --git a/libcloud/common/base.py b/libcloud/common/base.py index 9778ca9..d893cf0 100644 --- a/libcloud/common/base.py +++ b/libcloud/common/base.py @@ -151,6 +151,7 @@ class Response(object): self.headers = lowercase_keys(dict(response.headers)) self.error = response.reason self.status = response.status_code + self.request = response.request self.body = response.text.strip() \ if response.text is not None else '' @@ -576,15 +577,6 @@ class Connection(object): if data: data = self.encode_data(data) - headers['Content-Length'] = str(len(data)) - elif method.upper() in ['POST', 'PUT'] and not raw: - # Only send Content-Length 0 with POST and PUT request. - # - # Note: Content-Length is not added when using "raw" mode means - # means that headers are upfront and the body is sent at some point - # later on. With raw mode user can specify Content-Length with - # "data" not being set. - headers['Content-Length'] = '0' params, headers = self.pre_connect_hook(params, headers) http://git-wip-us.apache.org/repos/asf/libcloud/blob/5e04dbce/libcloud/storage/base.py ---------------------------------------------------------------------- diff --git a/libcloud/storage/base.py b/libcloud/storage/base.py index 4bfe209..c290ffd 100644 --- a/libcloud/storage/base.py +++ b/libcloud/storage/base.py @@ -589,9 +589,11 @@ class StorageDriver(BaseDriver): return True - def _upload_object(self, object_name, content_type, upload_func, - upload_func_kwargs, request_path, request_method='PUT', - headers=None, file_path=None, iterator=None): + def _upload_object(self, object_name, content_type, request_path, + request_method='PUT', + headers=None, file_path=None, stream=None, + upload_func=None, upload_func_kwargs=None, + chunked=False, multipart=False): """ Helper function for setting common request headers and calling the passed in callback which uploads an object. @@ -601,8 +603,8 @@ class StorageDriver(BaseDriver): if file_path and not os.path.exists(file_path): raise OSError('File %s does not exist' % (file_path)) - if iterator is not None and not hasattr(iterator, 'next') and not \ - hasattr(iterator, '__next__'): + if stream is not None and not hasattr(stream, 'next') and not \ + hasattr(stream, '__next__'): raise AttributeError('iterator object must implement next() ' + 'method.') @@ -622,200 +624,46 @@ class StorageDriver(BaseDriver): # Fallback to a content-type content_type = DEFAULT_CONTENT_TYPE - file_size = None - - if iterator: - if self.supports_chunked_encoding: - headers['Transfer-Encoding'] = 'chunked' - upload_func_kwargs['chunked'] = True - else: - # Chunked transfer encoding is not supported. Need to buffer - # all the data in memory so we can determine file size. - iterator = libcloud.utils.files.read_in_chunks( - iterator=iterator) - data = libcloud.utils.files.exhaust_iterator(iterator=iterator) - - file_size = len(data) - upload_func_kwargs['data'] = data - else: - file_size = os.path.getsize(file_path) - upload_func_kwargs['chunked'] = False - - if file_size is not None and 'Content-Length' not in headers: - headers['Content-Length'] = str(file_size) - headers['Content-Type'] = content_type - response = self.connection.request(request_path, - method=request_method, data=None, - headers=headers, raw=True) - - upload_func_kwargs['response'] = response - success, data_hash, bytes_transferred = upload_func( - **upload_func_kwargs) - - if not success: + if stream: + response = self.connection.request( + request_path, + method=request_method, data=stream, + headers=headers, raw=False) + stream_hash, stream_length = self._hash_buffered_stream( + stream, + self._get_hash_function()) + else: + with open(file_path, 'rb') as file_stream: + response = self.connection.request( + request_path, + method=request_method, data=file_stream, + headers=headers, raw=False) + with open(file_path, 'rb') as file_stream: + stream_hash, stream_length = self._hash_buffered_stream( + file_stream, + self._get_hash_function()) + + if not response.success(): raise LibcloudError( value='Object upload failed, Perhaps a timeout?', driver=self) - result_dict = {'response': response, 'data_hash': data_hash, - 'bytes_transferred': bytes_transferred} - return result_dict - - def _upload_data(self, response, data, calculate_hash=True): - """ - Upload data stored in a string. - - :param response: RawResponse object. - :type response: :class:`RawResponse` - - :param data: Data to upload. - :type data: ``str`` - - :param calculate_hash: True to calculate hash of the transferred data. - (defaults to True). - :type calculate_hash: ``bool`` - - :return: First item is a boolean indicator of success, second - one is the uploaded data MD5 hash and the third one - is the number of transferred bytes. - :rtype: ``tuple`` - """ - bytes_transferred = 0 - data_hash = None - - if calculate_hash: - data_hash = self._get_hash_function() - data_hash.update(b(data)) - - try: - response.connection.connection.send(b(data)) - except Exception: - # TODO: let this exception propagate - # Timeout, etc. - return False, None, bytes_transferred - - bytes_transferred = len(data) - - if calculate_hash: - data_hash = data_hash.hexdigest() - - return True, data_hash, bytes_transferred - - def _stream_data(self, response, iterator, chunked=False, - calculate_hash=True, chunk_size=None, data=None): - """ - Stream a data over an http connection. - - :param response: RawResponse object. - :type response: :class:`RawResponse` - - :param response: An object which implements an iterator interface - or a File like object with read method. - :type iterator: :class:`object` - - :param chunked: True if the chunked transfer encoding should be used - (defaults to False). - :type chunked: ``bool`` - - :param calculate_hash: True to calculate hash of the transferred data. - (defaults to True). - :type calculate_hash: ``bool`` - - :param chunk_size: Optional chunk size (defaults to ``CHUNK_SIZE``) - :type chunk_size: ``int`` - - :rtype: ``tuple`` - :return: First item is a boolean indicator of success, second - one is the uploaded data MD5 hash and the third one - is the number of transferred bytes. - """ - - chunk_size = chunk_size or CHUNK_SIZE - - data_hash = None - if calculate_hash: - data_hash = self._get_hash_function() - - generator = libcloud.utils.files.read_in_chunks(iterator, chunk_size, - fill_size=True) - - bytes_transferred = 0 - try: - chunk = next(generator) - except StopIteration: - # Special case when StopIteration is thrown on the first iteration - # create a 0-byte long object - chunk = '' - if chunked: - response.connection.connection.send(b('%X\r\n' % - (len(chunk)))) - response.connection.connection.send(chunk) - response.connection.connection.send(b('\r\n')) - response.connection.connection.send(b('0\r\n\r\n')) - else: - response.connection.connection.send(chunk) - return True, data_hash.hexdigest(), bytes_transferred - - while len(chunk) > 0: - try: - if chunked: - response.connection.connection.send(b('%X\r\n' % - (len(chunk)))) - response.connection.connection.send(b(chunk)) - response.connection.connection.send(b('\r\n')) - else: - response.connection.connection.send(b(chunk)) - except Exception: - # TODO: let this exception propagate - # Timeout, etc. - return False, None, bytes_transferred - - bytes_transferred += len(chunk) - if calculate_hash: - data_hash.update(b(chunk)) - - try: - chunk = next(generator) - except StopIteration: - chunk = '' - - if chunked: - response.connection.connection.send(b('0\r\n\r\n')) - - if calculate_hash: - data_hash = data_hash.hexdigest() - - return True, data_hash, bytes_transferred - - def _upload_file(self, response, file_path, chunked=False, - calculate_hash=True): - """ - Upload a file to the server. - - :type response: :class:`RawResponse` - :param response: RawResponse object. - - :type file_path: ``str`` - :param file_path: Path to a local file. - - :type iterator: :class:`object` - :param response: An object which implements an iterator interface (File - object, etc.) - - :rtype: ``tuple`` - :return: First item is a boolean indicator of success, second - one is the uploaded data MD5 hash and the third one - is the number of transferred bytes. - """ - with open(file_path, 'rb') as file_handle: - success, data_hash, bytes_transferred = ( - self._stream_data( - response=response, - iterator=iter(file_handle), - chunked=chunked, - calculate_hash=calculate_hash)) - - return success, data_hash, bytes_transferred + if upload_func: + upload_func(**upload_func_kwargs) + + return {'response': response, + 'bytes_transferred': stream_length, + 'data_hash': stream_hash} + + def _hash_buffered_stream(self, stream, hasher, blocksize=65536): + with stream: + buf = stream.read(blocksize) + total_len = 0 + while len(buf) > 0: + total_len = total_len + len(buf) + hasher.update(buf) + buf = stream.read(blocksize) + return (hasher.hexdigest(), total_len) def _get_hash_function(self): """ http://git-wip-us.apache.org/repos/asf/libcloud/blob/5e04dbce/libcloud/storage/drivers/atmos.py ---------------------------------------------------------------------- diff --git a/libcloud/storage/drivers/atmos.py b/libcloud/storage/drivers/atmos.py index c52be03..f524c1a 100644 --- a/libcloud/storage/drivers/atmos.py +++ b/libcloud/storage/drivers/atmos.py @@ -211,8 +211,6 @@ class AtmosDriver(StorageDriver): def upload_object(self, file_path, container, object_name, extra=None, verify_hash=True): - upload_func = self._upload_file - upload_func_kwargs = {'file_path': file_path} method = 'PUT' extra = extra or {} @@ -232,8 +230,6 @@ class AtmosDriver(StorageDriver): result_dict = self._upload_object( object_name=object_name, content_type=content_type, - upload_func=upload_func, - upload_func_kwargs=upload_func_kwargs, request_path=request_path, request_method=method, headers={}, file_path=file_path) http://git-wip-us.apache.org/repos/asf/libcloud/blob/5e04dbce/libcloud/storage/drivers/azure_blobs.py ---------------------------------------------------------------------- diff --git a/libcloud/storage/drivers/azure_blobs.py b/libcloud/storage/drivers/azure_blobs.py index 4e10df6..793746b 100644 --- a/libcloud/storage/drivers/azure_blobs.py +++ b/libcloud/storage/drivers/azure_blobs.py @@ -781,11 +781,6 @@ class AzureBlobsStorageDriver(StorageDriver): 'object_path': object_path, 'blob_type': ex_blob_type, 'lease': None} - else: - upload_func = self._stream_data - upload_func_kwargs = {'iterator': iterator, - 'chunked': False, - 'calculate_hash': verify_hash} return self._put_object(container=container, object_name=object_name, @@ -927,12 +922,9 @@ class AzureBlobsStorageDriver(StorageDriver): lease.update_headers(headers) - iterator = iter('') result_dict = self._upload_object(object_name, content_type, - upload_func, upload_func_kwargs, object_path, headers=headers, - file_path=file_path, - iterator=iterator) + file_path=file_path) response = result_dict['response'] bytes_transferred = result_dict['bytes_transferred'] http://git-wip-us.apache.org/repos/asf/libcloud/blob/5e04dbce/libcloud/storage/drivers/cloudfiles.py ---------------------------------------------------------------------- diff --git a/libcloud/storage/drivers/cloudfiles.py b/libcloud/storage/drivers/cloudfiles.py index 2502d42..22ed856 100644 --- a/libcloud/storage/drivers/cloudfiles.py +++ b/libcloud/storage/drivers/cloudfiles.py @@ -424,12 +424,8 @@ class CloudFilesStorageDriver(StorageDriver, OpenStackDriverMixin): Note: This will override file with a same name if it already exists. """ - upload_func = self._upload_file - upload_func_kwargs = {'file_path': file_path} return self._put_object(container=container, object_name=object_name, - upload_func=upload_func, - upload_func_kwargs=upload_func_kwargs, extra=extra, file_path=file_path, verify_hash=verify_hash, headers=headers) @@ -439,13 +435,8 @@ class CloudFilesStorageDriver(StorageDriver, OpenStackDriverMixin): if isinstance(iterator, file): iterator = iter(iterator) - upload_func = self._stream_data - upload_func_kwargs = {'iterator': iterator} - return self._put_object(container=container, object_name=object_name, - upload_func=upload_func, - upload_func_kwargs=upload_func_kwargs, - extra=extra, iterator=iterator, + extra=extra, stream=iterator, headers=headers) def delete_object(self, obj): @@ -646,16 +637,12 @@ class CloudFilesStorageDriver(StorageDriver, OpenStackDriverMixin): def _upload_object_part(self, container, object_name, part_number, iterator, verify_hash=True): - upload_func = self._stream_data - upload_func_kwargs = {'iterator': iterator} part_name = object_name + '/%08d' % part_number extra = {'content_type': 'application/octet-stream'} self._put_object(container=container, object_name=part_name, - upload_func=upload_func, - upload_func_kwargs=upload_func_kwargs, - extra=extra, iterator=iterator, + extra=extra, stream=iterator, verify_hash=verify_hash) def _upload_object_manifest(self, container, object_name, extra=None, @@ -756,9 +743,8 @@ class CloudFilesStorageDriver(StorageDriver, OpenStackDriverMixin): raise LibcloudError('Unexpected status code: %s' % (response.status)) - def _put_object(self, container, object_name, upload_func, - upload_func_kwargs, extra=None, file_path=None, - iterator=None, verify_hash=True, headers=None): + def _put_object(self, container, object_name, extra=None, file_path=None, + stream=None, verify_hash=True, headers=None): extra = extra or {} container_name_encoded = self._encode_container_name(container.name) object_name_encoded = self._encode_object_name(object_name) @@ -778,11 +764,10 @@ class CloudFilesStorageDriver(StorageDriver, OpenStackDriverMixin): request_path = '/%s/%s' % (container_name_encoded, object_name_encoded) result_dict = self._upload_object( object_name=object_name, content_type=content_type, - upload_func=upload_func, upload_func_kwargs=upload_func_kwargs, request_path=request_path, request_method='PUT', - headers=headers, file_path=file_path, iterator=iterator) + headers=headers, file_path=file_path, stream=stream) - response = result_dict['response'].response + response = result_dict['response'] bytes_transferred = result_dict['bytes_transferred'] server_hash = result_dict['response'].headers.get('etag', None) http://git-wip-us.apache.org/repos/asf/libcloud/blob/5e04dbce/libcloud/storage/drivers/oss.py ---------------------------------------------------------------------- diff --git a/libcloud/storage/drivers/oss.py b/libcloud/storage/drivers/oss.py index 33b155a..2ebba6e 100644 --- a/libcloud/storage/drivers/oss.py +++ b/libcloud/storage/drivers/oss.py @@ -459,12 +459,7 @@ class OSSStorageDriver(StorageDriver): def upload_object(self, file_path, container, object_name, extra=None, verify_hash=True, headers=None): - upload_func = self._upload_file - upload_func_kwargs = {'file_path': file_path} - return self._put_object(container=container, object_name=object_name, - upload_func=upload_func, - upload_func_kwargs=upload_func_kwargs, extra=extra, file_path=file_path, verify_hash=verify_hash) @@ -474,29 +469,12 @@ class OSSStorageDriver(StorageDriver): params = None if self.supports_multipart_upload: - # Initiate the multipart request and get an upload id - upload_func = self._upload_multipart - upload_func_kwargs = {'iterator': iterator, - 'container': container, - 'object_name': object_name} method = 'POST' - iterator = iter('') params = 'uploads' - elif self.supports_chunked_encoding: - upload_func = self._stream_data - upload_func_kwargs = {'iterator': iterator} - else: - # In this case, we have to load the entire object to - # memory and send it as normal data - upload_func = self._upload_data - upload_func_kwargs = {} - return self._put_object(container=container, object_name=object_name, - upload_func=upload_func, - upload_func_kwargs=upload_func_kwargs, extra=extra, method=method, query_args=params, - iterator=iterator, verify_hash=False) + stream=iterator, verify_hash=False) def delete_object(self, obj): object_path = self._get_object_path(obj.container, obj.name) @@ -611,10 +589,9 @@ class OSSStorageDriver(StorageDriver): name = urlquote(name) return name - def _put_object(self, container, object_name, upload_func, - upload_func_kwargs, method='PUT', query_args=None, - extra=None, file_path=None, iterator=None, - verify_hash=False): + def _put_object(self, container, object_name, method='PUT', + query_args=None, extra=None, file_path=None, + stream=None, verify_hash=False): """ Create an object and upload data using the given function. """ @@ -646,15 +623,14 @@ class OSSStorageDriver(StorageDriver): # user does not have correct permission result_dict = self._upload_object( object_name=object_name, content_type=content_type, - upload_func=upload_func, upload_func_kwargs=upload_func_kwargs, request_path=request_path, request_method=method, - headers=headers, file_path=file_path, iterator=iterator, + headers=headers, file_path=file_path, stream=stream, container=container) response = result_dict['response'] bytes_transferred = result_dict['bytes_transferred'] headers = response.headers - response = response.response + response = response server_hash = headers['etag'].replace('"', '') if (verify_hash and result_dict['data_hash'].upper() != server_hash): @@ -705,7 +681,7 @@ class OSSStorageDriver(StorageDriver): object_path = self._get_object_path(container, object_name) # Get the upload id from the response xml - response.body = response.response.read() + response.body = response.read() body = response.parse_body() upload_id = body.find(fixxpath(xpath='UploadId', namespace=self.namespace)).text http://git-wip-us.apache.org/repos/asf/libcloud/blob/5e04dbce/libcloud/storage/drivers/s3.py ---------------------------------------------------------------------- diff --git a/libcloud/storage/drivers/s3.py b/libcloud/storage/drivers/s3.py index 10da21a..34fc455 100644 --- a/libcloud/storage/drivers/s3.py +++ b/libcloud/storage/drivers/s3.py @@ -430,12 +430,7 @@ class BaseS3StorageDriver(StorageDriver): :param ex_storage_class: Storage class :type ex_storage_class: ``str`` """ - upload_func = self._upload_file - upload_func_kwargs = {'file_path': file_path} - return self._put_object(container=container, object_name=object_name, - upload_func=upload_func, - upload_func_kwargs=upload_func_kwargs, extra=extra, file_path=file_path, verify_hash=verify_hash, storage_class=ex_storage_class) @@ -642,29 +637,12 @@ class BaseS3StorageDriver(StorageDriver): # Amazon provides a different (complex?) mechanism to do multipart # uploads if self.supports_s3_multipart_upload: - # Initiate the multipart request and get an upload id - upload_func = self._upload_multipart - upload_func_kwargs = {'iterator': iterator, - 'container': container, - 'object_name': object_name} method = 'POST' - iterator = iter('') params = 'uploads' - elif self.supports_chunked_encoding: - upload_func = self._stream_data - upload_func_kwargs = {'iterator': iterator} - else: - # In this case, we have to load the entire object to - # memory and send it as normal data - upload_func = self._upload_data - upload_func_kwargs = {} - return self._put_object(container=container, object_name=object_name, - upload_func=upload_func, - upload_func_kwargs=upload_func_kwargs, extra=extra, method=method, query_args=params, - iterator=iterator, verify_hash=False, + stream=iterator, verify_hash=False, storage_class=ex_storage_class) def delete_object(self, obj): @@ -783,10 +761,9 @@ class BaseS3StorageDriver(StorageDriver): name = urlquote(name) return name - def _put_object(self, container, object_name, upload_func, - upload_func_kwargs, method='PUT', query_args=None, - extra=None, file_path=None, iterator=None, - verify_hash=True, storage_class=None): + def _put_object(self, container, object_name, method='PUT', + query_args=None, extra=None, file_path=None, + stream=None, verify_hash=True, storage_class=None): headers = {} extra = extra or {} storage_class = storage_class or 'standard' @@ -820,14 +797,13 @@ class BaseS3StorageDriver(StorageDriver): # user does not have correct permission result_dict = self._upload_object( object_name=object_name, content_type=content_type, - upload_func=upload_func, upload_func_kwargs=upload_func_kwargs, request_path=request_path, request_method=method, - headers=headers, file_path=file_path, iterator=iterator) + headers=headers, file_path=file_path, stream=stream) response = result_dict['response'] bytes_transferred = result_dict['bytes_transferred'] headers = response.headers - response = response.response + response = response server_hash = headers.get('etag', '').replace('"', '') if (verify_hash and result_dict['data_hash'] != server_hash): http://git-wip-us.apache.org/repos/asf/libcloud/blob/5e04dbce/libcloud/test/__init__.py ---------------------------------------------------------------------- diff --git a/libcloud/test/__init__.py b/libcloud/test/__init__.py index 701530a..2024f30 100644 --- a/libcloud/test/__init__.py +++ b/libcloud/test/__init__.py @@ -81,6 +81,7 @@ class MockResponse(object): status = 0 reason = '' version = 11 + request = None def __init__(self, status, body=None, headers=None, reason=None): self.status = status