Source code for pfio.v2.s3

import base64
import hashlib
import io
import os
import urllib.parse
from types import TracebackType
from typing import Optional, Type

import boto3
from botocore.exceptions import ClientError

from .fs import FS, FileStat, format_repr

DEFAULT_MAX_BUFFER_SIZE = 16 * 1024 * 1024


def _normalize_key(key: str) -> str:
    key = os.path.normpath(key)
    if key.startswith("/"):
        return key[1:]
    else:
        return key


class S3ObjectStat(FileStat):
    def __init__(self, key, head):
        self.filename = key
        self.last_modified = head['LastModified'].timestamp()
        self.size = head.get('ContentLength', head.get('Size'))
        self.metadata = head.get('Metadata', {})
        self._head = head

    def isdir(self):
        return False


class S3PrefixStat(FileStat):
    def __init__(self, key):
        self.filename = key
        self.last_modified = 0
        self.size = -1

    def isdir(self):
        return True


class _ObjectReader(io.RawIOBase):
    def __init__(self, client, bucket, key, mode, kwargs):
        super(_ObjectReader, self).__init__()

        self.client = client
        self.bucket = bucket
        self.key = key

        res = self.client.head_object(Bucket=bucket, Key=key)
        if res.get('DeleteMarker'):
            raise FileNotFoundError()

        self._mode = mode
        self.pos = 0
        self.content_length = res['ContentLength']
        self._closed = False

    def read(self, size=-1) -> bytes:
        # Always returns binary; as this object is wrapped with
        # TextIOWrapper in case of text mode open.

        s = self.pos

        if self.pos >= self.content_length:
            return b''
        elif size <= 0:
            e = ''
        else:
            e = min(self.pos + size, self.content_length)

        r = 'bytes={}-{}'.format(s, e)
        res = self.client.get_object(Bucket=self.bucket,
                                     Key=self.key,
                                     Range=r)
        body = res['Body']

        if size < 0:
            data = body.read()
        else:
            data = body.read(size)

        self.pos += len(data)

        return data

    def readline(self):
        raise NotImplementedError()

    def close(self):
        self._closed = True

    def __enter__(self):
        return self

    def __exit__(self, exc_type: Optional[Type[BaseException]],
                 exc_value: Optional[BaseException],
                 traceback: Optional[TracebackType]):
        self.close()

    def flush(self):
        pass

    @property
    def closed(self):
        return self._closed

    def isatty(self):
        return False

    def readable(self):
        return True

    def seekable(self):
        return True

    def tell(self):
        return self.pos

    def truncate(self, size=None):
        raise io.UnsupportedOperation('truncate')

    def seek(self, pos, whence=io.SEEK_SET):
        if whence in [0, io.SEEK_SET]:
            if pos < 0:
                raise OSError(22, "[Errno 22] Invalid argument")
        elif whence in [1, io.SEEK_CUR]:
            pos += self.pos
        elif whence in [2, io.SEEK_END]:
            pos += self.content_length
        else:
            raise ValueError('Wrong whence value: {}'.format(whence))

        if pos < 0:
            raise OSError(22, "[Errno 22] Invalid argument")
        self.pos = pos
        return self.pos

    def writable(self):
        return False

    def write(self, data):
        raise io.UnsupportedOperation('not writable')

    def readall(self):
        return self.read(-1)

    def readinto(self, b):
        buf = self.read(len(b))
        b[:len(buf)] = buf
        return len(buf)


