# -*- coding: utf-8 -*-
# Asynchronous MinIO Client SDK for Python
# (C) 2020 MinIO, Inc.
# (C) 2022 Huseyn Mashadiyev <mashadiyev.huseyn@gmail.com>
# (C) 2022 L-ING <hlf01@icloud.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# NOTICE: This file has been changed and differs from the original
# Author: L-ING
# Date: 2022-07-11
"""
Response of ListBuckets, ListObjects, ListObjectsV2 and ListObjectVersions API.
"""
from __future__ import absolute_import, annotations
import base64
import itertools
import json
from collections import OrderedDict
from datetime import datetime
from enum import Enum
from typing import (
TYPE_CHECKING,
Any,
AsyncGenerator,
Iterable,
List,
Tuple,
Type,
TypeVar,
cast,
)
from urllib.parse import unquote_plus
from xml.etree import ElementTree as ET
from aiohttp import ClientResponse, ClientSession
from aiohttp.typedefs import LooseHeaders
from aiohttp_retry import RetryClient
from multidict import CIMultiDictProxy
from .commonconfig import Tags
from .credentials import Credentials
from .deleteobjects import DeleteError, DeleteObject
from .helpers import DictType, check_bucket_name
from .signer import get_credential_string, post_presign_v4
from .time import from_iso8601utc, to_amz_date, to_iso8601utc, utcnow
from .xml import find, findall, findtext
if TYPE_CHECKING:
from .api import Minio
[docs]
class Bucket:
"""Bucket information."""
def __init__(self, name: str, creation_date: datetime | None):
self._name = name
self._creation_date = creation_date
@property
def name(self) -> str:
"""Get name."""
return self._name
@property
def creation_date(self) -> datetime | None:
"""Get creation date."""
return self._creation_date
def __repr__(self) -> str:
return f"{type(self).__name__}('{self.name}')"
def __str__(self) -> str:
return self.name
def __eq__(self, other) -> bool:
if isinstance(other, Bucket):
return self.name == other.name
if isinstance(other, str):
return self.name == other
return NotImplemented
def __hash__(self) -> int:
return hash(self.name)
A = TypeVar("A", bound="ListAllMyBucketsResult")
[docs]
class ListAllMyBucketsResult:
"""LissBuckets API result."""
def __init__(self, buckets: list[Bucket]):
self._buckets = buckets
@property
def buckets(self) -> List[Bucket]:
"""Get buckets."""
return self._buckets
[docs]
@classmethod
def fromxml(cls: Type[A], element: ET.Element) -> A:
"""Create new object with values from XML element."""
element = cast(ET.Element, find(element, "Buckets", True))
buckets = []
elements = findall(element, "Bucket")
for bucket in elements:
name = cast(str, findtext(bucket, "Name", True))
creation_date = findtext(bucket, "CreationDate")
buckets.append(
Bucket(
name,
from_iso8601utc(creation_date) if creation_date else None,
)
)
return cls(buckets)
B = TypeVar("B", bound="Object")
[docs]
class Object:
"""Object information."""
def __init__( # pylint: disable=too-many-arguments
self,
bucket_name: str,
object_name: str | None,
last_modified: datetime | None = None,
etag: str | None = None,
size: int | None = None,
metadata: LooseHeaders | None = None,
version_id: str | None = None,
is_latest: str | None = None,
storage_class: str | None = None,
owner_id: str | None = None,
owner_name: str | None = None,
content_type: str | None = None,
is_delete_marker: bool = False,
tags: Tags | None = None,
):
self._bucket_name = bucket_name
self._object_name = object_name
self._last_modified = last_modified
self._etag = etag
self._size = size
self._metadata = metadata
self._version_id = version_id
self._is_latest = is_latest
self._storage_class = storage_class
self._owner_id = owner_id
self._owner_name = owner_name
self._content_type = content_type
self._is_delete_marker = is_delete_marker
self._tags = tags
@property
def bucket_name(self) -> str:
"""Get bucket name."""
return self._bucket_name
@property
def object_name(self) -> str | None:
"""Get object name."""
return self._object_name
@property
def is_dir(self) -> bool:
"""Get whether this key is a directory."""
return self._object_name is not None and self._object_name.endswith("/")
@property
def last_modified(self) -> datetime | None:
"""Get last modified time."""
return self._last_modified
@property
def etag(self) -> str | None:
"""Get etag."""
return self._etag
@property
def size(self) -> int | None:
"""Get size."""
return self._size
@property
def metadata(self) -> LooseHeaders | None:
"""Get metadata."""
return self._metadata
@property
def version_id(self) -> str | None:
"""Get version ID."""
return self._version_id
@property
def is_latest(self) -> str | None:
"""Get is-latest flag."""
return self._is_latest
@property
def storage_class(self) -> str | None:
"""Get storage class."""
return self._storage_class
@property
def owner_id(self) -> str | None:
"""Get owner ID."""
return self._owner_id
@property
def owner_name(self) -> str | None:
"""Get owner name."""
return self._owner_name
@property
def is_delete_marker(self) -> bool:
"""Get whether this key is a delete marker."""
return self._is_delete_marker
@property
def content_type(self) -> str | None:
"""Get content type."""
return self._content_type
@property
def tags(self) -> Tags | None:
"""Get the tags"""
return self._tags
[docs]
@classmethod
def fromxml(
cls: Type[B],
element: ET.Element,
bucket_name: str,
is_delete_marker: bool = False,
encoding_type: str | None = None,
) -> B:
"""Create new object with values from XML element."""
tag = findtext(element, "LastModified")
last_modified = None if tag is None else from_iso8601utc(tag)
tag = findtext(element, "ETag")
etag = None if tag is None else tag.replace('"', "")
tag = findtext(element, "Size")
size = None if tag is None else int(tag)
elem = find(element, "Owner")
owner_id, owner_name = (
(None, None)
if elem is None
else (findtext(elem, "ID"), findtext(elem, "DisplayName"))
)
elems = find(element, "UserMetadata") or []
metadata: dict[str, str] = {}
for child in elems:
key = child.tag.split("}")[1] if "}" in child.tag else child.tag
metadata[key] = child.text or ""
object_name = cast(str, findtext(element, "Key", True))
if encoding_type == "url":
object_name = unquote_plus(object_name)
tags_text = findtext(element, "UserTags")
tags: Tags | None = None
if tags_text:
tags = Tags.new_object_tags()
tags.update(
cast(
List[Tuple[Any, Any]],
[tokens.split("=") for tokens in tags_text.split("&")],
),
)
return cls(
bucket_name,
object_name,
last_modified=last_modified,
etag=etag,
size=size,
version_id=findtext(element, "VersionId"),
is_latest=findtext(element, "IsLatest"),
storage_class=findtext(element, "StorageClass"),
owner_id=owner_id,
owner_name=owner_name,
metadata=metadata,
is_delete_marker=is_delete_marker,
tags=tags,
)
[docs]
def parse_list_objects(
response_data: str,
bucket_name: str | None = None,
) -> tuple[list[Object], bool, str | None, str | None]:
"""Parse ListObjects/ListObjectsV2/ListObjectVersions response."""
element = ET.fromstring(response_data)
bucket_name = cast(str, findtext(element, "Name", True))
encoding_type = findtext(element, "EncodingType")
elements = findall(element, "Contents")
objects = [
Object.fromxml(tag, bucket_name, encoding_type=encoding_type)
for tag in elements
]
marker = objects[-1].object_name if objects else None
elements = findall(element, "Version")
objects += [
Object.fromxml(tag, bucket_name, encoding_type=encoding_type)
for tag in elements
]
elements = findall(element, "CommonPrefixes")
objects += [
Object(
bucket_name,
(
unquote_plus(findtext(tag, "Prefix", True) or "")
if encoding_type == "url"
else findtext(tag, "Prefix", True)
),
)
for tag in elements
]
elements = findall(element, "DeleteMarker")
objects += [
Object.fromxml(
tag, bucket_name, is_delete_marker=True, encoding_type=encoding_type
)
for tag in elements
]
is_truncated = (findtext(element, "IsTruncated") or "").lower() == "true"
key_marker = findtext(element, "NextKeyMarker")
if key_marker and encoding_type == "url":
key_marker = unquote_plus(key_marker)
version_id_marker = findtext(element, "NextVersionIdMarker")
continuation_token = findtext(element, "NextContinuationToken")
if key_marker is not None:
continuation_token = key_marker
if continuation_token is None:
continuation_token = findtext(element, "NextMarker")
if continuation_token and encoding_type == "url":
continuation_token = unquote_plus(continuation_token)
if continuation_token is None and is_truncated:
continuation_token = marker
return objects, is_truncated, continuation_token, version_id_marker
[docs]
class CompleteMultipartUploadResult:
"""CompleteMultipartUpload API result."""
def __init__(self, response: ClientResponse, response_data: str):
element = ET.fromstring(response_data)
self._bucket_name = findtext(element, "Bucket")
self._object_name = findtext(element, "Key")
self._location = findtext(element, "Location")
self._etag = findtext(element, "ETag")
if self._etag:
self._etag = self._etag.replace('"', "")
self._version_id = response.headers.get("x-amz-version-id")
self._http_headers = response.headers
@property
def bucket_name(self) -> str | None:
"""Get bucket name."""
return self._bucket_name
@property
def object_name(self) -> str | None:
"""Get object name."""
return self._object_name
@property
def location(self) -> str | None:
"""Get location."""
return self._location
@property
def etag(self) -> str | None:
"""Get etag."""
return self._etag
@property
def version_id(self) -> str | None:
"""Get version ID."""
return self._version_id
@property
def http_headers(self) -> CIMultiDictProxy[str]:
"""Get HTTP headers."""
return self._http_headers
C = TypeVar("C", bound="Part")
[docs]
class Part:
"""Part information of a multipart upload."""
def __init__(
self,
part_number: int,
etag: str,
last_modified: datetime | None = None,
size: int | None = None,
):
self._part_number = part_number
self._etag = etag
self._last_modified = last_modified
self._size = size
@property
def part_number(self) -> int:
"""Get part number."""
return self._part_number
@property
def etag(self) -> str:
"""Get etag."""
return self._etag
@property
def last_modified(self) -> datetime | None:
"""Get last-modified."""
return self._last_modified
@property
def size(self) -> int | None:
"""Get size."""
return self._size
[docs]
@classmethod
def fromxml(cls: Type[C], element: ET.Element) -> C:
"""Create new object with values from XML element."""
part_number = int(cast(str, findtext(element, "PartNumber", True)))
etag = cast(str, findtext(element, "ETag", True))
etag = etag.replace('"', "")
tag = findtext(element, "LastModified")
last_modified = None if tag is None else from_iso8601utc(tag)
size = findtext(element, "Size")
return cls(
part_number,
etag,
last_modified,
int(size) if size else None,
)
[docs]
class ListPartsResult:
"""ListParts API result."""
def __init__(self, response_data: str):
element = ET.fromstring(response_data)
self._bucket_name = findtext(element, "Bucket")
self._object_name = findtext(element, "Key")
tag = find(element, "Initiator")
self._initiator_id = None if tag is None else findtext(tag, "ID")
self._initiator_name = None if tag is None else findtext(tag, "DisplayName")
tag = find(element, "Owner")
self._owner_id = None if tag is None else findtext(tag, "ID")
self._owner_name = None if tag is None else findtext(tag, "DisplayName")
self._storage_class = findtext(element, "StorageClass")
self._part_number_marker = findtext(element, "PartNumberMarker")
next_part_number_marker = findtext(element, "NextPartNumberMarker")
self._next_part_number_marker = (
int(next_part_number_marker) if next_part_number_marker else None
)
max_parts = findtext(element, "MaxParts")
self._max_parts = int(max_parts) if max_parts else None
is_truncated = findtext(element, "IsTruncated")
self._is_truncated = is_truncated is not None and is_truncated.lower() == "true"
self._parts = [Part.fromxml(tag) for tag in findall(element, "Part")]
@property
def bucket_name(self) -> str | None:
"""Get bucket name."""
return self._bucket_name
@property
def object_name(self) -> str | None:
"""Get object name."""
return self._object_name
@property
def initiator_id(self) -> str | None:
"""Get initiator ID."""
return self._initiator_id
@property
def initator_name(self) -> str | None:
"""Get initiator name."""
return self._initiator_name
@property
def owner_id(self) -> str | None:
"""Get owner ID."""
return self._owner_id
@property
def owner_name(self) -> str | None:
"""Get owner name."""
return self._owner_name
@property
def storage_class(self) -> str | None:
"""Get storage class."""
return self._storage_class
@property
def part_number_marker(self) -> str | None:
"""Get part number marker."""
return self._part_number_marker
@property
def next_part_number_marker(self) -> int | None:
"""Get next part number marker."""
return self._next_part_number_marker
@property
def max_parts(self) -> int | None:
"""Get max parts."""
return self._max_parts
@property
def is_truncated(self) -> bool:
"""Get is-truncated flag."""
return self._is_truncated
@property
def parts(self) -> list[Part]:
"""Get parts."""
return self._parts
[docs]
class Upload:
"""Upload information of a multipart upload."""
def __init__(self, element: ET.Element, encoding_type: str | None = None):
self._encoding_type = encoding_type
object_name = cast(str, findtext(element, "Key", True))
self._object_name = (
unquote_plus(object_name) if self._encoding_type == "url" else object_name
)
self._upload_id = findtext(element, "UploadId")
tag = find(element, "Initiator")
self._initiator_id = None if tag is None else findtext(tag, "ID")
self._initiator_name = None if tag is None else findtext(tag, "DisplayName")
tag = find(element, "Owner")
self._owner_id = None if tag is None else findtext(tag, "ID")
self._owner_name = None if tag is None else findtext(tag, "DisplayName")
self._storage_class = findtext(element, "StorageClass")
initiated_time = findtext(element, "Initiated")
self._initiated_time = (
from_iso8601utc(initiated_time) if initiated_time else None
)
@property
def object_name(self) -> str:
"""Get object name."""
return self._object_name
@property
def initiator_id(self) -> str | None:
"""Get initiator ID."""
return self._initiator_id
@property
def initator_name(self) -> str | None:
"""Get initiator name."""
return self._initiator_name
@property
def owner_id(self) -> str | None:
"""Get owner ID."""
return self._owner_id
@property
def owner_name(self) -> str | None:
"""Get owner name."""
return self._owner_name
@property
def storage_class(self) -> str | None:
"""Get storage class."""
return self._storage_class
@property
def upload_id(self) -> str | None:
"""Get upload ID."""
return self._upload_id
@property
def initiated_time(self) -> datetime | None:
"""Get initiated time."""
return self._initiated_time
[docs]
class ListMultipartUploadsResult:
"""ListMultipartUploads API result."""
def __init__(self, response_data: str):
element = ET.fromstring(response_data)
self._encoding_type = findtext(element, "EncodingType")
self._bucket_name = findtext(element, "Bucket")
self._key_marker = findtext(element, "KeyMarker")
if self._key_marker:
self._key_marker = (
unquote_plus(self._key_marker)
if self._encoding_type == "url"
else self._key_marker
)
self._upload_id_marker = findtext(element, "UploadIdMarker")
self._next_key_marker = findtext(element, "NextKeyMarker")
if self._next_key_marker:
self._next_key_marker = (
unquote_plus(self._next_key_marker)
if self._encoding_type == "url"
else self._next_key_marker
)
self._next_upload_id_marker = findtext(element, "NextUploadIdMarker")
max_uploads = findtext(element, "MaxUploads")
self._max_uploads = int(max_uploads) if max_uploads else None
is_truncated = findtext(element, "IsTruncated")
self._is_truncated = is_truncated is not None and is_truncated.lower() == "true"
self._uploads = [
Upload(tag, self._encoding_type) for tag in findall(element, "Upload")
]
@property
def bucket_name(self) -> str | None:
"""Get bucket name."""
return self._bucket_name
@property
def key_marker(self) -> str | None:
"""Get key marker."""
return self._key_marker
@property
def upload_id_marker(self) -> str | None:
"""Get upload ID marker."""
return self._upload_id_marker
@property
def next_key_marker(self) -> str | None:
"""Get next key marker."""
return self._next_key_marker
@property
def next_upload_id_marker(self) -> str | None:
"""Get next upload ID marker."""
return self._next_upload_id_marker
@property
def max_uploads(self) -> int | None:
"""Get max uploads."""
return self._max_uploads
@property
def is_truncated(self) -> bool:
"""Get is-truncated flag."""
return self._is_truncated
@property
def encoding_type(self) -> str | None:
"""Get encoding type."""
return self._encoding_type
@property
def uploads(self) -> list[Upload]:
"""Get uploads."""
return self._uploads
_RESERVED_ELEMENTS = (
"bucket",
"x-amz-algorithm",
"x-amz-credential",
"x-amz-date",
"policy",
"x-amz-signature",
)
_EQ = "eq"
_STARTS_WITH = "starts-with"
_ALGORITHM = "AWS4-HMAC-SHA256"
def _trim_dollar(value: str) -> str:
"""Trim dollar character if present."""
return value[1:] if value.startswith("$") else value
[docs]
class PostPolicy:
"""
Post policy information to be used to generate presigned post policy
form-data. Condition elements and respective condition for Post policy
is available at
https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-HTTPPOSTConstructPolicy.html#sigv4-PolicyConditions
"""
def __init__(self, bucket_name: str, expiration: datetime):
check_bucket_name(bucket_name)
if not isinstance(expiration, datetime):
raise ValueError("expiration must be datetime type")
self._bucket_name = bucket_name
self._expiration = expiration
self._conditions: OrderedDict = OrderedDict()
self._conditions[_EQ] = OrderedDict()
self._conditions[_STARTS_WITH] = OrderedDict()
self._lower_limit: int | None = None
self._upper_limit: int | None = None
[docs]
def add_equals_condition(self, element: str, value: str):
"""Add equals condition of an element and value."""
if not element:
raise ValueError("condition element cannot be empty")
element = _trim_dollar(element)
if element in [
"success_action_redirect",
"redirect",
"content-length-range",
]:
raise ValueError(element + " is unsupported for equals condition")
if element in _RESERVED_ELEMENTS:
raise ValueError(element + " cannot be set")
self._conditions[_EQ][element] = value
[docs]
def remove_equals_condition(self, element: str):
"""Remove previously set equals condition of an element."""
if not element:
raise ValueError("condition element cannot be empty")
self._conditions[_EQ].pop(element)
[docs]
def add_starts_with_condition(self, element: str, value: str):
"""
Add starts-with condition of an element and value. Value set to empty
string does matching any content condition.
"""
if not element:
raise ValueError("condition element cannot be empty")
element = _trim_dollar(element)
if element in ["success_action_status", "content-length-range"] or (
element.startswith("x-amz-") and not element.startswith("x-amz-meta-")
):
raise ValueError(
f"{element} is unsupported for starts-with condition",
)
if element in _RESERVED_ELEMENTS:
raise ValueError(element + " cannot be set")
self._conditions[_STARTS_WITH][element] = value
[docs]
def remove_starts_with_condition(self, element: str):
"""Remove previously set starts-with condition of an element."""
if not element:
raise ValueError("condition element cannot be empty")
self._conditions[_STARTS_WITH].pop(element)
[docs]
def add_content_length_range_condition( # pylint: disable=invalid-name
self, lower_limit: int, upper_limit: int
):
"""Add content-length-range condition with lower and upper limits."""
if lower_limit < 0:
raise ValueError("lower limit cannot be negative number")
if upper_limit < 0:
raise ValueError("upper limit cannot be negative number")
if lower_limit > upper_limit:
raise ValueError("lower limit cannot be greater than upper limit")
self._lower_limit = lower_limit
self._upper_limit = upper_limit
# pylint: disable=invalid-name
[docs]
def remove_content_length_range_condition(self):
"""Remove previously set content-length-range condition."""
self._lower_limit = None
self._upper_limit = None
[docs]
def form_data(self, creds: Credentials, region: str) -> dict[str, Any]:
"""
Return form-data of this post policy. The returned dict contains
x-amz-algorithm, x-amz-credential, x-amz-security-token, x-amz-date,
policy and x-amz-signature.
"""
if not isinstance(creds, Credentials):
raise ValueError("credentials must be Credentials type")
if not region:
raise ValueError("region cannot be empty")
if (
"key" not in self._conditions[_EQ]
and "key" not in self._conditions[_STARTS_WITH]
):
raise ValueError("key condition must be set")
policy = OrderedDict()
policy["expiration"] = to_iso8601utc(self._expiration)
policy["conditions"] = [[_EQ, "$bucket", self._bucket_name]]
for cond_key, conditions in self._conditions.items():
for key, value in conditions.items():
policy["conditions"].append([cond_key, "$" + key, value])
if self._lower_limit is not None and self._upper_limit is not None:
policy["conditions"].append(
[
"content-length-range",
str(self._lower_limit),
str(self._upper_limit),
],
)
credential = get_credential_string(creds.access_key, utcnow(), region)
amz_date = to_amz_date(utcnow())
policy["conditions"].append([_EQ, "$x-amz-algorithm", _ALGORITHM])
policy["conditions"].append([_EQ, "$x-amz-credential", credential])
if creds.session_token:
policy["conditions"].append(
[_EQ, "$x-amz-security-token", creds.session_token],
)
policy["conditions"].append([_EQ, "$x-amz-date", amz_date])
policy_encoded = base64.b64encode(
json.dumps(policy).encode(),
).decode("utf-8")
signature = post_presign_v4(
policy_encoded,
creds.secret_key,
utcnow(),
region,
)
form_data = {
"x-amz-algorithm": _ALGORITHM,
"x-amz-credential": credential,
"x-amz-date": amz_date,
"policy": policy_encoded,
"x-amz-signature": signature,
}
if creds.session_token:
form_data["x-amz-security-token"] = creds.session_token
return form_data
@property
def bucket_name(self) -> str:
"""Get bucket name."""
return self._bucket_name
[docs]
def parse_copy_object(response_data: str) -> tuple[str, datetime | None]:
"""Parse CopyObject/UploadPartCopy response."""
element = ET.fromstring(response_data)
etag = cast(str, findtext(element, "ETag", True)).replace('"', "")
last_modified = findtext(element, "LastModified")
return etag, from_iso8601utc(last_modified) if last_modified else None
[docs]
class AsyncEventIterable:
"""Context manager friendly event iterable."""
def __init__(
self,
response: ClientResponse,
session: ClientSession | RetryClient,
):
self._response = response
self._session = session
def __aiter__(self):
return self
async def _get_records(self) -> dict | list | None:
"""Get event records from response stream."""
line = await self._response.content.readline()
if not line:
return None
event: dict = json.loads(line)
if event["Records"]:
return event
return None
async def __anext__(self) -> dict | list:
records = await self._get_records()
if not records:
raise StopAsyncIteration
return records
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, value, traceback):
self._response.close()
await self._session.close()
[docs]
class PeerSite:
"""Represents a cluster/site to be added to the set of replicated sites."""
def __init__(
self,
name: str,
endpoint: str,
access_key: str,
secret_key: str,
):
self._name = name
self._endpoint = endpoint
self._access_key = access_key
self._secret_key = secret_key
[docs]
def to_dict(self) -> dict[str, str]:
"""Convert to dictionary."""
return {
"name": self._name,
"endpoints": self._endpoint,
"accessKey": self._access_key,
"secretKey": self._secret_key,
}
[docs]
class SiteReplicationStatusOptions:
"""Represents site replication status options."""
ENTITY_TYPE = Enum(
"ENTITY_TYPE",
{
"BUCKET": "bucket",
"POLICY": "policy",
"USER": "user",
"GROUP": "group",
},
)
def __init__(self):
self._buckets = False
self._policies = False
self._users = False
self._groups = False
self._metrics = False
self._entity = None
self._entity_value = None
self._show_deleted = False
@property
def buckets(self) -> bool:
"""Get buckets."""
return self._buckets
@buckets.setter
def buckets(self, value: bool):
"""Set buckets."""
self._buckets = value
@property
def policies(self) -> bool:
"""Get policies."""
return self._policies
@policies.setter
def policies(self, value: bool):
"""Set policies."""
self._policies = value
@property
def users(self) -> bool:
"""Get users."""
return self._users
@users.setter
def users(self, value: bool):
"""Set users."""
self._users = value
@property
def groups(self) -> bool:
"""Get groups."""
return self._groups
@groups.setter
def groups(self, value: bool):
"""Set groups."""
self._groups = value
@property
def metrics(self) -> bool:
"""Get metrics."""
return self._metrics
@metrics.setter
def metrics(self, value: bool):
"""Set metrics."""
self._metrics = value
@property
def entity(self) -> str | None:
"""Get entity."""
return self._entity
@entity.setter
def entity(self, value: str):
"""Set entity."""
self._entity = value
@property
def entity_value(self) -> str | None:
"""Get entity value."""
return self._entity_value
@entity_value.setter
def entity_value(self, value: str):
"""Set entity value."""
self._entity_value = value
@property
def show_deleted(self) -> bool:
"""Get show deleted."""
return self._show_deleted
@show_deleted.setter
def show_deleted(self, value: bool):
"""Set show deleted."""
self._show_deleted = value
[docs]
def to_query_params(self) -> dict[str, str]:
"""Convert this options to query parameters."""
params = {
"buckets": str(self._buckets).lower(),
"policies": str(self._policies).lower(),
"users": str(self._users).lower(),
"groups": str(self._groups).lower(),
"metrics": str(self._metrics).lower(),
"showDeleted": str(self._show_deleted).lower(),
}
if self._entity and self._entity_value:
params["entityvalue"] = self._entity_value
params["entity"] = self._entity
return params
[docs]
class PeerInfo:
"""Site replication peer information."""
def __init__(
self,
deployment_id: str,
endpoint: str,
bucket_bandwidth_limit: str,
bucket_bandwidth_set: str,
):
self._deployment_id = deployment_id
self._endpoint = endpoint
self._name: str | None = None
self._sync_status: str | None = None
self._bucket_bandwidth_limit = bucket_bandwidth_limit
self._bucket_bandwidth_set = bucket_bandwidth_set
self._bucket_bandwidth_updated_at: datetime | None = None
@property
def deployment_id(self) -> str:
"""Get deployment ID."""
return self._deployment_id
@deployment_id.setter
def deployment_id(self, value: str):
"""Set deployment ID."""
self._deployment_id = value
@property
def endpoint(self) -> str:
"""Get endpoint."""
return self._endpoint
@endpoint.setter
def endpoint(self, value: str):
"""Set endpoint."""
self._endpoint = value
@property
def name(self) -> str | None:
"""Get name."""
return self._name
@name.setter
def name(self, value: str):
"""Set name."""
self._name = value
@property
def sync_status(self) -> str | None:
"""Get sync status."""
return self._sync_status
@sync_status.setter
def sync_status(self, value: str):
"""Set sync status."""
self._sync_status = value
@property
def bucket_bandwidth_limit(self) -> str:
"""Get bucket bandwidth limit."""
return self._bucket_bandwidth_limit
@bucket_bandwidth_limit.setter
def bucket_bandwidth_limit(self, value: str):
"""Set bucket bandwidth limit."""
self._bucket_bandwidth_limit = value
@property
def bucket_bandwidth_set(self) -> str:
"""Get bucket bandwidth set."""
return self._bucket_bandwidth_set
@bucket_bandwidth_set.setter
def bucket_bandwidth_set(self, value: str):
"""Set bucket bandwidth set."""
self._bucket_bandwidth_set = value
@property
def bucket_bandwidth_updated_at(self) -> datetime | None:
"""Get bucket bandwidth updated at."""
return self._bucket_bandwidth_updated_at
@bucket_bandwidth_updated_at.setter
def bucket_bandwidth_updated_at(self, value: datetime | None):
"""Set bucket bandwidth updated at."""
self._bucket_bandwidth_updated_at = value
[docs]
def to_dict(self):
"""Converts peer information to dictionary."""
data = {
"endpoint": self._endpoint,
"deploymentID": self._deployment_id,
"defaultbandwidth": {
"bandwidthLimitPerBucket": self._bucket_bandwidth_limit,
"set": self._bucket_bandwidth_set,
},
}
if self._name:
data["name"] = self._name
if self._sync_status is not None:
data["sync"] = "enable" if self._sync_status else "disable"
if self._bucket_bandwidth_updated_at:
data["defaultbandwidth"]["updatedAt"] = to_iso8601utc(
self._bucket_bandwidth_updated_at,
)
return data
[docs]
class ListObjects:
def __init__(
self,
client: Minio,
bucket_name: str,
prefix: str | None = None,
recursive: bool = False,
start_after: str | None = None,
include_user_meta: bool = False,
include_version: bool = False,
use_api_v1: bool = False,
use_url_encoding_type: bool = True,
fetch_owner: bool = False,
extra_headers: DictType | None = None,
extra_query_params: DictType | None = None,
):
self.client = client
self.bucket_name = bucket_name
self.prefix = prefix
self.recursive = recursive
self.start_after = start_after
self.include_user_meta = include_user_meta
self.include_version = include_version
self.use_api_v1 = use_api_v1
self.use_url_encoding_type = use_url_encoding_type
self.fetch_owner = fetch_owner
self.extra_headers = extra_headers
self.extra_query_params = extra_query_params
self.iterator: AsyncGenerator[Object] | None = None
[docs]
def gen_iterator(self) -> AsyncGenerator[Object]:
return self.client._list_objects(
self.bucket_name,
delimiter=None if self.recursive else "/",
include_user_meta=self.include_user_meta,
prefix=self.prefix,
start_after=self.start_after,
use_api_v1=self.use_api_v1,
include_version=self.include_version,
encoding_type="url" if self.use_url_encoding_type else None,
fetch_owner=self.fetch_owner,
extra_headers=self.extra_headers,
extra_query_params=self.extra_query_params,
)
def __aiter__(self):
self.iterator = self.gen_iterator()
return self
async def __anext__(self) -> Object:
if self.iterator is None:
self.gen_iterator()
try:
return await cast(AsyncGenerator, self.iterator).__anext__()
except StopAsyncIteration:
raise StopAsyncIteration
def __await__(self):
return self._collect_objects().__await__()
async def _collect_objects(self) -> List[Object]:
if self.iterator is None:
self.gen_iterator()
objects = []
async for object in self:
objects.append(object)
return objects
[docs]
class DeleteErrors:
def __init__(
self,
client: Minio,
bucket_name: str,
delete_object_list: Iterable[DeleteObject],
bypass_governance_mode: bool = False,
):
self.client = client
self.bucket_name = bucket_name
# turn list like objects into an iterator.
self.delete_object_list = itertools.chain(delete_object_list)
self.bypass_governance_mode = bypass_governance_mode
self.iterator: AsyncGenerator[DeleteError] | None = None
[docs]
async def gen_iterator(self) -> AsyncGenerator[DeleteError]:
check_bucket_name(self.bucket_name, s3_check=self.client._base_url.is_aws_host)
while True:
# get 1000 entries or whatever available.
objects = [
delete_object
for _, delete_object in zip(
range(1000),
self.delete_object_list,
)
]
if not objects:
return
result = await self.client._delete_objects(
self.bucket_name,
objects,
quiet=True,
bypass_governance_mode=self.bypass_governance_mode,
)
for error in result.error_list:
# AWS S3 returns "NoSuchVersion" error when
# version doesn't exist ignore this error
# yield all errors otherwise
if error.code != "NoSuchVersion":
yield error
def __aiter__(self):
self.iterator = self.gen_iterator()
return self
async def __anext__(self) -> DeleteError:
if self.iterator is None:
self.gen_iterator()
try:
return await cast(
AsyncGenerator[DeleteError, DeleteError], self.iterator
).__anext__()
except StopAsyncIteration:
raise StopAsyncIteration
def __await__(self):
return self._collect_errors().__await__()
async def _collect_errors(self) -> List[DeleteError]:
if self.iterator is None:
self.gen_iterator()
errors = []
async for error in self:
errors.append(error)
return errors