# -*- coding: utf-8 -*-
# MinIO Python Library for Amazon S3 Compatible Cloud Storage, (C)
# 2015, 2016, 2017 MinIO, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Cryptography to read and write encrypted MinIO Admin payload"""
from __future__ import absolute_import, annotations
import os
from aiohttp import ClientResponse
from argon2.low_level import Type, hash_secret_raw
from Crypto.Cipher import AES, ChaCha20_Poly1305
from Crypto.Cipher._mode_gcm import GcmMode
from Crypto.Cipher.ChaCha20_Poly1305 import ChaCha20Poly1305Cipher
#
# Encrypted Message Format:
#
# | 41 bytes HEADER |
# |-------------------------|
# | 16 KiB encrypted chunk |
# | + 16 bytes TAG |
# |-------------------------|
# | .... |
# |-------------------------|
# | ~16 KiB encrypted chunk |
# | + 16 bytes TAG |
# |-------------------------|
#
# HEADER:
#
# | 32 bytes salt |
# |----------------|
# | 1 byte AEAD ID |
# |----------------|
# | 8 bytes NONCE |
# |----------------|
#
_TAG_LEN = 16
_CHUNK_SIZE = 16 * 1024
_MAX_CHUNK_SIZE = _TAG_LEN + _CHUNK_SIZE
_SALT_LEN = 32
_NONCE_LEN = 8
def _get_cipher(
aead_id: int,
key: bytes,
nonce: bytes,
) -> GcmMode | ChaCha20Poly1305Cipher:
"""Get cipher for AEAD ID."""
if aead_id == 0:
return AES.new(key, AES.MODE_GCM, nonce)
if aead_id == 1:
return ChaCha20_Poly1305.new(key=key, nonce=nonce)
raise ValueError(f"Unknown AEAD ID {aead_id}")
def _generate_key(secret: bytes, salt: bytes) -> bytes:
"""Generate 256-bit Argon2ID key"""
return hash_secret_raw(
secret=secret,
salt=salt,
time_cost=1,
memory_cost=65536,
parallelism=4,
hash_len=32,
type=Type.ID,
version=19,
)
def _generate_additional_data(aead_id: int, key: bytes, padded_nonce: bytes) -> bytes:
"""Generate additional data"""
cipher = _get_cipher(aead_id, key, padded_nonce)
return b"\x00" + cipher.digest()
def _mark_as_last(additional_data: bytes) -> bytes:
"""Mark additional data as the last in the sequence"""
return b"\x80" + additional_data[1:]
def _update_nonce_id(nonce: bytes, idx: int) -> bytes:
"""Set nonce id (4 last bytes)"""
return nonce + idx.to_bytes(4, byteorder="little")
[docs]
def encrypt(payload: bytes, password: str) -> bytes:
"""Encrypt given payload."""
nonce = os.urandom(_NONCE_LEN)
salt = os.urandom(_SALT_LEN)
key = _generate_key(password.encode(), salt)
aead_id = b"\x00"
padded_nonce = nonce + b"\x00\x00\x00\x00"
additional_data = _generate_additional_data(aead_id[0], key, padded_nonce)
indices = range(0, len(payload), _CHUNK_SIZE)
nonce_id = 0
result = salt + aead_id + nonce
for i in indices:
nonce_id += 1
if i == indices[-1]:
additional_data = _mark_as_last(additional_data)
padded_nonce = _update_nonce_id(nonce, nonce_id)
cipher = _get_cipher(aead_id[0], key, padded_nonce)
cipher.update(additional_data)
encrypted_data, hmac_tag = cipher.encrypt_and_digest(
payload[i : i + _CHUNK_SIZE],
)
result += encrypted_data
result += hmac_tag
return result
[docs]
class DecryptReader:
"""
BufferedIOBase compatible reader represents decrypted data of MinioAdmin
APIs.
"""
def __init__(self, response: ClientResponse, header: bytes, secret: bytes):
self._response = response
self._secret = secret
self._payload = None
if len(header) != 41:
raise IOError("insufficient data")
self._salt = header[:32]
self._aead_id = header[32]
self._nonce = header[33:]
self._key = _generate_key(self._secret, self._salt)
padded_nonce = self._nonce + b"\x00\x00\x00\x00"
self._additional_data = _generate_additional_data(
self._aead_id, self._key, padded_nonce
)
self._chunk = b""
self._count = 0
self._is_closed = False
[docs]
@classmethod
async def init_async(cls, response: ClientResponse, secret: bytes):
"""Initialize DecryptReader asynchronously."""
header = await response.content.read(41)
return cls(response, header, secret)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, exc_traceback):
return self.close()
[docs]
def readable(self): # pylint: disable=no-self-use
"""Return this is readable."""
return True
[docs]
def writeable(self): # pylint: disable=no-self-use
"""Return this is not writeable."""
return False
[docs]
def close(self):
"""Close response and release network resources."""
self._response.close()
if self._response.connection:
self._response.connection.close()
def _decrypt(self, payload: bytes, last_chunk: bool = False) -> bytes:
"""Decrypt given payload."""
self._count += 1
if last_chunk:
self._additional_data = _mark_as_last(self._additional_data)
padded_nonce = _update_nonce_id(self._nonce, self._count)
cipher = _get_cipher(self._aead_id, self._key, padded_nonce)
cipher.update(self._additional_data)
hmac_tag = payload[-_TAG_LEN:]
encrypted_data = payload[:-_TAG_LEN]
decrypted_data = cipher.decrypt_and_verify(encrypted_data, hmac_tag)
return decrypted_data
async def _read_chunk(self) -> bool:
"""Read a chunk at least one byte more than chunk size."""
if self._is_closed:
return True
while len(self._chunk) != (1 + _MAX_CHUNK_SIZE):
chunk = await self._response.content.read(
1 + _MAX_CHUNK_SIZE - len(self._chunk)
)
self._chunk += chunk
if len(chunk) == 0:
self._is_closed = True
return True
return False
async def _read(self) -> bytes:
"""Read and decrypt response."""
stop = await self._read_chunk()
if len(self._chunk) == 0:
return self._chunk
length = _MAX_CHUNK_SIZE
if len(self._chunk) < length:
length = len(self._chunk)
stop = True
payload = self._chunk[:length]
self._chunk = self._chunk[length:]
return self._decrypt(payload, stop)
[docs]
async def stream(self, num_bytes=32 * 1024):
"""
Stream extracted payload from response data. Upon completion, caller
should call self.close() to release network resources.
"""
while True:
data = await self._read()
if not data:
break
while data:
result = data
if num_bytes < len(data):
result = data[:num_bytes]
data = data[len(result) :]
yield result
[docs]
async def decrypt(response: ClientResponse, secret_key: str) -> bytes:
"""Decrypt response data."""
result = b""
with await DecryptReader.init_async(response, secret_key.encode()) as reader:
async for data in reader.stream():
result += data
return result