# -*- coding: utf-8 -*-
# Asynchronous MinIO Client SDK for Python
# (C) 2020 MinIO, Inc.
# (C) 2022 Huseyn Mashadiyev <mashadiyev.huseyn@gmail.com>
# (C) 2022 L-ING <hlf01@icloud.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Request/response of PutBucketReplication and GetBucketReplication APIs."""
from __future__ import absolute_import, annotations
from abc import ABCMeta
from typing import Type, TypeVar, cast
from xml.etree import ElementTree as ET
from .commonconfig import DISABLED, BaseRule, Filter, check_status
from .xml import Element, SubElement, find, findall, findtext
A = TypeVar("A", bound="Status")
[docs]
class Status:
"""Status."""
__metaclass__ = ABCMeta
def __init__(self, status: str):
check_status(status)
self._status = status
@property
def status(self) -> str:
"""Get status."""
return self._status
[docs]
@classmethod
def fromxml(cls: Type[A], element: ET.Element) -> A:
"""Create new object with values from XML element."""
element = cast(ET.Element, find(element, cls.__name__, True))
status = cast(str, findtext(element, "Status", True))
return cls(status)
[docs]
def toxml(self, element: ET.Element | None) -> ET.Element:
"""Convert to XML."""
if element is None:
raise ValueError("element must be provided")
element = SubElement(element, self.__class__.__name__)
SubElement(element, "Status", self._status)
return element
[docs]
class SseKmsEncryptedObjects(Status):
"""SSE KMS encrypted objects."""
B = TypeVar("B", bound="SourceSelectionCriteria")
[docs]
class SourceSelectionCriteria:
"""Source selection criteria."""
def __init__(
self,
sse_kms_encrypted_objects: SseKmsEncryptedObjects | None = None,
):
self._sse_kms_encrypted_objects = sse_kms_encrypted_objects
@property
def sse_kms_encrypted_objects(self) -> SseKmsEncryptedObjects | None:
"""Get SSE KMS encrypted objects."""
return self._sse_kms_encrypted_objects
[docs]
@classmethod
def fromxml(cls: Type[B], element: ET.Element) -> B:
"""Create new object with values from XML element."""
element = cast(
ET.Element,
find(element, "SourceSelectionCriteria", True),
)
return cls(
None
if find(element, "SseKmsEncryptedObjects") is None
else SseKmsEncryptedObjects.fromxml(element)
)
[docs]
def toxml(self, element: ET.Element | None) -> ET.Element:
"""Convert to XML."""
if element is None:
raise ValueError("element must be provided")
element = SubElement(element, "SourceSelectionCriteria")
if self._sse_kms_encrypted_objects:
self._sse_kms_encrypted_objects.toxml(element)
return element
[docs]
class ExistingObjectReplication(Status):
"""Existing object replication."""
[docs]
class DeleteMarkerReplication(Status):
"""Delete marker replication."""
def __init__(self, status=DISABLED):
super().__init__(status)
C = TypeVar("C", bound="ReplicationTimeValue")
[docs]
class ReplicationTimeValue:
"""Replication time value."""
__metaclass__ = ABCMeta
def __init__(self, minutes: None | int = 15):
self._minutes = minutes
@property
def minutes(self) -> int | None:
"""Get minutes."""
return self._minutes
[docs]
@classmethod
def fromxml(cls: Type[C], element: ET.Element) -> C:
"""Create new object with values from XML element."""
element = cast(ET.Element, find(element, cls.__name__, True))
minutes = findtext(element, "Minutes")
return cls(int(minutes) if minutes else None)
[docs]
def toxml(self, element: ET.Element | None) -> ET.Element:
"""Convert to XML."""
if element is None:
raise ValueError("element must be provided")
element = SubElement(element, self.__class__.__name__)
if self._minutes is not None:
SubElement(element, "Minutes", str(self._minutes))
return element
[docs]
class Time(ReplicationTimeValue):
"""Time."""
D = TypeVar("D", bound="ReplicationTime")
[docs]
class ReplicationTime:
"""Replication time."""
def __init__(self, time: Time, status: str):
if not time:
raise ValueError("time must be provided")
check_status(status)
self._time = time
self._status = status
@property
def time(self) -> Time:
"""Get time value."""
return self._time
@property
def status(self) -> str:
"""Get status."""
return self._status
[docs]
@classmethod
def fromxml(cls: Type[D], element: ET.Element) -> D:
"""Create new object with values from XML element."""
element = cast(ET.Element, find(element, "ReplicationTime", True))
time = Time.fromxml(element)
status = cast(str, findtext(element, "Status", True))
return cls(time, status)
[docs]
def toxml(self, element: ET.Element | None) -> ET.Element:
"""Convert to XML."""
if element is None:
raise ValueError("element must be provided")
element = SubElement(element, "ReplicationTime")
self._time.toxml(element)
SubElement(element, "Status", self._status)
return element
[docs]
class EventThreshold(ReplicationTimeValue):
"""Event threshold."""
E = TypeVar("E", bound="Metrics")
[docs]
class Metrics:
"""Metrics."""
def __init__(self, event_threshold: EventThreshold, status: str):
if not event_threshold:
raise ValueError("event threshold must be provided")
check_status(status)
self._event_threshold = event_threshold
self._status = status
@property
def event_threshold(self) -> EventThreshold:
"""Get event threshold."""
return self._event_threshold
@property
def status(self) -> str:
"""Get status."""
return self._status
[docs]
@classmethod
def fromxml(cls: Type[E], element: ET.Element) -> E:
"""Create new object with values from XML element."""
element = cast(ET.Element, find(element, "Metrics", True))
event_threshold = EventThreshold.fromxml(element)
status = cast(str, findtext(element, "Status", True))
return cls(event_threshold, status)
[docs]
def toxml(self, element: ET.Element | None) -> ET.Element:
"""Convert to XML."""
if element is None:
raise ValueError("element must be provided")
element = SubElement(element, "Metrics")
self._event_threshold.toxml(element)
SubElement(element, "Status", self._status)
return element
F = TypeVar("F", bound="EncryptionConfig")
[docs]
class EncryptionConfig:
"""Encryption configuration."""
def __init__(self, replica_kms_key_id: str | None = None):
self._replica_kms_key_id = replica_kms_key_id
@property
def replica_kms_key_id(self) -> str | None:
"""Get replica KMS key ID."""
return self._replica_kms_key_id
[docs]
@classmethod
def fromxml(cls: Type[F], element: ET.Element) -> F:
"""Create new object with values from XML element."""
element = cast(
ET.Element,
find(element, "EncryptionConfiguration", True),
)
return cls(findtext(element, "ReplicaKmsKeyID"))
[docs]
def toxml(self, element: ET.Element | None) -> ET.Element:
"""Convert to XML."""
if element is None:
raise ValueError("element must be provided")
element = SubElement(element, "EncryptionConfiguration")
SubElement(element, "ReplicaKmsKeyID", self._replica_kms_key_id)
return element
G = TypeVar("G", bound="AccessControlTranslation")
[docs]
class AccessControlTranslation:
"""Access control translation."""
def __init__(self, owner: str = "Destination"):
if not owner:
raise ValueError("owner must be provided")
self._owner = owner
@property
def owner(self) -> str:
"""Get owner."""
return self._owner
[docs]
@classmethod
def fromxml(cls: Type[G], element: ET.Element) -> G:
"""Create new object with values from XML element."""
element = cast(
ET.Element,
find(element, "AccessControlTranslation", True),
)
owner = cast(str, findtext(element, "Owner", True))
return cls(owner)
[docs]
def toxml(self, element: ET.Element | None) -> ET.Element:
"""Convert to XML."""
if element is None:
raise ValueError("element must be provided")
element = SubElement(element, "AccessControlTranslation")
SubElement(element, "Owner", self._owner)
return element
H = TypeVar("H", bound="Destination")
[docs]
class Destination:
"""Replication destination."""
def __init__(
self,
bucket_arn: str,
access_control_translation: AccessControlTranslation | None = None,
account: str | None = None,
encryption_config: EncryptionConfig | None = None,
metrics: Metrics | None = None,
replication_time: ReplicationTime | None = None,
storage_class: str | None = None,
):
if not bucket_arn:
raise ValueError("bucket ARN must be provided")
self._bucket_arn = bucket_arn
self._access_control_translation = access_control_translation
self._account = account
self._encryption_config = encryption_config
self._metrics = metrics
self._replication_time = replication_time
self._storage_class = storage_class
@property
def bucket_arn(self) -> str:
"""Get bucket ARN."""
return self._bucket_arn
@property
def access_control_translation(self) -> AccessControlTranslation | None:
"""Get access control translation."""
return self._access_control_translation
@property
def account(self) -> str | None:
"""Get account."""
return self._account
@property
def encryption_config(self) -> EncryptionConfig | None:
"""Get encryption configuration."""
return self._encryption_config
@property
def metrics(self) -> Metrics | None:
"""Get metrics."""
return self._metrics
@property
def replication_time(self) -> ReplicationTime | None:
"""Get replication time."""
return self._replication_time
@property
def storage_class(self) -> str | None:
"""Get storage class."""
return self._storage_class
[docs]
@classmethod
def fromxml(cls: Type[H], element: ET.Element) -> H:
"""Create new object with values from XML element."""
element = cast(ET.Element, find(element, "Destination", True))
access_control_translation = (
None
if find(element, "AccessControlTranslation") is None
else AccessControlTranslation.fromxml(element)
)
account = findtext(element, "Account")
bucket_arn = cast(str, findtext(element, "Bucket", True))
encryption_config = (
None
if find(element, "EncryptionConfiguration") is None
else EncryptionConfig.fromxml(element)
)
metrics = None if find(element, "Metrics") is None else Metrics.fromxml(element)
replication_time = (
None
if find(element, "ReplicationTime") is None
else ReplicationTime.fromxml(element)
)
storage_class = findtext(element, "StorageClass")
return cls(
bucket_arn,
access_control_translation,
account,
encryption_config,
metrics,
replication_time,
storage_class,
)
[docs]
def toxml(self, element: ET.Element | None) -> ET.Element:
"""Convert to XML."""
if element is None:
raise ValueError("element must be provided")
element = SubElement(element, "Destination")
if self._access_control_translation:
self._access_control_translation.toxml(element)
if self._account is not None:
SubElement(element, "Account", self._account)
SubElement(element, "Bucket", self._bucket_arn)
if self._encryption_config:
self._encryption_config.toxml(element)
if self._metrics:
self._metrics.toxml(element)
if self._replication_time:
self._replication_time.toxml(element)
if self._storage_class:
SubElement(element, "StorageClass", self._storage_class)
return element
I = TypeVar("I", bound="Rule")
[docs]
class Rule(BaseRule):
"""Replication rule."""
def __init__(
self,
destination: Destination,
status: str,
delete_marker_replication: DeleteMarkerReplication | None = None,
existing_object_replication: ExistingObjectReplication | None = None,
rule_filter: Filter | None = None,
rule_id: str | None = None,
prefix: str | None = None,
priority: int | None = None,
source_selection_criteria: SourceSelectionCriteria | None = None,
):
if not destination:
raise ValueError("destination must be provided")
check_status(status)
super().__init__(rule_filter, rule_id)
self._destination = destination
self._status = status
if rule_filter and not delete_marker_replication:
delete_marker_replication = DeleteMarkerReplication()
self._delete_marker_replication = delete_marker_replication
self._existing_object_replication = existing_object_replication
self._prefix = prefix
self._priority = priority
self._source_selection_criteria = source_selection_criteria
@property
def destination(self) -> Destination:
"""Get destination."""
return self._destination
@property
def status(self) -> str:
"""Get status."""
return self._status
@property
def delete_marker_replication(self) -> DeleteMarkerReplication | None:
"""Get delete marker replication."""
return self._delete_marker_replication
@property
def existing_object_replication(self) -> ExistingObjectReplication | None:
"""Get existing object replication."""
return self._existing_object_replication
@property
def prefix(self) -> str | None:
"""Get prefix."""
return self._prefix
@property
def priority(self) -> int | None:
"""Get priority."""
return self._priority
@property
def source_selection_criteria(self) -> SourceSelectionCriteria | None:
"""Get source selection criteria."""
return self._source_selection_criteria
[docs]
@classmethod
def fromxml(cls: Type[I], element: ET.Element) -> I:
"""Create new object with values from XML element."""
delete_marker_replication = (
None
if find(element, "DeleteMarkerReplication") is None
else DeleteMarkerReplication.fromxml(element)
)
destination = Destination.fromxml(element)
existing_object_replication = (
None
if find(element, "ExistingObjectReplication") is None
else ExistingObjectReplication.fromxml(element)
)
rule_filter, rule_id = cls.parsexml(element)
prefix = findtext(element, "Prefix")
priority = findtext(element, "Priority")
source_selection_criteria = (
None
if find(element, "SourceSelectionCriteria") is None
else SourceSelectionCriteria.fromxml(element)
)
status = cast(str, findtext(element, "Status", True))
return cls(
destination,
status,
delete_marker_replication,
existing_object_replication,
rule_filter,
rule_id,
prefix,
int(priority) if priority else None,
source_selection_criteria,
)
[docs]
def toxml(self, element: ET.Element | None) -> ET.Element:
"""Convert to XML."""
if element is None:
raise ValueError("element must be provided")
element = SubElement(element, "Rule")
if self._delete_marker_replication:
self._delete_marker_replication.toxml(element)
self._destination.toxml(element)
if self._existing_object_replication:
self._existing_object_replication.toxml(element)
super().toxml(element)
if self._prefix is not None:
SubElement(element, "Prefix", self._prefix)
if self._priority is not None:
SubElement(element, "Priority", str(self._priority))
if self._source_selection_criteria:
self._source_selection_criteria.toxml(element)
SubElement(element, "Status", self._status)
return element
J = TypeVar("J", bound="ReplicationConfig")
[docs]
class ReplicationConfig:
"""Replication configuration."""
def __init__(self, role: str, rules: list[Rule]):
if not rules:
raise ValueError("rules must be provided")
if len(rules) > 1000:
raise ValueError("more than 1000 rules are not supported")
self._role = role
self._rules = rules
@property
def role(self) -> str:
"""Get role."""
return self._role
@property
def rules(self) -> list[Rule]:
"""Get rules."""
return self._rules
[docs]
@classmethod
def fromxml(cls: Type[J], element: ET.Element) -> J:
"""Create new object with values from XML element."""
role = cast(str, findtext(element, "Role", True))
elements = findall(element, "Rule")
rules = []
for tag in elements:
rules.append(Rule.fromxml(tag))
return cls(role, rules)
[docs]
def toxml(self, element: ET.Element | None) -> ET.Element:
"""Convert to XML."""
element = Element("ReplicationConfiguration")
SubElement(element, "Role", self._role)
for rule in self._rules:
rule.toxml(element)
return element