# -*- coding: utf-8 -*-
# Asynchronous MinIO Client SDK for Python
# (C) 2020 MinIO, Inc.
# (C) 2022 Huseyn Mashadiyev <mashadiyev.huseyn@gmail.com>
# (C) 2022 L-ING <hlf01@icloud.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# NOTICE: This file has been changed and differs from the original
# Author: L-ING
# Date: 2022-07-11
"""Request/response of SelectObjectContent API."""
from __future__ import absolute_import, annotations
from abc import ABCMeta
from binascii import crc32
from io import BytesIO
from typing import AsyncGenerator, Iterable, SupportsBytes, SupportsIndex
from xml.etree import ElementTree as ET
from aiohttp import ClientResponse, ClientSession, StreamReader
from aiohttp_retry import RetryClient
from typing_extensions import Buffer
from .error import MinioException
from .xml import Element, SubElement, findtext
ReadableBuffer = Buffer
COMPRESSION_TYPE_NONE = "NONE"
COMPRESSION_TYPE_GZIP = "GZIP"
COMPRESSION_TYPE_BZIP2 = "BZIP2"
FILE_HEADER_INFO_USE = "USE"
FILE_HEADER_INFO_IGNORE = "IGNORE"
FILE_HEADER_INFO_NONE = "NONE"
JSON_TYPE_DOCUMENT = "DOCUMENT"
JSON_TYPE_LINES = "LINES"
QUOTE_FIELDS_ALWAYS = "ALWAYS"
QUOTE_FIELDS_ASNEEDED = "ASNEEDED"
[docs]
class CSVOutputSerialization:
"""CSV output serialization."""
def __init__(
self,
field_delimiter: str | None = None,
quote_character: str | None = None,
quote_escape_character: str | None = None,
quote_fields: str | None = None,
record_delimiter: str | None = None,
):
self._field_delimiter = field_delimiter
self._quote_character = quote_character
self._quote_escape_character = quote_escape_character
if quote_fields is not None and quote_fields not in [
QUOTE_FIELDS_ALWAYS,
QUOTE_FIELDS_ASNEEDED,
]:
raise ValueError(
"quote fields must be {0} or {1}".format(
QUOTE_FIELDS_ALWAYS,
QUOTE_FIELDS_ASNEEDED,
),
)
self._quote_fields = quote_fields
self._record_delimiter = record_delimiter
[docs]
def toxml(self, element: ET.Element | None):
"""Convert to XML."""
if element is None:
raise ValueError("element must be provided")
element = SubElement(element, "CSV")
if self._field_delimiter is not None:
SubElement(element, "FieldDelimiter", self._field_delimiter)
if self._quote_character is not None:
SubElement(element, "QuoteCharacter", self._quote_character)
if self._quote_escape_character is not None:
SubElement(
element,
"QuoteEscapeCharacter",
self._quote_escape_character,
)
if self._quote_fields is not None:
SubElement(element, "QuoteFields", self._quote_fields)
if self._record_delimiter is not None:
SubElement(element, "RecordDelimiter", self._record_delimiter)
[docs]
class JSONOutputSerialization:
"""JSON output serialization."""
def __init__(self, record_delimiter: str | None = None):
self._record_delimiter = record_delimiter
[docs]
def toxml(self, element: ET.Element | None):
"""Convert to XML."""
if element is None:
raise ValueError("element must be provided")
element = SubElement(element, "JSON")
if self._record_delimiter is not None:
SubElement(element, "RecordDelimiter", self._record_delimiter)
[docs]
class SelectRequest:
"""Select object content request."""
def __init__(
self,
expression: str,
input_serialization: (
CSVInputSerialization | JSONInputSerialization | ParquetInputSerialization
),
output_serialization: CSVOutputSerialization | JSONOutputSerialization,
request_progress: bool = False,
scan_start_range: str | None = None,
scan_end_range: str | None = None,
):
self._expession = expression
if not isinstance(
input_serialization,
(
CSVInputSerialization,
JSONInputSerialization,
ParquetInputSerialization,
),
):
raise ValueError(
"input serialization must be CSVInputSerialization, "
"JSONInputSerialization or ParquetInputSerialization type",
)
self._input_serialization = input_serialization
if not isinstance(
output_serialization,
(CSVOutputSerialization, JSONOutputSerialization),
):
raise ValueError(
"output serialization must be CSVOutputSerialization or "
"JSONOutputSerialization type",
)
self._output_serialization = output_serialization
self._request_progress = request_progress
self._scan_start_range = scan_start_range
self._scan_end_range = scan_end_range
[docs]
def toxml(self, element: ET.Element | None) -> ET.Element:
"""Convert to XML."""
element = Element("SelectObjectContentRequest")
SubElement(element, "Expression", self._expession)
SubElement(element, "ExpressionType", "SQL")
self._input_serialization.toxml(
SubElement(element, "InputSerialization"),
)
self._output_serialization.toxml(
SubElement(element, "OutputSerialization"),
)
if self._request_progress:
SubElement(
SubElement(element, "RequestProgress"),
"Enabled",
"true",
)
if self._scan_start_range or self._scan_end_range:
tag = SubElement(element, "ScanRange")
if self._scan_start_range:
SubElement(tag, "Start", self._scan_start_range)
if self._scan_end_range:
SubElement(tag, "End", self._scan_end_range)
return element
async def _read(reader: StreamReader, size: int) -> bytes:
"""Wrapper to RawIOBase.read() to error out on short reads."""
data = await reader.read(size)
if len(data) != size:
raise IOError("insufficient data")
return data
def _int(data: Iterable[SupportsIndex] | SupportsBytes | ReadableBuffer) -> int:
"""Convert byte data to big-endian int."""
return int.from_bytes(data, byteorder="big")
def _crc32(data: ReadableBuffer) -> int:
"""Wrapper to binascii.crc32()."""
return crc32(data) & 0xFFFFFFFF
def _decode_header(data: ReadableBuffer) -> dict[str, str]:
"""Decode header data."""
reader = BytesIO(data)
headers = {}
while True:
length = reader.read(1)
if not length:
break
name = reader.read(_int(length))
if _int(reader.read(1)) != 7:
raise IOError("header value type is not 7")
value = reader.read(_int(reader.read(2)))
headers[name.decode()] = value.decode()
return headers
[docs]
class Stats:
"""Progress/Stats information."""
def __init__(self, data: bytes):
element = ET.fromstring(data.decode())
self._bytes_scanned = findtext(element, "BytesScanned")
self._bytes_processed = findtext(element, "BytesProcessed")
self._bytes_returned = findtext(element, "BytesReturned")
@property
def bytes_scanned(self) -> str | None:
"""Get bytes scanned."""
return self._bytes_scanned
@property
def bytes_processed(self) -> str | None:
"""Get bytes processed."""
return self._bytes_processed
@property
def bytes_returned(self) -> str | None:
"""Get bytes returned."""
return self._bytes_returned
[docs]
class SelectObjectReader:
"""
BufferedIOBase compatible reader represents response data of
Minio.select_object_content() API.
"""
def __init__(self, response: ClientResponse, session: ClientSession | RetryClient):
self._response = response
self._session = session
self._stats: Stats | None = None
self._payload: bytes | None = None
def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_value, exc_traceback):
return await self.close()
[docs]
def readable(self) -> bool: # pylint: disable=no-self-use
"""Return this is readable."""
return True
[docs]
def writeable(self) -> bool: # pylint: disable=no-self-use
"""Return this is not writeable."""
return False
[docs]
async def close(self):
"""Close response and release network resources."""
self._response.close()
await self._response.release()
await self._session.close()
[docs]
def stats(self) -> Stats | None:
"""Get stats information."""
return self._stats
async def _read(self) -> int:
"""Read and decode response."""
if self._response.closed:
return 0
prelude = await _read(self._response.content, 8)
prelude_crc = await _read(self._response.content, 4)
if _crc32(prelude) != _int(prelude_crc):
raise IOError(
f"prelude CRC mismatch; expected: {_crc32(prelude)}, "
f"got: {_int(prelude_crc)}"
)
total_length = _int(prelude[:4])
data = await _read(self._response.content, total_length - 8 - 4 - 4)
message_crc = _int(await _read(self._response.content, 4))
if _crc32(prelude + prelude_crc + data) != message_crc:
raise IOError(
f"message CRC mismatch; "
f"expected: {_crc32(prelude + prelude_crc + data)}, "
f"got: {message_crc}"
)
header_length = _int(prelude[4:])
headers = _decode_header(data[:header_length])
if headers.get(":message-type") == "error":
raise MinioException(
f"{headers.get(':error-code')}: " f"{headers.get(':error-message')}"
)
if headers.get(":event-type") == "End":
return 0
payload_length = total_length - header_length - 16
if headers.get(":event-type") == "Cont" or payload_length < 1:
return await self._read()
payload = data[header_length : header_length + payload_length]
if headers.get(":event-type") in ["Progress", "Stats"]:
self._stats = Stats(payload)
return await self._read()
if headers.get(":event-type") == "Records":
self._payload = payload
return len(payload)
raise MinioException(
f"unknown event-type {headers.get(':event-type')}",
)
[docs]
async def stream(self, num_bytes: int = 32 * 1024) -> AsyncGenerator[bytes]:
"""
Stream extracted payload from response data. Upon completion, caller
should `call self.close()` to release network resources.
"""
while True:
if self._payload:
result = self._payload
if num_bytes < len(self._payload):
result = self._payload[:num_bytes]
self._payload = self._payload[len(result) :]
yield result
if await self._read() <= 0:
await self.close()
break