Source code for miniopy_async.datatypes

# -*- coding: utf-8 -*-
# MinIO Python Library for Amazon S3 Compatible Cloud Storage, (C)
# 2020 MinIO, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Response of ListBuckets, ListObjects, ListObjectsV2 and ListObjectVersions API.
"""

from __future__ import absolute_import, annotations

import base64
import io
import itertools
import json
from collections import OrderedDict
from datetime import datetime
from enum import Enum
from typing import (
    TYPE_CHECKING,
    Any,
    AsyncGenerator,
    Iterable,
    List,
    Tuple,
    Type,
    TypeVar,
    cast,
)
from urllib.parse import unquote_plus
from xml.etree import ElementTree as ET

from aiohttp import ClientResponse
from aiohttp.typedefs import LooseHeaders
from multidict import CIMultiDictProxy

from .commonconfig import Tags
from .credentials import Credentials
from .deleteobjects import DeleteError, DeleteObject
from .helpers import DictType, check_bucket_name
from .signer import get_credential_string, post_presign_v4
from .time import from_iso8601utc, to_amz_date, to_iso8601utc, utcnow
from .xml import find, findall, findtext

if TYPE_CHECKING:
    from .api import Minio


[docs] class Bucket: """Bucket information.""" def __init__(self, name: str, creation_date: datetime | None): self._name = name self._creation_date = creation_date @property def name(self) -> str: """Get name.""" return self._name @property def creation_date(self) -> datetime | None: """Get creation date.""" return self._creation_date def __repr__(self) -> str: return f"{type(self).__name__}('{self.name}')" def __str__(self) -> str: return self.name def __eq__(self, other) -> bool: if isinstance(other, Bucket): return self.name == other.name if isinstance(other, str): return self.name == other return NotImplemented def __hash__(self) -> int: return hash(self.name)
A = TypeVar("A", bound="ListAllMyBucketsResult")
[docs] class ListAllMyBucketsResult: """LissBuckets API result.""" def __init__(self, buckets: list[Bucket]): self._buckets = buckets @property def buckets(self) -> List[Bucket]: """Get buckets.""" return self._buckets
[docs] @classmethod def fromxml(cls: Type[A], element: ET.Element) -> A: """Create new object with values from XML element.""" element = cast(ET.Element, find(element, "Buckets", True)) buckets = [] elements = findall(element, "Bucket") for bucket in elements: name = cast(str, findtext(bucket, "Name", True)) creation_date = findtext(bucket, "CreationDate") buckets.append( Bucket( name, from_iso8601utc(creation_date) if creation_date else None, ) ) return cls(buckets)
B = TypeVar("B", bound="Object")
[docs] class Object: """Object information.""" def __init__( # pylint: disable=too-many-arguments self, bucket_name: str, object_name: str | None, last_modified: datetime | None = None, etag: str | None = None, size: int | None = None, metadata: LooseHeaders | None = None, version_id: str | None = None, is_latest: str | None = None, storage_class: str | None = None, owner_id: str | None = None, owner_name: str | None = None, content_type: str | None = None, is_delete_marker: bool = False, tags: Tags | None = None, ): self._bucket_name = bucket_name self._object_name = object_name self._last_modified = last_modified self._etag = etag self._size = size self._metadata = metadata self._version_id = version_id self._is_latest = is_latest self._storage_class = storage_class self._owner_id = owner_id self._owner_name = owner_name self._content_type = content_type self._is_delete_marker = is_delete_marker self._tags = tags @property def bucket_name(self) -> str: """Get bucket name.""" return self._bucket_name @property def object_name(self) -> str | None: """Get object name.""" return self._object_name @property def is_dir(self) -> bool: """Get whether this key is a directory.""" return self._object_name is not None and self._object_name.endswith("/") @property def last_modified(self) -> datetime | None: """Get last modified time.""" return self._last_modified @property def etag(self) -> str | None: """Get etag.""" return self._etag @property def size(self) -> int | None: """Get size.""" return self._size @property def metadata(self) -> LooseHeaders | None: """Get metadata.""" return self._metadata @property def version_id(self) -> str | None: """Get version ID.""" return self._version_id @property def is_latest(self) -> str | None: """Get is-latest flag.""" return self._is_latest @property def storage_class(self) -> str | None: """Get storage class.""" return self._storage_class @property def owner_id(self) -> str | None: """Get owner ID.""" return self._owner_id @property def owner_name(self) -> str | None: """Get owner name.""" return self._owner_name @property def is_delete_marker(self) -> bool: """Get whether this key is a delete marker.""" return self._is_delete_marker @property def content_type(self) -> str | None: """Get content type.""" return self._content_type @property def tags(self) -> Tags | None: """Get the tags""" return self._tags
[docs] @classmethod def fromxml( cls: Type[B], element: ET.Element, bucket_name: str, is_delete_marker: bool = False, encoding_type: str | None = None, ) -> B: """Create new object with values from XML element.""" tag = findtext(element, "LastModified") last_modified = None if tag is None else from_iso8601utc(tag) tag = findtext(element, "ETag") etag = None if tag is None else tag.replace('"', "") tag = findtext(element, "Size") size = None if tag is None else int(tag) elem = find(element, "Owner") owner_id, owner_name = ( (None, None) if elem is None else (findtext(elem, "ID"), findtext(elem, "DisplayName")) ) elems = find(element, "UserMetadata") or [] metadata: dict[str, str] = {} for child in elems: key = child.tag.split("}")[1] if "}" in child.tag else child.tag metadata[key] = child.text or "" object_name = cast(str, findtext(element, "Key", True)) if encoding_type == "url": object_name = unquote_plus(object_name) tags_text = findtext(element, "UserTags") tags: Tags | None = None if tags_text: tags = Tags.new_object_tags() tags.update( cast( List[Tuple[Any, Any]], [tokens.split("=") for tokens in tags_text.split("&")], ), ) return cls( bucket_name, object_name, last_modified=last_modified, etag=etag, size=size, version_id=findtext(element, "VersionId"), is_latest=findtext(element, "IsLatest"), storage_class=findtext(element, "StorageClass"), owner_id=owner_id, owner_name=owner_name, metadata=metadata, is_delete_marker=is_delete_marker, tags=tags, )
[docs] def parse_list_objects( response_data: str, bucket_name: str | None = None, ) -> tuple[list[Object], bool, str | None, str | None]: """Parse ListObjects/ListObjectsV2/ListObjectVersions response.""" element = ET.fromstring(response_data) bucket_name = cast(str, findtext(element, "Name", True)) encoding_type = findtext(element, "EncodingType") elements = findall(element, "Contents") objects = [ Object.fromxml(tag, bucket_name, encoding_type=encoding_type) for tag in elements ] marker = objects[-1].object_name if objects else None elements = findall(element, "Version") objects += [ Object.fromxml(tag, bucket_name, encoding_type=encoding_type) for tag in elements ] elements = findall(element, "CommonPrefixes") objects += [ Object( bucket_name, ( unquote_plus(findtext(tag, "Prefix", True) or "") if encoding_type == "url" else findtext(tag, "Prefix", True) ), ) for tag in elements ] elements = findall(element, "DeleteMarker") objects += [ Object.fromxml( tag, bucket_name, is_delete_marker=True, encoding_type=encoding_type ) for tag in elements ] is_truncated = (findtext(element, "IsTruncated") or "").lower() == "true" key_marker = findtext(element, "NextKeyMarker") if key_marker and encoding_type == "url": key_marker = unquote_plus(key_marker) version_id_marker = findtext(element, "NextVersionIdMarker") continuation_token = findtext(element, "NextContinuationToken") if key_marker is not None: continuation_token = key_marker if continuation_token is None: continuation_token = findtext(element, "NextMarker") if continuation_token and encoding_type == "url": continuation_token = unquote_plus(continuation_token) if continuation_token is None and is_truncated: continuation_token = marker return objects, is_truncated, continuation_token, version_id_marker
[docs] class CompleteMultipartUploadResult: """CompleteMultipartUpload API result.""" def __init__(self, response: ClientResponse, response_data: str): element = ET.fromstring(response_data) self._bucket_name = findtext(element, "Bucket") self._object_name = findtext(element, "Key") self._location = findtext(element, "Location") self._etag = findtext(element, "ETag") if self._etag: self._etag = self._etag.replace('"', "") self._version_id = response.headers.get("x-amz-version-id") self._http_headers = response.headers @property def bucket_name(self) -> str | None: """Get bucket name.""" return self._bucket_name @property def object_name(self) -> str | None: """Get object name.""" return self._object_name @property def location(self) -> str | None: """Get location.""" return self._location @property def etag(self) -> str | None: """Get etag.""" return self._etag @property def version_id(self) -> str | None: """Get version ID.""" return self._version_id @property def http_headers(self) -> CIMultiDictProxy[str]: """Get HTTP headers.""" return self._http_headers
C = TypeVar("C", bound="Part")
[docs] class Part: """Part information of a multipart upload.""" def __init__( self, part_number: int, etag: str, last_modified: datetime | None = None, size: int | None = None, ): self._part_number = part_number self._etag = etag self._last_modified = last_modified self._size = size @property def part_number(self) -> int: """Get part number.""" return self._part_number @property def etag(self) -> str: """Get etag.""" return self._etag @property def last_modified(self) -> datetime | None: """Get last-modified.""" return self._last_modified @property def size(self) -> int | None: """Get size.""" return self._size
[docs] @classmethod def fromxml(cls: Type[C], element: ET.Element) -> C: """Create new object with values from XML element.""" part_number = int(cast(str, findtext(element, "PartNumber", True))) etag = cast(str, findtext(element, "ETag", True)) etag = etag.replace('"', "") tag = findtext(element, "LastModified") last_modified = None if tag is None else from_iso8601utc(tag) size = findtext(element, "Size") return cls( part_number, etag, last_modified, int(size) if size else None, )
[docs] class ListPartsResult: """ListParts API result.""" def __init__(self, response_data: str): element = ET.fromstring(response_data) self._bucket_name = findtext(element, "Bucket") self._object_name = findtext(element, "Key") tag = find(element, "Initiator") self._initiator_id = None if tag is None else findtext(tag, "ID") self._initiator_name = None if tag is None else findtext(tag, "DisplayName") tag = find(element, "Owner") self._owner_id = None if tag is None else findtext(tag, "ID") self._owner_name = None if tag is None else findtext(tag, "DisplayName") self._storage_class = findtext(element, "StorageClass") self._part_number_marker = findtext(element, "PartNumberMarker") next_part_number_marker = findtext(element, "NextPartNumberMarker") self._next_part_number_marker = ( int(next_part_number_marker) if next_part_number_marker else None ) max_parts = findtext(element, "MaxParts") self._max_parts = int(max_parts) if max_parts else None is_truncated = findtext(element, "IsTruncated") self._is_truncated = is_truncated is not None and is_truncated.lower() == "true" self._parts = [Part.fromxml(tag) for tag in findall(element, "Part")] @property def bucket_name(self) -> str | None: """Get bucket name.""" return self._bucket_name @property def object_name(self) -> str | None: """Get object name.""" return self._object_name @property def initiator_id(self) -> str | None: """Get initiator ID.""" return self._initiator_id @property def initator_name(self) -> str | None: """Get initiator name.""" return self._initiator_name @property def owner_id(self) -> str | None: """Get owner ID.""" return self._owner_id @property def owner_name(self) -> str | None: """Get owner name.""" return self._owner_name @property def storage_class(self) -> str | None: """Get storage class.""" return self._storage_class @property def part_number_marker(self) -> str | None: """Get part number marker.""" return self._part_number_marker @property def next_part_number_marker(self) -> int | None: """Get next part number marker.""" return self._next_part_number_marker @property def max_parts(self) -> int | None: """Get max parts.""" return self._max_parts @property def is_truncated(self) -> bool: """Get is-truncated flag.""" return self._is_truncated @property def parts(self) -> list[Part]: """Get parts.""" return self._parts
[docs] class Upload: """Upload information of a multipart upload.""" def __init__(self, element: ET.Element, encoding_type: str | None = None): self._encoding_type = encoding_type object_name = cast(str, findtext(element, "Key", True)) self._object_name = ( unquote_plus(object_name) if self._encoding_type == "url" else object_name ) self._upload_id = findtext(element, "UploadId") tag = find(element, "Initiator") self._initiator_id = None if tag is None else findtext(tag, "ID") self._initiator_name = None if tag is None else findtext(tag, "DisplayName") tag = find(element, "Owner") self._owner_id = None if tag is None else findtext(tag, "ID") self._owner_name = None if tag is None else findtext(tag, "DisplayName") self._storage_class = findtext(element, "StorageClass") initiated_time = findtext(element, "Initiated") self._initiated_time = ( from_iso8601utc(initiated_time) if initiated_time else None ) @property def object_name(self) -> str: """Get object name.""" return self._object_name @property def initiator_id(self) -> str | None: """Get initiator ID.""" return self._initiator_id @property def initator_name(self) -> str | None: """Get initiator name.""" return self._initiator_name @property def owner_id(self) -> str | None: """Get owner ID.""" return self._owner_id @property def owner_name(self) -> str | None: """Get owner name.""" return self._owner_name @property def storage_class(self) -> str | None: """Get storage class.""" return self._storage_class @property def upload_id(self) -> str | None: """Get upload ID.""" return self._upload_id @property def initiated_time(self) -> datetime | None: """Get initiated time.""" return self._initiated_time
[docs] class ListMultipartUploadsResult: """ListMultipartUploads API result.""" def __init__(self, response_data: str): element = ET.fromstring(response_data) self._encoding_type = findtext(element, "EncodingType") self._bucket_name = findtext(element, "Bucket") self._key_marker = findtext(element, "KeyMarker") if self._key_marker: self._key_marker = ( unquote_plus(self._key_marker) if self._encoding_type == "url" else self._key_marker ) self._upload_id_marker = findtext(element, "UploadIdMarker") self._next_key_marker = findtext(element, "NextKeyMarker") if self._next_key_marker: self._next_key_marker = ( unquote_plus(self._next_key_marker) if self._encoding_type == "url" else self._next_key_marker ) self._next_upload_id_marker = findtext(element, "NextUploadIdMarker") max_uploads = findtext(element, "MaxUploads") self._max_uploads = int(max_uploads) if max_uploads else None is_truncated = findtext(element, "IsTruncated") self._is_truncated = is_truncated is not None and is_truncated.lower() == "true" self._uploads = [ Upload(tag, self._encoding_type) for tag in findall(element, "Upload") ] @property def bucket_name(self) -> str | None: """Get bucket name.""" return self._bucket_name @property def key_marker(self) -> str | None: """Get key marker.""" return self._key_marker @property def upload_id_marker(self) -> str | None: """Get upload ID marker.""" return self._upload_id_marker @property def next_key_marker(self) -> str | None: """Get next key marker.""" return self._next_key_marker @property def next_upload_id_marker(self) -> str | None: """Get next upload ID marker.""" return self._next_upload_id_marker @property def max_uploads(self) -> int | None: """Get max uploads.""" return self._max_uploads @property def is_truncated(self) -> bool: """Get is-truncated flag.""" return self._is_truncated @property def encoding_type(self) -> str | None: """Get encoding type.""" return self._encoding_type @property def uploads(self) -> list[Upload]: """Get uploads.""" return self._uploads
_RESERVED_ELEMENTS = ( "bucket", "x-amz-algorithm", "x-amz-credential", "x-amz-date", "policy", "x-amz-signature", ) _EQ = "eq" _STARTS_WITH = "starts-with" _ALGORITHM = "AWS4-HMAC-SHA256" def _trim_dollar(value: str) -> str: """Trim dollar character if present.""" return value[1:] if value.startswith("$") else value
[docs] class PostPolicy: """ Post policy information to be used to generate presigned post policy form-data. Condition elements and respective condition for Post policy is available at https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-HTTPPOSTConstructPolicy.html#sigv4-PolicyConditions """ def __init__(self, bucket_name: str, expiration: datetime): check_bucket_name(bucket_name) if not isinstance(expiration, datetime): raise ValueError("expiration must be datetime type") self._bucket_name = bucket_name self._expiration = expiration self._conditions: OrderedDict = OrderedDict() self._conditions[_EQ] = OrderedDict() self._conditions[_STARTS_WITH] = OrderedDict() self._lower_limit: int | None = None self._upper_limit: int | None = None
[docs] def add_equals_condition(self, element: str, value: str): """Add equals condition of an element and value.""" if not element: raise ValueError("condition element cannot be empty") element = _trim_dollar(element) if element in [ "success_action_redirect", "redirect", "content-length-range", ]: raise ValueError(element + " is unsupported for equals condition") if element in _RESERVED_ELEMENTS: raise ValueError(element + " cannot be set") self._conditions[_EQ][element] = value
[docs] def remove_equals_condition(self, element: str): """Remove previously set equals condition of an element.""" if not element: raise ValueError("condition element cannot be empty") self._conditions[_EQ].pop(element)
[docs] def add_starts_with_condition(self, element: str, value: str): """ Add starts-with condition of an element and value. Value set to empty string does matching any content condition. """ if not element: raise ValueError("condition element cannot be empty") element = _trim_dollar(element) if element in ["success_action_status", "content-length-range"] or ( element.startswith("x-amz-") and not element.startswith("x-amz-meta-") ): raise ValueError( f"{element} is unsupported for starts-with condition", ) if element in _RESERVED_ELEMENTS: raise ValueError(element + " cannot be set") self._conditions[_STARTS_WITH][element] = value
[docs] def remove_starts_with_condition(self, element: str): """Remove previously set starts-with condition of an element.""" if not element: raise ValueError("condition element cannot be empty") self._conditions[_STARTS_WITH].pop(element)
[docs] def add_content_length_range_condition( # pylint: disable=invalid-name self, lower_limit: int, upper_limit: int ): """Add content-length-range condition with lower and upper limits.""" if lower_limit < 0: raise ValueError("lower limit cannot be negative number") if upper_limit < 0: raise ValueError("upper limit cannot be negative number") if lower_limit > upper_limit: raise ValueError("lower limit cannot be greater than upper limit") self._lower_limit = lower_limit self._upper_limit = upper_limit
# pylint: disable=invalid-name
[docs] def remove_content_length_range_condition(self): """Remove previously set content-length-range condition.""" self._lower_limit = None self._upper_limit = None
[docs] def form_data(self, creds: Credentials, region: str) -> dict[str, Any]: """ Return form-data of this post policy. The returned dict contains x-amz-algorithm, x-amz-credential, x-amz-security-token, x-amz-date, policy and x-amz-signature. """ if not isinstance(creds, Credentials): raise ValueError("credentials must be Credentials type") if not region: raise ValueError("region cannot be empty") if ( "key" not in self._conditions[_EQ] and "key" not in self._conditions[_STARTS_WITH] ): raise ValueError("key condition must be set") policy = OrderedDict() policy["expiration"] = to_iso8601utc(self._expiration) policy["conditions"] = [[_EQ, "$bucket", self._bucket_name]] for cond_key, conditions in self._conditions.items(): for key, value in conditions.items(): policy["conditions"].append([cond_key, "$" + key, value]) if self._lower_limit is not None and self._upper_limit is not None: policy["conditions"].append( [ "content-length-range", str(self._lower_limit), str(self._upper_limit), ], ) credential = get_credential_string(creds.access_key, utcnow(), region) amz_date = to_amz_date(utcnow()) policy["conditions"].append([_EQ, "$x-amz-algorithm", _ALGORITHM]) policy["conditions"].append([_EQ, "$x-amz-credential", credential]) if creds.session_token: policy["conditions"].append( [_EQ, "$x-amz-security-token", creds.session_token], ) policy["conditions"].append([_EQ, "$x-amz-date", amz_date]) policy_encoded = base64.b64encode( json.dumps(policy).encode(), ).decode("utf-8") signature = post_presign_v4( policy_encoded, creds.secret_key, utcnow(), region, ) form_data = { "x-amz-algorithm": _ALGORITHM, "x-amz-credential": credential, "x-amz-date": amz_date, "policy": policy_encoded, "x-amz-signature": signature, } if creds.session_token: form_data["x-amz-security-token"] = creds.session_token return form_data
@property def bucket_name(self) -> str: """Get bucket name.""" return self._bucket_name
[docs] def parse_copy_object(response_data: str) -> tuple[str, datetime | None]: """Parse CopyObject/UploadPartCopy response.""" element = ET.fromstring(response_data) etag = cast(str, findtext(element, "ETag", True)).replace('"', "") last_modified = findtext(element, "LastModified") return etag, from_iso8601utc(last_modified) if last_modified else None
[docs] class AsyncEventIterable: """Context manager friendly event iterable.""" def __init__( self, response: ClientResponse, ): self._response = response def __aiter__(self): return self async def _get_records(self) -> dict | list | None: """Get event records from response stream.""" line = await self._response.content.readline() if not line: return None event: dict = json.loads(line) if event["Records"]: return event return None async def __anext__(self) -> dict | list: records = await self._get_records() if not records: raise StopAsyncIteration return records async def __aenter__(self): return self async def __aexit__(self, exc_type, value, traceback): self._response.close()
[docs] class PeerSite: """Represents a cluster/site to be added to the set of replicated sites.""" def __init__( self, name: str, endpoint: str, access_key: str, secret_key: str, ): self._name = name self._endpoint = endpoint self._access_key = access_key self._secret_key = secret_key
[docs] def to_dict(self) -> dict[str, str]: """Convert to dictionary.""" return { "name": self._name, "endpoints": self._endpoint, "accessKey": self._access_key, "secretKey": self._secret_key, }
[docs] class SiteReplicationStatusOptions: """Represents site replication status options.""" ENTITY_TYPE = Enum( "ENTITY_TYPE", { "BUCKET": "bucket", "POLICY": "policy", "USER": "user", "GROUP": "group", }, ) def __init__(self): self._buckets = False self._policies = False self._users = False self._groups = False self._metrics = False self._entity = None self._entity_value = None self._show_deleted = False @property def buckets(self) -> bool: """Get buckets.""" return self._buckets @buckets.setter def buckets(self, value: bool): """Set buckets.""" self._buckets = value @property def policies(self) -> bool: """Get policies.""" return self._policies @policies.setter def policies(self, value: bool): """Set policies.""" self._policies = value @property def users(self) -> bool: """Get users.""" return self._users @users.setter def users(self, value: bool): """Set users.""" self._users = value @property def groups(self) -> bool: """Get groups.""" return self._groups @groups.setter def groups(self, value: bool): """Set groups.""" self._groups = value @property def metrics(self) -> bool: """Get metrics.""" return self._metrics @metrics.setter def metrics(self, value: bool): """Set metrics.""" self._metrics = value @property def entity(self) -> str | None: """Get entity.""" return self._entity @entity.setter def entity(self, value: str): """Set entity.""" self._entity = value @property def entity_value(self) -> str | None: """Get entity value.""" return self._entity_value @entity_value.setter def entity_value(self, value: str): """Set entity value.""" self._entity_value = value @property def show_deleted(self) -> bool: """Get show deleted.""" return self._show_deleted @show_deleted.setter def show_deleted(self, value: bool): """Set show deleted.""" self._show_deleted = value
[docs] def to_query_params(self) -> dict[str, str]: """Convert this options to query parameters.""" params = { "buckets": str(self._buckets).lower(), "policies": str(self._policies).lower(), "users": str(self._users).lower(), "groups": str(self._groups).lower(), "metrics": str(self._metrics).lower(), "showDeleted": str(self._show_deleted).lower(), } if self._entity and self._entity_value: params["entityvalue"] = self._entity_value params["entity"] = self._entity return params
[docs] class PeerInfo: """Site replication peer information.""" def __init__( self, deployment_id: str, endpoint: str, bucket_bandwidth_limit: str, bucket_bandwidth_set: str, ): self._deployment_id = deployment_id self._endpoint = endpoint self._name: str | None = None self._sync_status: str | None = None self._bucket_bandwidth_limit = bucket_bandwidth_limit self._bucket_bandwidth_set = bucket_bandwidth_set self._bucket_bandwidth_updated_at: datetime | None = None @property def deployment_id(self) -> str: """Get deployment ID.""" return self._deployment_id @deployment_id.setter def deployment_id(self, value: str): """Set deployment ID.""" self._deployment_id = value @property def endpoint(self) -> str: """Get endpoint.""" return self._endpoint @endpoint.setter def endpoint(self, value: str): """Set endpoint.""" self._endpoint = value @property def name(self) -> str | None: """Get name.""" return self._name @name.setter def name(self, value: str): """Set name.""" self._name = value @property def sync_status(self) -> str | None: """Get sync status.""" return self._sync_status @sync_status.setter def sync_status(self, value: str): """Set sync status.""" self._sync_status = value @property def bucket_bandwidth_limit(self) -> str: """Get bucket bandwidth limit.""" return self._bucket_bandwidth_limit @bucket_bandwidth_limit.setter def bucket_bandwidth_limit(self, value: str): """Set bucket bandwidth limit.""" self._bucket_bandwidth_limit = value @property def bucket_bandwidth_set(self) -> str: """Get bucket bandwidth set.""" return self._bucket_bandwidth_set @bucket_bandwidth_set.setter def bucket_bandwidth_set(self, value: str): """Set bucket bandwidth set.""" self._bucket_bandwidth_set = value @property def bucket_bandwidth_updated_at(self) -> datetime | None: """Get bucket bandwidth updated at.""" return self._bucket_bandwidth_updated_at @bucket_bandwidth_updated_at.setter def bucket_bandwidth_updated_at(self, value: datetime | None): """Set bucket bandwidth updated at.""" self._bucket_bandwidth_updated_at = value
[docs] def to_dict(self): """Converts peer information to dictionary.""" data = { "endpoint": self._endpoint, "deploymentID": self._deployment_id, "defaultbandwidth": { "bandwidthLimitPerBucket": self._bucket_bandwidth_limit, "set": self._bucket_bandwidth_set, }, } if self._name: data["name"] = self._name if self._sync_status is not None: data["sync"] = "enable" if self._sync_status else "disable" if self._bucket_bandwidth_updated_at: data["defaultbandwidth"]["updatedAt"] = to_iso8601utc( self._bucket_bandwidth_updated_at, ) return data
[docs] class ListObjects: def __init__( self, client: Minio, bucket_name: str, prefix: str | None = None, recursive: bool = False, start_after: str | None = None, include_user_meta: bool = False, include_version: bool = False, use_api_v1: bool = False, use_url_encoding_type: bool = True, fetch_owner: bool = False, extra_headers: DictType | None = None, ): self.client = client self.bucket_name = bucket_name self.prefix = prefix self.recursive = recursive self.start_after = start_after self.include_user_meta = include_user_meta self.include_version = include_version self.use_api_v1 = use_api_v1 self.use_url_encoding_type = use_url_encoding_type self.fetch_owner = fetch_owner self.extra_headers = extra_headers self.iterator: AsyncGenerator[Object] | None = None
[docs] def gen_iterator(self) -> AsyncGenerator[Object]: return self.client._list_objects( self.bucket_name, delimiter=None if self.recursive else "/", include_user_meta=self.include_user_meta, prefix=self.prefix, start_after=self.start_after, use_api_v1=self.use_api_v1, include_version=self.include_version, encoding_type="url" if self.use_url_encoding_type else None, fetch_owner=self.fetch_owner, extra_headers=self.extra_headers, )
def __aiter__(self): self.iterator = self.gen_iterator() return self async def __anext__(self) -> Object: if self.iterator is None: self.gen_iterator() try: return await cast(AsyncGenerator, self.iterator).__anext__() except StopAsyncIteration: raise StopAsyncIteration def __await__(self): return self._collect_objects().__await__() async def _collect_objects(self) -> List[Object]: if self.iterator is None: self.gen_iterator() objects = [] async for object in self: objects.append(object) return objects
[docs] class DeleteErrors: def __init__( self, client: Minio, bucket_name: str, delete_object_list: Iterable[DeleteObject], bypass_governance_mode: bool = False, ): self.client = client self.bucket_name = bucket_name # turn list like objects into an iterator. self.delete_object_list = itertools.chain(delete_object_list) self.bypass_governance_mode = bypass_governance_mode self.iterator: AsyncGenerator[DeleteError] | None = None
[docs] async def gen_iterator(self) -> AsyncGenerator[DeleteError]: check_bucket_name(self.bucket_name, s3_check=self.client._base_url.is_aws_host) while True: # get 1000 entries or whatever available. objects = [ delete_object for _, delete_object in zip( range(1000), self.delete_object_list, ) ] if not objects: return result = await self.client._delete_objects( self.bucket_name, objects, quiet=True, bypass_governance_mode=self.bypass_governance_mode, ) for error in result.error_list: # AWS S3 returns "NoSuchVersion" error when # version doesn't exist ignore this error # yield all errors otherwise if error.code != "NoSuchVersion": yield error
def __aiter__(self): self.iterator = self.gen_iterator() return self async def __anext__(self) -> DeleteError: if self.iterator is None: self.gen_iterator() try: return await cast( AsyncGenerator[DeleteError, DeleteError], self.iterator ).__anext__() except StopAsyncIteration: raise StopAsyncIteration def __await__(self): return self._collect_errors().__await__() async def _collect_errors(self) -> List[DeleteError]: if self.iterator is None: self.gen_iterator() errors = [] async for error in self: errors.append(error) return errors
[docs] class MultipartUploader: def __init__( self, client: Minio, bucket_name: str, object_name: str, headers: DictType, ): self._client = client self._bucket_name = bucket_name self._object_name = object_name self._headers = headers self._upload_id = "" self._parts = [] self._result: CompleteMultipartUploadResult | None = None @property def result(self) -> CompleteMultipartUploadResult: """ Get multipart upload result. :return: Multipart upload result. :rtype: CompleteMultipartUploadResult :raises ValueError: If multipart upload is not completed. """ if not self._result: raise ValueError("Multipart upload not completed") return self._result async def _init(self): self._upload_id = await self._client._create_multipart_upload( self._bucket_name, self._object_name, self._headers )
[docs] async def complete(self) -> CompleteMultipartUploadResult: """ Complete multipart upload. :return: Multipart upload result. :rtype: CompleteMultipartUploadResult """ self._result = await self._client._complete_multipart_upload( self._bucket_name, self._object_name, self._upload_id, self._parts ) return self.result
[docs] async def abort(self): """Abort multipart upload.""" await self._client._abort_multipart_upload( self._bucket_name, self._object_name, self._upload_id )
async def __aenter__(self): await self._init() return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self.complete()
[docs] async def upload_part(self, data: bytes | io.BytesIO, part_number: int) -> Part: """ Upload a part of the object. The data size must be larger than 5 MiB and less than 5 GiB except the last part. :param bytes | io.BytesIO data: Data to upload as part. :param int part_number: Number of the part. :return: Part information. :rtype: Part :raises ValueError: If part_number is not a positive integer. """ if not self._upload_id: await self._init() try: if not isinstance(part_number, int) or part_number < 1: raise ValueError("part_number must be a positive integer") etag = await self._client._upload_part( self._bucket_name, self._object_name, data, self._headers, self._upload_id, part_number, ) part = Part(part_number, etag) self._parts.append(part) return part except Exception as e: await self.abort() raise e