class _ObjectWriter:
    def __init__(self, client, bucket, key, mode, mpu_chunksize, kwargs):
        self.client = client
        self.bucket = bucket
        self.key = key
        self.mode = mode
        self._init_buf()
        self.mpu_chunksize = mpu_chunksize
        self.mpu_id = None
        self.parts = []

    def _init_buf(self):
        if 'b' in self.mode:
            self.buf = io.BytesIO()
        else:
            self.buf = io.StringIO()

    def flush(self):
        # A part must be more than 8 MiB in S3
        if len(self.buf.getvalue()) < 8 * 1024 * 1024:
            return
        self._flush()

    def _flush(self):
        # Send buffer as a part
        c = self.client
        b = self.bucket
        k = self.key

        if self.mpu_id is None:
            res = c.create_multipart_upload(Bucket=b, Key=k)
            self.mpu_id = res['UploadId']
            boto3.set_stream_logger()

        assert self.mpu_id is not None

        data = self.buf.getvalue()
        if 'b' in self.mode:
            md5 = base64.b64encode(
                hashlib.md5(data).digest()
            ).decode()
        else:
            md5 = base64.b64encode(
                hashlib.md5(data.encode()).digest()
            ).decode()
        num = len(self.parts) + 1

        res = c.upload_part(Body=data, Bucket=b, Key=k,
                            PartNumber=num,
                            UploadId=self.mpu_id,
                            ContentLength=len(data),
                            ContentMD5=md5)
        self.parts.append({'ETag': res['ETag'], 'PartNumber': num})

        self._init_buf()

    def write(self, buf):
        written = 0
        overflow = len(self.buf.getvalue()) + len(buf) - self.mpu_chunksize
        if overflow > 0:
            l = len(buf) - overflow
            written += self.buf.write(buf[:l])
            self.flush()
            buf = buf[l:]

        written += self.buf.write(buf)
        if len(self.buf.getvalue()) >= self.mpu_chunksize:
            self.flush()

        return written

    def close(self):
        # See:  https://boto3.amazonaws.com/v1/documentation/
        # api/latest/reference/services/s3.html#S3.Client.put_object
        if len(self.parts) == 0:
            self.client.put_object(Body=self.buf.getvalue(),
                                   Bucket=self.bucket,
                                   Key=self.key)
        else:
            self._flush()
            # DO: MPU
            c = self.client
            max_parts = len(self.parts) + 1
            res = c.list_parts(Bucket=self.bucket,
                               Key=self.key,
                               UploadId=self.mpu_id, MaxParts=max_parts)

            if res['IsTruncated']:
                next_part = res['NextPartNumberMarker']
                raise RuntimeError('Unexpectedly truncated: ' +
                                   'next={}/maxparts={}'.format(next_part,
                                                                max_parts))

            parts = [{'ETag': part['ETag'], 'PartNumber': part['PartNumber']}
                     for part in res.get('Parts', [])]
            parts = sorted(parts, key=lambda x: int(x['PartNumber']))
            assert self.parts == parts

            res = c.complete_multipart_upload(Bucket=self.bucket,
                                              Key=self.key,
                                              UploadId=self.mpu_id,
                                              MultipartUpload={'Parts': parts})
            # logger.info("Upload done.", res['Location'])

        self.buf = None

    def __enter__(self):
        return self

    def __exit__(self, exc_type: Optional[Type[BaseException]],
                 exc_value: Optional[BaseException],
                 traceback: Optional[TracebackType]):
        self.close()

    @property
    def closed(self):
        return self.buf is None

    def isatty(self):
        return False

    def readable(self):
        return False

    def seekable(self):
        return False

    def writable(self):
        return True


