# -*- coding: utf-8 -*-
# MinIO Python Library for Amazon S3 Compatible Cloud Storage, (C)
# 2020 MinIO, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Response of ListBuckets, ListObjects, ListObjectsV2 and ListObjectVersions API.
"""
from __future__ import absolute_import, annotations
import base64
import io
import itertools
import json
from collections import OrderedDict
from datetime import datetime
from enum import Enum
from typing import (
TYPE_CHECKING,
Any,
AsyncGenerator,
Iterable,
List,
Tuple,
Type,
TypeVar,
cast,
)
from urllib.parse import unquote_plus
from xml.etree import ElementTree as ET
from aiohttp import ClientResponse
from aiohttp.typedefs import LooseHeaders
from multidict import CIMultiDictProxy
from .commonconfig import Tags
from .credentials import Credentials
from .deleteobjects import DeleteError, DeleteObject
from .helpers import DictType, check_bucket_name
from .signer import get_credential_string, post_presign_v4
from .time import from_iso8601utc, to_amz_date, to_iso8601utc, utcnow
from .xml import find, findall, findtext
if TYPE_CHECKING:
from .api import Minio
[docs]
class Bucket:
"""Bucket information."""
def __init__(self, name: str, creation_date: datetime | None):
self._name = name
self._creation_date = creation_date
@property
def name(self) -> str:
"""Get name."""
return self._name
@property
def creation_date(self) -> datetime | None:
"""Get creation date."""
return self._creation_date
def __repr__(self) -> str:
return f"{type(self).__name__}('{self.name}')"
def __str__(self) -> str:
return self.name
def __eq__(self, other) -> bool:
if isinstance(other, Bucket):
return self.name == other.name
if isinstance(other, str):
return self.name == other
return NotImplemented
def __hash__(self) -> int:
return hash(self.name)
A = TypeVar("A", bound="ListAllMyBucketsResult")
[docs]
class ListAllMyBucketsResult:
"""LissBuckets API result."""
def __init__(self, buckets: list[Bucket]):
self._buckets = buckets
@property
def buckets(self) -> List[Bucket]:
"""Get buckets."""
return self._buckets
[docs]
@classmethod
def fromxml(cls: Type[A], element: ET.Element) -> A:
"""Create new object with values from XML element."""
element = cast(ET.Element, find(element, "Buckets", True))
buckets = []
elements = findall(element, "Bucket")
for bucket in elements:
name = cast(str, findtext(bucket, "Name", True))
creation_date = findtext(bucket, "CreationDate")
buckets.append(
Bucket(
name,
from_iso8601utc(creation_date) if creation_date else None,
)
)
return cls(buckets)
B = TypeVar("B", bound="Object")
[docs]
class Object:
"""Object information."""
def __init__( # pylint: disable=too-many-arguments
self,
bucket_name: str,
object_name: str | None,
last_modified: datetime | None = None,
etag: str | None = None,
size: int | None = None,
metadata: LooseHeaders | None = None,
version_id: str | None = None,
is_latest: str | None = None,
storage_class: str | None = None,
owner_id: str | None = None,
owner_name: str | None = None,
content_type: str | None = None,
is_delete_marker: bool = False,
tags: Tags | None = None,
):
self._bucket_name = bucket_name
self._object_name = object_name
self._last_modified = last_modified
self._etag = etag
self._size = size
self._metadata = metadata
self._version_id = version_id
self._is_latest = is_latest
self._storage_class = storage_class
self._owner_id = owner_id
self._owner_name = owner_name
self._content_type = content_type
self._is_delete_marker = is_delete_marker
self._tags = tags
@property
def bucket_name(self) -> str:
"""Get bucket name."""
return self._bucket_name
@property
def object_name(self) -> str | None:
"""Get object name."""
return self._object_name
@property
def is_dir(self) -> bool:
"""Get whether this key is a directory."""
return self._object_name is not None and self._object_name.endswith("/")
@property
def last_modified(self) -> datetime | None:
"""Get last modified time."""
return self._last_modified
@property
def etag(self) -> str | None:
"""Get etag."""
return self._etag
@property
def size(self) -> int | None:
"""Get size."""
return self._size
@property
def metadata(self) -> LooseHeaders | None:
"""Get metadata."""
return self._metadata
@property
def version_id(self) -> str | None:
"""Get version ID."""
return self._version_id
@property
def is_latest(self) -> str | None:
"""Get is-latest flag."""
return self._is_latest
@property
def storage_class(self) -> str | None:
"""Get storage class."""
return self._storage_class
@property
def owner_id(self) -> str | None:
"""Get owner ID."""
return self._owner_id
@property
def owner_name(self) -> str | None:
"""Get owner name."""
return self._owner_name
@property
def is_delete_marker(self) -> bool:
"""Get whether this key is a delete marker."""
return self._is_delete_marker
@property
def content_type(self) -> str | None:
"""Get content type."""
return self._content_type
@property
def tags(self) -> Tags | None:
"""Get the tags"""
return self._tags
[docs]
@classmethod
def fromxml(
cls: Type[B],
element: ET.Element,
bucket_name: str,
is_delete_marker: bool = False,
encoding_type: str | None = None,
) -> B:
"""Create new object with values from XML element."""
tag = findtext(element, "LastModified")
last_modified = None if tag is None else from_iso8601utc(tag)
tag = findtext(element, "ETag")
etag = None if tag is None else tag.replace('"', "")
tag = findtext(element, "Size")
size = None if tag is None else int(tag)
elem = find(element, "Owner")
owner_id, owner_name = (
(None, None)
if elem is None
else (findtext(elem, "ID"), findtext(elem, "DisplayName"))
)
elems = find(element, "UserMetadata") or []
metadata: dict[str, str] = {}
for child in elems:
key = child.tag.split("}")[1] if "}" in child.tag else child.tag
metadata[key] = child.text or ""
object_name = cast(str, findtext(element, "Key", True))
if encoding_type == "url":
object_name = unquote_plus(object_name)
tags_text = findtext(element, "UserTags")
tags: Tags | None = None
if tags_text:
tags = Tags.new_object_tags()
tags.update(
cast(
List[Tuple[Any, Any]],
[tokens.split("=") for tokens in tags_text.split("&")],
),
)
return cls(
bucket_name,
object_name,
last_modified=last_modified,
etag=etag,
size=size,
version_id=findtext(element, "VersionId"),
is_latest=findtext(element, "IsLatest"),
storage_class=findtext(element, "StorageClass"),
owner_id=owner_id,
owner_name=owner_name,
metadata=metadata,
is_delete_marker=is_delete_marker,
tags=tags,
)
[docs]
def parse_list_objects(
response_data: str,
bucket_name: str | None = None,
) -> tuple[list[Object], bool, str | None, str | None]:
"""Parse ListObjects/ListObjectsV2/ListObjectVersions response."""
element = ET.fromstring(response_data)
bucket_name = cast(str, findtext(element, "Name", True))
encoding_type = findtext(element, "EncodingType")
elements = findall(element, "Contents")
objects = [
Object.fromxml(tag, bucket_name, encoding_type=encoding_type)
for tag in elements
]
marker = objects[-1].object_name if objects else None
elements = findall(element, "Version")
objects += [
Object.fromxml(tag, bucket_name, encoding_type=encoding_type)
for tag in elements
]
elements = findall(element, "CommonPrefixes")
objects += [
Object(
bucket_name,
(
unquote_plus(findtext(tag, "Prefix", True) or "")
if encoding_type == "url"
else findtext(tag, "Prefix", True)
),
)
for tag in elements
]
elements = findall(element, "DeleteMarker")
objects += [
Object.fromxml(
tag, bucket_name, is_delete_marker=True, encoding_type=encoding_type
)
for tag in elements
]
is_truncated = (findtext(element, "IsTruncated") or "").lower() == "true"
key_marker = findtext(element, "NextKeyMarker")
if key_marker and encoding_type == "url":
key_marker = unquote_plus(key_marker)
version_id_marker = findtext(element, "NextVersionIdMarker")
continuation_token = findtext(element, "NextContinuationToken")
if key_marker is not None:
continuation_token = key_marker
if continuation_token is None:
continuation_token = findtext(element, "NextMarker")
if continuation_token and encoding_type == "url":
continuation_token = unquote_plus(continuation_token)
if continuation_token is None and is_truncated:
continuation_token = marker
return objects, is_truncated, continuation_token, version_id_marker
[docs]
class CompleteMultipartUploadResult:
"""CompleteMultipartUpload API result."""
def __init__(self, response: ClientResponse, response_data: str):
element = ET.fromstring(response_data)
self._bucket_name = findtext(element, "Bucket")
self._object_name = findtext(element, "Key")
self._location = findtext(element, "Location")
self._etag = findtext(element, "ETag")
if self._etag:
self._etag = self._etag.replace('"', "")
self._version_id = response.headers.get("x-amz-version-id")
self._http_headers = response.headers
@property
def bucket_name(self) -> str | None:
"""Get bucket name."""
return self._bucket_name
@property
def object_name(self) -> str | None:
"""Get object name."""
return self._object_name
@property
def location(self) -> str | None:
"""Get location."""
return self._location
@property
def etag(self) -> str | None:
"""Get etag."""
return self._etag
@property
def version_id(self) -> str | None:
"""Get version ID."""
return self._version_id
@property
def http_headers(self) -> CIMultiDictProxy[str]:
"""Get HTTP headers."""
return self._http_headers
C = TypeVar("C", bound="Part")
[docs]
class Part:
"""Part information of a multipart upload."""
def __init__(
self,
part_number: int,
etag: str,
last_modified: datetime | None = None,
size: int | None = None,
):
self._part_number = part_number
self._etag = etag
self._last_modified = last_modified
self._size = size
@property
def part_number(self) -> int:
"""Get part number."""
return self._part_number
@property
def etag(self) -> str:
"""Get etag."""
return self._etag
@property
def last_modified(self) -> datetime | None:
"""Get last-modified."""
return self._last_modified
@property
def size(self) -> int | None:
"""Get size."""
return self._size
[docs]
@classmethod
def fromxml(cls: Type[C], element: ET.Element) -> C:
"""Create new object with values from XML element."""
part_number = int(cast(str, findtext(element, "PartNumber", True)))
etag = cast(str, findtext(element, "ETag", True))
etag = etag.replace('"', "")
tag = findtext(element, "LastModified")
last_modified = None if tag is None else from_iso8601utc(tag)
size = findtext(element, "Size")
return cls(
part_number,
etag,
last_modified,
int(size) if size else None,
)
[docs]
class ListPartsResult:
"""ListParts API result."""
def __init__(self, response_data: str):
element = ET.fromstring(response_data)
self._bucket_name = findtext(element, "Bucket")
self._object_name = findtext(element, "Key")
tag = find(element, "Initiator")
self._initiator_id = None if tag is None else findtext(tag, "ID")
self._initiator_name = None if tag is None else findtext(tag, "DisplayName")
tag = find(element, "Owner")
self._owner_id = None if tag is None else findtext(tag, "ID")
self._owner_name = None if tag is None else findtext(tag, "DisplayName")
self._storage_class = findtext(element, "StorageClass")
self._part_number_marker = findtext(element, "PartNumberMarker")
next_part_number_marker = findtext(element, "NextPartNumberMarker")
self._next_part_number_marker = (
int(next_part_number_marker) if next_part_number_marker else None
)
max_parts = findtext(element, "MaxParts")
self._max_parts = int(max_parts) if max_parts else None
is_truncated = findtext(element, "IsTruncated")
self._is_truncated = is_truncated is not None and is_truncated.lower() == "true"
self._parts = [Part.fromxml(tag) for tag in findall(element, "Part")]
@property
def bucket_name(self) -> str | None:
"""Get bucket name."""
return self._bucket_name
@property
def object_name(self) -> str | None:
"""Get object name."""
return self._object_name
@property
def initiator_id(self) -> str | None:
"""Get initiator ID."""
return self._initiator_id
@property
def initator_name(self) -> str | None:
"""Get initiator name."""
return self._initiator_name
@property
def owner_id(self) -> str | None:
"""Get owner ID."""
return self._owner_id
@property
def owner_name(self) -> str | None:
"""Get owner name."""
return self._owner_name
@property
def storage_class(self) -> str | None:
"""Get storage class."""
return self._storage_class
@property
def part_number_marker(self) -> str | None:
"""Get part number marker."""
return self._part_number_marker
@property
def next_part_number_marker(self) -> int | None:
"""Get next part number marker."""
return self._next_part_number_marker
@property
def max_parts(self) -> int | None:
"""Get max parts."""
return self._max_parts
@property
def is_truncated(self) -> bool:
"""Get is-truncated flag."""
return self._is_truncated
@property
def parts(self) -> list[Part]:
"""Get parts."""
return self._parts
[docs]
class Upload:
"""Upload information of a multipart upload."""
def __init__(self, element: ET.Element, encoding_type: str | None = None):
self._encoding_type = encoding_type
object_name = cast(str, findtext(element, "Key", True))
self._object_name = (
unquote_plus(object_name) if self._encoding_type == "url" else object_name
)
self._upload_id = findtext(element, "UploadId")
tag = find(element, "Initiator")
self._initiator_id = None if tag is None else findtext(tag, "ID")
self._initiator_name = None if tag is None else findtext(tag, "DisplayName")
tag = find(element, "Owner")
self._owner_id = None if tag is None else findtext(tag, "ID")
self._owner_name = None if tag is None else findtext(tag, "DisplayName")
self._storage_class = findtext(element, "StorageClass")
initiated_time = findtext(element, "Initiated")
self._initiated_time = (
from_iso8601utc(initiated_time) if initiated_time else None
)
@property
def object_name(self) -> str:
"""Get object name."""
return self._object_name
@property
def initiator_id(self) -> str | None:
"""Get initiator ID."""
return self._initiator_id
@property
def initator_name(self) -> str | None:
"""Get initiator name."""
return self._initiator_name
@property
def owner_id(self) -> str | None:
"""Get owner ID."""
return self._owner_id
@property
def owner_name(self) -> str | None:
"""Get owner name."""
return self._owner_name
@property
def storage_class(self) -> str | None:
"""Get storage class."""
return self._storage_class
@property
def upload_id(self) -> str | None:
"""Get upload ID."""
return self._upload_id
@property
def initiated_time(self) -> datetime | None:
"""Get initiated time."""
return self._initiated_time
[docs]
class ListMultipartUploadsResult:
"""ListMultipartUploads API result."""
def __init__(self, response_data: str):
element = ET.fromstring(response_data)
self._encoding_type = findtext(element, "EncodingType")
self._bucket_name = findtext(element, "Bucket")
self._key_marker = findtext(element, "KeyMarker")
if self._key_marker:
self._key_marker = (
unquote_plus(self._key_marker)
if self._encoding_type == "url"
else self._key_marker
)
self._upload_id_marker = findtext(element, "UploadIdMarker")
self._next_key_marker = findtext(element, "NextKeyMarker")
if self._next_key_marker:
self._next_key_marker = (
unquote_plus(self._next_key_marker)
if self._encoding_type == "url"
else self._next_key_marker
)
self._next_upload_id_marker = findtext(element, "NextUploadIdMarker")
max_uploads = findtext(element, "MaxUploads")
self._max_uploads = int(max_uploads) if max_uploads else None
is_truncated = findtext(element, "IsTruncated")
self._is_truncated = is_truncated is not None and is_truncated.lower() == "true"
self._uploads = [
Upload(tag, self._encoding_type) for tag in findall(element, "Upload")
]
@property
def bucket_name(self) -> str | None:
"""Get bucket name."""
return self._bucket_name
@property
def key_marker(self) -> str | None:
"""Get key marker."""
return self._key_marker
@property
def upload_id_marker(self) -> str | None:
"""Get upload ID marker."""
return self._upload_id_marker
@property
def next_key_marker(self) -> str | None:
"""Get next key marker."""
return self._next_key_marker
@property
def next_upload_id_marker(self) -> str | None:
"""Get next upload ID marker."""
return self._next_upload_id_marker
@property
def max_uploads(self) -> int | None:
"""Get max uploads."""
return self._max_uploads
@property
def is_truncated(self) -> bool:
"""Get is-truncated flag."""
return self._is_truncated
@property
def encoding_type(self) -> str | None:
"""Get encoding type."""
return self._encoding_type
@property
def uploads(self) -> list[Upload]:
"""Get uploads."""
return self._uploads
_RESERVED_ELEMENTS = (
"bucket",
"x-amz-algorithm",
"x-amz-credential",
"x-amz-date",
"policy",
"x-amz-signature",
)
_EQ = "eq"
_STARTS_WITH = "starts-with"
_ALGORITHM = "AWS4-HMAC-SHA256"
def _trim_dollar(value: str) -> str:
"""Trim dollar character if present."""
return value[1:] if value.startswith("$") else value
[docs]
class PostPolicy:
"""
Post policy information to be used to generate presigned post policy
form-data. Condition elements and respective condition for Post policy
is available at
https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-HTTPPOSTConstructPolicy.html#sigv4-PolicyConditions
"""
def __init__(self, bucket_name: str, expiration: datetime):
check_bucket_name(bucket_name)
if not isinstance(expiration, datetime):
raise ValueError("expiration must be datetime type")
self._bucket_name = bucket_name
self._expiration = expiration
self._conditions: OrderedDict = OrderedDict()
self._conditions[_EQ] = OrderedDict()
self._conditions[_STARTS_WITH] = OrderedDict()
self._lower_limit: int | None = None
self._upper_limit: int | None = None
[docs]
def add_equals_condition(self, element: str, value: str):
"""Add equals condition of an element and value."""
if not element:
raise ValueError("condition element cannot be empty")
element = _trim_dollar(element)
if element in [
"success_action_redirect",
"redirect",
"content-length-range",
]:
raise ValueError(element + " is unsupported for equals condition")
if element in _RESERVED_ELEMENTS:
raise ValueError(element + " cannot be set")
self._conditions[_EQ][element] = value
[docs]
def remove_equals_condition(self, element: str):
"""Remove previously set equals condition of an element."""
if not element:
raise ValueError("condition element cannot be empty")
self._conditions[_EQ].pop(element)
[docs]
def add_starts_with_condition(self, element: str, value: str):
"""
Add starts-with condition of an element and value. Value set to empty
string does matching any content condition.
"""
if not element:
raise ValueError("condition element cannot be empty")
element = _trim_dollar(element)
if element in ["success_action_status", "content-length-range"] or (
element.startswith("x-amz-") and not element.startswith("x-amz-meta-")
):
raise ValueError(
f"{element} is unsupported for starts-with condition",
)
if element in _RESERVED_ELEMENTS:
raise ValueError(element + " cannot be set")
self._conditions[_STARTS_WITH][element] = value
[docs]
def remove_starts_with_condition(self, element: str):
"""Remove previously set starts-with condition of an element."""
if not element:
raise ValueError("condition element cannot be empty")
self._conditions[_STARTS_WITH].pop(element)
[docs]
def add_content_length_range_condition( # pylint: disable=invalid-name
self, lower_limit: int, upper_limit: int
):
"""Add content-length-range condition with lower and upper limits."""
if lower_limit < 0:
raise ValueError("lower limit cannot be negative number")
if upper_limit < 0:
raise ValueError("upper limit cannot be negative number")
if lower_limit > upper_limit:
raise ValueError("lower limit cannot be greater than upper limit")
self._lower_limit = lower_limit
self._upper_limit = upper_limit
# pylint: disable=invalid-name
[docs]
def remove_content_length_range_condition(self):
"""Remove previously set content-length-range condition."""
self._lower_limit = None
self._upper_limit = None
[docs]
def form_data(self, creds: Credentials, region: str) -> dict[str, Any]:
"""
Return form-data of this post policy. The returned dict contains
x-amz-algorithm, x-amz-credential, x-amz-security-token, x-amz-date,
policy and x-amz-signature.
"""
if not isinstance(creds, Credentials):
raise ValueError("credentials must be Credentials type")
if not region:
raise ValueError("region cannot be empty")
if (
"key" not in self._conditions[_EQ]
and "key" not in self._conditions[_STARTS_WITH]
):
raise ValueError("key condition must be set")
policy = OrderedDict()
policy["expiration"] = to_iso8601utc(self._expiration)
policy["conditions"] = [[_EQ, "$bucket", self._bucket_name]]
for cond_key, conditions in self._conditions.items():
for key, value in conditions.items():
policy["conditions"].append([cond_key, "$" + key, value])
if self._lower_limit is not None and self._upper_limit is not None:
policy["conditions"].append(
[
"content-length-range",
str(self._lower_limit),
str(self._upper_limit),
],
)
credential = get_credential_string(creds.access_key, utcnow(), region)
amz_date = to_amz_date(utcnow())
policy["conditions"].append([_EQ, "$x-amz-algorithm", _ALGORITHM])
policy["conditions"].append([_EQ, "$x-amz-credential", credential])
if creds.session_token:
policy["conditions"].append(
[_EQ, "$x-amz-security-token", creds.session_token],
)
policy["conditions"].append([_EQ, "$x-amz-date", amz_date])
policy_encoded = base64.b64encode(
json.dumps(policy).encode(),
).decode("utf-8")
signature = post_presign_v4(
policy_encoded,
creds.secret_key,
utcnow(),
region,
)
form_data = {
"x-amz-algorithm": _ALGORITHM,
"x-amz-credential": credential,
"x-amz-date": amz_date,
"policy": policy_encoded,
"x-amz-signature": signature,
}
if creds.session_token:
form_data["x-amz-security-token"] = creds.session_token
return form_data
@property
def bucket_name(self) -> str:
"""Get bucket name."""
return self._bucket_name
[docs]
def parse_copy_object(response_data: str) -> tuple[str, datetime | None]:
"""Parse CopyObject/UploadPartCopy response."""
element = ET.fromstring(response_data)
etag = cast(str, findtext(element, "ETag", True)).replace('"', "")
last_modified = findtext(element, "LastModified")
return etag, from_iso8601utc(last_modified) if last_modified else None
[docs]
class AsyncEventIterable:
"""Context manager friendly event iterable."""
def __init__(
self,
response: ClientResponse,
):
self._response = response
def __aiter__(self):
return self
async def _get_records(self) -> dict | list | None:
"""Get event records from response stream."""
line = await self._response.content.readline()
if not line:
return None
event: dict = json.loads(line)
if event["Records"]:
return event
return None
async def __anext__(self) -> dict | list:
records = await self._get_records()
if not records:
raise StopAsyncIteration
return records
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, value, traceback):
self._response.close()
[docs]
class PeerSite:
"""Represents a cluster/site to be added to the set of replicated sites."""
def __init__(
self,
name: str,
endpoint: str,
access_key: str,
secret_key: str,
):
self._name = name
self._endpoint = endpoint
self._access_key = access_key
self._secret_key = secret_key
[docs]
def to_dict(self) -> dict[str, str]:
"""Convert to dictionary."""
return {
"name": self._name,
"endpoints": self._endpoint,
"accessKey": self._access_key,
"secretKey": self._secret_key,
}
[docs]
class SiteReplicationStatusOptions:
"""Represents site replication status options."""
ENTITY_TYPE = Enum(
"ENTITY_TYPE",
{
"BUCKET": "bucket",
"POLICY": "policy",
"USER": "user",
"GROUP": "group",
},
)
def __init__(self):
self._buckets = False
self._policies = False
self._users = False
self._groups = False
self._metrics = False
self._entity = None
self._entity_value = None
self._show_deleted = False
@property
def buckets(self) -> bool:
"""Get buckets."""
return self._buckets
@buckets.setter
def buckets(self, value: bool):
"""Set buckets."""
self._buckets = value
@property
def policies(self) -> bool:
"""Get policies."""
return self._policies
@policies.setter
def policies(self, value: bool):
"""Set policies."""
self._policies = value
@property
def users(self) -> bool:
"""Get users."""
return self._users
@users.setter
def users(self, value: bool):
"""Set users."""
self._users = value
@property
def groups(self) -> bool:
"""Get groups."""
return self._groups
@groups.setter
def groups(self, value: bool):
"""Set groups."""
self._groups = value
@property
def metrics(self) -> bool:
"""Get metrics."""
return self._metrics
@metrics.setter
def metrics(self, value: bool):
"""Set metrics."""
self._metrics = value
@property
def entity(self) -> str | None:
"""Get entity."""
return self._entity
@entity.setter
def entity(self, value: str):
"""Set entity."""
self._entity = value
@property
def entity_value(self) -> str | None:
"""Get entity value."""
return self._entity_value
@entity_value.setter
def entity_value(self, value: str):
"""Set entity value."""
self._entity_value = value
@property
def show_deleted(self) -> bool:
"""Get show deleted."""
return self._show_deleted
@show_deleted.setter
def show_deleted(self, value: bool):
"""Set show deleted."""
self._show_deleted = value
[docs]
def to_query_params(self) -> dict[str, str]:
"""Convert this options to query parameters."""
params = {
"buckets": str(self._buckets).lower(),
"policies": str(self._policies).lower(),
"users": str(self._users).lower(),
"groups": str(self._groups).lower(),
"metrics": str(self._metrics).lower(),
"showDeleted": str(self._show_deleted).lower(),
}
if self._entity and self._entity_value:
params["entityvalue"] = self._entity_value
params["entity"] = self._entity
return params
[docs]
class PeerInfo:
"""Site replication peer information."""
def __init__(
self,
deployment_id: str,
endpoint: str,
bucket_bandwidth_limit: str,
bucket_bandwidth_set: str,
):
self._deployment_id = deployment_id
self._endpoint = endpoint
self._name: str | None = None
self._sync_status: str | None = None
self._bucket_bandwidth_limit = bucket_bandwidth_limit
self._bucket_bandwidth_set = bucket_bandwidth_set
self._bucket_bandwidth_updated_at: datetime | None = None
@property
def deployment_id(self) -> str:
"""Get deployment ID."""
return self._deployment_id
@deployment_id.setter
def deployment_id(self, value: str):
"""Set deployment ID."""
self._deployment_id = value
@property
def endpoint(self) -> str:
"""Get endpoint."""
return self._endpoint
@endpoint.setter
def endpoint(self, value: str):
"""Set endpoint."""
self._endpoint = value
@property
def name(self) -> str | None:
"""Get name."""
return self._name
@name.setter
def name(self, value: str):
"""Set name."""
self._name = value
@property
def sync_status(self) -> str | None:
"""Get sync status."""
return self._sync_status
@sync_status.setter
def sync_status(self, value: str):
"""Set sync status."""
self._sync_status = value
@property
def bucket_bandwidth_limit(self) -> str:
"""Get bucket bandwidth limit."""
return self._bucket_bandwidth_limit
@bucket_bandwidth_limit.setter
def bucket_bandwidth_limit(self, value: str):
"""Set bucket bandwidth limit."""
self._bucket_bandwidth_limit = value
@property
def bucket_bandwidth_set(self) -> str:
"""Get bucket bandwidth set."""
return self._bucket_bandwidth_set
@bucket_bandwidth_set.setter
def bucket_bandwidth_set(self, value: str):
"""Set bucket bandwidth set."""
self._bucket_bandwidth_set = value
@property
def bucket_bandwidth_updated_at(self) -> datetime | None:
"""Get bucket bandwidth updated at."""
return self._bucket_bandwidth_updated_at
@bucket_bandwidth_updated_at.setter
def bucket_bandwidth_updated_at(self, value: datetime | None):
"""Set bucket bandwidth updated at."""
self._bucket_bandwidth_updated_at = value
[docs]
def to_dict(self):
"""Converts peer information to dictionary."""
data = {
"endpoint": self._endpoint,
"deploymentID": self._deployment_id,
"defaultbandwidth": {
"bandwidthLimitPerBucket": self._bucket_bandwidth_limit,
"set": self._bucket_bandwidth_set,
},
}
if self._name:
data["name"] = self._name
if self._sync_status is not None:
data["sync"] = "enable" if self._sync_status else "disable"
if self._bucket_bandwidth_updated_at:
data["defaultbandwidth"]["updatedAt"] = to_iso8601utc(
self._bucket_bandwidth_updated_at,
)
return data
[docs]
class ListObjects:
def __init__(
self,
client: Minio,
bucket_name: str,
prefix: str | None = None,
recursive: bool = False,
start_after: str | None = None,
include_user_meta: bool = False,
include_version: bool = False,
use_api_v1: bool = False,
use_url_encoding_type: bool = True,
fetch_owner: bool = False,
extra_headers: DictType | None = None,
):
self.client = client
self.bucket_name = bucket_name
self.prefix = prefix
self.recursive = recursive
self.start_after = start_after
self.include_user_meta = include_user_meta
self.include_version = include_version
self.use_api_v1 = use_api_v1
self.use_url_encoding_type = use_url_encoding_type
self.fetch_owner = fetch_owner
self.extra_headers = extra_headers
self.iterator: AsyncGenerator[Object] | None = None
[docs]
def gen_iterator(self) -> AsyncGenerator[Object]:
return self.client._list_objects(
self.bucket_name,
delimiter=None if self.recursive else "/",
include_user_meta=self.include_user_meta,
prefix=self.prefix,
start_after=self.start_after,
use_api_v1=self.use_api_v1,
include_version=self.include_version,
encoding_type="url" if self.use_url_encoding_type else None,
fetch_owner=self.fetch_owner,
extra_headers=self.extra_headers,
)
def __aiter__(self):
self.iterator = self.gen_iterator()
return self
async def __anext__(self) -> Object:
if self.iterator is None:
self.gen_iterator()
try:
return await cast(AsyncGenerator, self.iterator).__anext__()
except StopAsyncIteration:
raise StopAsyncIteration
def __await__(self):
return self._collect_objects().__await__()
async def _collect_objects(self) -> List[Object]:
if self.iterator is None:
self.gen_iterator()
objects = []
async for object in self:
objects.append(object)
return objects
[docs]
class DeleteErrors:
def __init__(
self,
client: Minio,
bucket_name: str,
delete_object_list: Iterable[DeleteObject],
bypass_governance_mode: bool = False,
):
self.client = client
self.bucket_name = bucket_name
# turn list like objects into an iterator.
self.delete_object_list = itertools.chain(delete_object_list)
self.bypass_governance_mode = bypass_governance_mode
self.iterator: AsyncGenerator[DeleteError] | None = None
[docs]
async def gen_iterator(self) -> AsyncGenerator[DeleteError]:
check_bucket_name(self.bucket_name, s3_check=self.client._base_url.is_aws_host)
while True:
# get 1000 entries or whatever available.
objects = [
delete_object
for _, delete_object in zip(
range(1000),
self.delete_object_list,
)
]
if not objects:
return
result = await self.client._delete_objects(
self.bucket_name,
objects,
quiet=True,
bypass_governance_mode=self.bypass_governance_mode,
)
for error in result.error_list:
# AWS S3 returns "NoSuchVersion" error when
# version doesn't exist ignore this error
# yield all errors otherwise
if error.code != "NoSuchVersion":
yield error
def __aiter__(self):
self.iterator = self.gen_iterator()
return self
async def __anext__(self) -> DeleteError:
if self.iterator is None:
self.gen_iterator()
try:
return await cast(
AsyncGenerator[DeleteError, DeleteError], self.iterator
).__anext__()
except StopAsyncIteration:
raise StopAsyncIteration
def __await__(self):
return self._collect_errors().__await__()
async def _collect_errors(self) -> List[DeleteError]:
if self.iterator is None:
self.gen_iterator()
errors = []
async for error in self:
errors.append(error)
return errors
[docs]
class MultipartUploader:
def __init__(
self,
client: Minio,
bucket_name: str,
object_name: str,
headers: DictType,
):
self._client = client
self._bucket_name = bucket_name
self._object_name = object_name
self._headers = headers
self._upload_id = ""
self._parts = []
self._result: CompleteMultipartUploadResult | None = None
@property
def result(self) -> CompleteMultipartUploadResult:
"""
Get multipart upload result.
:return: Multipart upload result.
:rtype: CompleteMultipartUploadResult
:raises ValueError: If multipart upload is not completed.
"""
if not self._result:
raise ValueError("Multipart upload not completed")
return self._result
async def _init(self):
self._upload_id = await self._client._create_multipart_upload(
self._bucket_name, self._object_name, self._headers
)
[docs]
async def complete(self) -> CompleteMultipartUploadResult:
"""
Complete multipart upload.
:return: Multipart upload result.
:rtype: CompleteMultipartUploadResult
"""
self._result = await self._client._complete_multipart_upload(
self._bucket_name, self._object_name, self._upload_id, self._parts
)
return self.result
[docs]
async def abort(self):
"""Abort multipart upload."""
await self._client._abort_multipart_upload(
self._bucket_name, self._object_name, self._upload_id
)
async def __aenter__(self):
await self._init()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.complete()
[docs]
async def upload_part(self, data: bytes | io.BytesIO, part_number: int) -> Part:
"""
Upload a part of the object.
The data size must be larger than 5 MiB and less than 5 GiB except the last part.
:param bytes | io.BytesIO data: Data to upload as part.
:param int part_number: Number of the part.
:return: Part information.
:rtype: Part
:raises ValueError: If part_number is not a positive integer.
"""
if not self._upload_id:
await self._init()
try:
if not isinstance(part_number, int) or part_number < 1:
raise ValueError("part_number must be a positive integer")
etag = await self._client._upload_part(
self._bucket_name,
self._object_name,
data,
self._headers,
self._upload_id,
part_number,
)
part = Part(part_number, etag)
self._parts.append(part)
return part
except Exception as e:
await self.abort()
raise e