# -*- coding: utf-8 -*-
# Asynchronous MinIO Client SDK for Python
# (C) 2020 MinIO, Inc.
# (C) 2022 Huseyn Mashadiyev <mashadiyev.huseyn@gmail.com>
# (C) 2022 L-ING <hlf01@icloud.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# NOTICE: This file has been changed and differs from the original
# Author: L-ING
# Date: 2022-07-11
"""Common request/response configuration of S3 APIs."""
# pylint: disable=invalid-name
from __future__ import absolute_import, annotations
from abc import ABCMeta
from datetime import datetime
from typing import IO, Type, TypeVar, cast
from xml.etree import ElementTree as ET
from .error import MinioException
from .helpers import quote
from .sse import SseCustomerKey
from .time import to_http_header
from .xml import SubElement, find, findall, findtext
COPY = "COPY"
REPLACE = "REPLACE"
DISABLED = "Disabled"
ENABLED = "Enabled"
GOVERNANCE = "GOVERNANCE"
COMPLIANCE = "COMPLIANCE"
_MAX_KEY_LENGTH = 128
_MAX_VALUE_LENGTH = 256
_MAX_OBJECT_TAG_COUNT = 10
_MAX_TAG_COUNT = 50
A = TypeVar("A", bound="Tags")
B = TypeVar("B", bound="Tag")
[docs]
class Tag:
"""Tag."""
def __init__(self, key: str, value: str):
if not key:
raise ValueError("key must be provided")
if value is None:
raise ValueError("value must be provided")
self._key = key
self._value = value
@property
def key(self) -> str:
"""Get key."""
return self._key
@property
def value(self) -> str:
"""Get value."""
return self._value
[docs]
@classmethod
def fromxml(cls: Type[B], element: ET.Element) -> B:
"""Create new object with values from XML element."""
element = cast(ET.Element, find(element, "Tag", True))
key = cast(str, findtext(element, "Key", True))
value = cast(str, findtext(element, "Value", True))
return cls(key, value)
[docs]
def toxml(self, element: ET.Element | None) -> ET.Element:
"""Convert to XML."""
if element is None:
raise ValueError("element must be provided")
element = SubElement(element, "Tag")
SubElement(element, "Key", self._key)
SubElement(element, "Value", self._value)
return element
C = TypeVar("C", bound="AndOperator")
[docs]
class AndOperator:
"""AND operator."""
def __init__(self, prefix: str | None = None, tags: Tags | None = None):
if prefix is None and not tags:
raise ValueError("at least prefix or tags must be provided")
self._prefix = prefix
self._tags = tags
@property
def prefix(self) -> str | None:
"""Get prefix."""
return self._prefix
@property
def tags(self) -> Tags | None:
"""Get tags."""
return self._tags
[docs]
@classmethod
def fromxml(cls: Type[C], element: ET.Element) -> C:
"""Create new object with values from XML element."""
element = cast(ET.Element, find(element, "And", True))
prefix = findtext(element, "Prefix")
tags = None if find(element, "Tag") is None else Tags.fromxml(element)
return cls(prefix, tags)
[docs]
def toxml(self, element: ET.Element | None) -> ET.Element:
"""Convert to XML."""
if element is None:
raise ValueError("element must be provided")
element = SubElement(element, "And")
if self._prefix is not None:
SubElement(element, "Prefix", self._prefix)
if self._tags is not None:
self._tags.toxml(element)
return element
D = TypeVar("D", bound="Filter")
[docs]
class Filter:
"""Lifecycle rule filter."""
def __init__(
self,
and_operator: AndOperator | None = None,
prefix: str | None = None,
tag: Tag | None = None,
):
valid = (and_operator is not None) ^ (prefix is not None) ^ (tag is not None)
if not valid:
raise ValueError("only one of and, prefix or tag must be provided")
self._and_operator = and_operator
self._prefix = prefix
self._tag = tag
@property
def and_operator(self) -> AndOperator | None:
"""Get AND operator."""
return self._and_operator
@property
def prefix(self) -> str | None:
"""Get prefix."""
return self._prefix
@property
def tag(self) -> Tag | None:
"""Get tag."""
return self._tag
[docs]
@classmethod
def fromxml(cls: Type[D], element: ET.Element) -> D:
"""Create new object with values from XML element."""
element = cast(ET.Element, find(element, "Filter", True))
and_operator = (
None if find(element, "And") is None else AndOperator.fromxml(element)
)
prefix = findtext(element, "Prefix")
tag = None if find(element, "Tag") is None else Tag.fromxml(element)
return cls(and_operator, prefix, tag)
[docs]
def toxml(self, element: ET.Element | None) -> ET.Element:
"""Convert to XML."""
if element is None:
raise ValueError("element must be provided")
element = SubElement(element, "Filter")
if self._and_operator:
self._and_operator.toxml(element)
if self._prefix is not None:
SubElement(element, "Prefix", self._prefix)
if self._tag is not None:
self._tag.toxml(element)
return element
[docs]
class BaseRule:
"""Base rule class for Replication and Lifecycle."""
__metaclass__ = ABCMeta
def __init__(
self,
rule_filter: Filter | None = None,
rule_id: str | None = None,
):
if rule_id is not None:
rule_id = rule_id.strip()
if not rule_id:
raise ValueError("rule ID must be non-empty string")
if len(rule_id) > 255:
raise ValueError("rule ID must not exceed 255 characters")
self._rule_filter = rule_filter
self._rule_id = rule_id
@property
def rule_filter(self) -> Filter | None:
"""Get replication rule filter."""
return self._rule_filter
@property
def rule_id(self) -> str | None:
"""Get rule ID."""
return self._rule_id
[docs]
@staticmethod
def parsexml(element: ET.Element) -> tuple[Filter | None, str | None]:
"""Parse XML and return filter and ID."""
return (
None if find(element, "Filter") is None else Filter.fromxml(element)
), findtext(element, "ID")
[docs]
def toxml(self, element: ET.Element | None) -> ET.Element:
"""Convert to XML."""
if element is None:
raise ValueError("element must be provided")
if self._rule_filter:
self._rule_filter.toxml(element)
if self._rule_id is not None:
SubElement(element, "ID", self._rule_id)
return element
[docs]
def check_status(status: str):
"""Validate status."""
if status not in [ENABLED, DISABLED]:
raise ValueError("status must be 'Enabled' or 'Disabled'")
[docs]
class ObjectConditionalReadArgs:
"""Base argument class holds condition properties for reading object."""
__metaclass__ = ABCMeta
def __init__(
self,
bucket_name: str,
object_name: str,
region: str | None = None,
version_id: str | None = None,
ssec: SseCustomerKey | None = None,
offset: int | None = None,
length: int | None = None,
match_etag: str | None = None,
not_match_etag: str | None = None,
modified_since: datetime | None = None,
unmodified_since: datetime | None = None,
):
if ssec is not None and not isinstance(ssec, SseCustomerKey):
raise ValueError("ssec must be SseCustomerKey type")
if offset is not None and offset < 0:
raise ValueError("offset should be zero or greater")
if length is not None and length <= 0:
raise ValueError("length should be greater than zero")
if match_etag is not None and match_etag == "":
raise ValueError("match_etag must not be empty")
if not_match_etag is not None and not_match_etag == "":
raise ValueError("not_match_etag must not be empty")
if modified_since is not None and not isinstance(modified_since, datetime):
raise ValueError("modified_since must be datetime.datetime type")
if unmodified_since is not None and not isinstance(unmodified_since, datetime):
raise ValueError("unmodified_since must be datetime.datetime type")
self._bucket_name = bucket_name
self._object_name = object_name
self._region = region
self._version_id = version_id
self._ssec = ssec
self._offset = offset
self._length = length
self._match_etag = match_etag
self._not_match_etag = not_match_etag
self._modified_since = modified_since
self._unmodified_since = unmodified_since
@property
def bucket_name(self) -> str:
"""Get bucket name."""
return self._bucket_name
@property
def object_name(self) -> str:
"""Get object name."""
return self._object_name
@property
def region(self) -> str | None:
"""Get region."""
return self._region
@property
def version_id(self) -> str | None:
"""Get version ID."""
return self._version_id
@property
def ssec(self) -> SseCustomerKey | None:
"""Get SSE-C."""
return self._ssec
@property
def offset(self) -> int | None:
"""Get offset."""
return self._offset
@property
def length(self) -> int | None:
"""Get length."""
return self._length
@property
def match_etag(self) -> str | None:
"""Get match ETag condition."""
return self._match_etag
@property
def not_match_etag(self) -> str | None:
"""Get not-match ETag condition."""
return self._not_match_etag
@property
def modified_since(self) -> datetime | None:
"""Get modified since condition."""
return self._modified_since
@property
def unmodified_since(self) -> datetime | None:
"""Get unmodified since condition."""
return self._unmodified_since
E = TypeVar("E", bound="CopySource")
[docs]
class CopySource(ObjectConditionalReadArgs):
"""A source object definition for copy_object method."""
[docs]
@classmethod
def of(cls: Type[E], src: ObjectConditionalReadArgs) -> E:
"""Create CopySource from another source."""
return cls(
src.bucket_name,
src.object_name,
src.region,
src.version_id,
src.ssec,
src.offset,
src.length,
src.match_etag,
src.not_match_etag,
src.modified_since,
src.unmodified_since,
)
F = TypeVar("F", bound="ComposeSource")
[docs]
class ComposeSource(ObjectConditionalReadArgs):
"""A source object definition for compose_object method."""
def __init__(
self,
bucket_name: str,
object_name: str,
region: str | None = None,
version_id: str | None = None,
ssec: SseCustomerKey | None = None,
offset: int | None = None,
length: int | None = None,
match_etag: str | None = None,
not_match_etag: str | None = None,
modified_since: datetime | None = None,
unmodified_since: datetime | None = None,
):
super().__init__(
bucket_name,
object_name,
region,
version_id,
ssec,
offset,
length,
match_etag,
not_match_etag,
modified_since,
unmodified_since,
)
self._object_size: int | None = None
self._headers: dict[str, str] | None = None
def _validate_size(self, object_size: int):
"""Validate object size with offset and length."""
def make_error(name, value):
ver = ("?versionId=" + self._version_id) if self._version_id else ""
return ValueError(
f"Source {self._bucket_name}/{self._object_name}{ver}: "
f"{name} {value} is beyond object size {object_size}"
)
if self._offset is not None and self._offset >= object_size:
raise make_error("offset", self._offset)
if self._length is not None:
if self._length > object_size:
raise make_error("length", self._length)
offset = self._offset or 0
if offset + self._length > object_size:
raise make_error("compose size", offset + self._length)
@property
def object_size(self) -> int | None:
"""Get object size."""
if self._object_size is None:
raise MinioException(
"build_headers() must be called prior to " "this method invocation",
)
return self._object_size
@property
def headers(self) -> dict[str, str]:
"""Get headers."""
if self._headers is None:
raise MinioException(
"build_headers() must be called prior to " "this method invocation",
)
return self._headers.copy()
[docs]
@classmethod
def of(cls: Type[F], src: ObjectConditionalReadArgs) -> F:
"""Create ComposeSource from another source."""
return cls(
src.bucket_name,
src.object_name,
src.region,
src.version_id,
src.ssec,
src.offset,
src.length,
src.match_etag,
src.not_match_etag,
src.modified_since,
src.unmodified_since,
)
[docs]
class SnowballObject:
"""A source object definition for upload_snowball_objects method."""
def __init__(
self,
object_name: str,
filename: str | None = None,
data: IO[bytes] | None = None,
length: int | None = None,
mod_time: datetime | None = None,
):
self._object_name = object_name
if (filename is not None) ^ (data is not None):
self._filename = filename
self._data = data
self._length = length
else:
raise ValueError("only one of filename or data must be provided")
if data is not None and length is None:
raise ValueError("length must be provided for data")
if mod_time is not None and not isinstance(mod_time, datetime):
raise ValueError("mod_time must be datetime type")
self._mod_time = mod_time
@property
def object_name(self) -> str:
"""Get object name."""
return self._object_name
@property
def filename(self) -> str | None:
"""Get filename."""
return self._filename
@property
def data(self) -> IO[bytes] | None:
"""Get data."""
return self._data
@property
def length(self) -> int | None:
"""Get length."""
return self._length
@property
def mod_time(self) -> datetime | None:
"""Get modification time."""
return self._mod_time