[docs] class S3(FS): '''S3 FileSystem wrapper Takes three arguments as well as enviroment variables for constructor. The priority is (1) see arguments, (2) see enviroment variables, (3) take boto3's default. Available arguments are: - ``aws_access_key_id``, ``AWS_ACCESS_KEY_ID`` - ``aws_secret_access_key``, ``AWS_SECRET_ACCESS_KEY`` - ``endpoint``, ``S3_ENDPOINT`` It supports buffering when opening a file in binary read mode ("rb"). When ``buffering`` is set to -1 (default), the buffer size will be the size of the file or ``pfio.v2.S3.DEFAULT_MAX_BUFFER_SIZE``, whichever smaller. ``buffering=0`` disables buffering, and ``buffering>0`` forcibly sets the specified value as the buffer size in bytes. ''' def __init__(self, bucket, prefix=None, endpoint=None, create_bucket=False, aws_access_key_id=None, aws_secret_access_key=None, mpu_chunksize=32*1024*1024, buffering=-1, create=False, _skip_connect=None, # For test purpose **_): super().__init__() self.bucket = bucket self.create_bucket = create_bucket if prefix is not None: self.cwd = prefix else: self.cwd = '' # In S3, create flag can be disregarded del create self.mpu_chunksize = mpu_chunksize self.buffering = buffering # boto3.set_stream_logger() # import botocore # botocore.session.Session().set_debug_logger() kwargs = {} # IF these arguments are not defined, the library # automatically retrieves from AWS_ACCESS_KEY_ID and # AWS_SECRET_ACCESS_KEY. self.aws_access_key_id = aws_access_key_id if aws_access_key_id is not None: kwargs['aws_access_key_id'] = os.path.expandvars(aws_access_key_id) self.aws_secret_access_key = aws_secret_access_key if aws_secret_access_key is not None: kwargs['aws_secret_access_key'] = os.path.expandvars( aws_secret_access_key) # We won't expect any enviroment variable for S3 endpoints # supported by boto3. Instead, we take S3_ENDPOINT in case # argument ``endpoint`` is not given. Otherwise, it goes to # boto3's default by giving ``None``. # # See also: # https://github.com/boto/boto3/issues/1375 # https://github.com/boto/boto3/pull/2746 self.endpoint = endpoint if self.endpoint is None: self.endpoint = os.getenv('S3_ENDPOINT') if self.endpoint is not None: kwargs['endpoint_url'] = self.endpoint self.kwargs = kwargs if not _skip_connect: self._connect() if self.endpoint is not None: parsed = urllib.parse.urlparse(self.endpoint) if parsed.scheme == "": parsed = urllib.parse.urlparse(f"http://{self.endpoint}") self.hostname = parsed.hostname else: # self.endpoint is not defined in moto3 environment self.hostname = "undefined" print("S3 endpoint is not defined") def _reset(self): self._connect() def _connect(self): # print('boto3.client options:', kwargs) self.client = boto3.client('s3', **self.kwargs) try: self.client.head_bucket(Bucket=self.bucket) except ClientError as e: if e.response['Error']['Code'] == '404' and self.create_bucket: res = self.client.create_bucket(Bucket=self.bucket) print("Bucket", self.bucket, "created:", res) else: raise e def __getstate__(self): state = self.__dict__.copy() state['client'] = None return state def __setstate__(self, state): self.__dict__ = state def __repr__(self) -> str: return format_repr( S3, { "bucket": self.bucket, "prefix": self.cwd, "endpoint": self.endpoint }, )
[docs] def open(self, path, mode='r', **kwargs): '''Opens an object accessor for read or write .. note:: Multi-part upload is not yet available. Arguments: path (str): relative path from basedir mode (str): open mode ''' self._checkfork() if 'a' in mode: raise io.UnsupportedOperation('Append is not supported') if 'r' in mode and 'w' in mode: raise io.UnsupportedOperation('Read-write mode is not supported') path = os.path.join(self.cwd, path) path = _normalize_key(path) if 'r' in mode: obj = _ObjectReader(self.client, self.bucket, path, mode, kwargs) bs = self.buffering if bs < 0: bs = min(obj.content_length, DEFAULT_MAX_BUFFER_SIZE) if 'b' in mode: if self.buffering and bs != 0: obj = io.BufferedReader(obj, buffer_size=bs) else: obj = io.TextIOWrapper(obj) if self.buffering: # This is undocumented property; but resident at # least since 2009 (the merge of io-c branch). # We'll use it until the day of removal. if bs == 0: # empty file case: _CHUNK_SIZE must be positive bs = DEFAULT_MAX_BUFFER_SIZE obj._CHUNK_SIZE = bs elif 'w' in mode: obj = _ObjectWriter(self.client, self.bucket, path, mode, self.mpu_chunksize, kwargs) if 'b' in mode: obj = io.BufferedWriter(obj) else: raise RuntimeError(f'Unknown option: {mode}') return obj
[docs] def list(self, prefix: Optional[str] = "", recursive=False, detail=False): '''List all objects (and prefixes) Although there is not concept of directory in AWS S3 API, common prefixes shows up like directories. ''' self._checkfork() key = os.path.join(self.cwd, "" if prefix is None else prefix) key = _normalize_key(key) if key == '.': key = '' elif key != '' and not key.endswith('/'): key += '/' if '/../' in key or key.startswith('..'): raise ValueError('Invalid S3 key: {} as {}'.format(prefix, key)) page_size = 1000 paginator = self.client.get_paginator('list_objects_v2') paging_args = { 'Bucket': self.bucket, 'Prefix': key, 'PaginationConfig': {'PageSize': page_size} } if not recursive: paging_args['Delimiter'] = '/' iterator = paginator.paginate(**paging_args) for res in iterator: # print(res) for common_prefix in res.get('CommonPrefixes', []): if detail: yield S3PrefixStat(common_prefix['Prefix'][len(key):]) else: yield common_prefix['Prefix'][len(key):] for content in res.get('Contents', []): if detail: yield S3ObjectStat(content['Key'][len(key):], content) else: yield content['Key'][len(key):]
[docs] def stat(self, path): '''Imitate FileStat with S3 Object metadata ''' self._checkfork() key = os.path.join(self.cwd, path) key = _normalize_key(key) try: res = self.client.head_object(Bucket=self.bucket, Key=key) if res.get('DeleteMarker'): raise FileNotFoundError() return S3ObjectStat(key, res) except ClientError as e: if e.response['Error']['Code'] == '404': if self.isdir(path): return S3PrefixStat(key) raise FileNotFoundError() else: raise e
[docs] def isdir(self, file_path: str): '''Imitate isdir by handling common prefix ending with "/" as directory AWS S3 does not have concept of directory tree, but this class imitates other file systems to increase compatibility. ''' self._checkfork() key = _normalize_key(os.path.join(self.cwd, file_path)) if key == '.': key = '' elif key.endswith('/'): key = key[:-1] if '/../' in key or key.startswith('..'): raise ValueError('Invalid S3 key: {} as {}'.format(file_path, key)) if len(key) == 0: return True res = self.client.list_objects_v2( Bucket=self.bucket, Prefix=key, Delimiter="/", MaxKeys=1, ) for common_prefix in res.get('CommonPrefixes', []): if common_prefix['Prefix'] == key + "/": return True return False
[docs] def mkdir(self, file_path: str, mode=0o777, *args, dir_fd=None): '''Does nothing .. note:: AWS S3 does not have concept of directory tree; what this function (and ``makedirs()``) should do and return? To be strict, it would be straightforward to raise ``io.UnsupportedOperation`` exception. But it just breaks users' applications that except quasi-compatible behaviour. Thus, imitating other file systems, like returning ``None`` would be nicer. ''' # raise io.UnsupportedOperation("S3 doesn't have directory") pass
[docs] def makedirs(self, file_path: str, mode=0o777, exist_ok=False): '''Does nothing .. note:: see discussion in ``mkdir()``. ''' # raise io.UnsupportedOperation("S3 doesn't have directory") pass
[docs] def exists(self, file_path: str): '''Returns the existence of objects For common prefixes, it does nothing. See discussion in ``isdir()``. ''' self._checkfork() try: key = os.path.join(self.cwd, file_path) key = _normalize_key(key) res = self.client.head_object(Bucket=self.bucket, Key=key) return not res.get('DeleteMarker') except ClientError as e: if e.response['Error']['Code'] == '404': if self.isdir(file_path): return True return False else: raise e
[docs] def rename(self, src, dst): '''Copies & removes the object Source and destination must be in the same bucket for ``pfio``, although AWS S3 supports inter-bucket copying. ''' self._checkfork() source = { 'Bucket': self.bucket, 'Key': _normalize_key(os.path.join(self.cwd, src)), } dst = os.path.join(self.cwd, dst) dst = _normalize_key(dst) self.client.copy(Bucket=self.bucket, CopySource=source, Key=dst) return self.remove(src)
[docs] def remove(self, file_path: str, recursive=False): '''Removes an object It raises a FileNotFoundError when the specified file doesn't exist. ''' if recursive: raise io.UnsupportedOperation("Recursive delete not supported") if not self.exists(file_path): msg = "No such S3 object: '{}'".format(file_path) raise FileNotFoundError(msg) self._checkfork() key = os.path.join(self.cwd, file_path) key = _normalize_key(key) return self.client.delete_object(Bucket=self.bucket, Key=key)
def _canonical_name(self, file_path: str) -> str: path = os.path.join(self.cwd, file_path) norm_path = _normalize_key(path) return f"s3://{self.hostname}/{self.bucket}/{norm_path}"