# huffman.py """ Прототип реализации алгоритма Хаффмана на Python. Особенности: - Статическое хранение частот (массив 256 элементов) - Дерево Хаффмана хранится списком узлов (HTLS) - Кодирование и декодирование побитовое - Совместимо с переносом в C (фиксированные массивы и сдвиги) """ from typing import BinaryIO, List, Tuple from dataclasses import dataclass @dataclass class HTLS: """ Структура узла дерева Хаффмана. Атрибуты: left: индекс левого потомка (None, если лист) right: индекс правого потомка (None, если лист) value: кортеж (byte_value, frequency) для листа, (None, frequency) для внутренних узлов """ left: int | None right: int | None value: Tuple[int | None, int] # ----------------------------------------------------------------------------- # Шаг 1: подсчёт частот байтов # ----------------------------------------------------------------------------- def get_frequencies(fd: BinaryIO) -> List[int]: """ Собирает частоты каждого байта в файле. Аргументы: fd: открытый бинарный файл для чтения Возвращает: Список из 256 элементов, где индекс = значение байта, значение = частота этого байта в файле. """ frequencies = [0] * 256 content = fd.read() for byte in content: frequencies[byte] += 1 return frequencies # ----------------------------------------------------------------------------- # Шаг 2: построение дерева Хаффмана # ----------------------------------------------------------------------------- def make_tree(frequencies_table: List[int]) -> Tuple[List[HTLS], int]: """ Строит дерево Хаффмана на основе массива частот. Аргументы: frequencies_table: массив частот 256 байтов Возвращает: nodes: список всех узлов дерева (HTLS) root_idx: индекс корневого узла """ # создаём узлы-«листья» для каждого байта nodes = [HTLS(left=None, right=None, value=(i, freq)) for i, freq in enumerate(frequencies_table)] # список «живых» узлов для объединения alive = list(range(len(nodes))) while len(alive) > 1: # находим два узла с наименьшей частотой alive.sort(key=lambda idx: nodes[idx].value[1]) i1 = alive.pop(0) i2 = alive.pop(0) freq1 = nodes[i1].value[1] freq2 = nodes[i2].value[1] # создаём новый узел, объединяющий два наименьших new_node = HTLS( left=i1, right=i2, value=(None, freq1 + freq2) ) nodes.append(new_node) alive.append(len(nodes) - 1) return nodes, alive[0] # ----------------------------------------------------------------------------- # Шаг 3: генерация кодов Хаффмана # ----------------------------------------------------------------------------- def make_codes(nodes: List[HTLS], root: int) -> dict[int, Tuple[int, int]]: """ Генерирует побитовые коды для каждого байта по дереву Хаффмана. Аргументы: nodes: список узлов HTLS root: индекс корня дерева Возвращает: Словарь: ключ = байт (0–255), значение = (код, длина) Код хранится как int, длина = количество бит. """ codes = {} stack = [(root, 0, 0)] # (индекс узла, текущий код, длина кода) while stack: idx, code, length = stack.pop() node = nodes[idx] if node.value[0] is not None: # достигнут лист byte = node.value[0] codes[byte] = (code, length) else: # сначала правый потомок (для корректного обхода LIFO) if node.right is not None: stack.append((node.right, (code << 1) | 1, length + 1)) if node.left is not None: stack.append((node.left, (code << 1) | 0, length + 1)) return codes # ----------------------------------------------------------------------------- # Шаг 4: кодирование файла # ----------------------------------------------------------------------------- def encode_flow(input_fd: BinaryIO, output_fd: BinaryIO, codes: dict[int, Tuple[int, int]]): """ Побитово кодирует содержимое input_fd и записывает в output_fd. Аргументы: input_fd: бинарный входной файл output_fd: бинарный выходной файл codes: словарь байт -> (код, длина) """ buffer = 0 buffer_len = 0 input_fd.seek(0) content = input_fd.read() for byte in content: code, length = codes[byte] # добавляем код в буфер buffer = (buffer << length) | code buffer_len += length # пока накоплено >= 8 бит → пишем байт while buffer_len >= 8: buffer_len -= 8 to_write = (buffer >> buffer_len) & 0xFF output_fd.write(bytes([to_write])) # остаток, если есть if buffer_len > 0: to_write = (buffer << (8 - buffer_len)) & 0xFF output_fd.write(bytes([to_write])) # ----------------------------------------------------------------------------- # Шаг 5: декодирование файла # ----------------------------------------------------------------------------- def decode_flow(input_fd: BinaryIO, output_fd: BinaryIO, nodes: List[HTLS], root_idx: int, total_bytes: int): """ Декодирует поток Хаффмана обратно в исходные байты. Аргументы: input_fd: бинарный файл с закодированными данными output_fd: бинарный выходной файл для декодированных байт nodes: список узлов дерева Хаффмана root_idx: индекс корня дерева total_bytes: общее количество исходных байт (чтобы остановиться) """ input_fd.seek(0) current_node = root_idx bytes_written = 0 while True: byte_s = input_fd.read(1) if not byte_s: break b = byte_s[0] for i in reversed(range(8)): bit = (b >> i) & 1 node = nodes[current_node] current_node = node.right if bit else node.left # если достигнут лист if nodes[current_node].value[0] is not None: output_fd.write(bytes([nodes[current_node].value[0]])) bytes_written += 1 if bytes_written >= total_bytes: return current_node = root_idx