197 lines
No EOL
7.6 KiB
Python
197 lines
No EOL
7.6 KiB
Python
# huffman.py
|
||
"""
|
||
Прототип реализации алгоритма Хаффмана на Python.
|
||
|
||
Особенности:
|
||
- Статическое хранение частот (массив 256 элементов)
|
||
- Дерево Хаффмана хранится списком узлов (HTLS)
|
||
- Кодирование и декодирование побитовое
|
||
- Совместимо с переносом в C (фиксированные массивы и сдвиги)
|
||
"""
|
||
|
||
from typing import BinaryIO, List, Tuple
|
||
from dataclasses import dataclass
|
||
|
||
@dataclass
|
||
class HTLS:
|
||
"""
|
||
Структура узла дерева Хаффмана.
|
||
|
||
Атрибуты:
|
||
left: индекс левого потомка (None, если лист)
|
||
right: индекс правого потомка (None, если лист)
|
||
value: кортеж (byte_value, frequency) для листа,
|
||
(None, frequency) для внутренних узлов
|
||
"""
|
||
left: int | None
|
||
right: int | None
|
||
value: Tuple[int | None, int]
|
||
|
||
# -----------------------------------------------------------------------------
|
||
# Шаг 1: подсчёт частот байтов
|
||
# -----------------------------------------------------------------------------
|
||
def get_frequencies(fd: BinaryIO) -> List[int]:
|
||
"""
|
||
Собирает частоты каждого байта в файле.
|
||
|
||
Аргументы:
|
||
fd: открытый бинарный файл для чтения
|
||
|
||
Возвращает:
|
||
Список из 256 элементов, где индекс = значение байта,
|
||
значение = частота этого байта в файле.
|
||
"""
|
||
frequencies = [0] * 256
|
||
content = fd.read()
|
||
|
||
for byte in content:
|
||
frequencies[byte] += 1
|
||
|
||
return frequencies
|
||
|
||
# -----------------------------------------------------------------------------
|
||
# Шаг 2: построение дерева Хаффмана
|
||
# -----------------------------------------------------------------------------
|
||
def make_tree(frequencies_table: List[int]) -> Tuple[List[HTLS], int]:
|
||
"""
|
||
Строит дерево Хаффмана на основе массива частот.
|
||
|
||
Аргументы:
|
||
frequencies_table: массив частот 256 байтов
|
||
|
||
Возвращает:
|
||
nodes: список всех узлов дерева (HTLS)
|
||
root_idx: индекс корневого узла
|
||
"""
|
||
# создаём узлы-«листья» для каждого байта
|
||
nodes = [HTLS(left=None, right=None, value=(i, freq))
|
||
for i, freq in enumerate(frequencies_table)]
|
||
|
||
# список «живых» узлов для объединения
|
||
alive = list(range(len(nodes)))
|
||
|
||
while len(alive) > 1:
|
||
# находим два узла с наименьшей частотой
|
||
alive.sort(key=lambda idx: nodes[idx].value[1])
|
||
i1 = alive.pop(0)
|
||
i2 = alive.pop(0)
|
||
freq1 = nodes[i1].value[1]
|
||
freq2 = nodes[i2].value[1]
|
||
|
||
# создаём новый узел, объединяющий два наименьших
|
||
new_node = HTLS(
|
||
left=i1,
|
||
right=i2,
|
||
value=(None, freq1 + freq2)
|
||
)
|
||
nodes.append(new_node)
|
||
alive.append(len(nodes) - 1)
|
||
|
||
return nodes, alive[0]
|
||
|
||
# -----------------------------------------------------------------------------
|
||
# Шаг 3: генерация кодов Хаффмана
|
||
# -----------------------------------------------------------------------------
|
||
def make_codes(nodes: List[HTLS], root: int) -> dict[int, Tuple[int, int]]:
|
||
"""
|
||
Генерирует побитовые коды для каждого байта по дереву Хаффмана.
|
||
|
||
Аргументы:
|
||
nodes: список узлов HTLS
|
||
root: индекс корня дерева
|
||
|
||
Возвращает:
|
||
Словарь: ключ = байт (0–255), значение = (код, длина)
|
||
Код хранится как int, длина = количество бит.
|
||
"""
|
||
codes = {}
|
||
stack = [(root, 0, 0)] # (индекс узла, текущий код, длина кода)
|
||
|
||
while stack:
|
||
idx, code, length = stack.pop()
|
||
node = nodes[idx]
|
||
|
||
if node.value[0] is not None:
|
||
# достигнут лист
|
||
byte = node.value[0]
|
||
codes[byte] = (code, length)
|
||
else:
|
||
# сначала правый потомок (для корректного обхода LIFO)
|
||
if node.right is not None:
|
||
stack.append((node.right, (code << 1) | 1, length + 1))
|
||
if node.left is not None:
|
||
stack.append((node.left, (code << 1) | 0, length + 1))
|
||
|
||
return codes
|
||
|
||
# -----------------------------------------------------------------------------
|
||
# Шаг 4: кодирование файла
|
||
# -----------------------------------------------------------------------------
|
||
def encode_flow(input_fd: BinaryIO, output_fd: BinaryIO, codes: dict[int, Tuple[int, int]]):
|
||
"""
|
||
Побитово кодирует содержимое input_fd и записывает в output_fd.
|
||
|
||
Аргументы:
|
||
input_fd: бинарный входной файл
|
||
output_fd: бинарный выходной файл
|
||
codes: словарь байт -> (код, длина)
|
||
"""
|
||
buffer = 0
|
||
buffer_len = 0
|
||
|
||
input_fd.seek(0)
|
||
content = input_fd.read()
|
||
|
||
for byte in content:
|
||
code, length = codes[byte]
|
||
# добавляем код в буфер
|
||
buffer = (buffer << length) | code
|
||
buffer_len += length
|
||
|
||
# пока накоплено >= 8 бит → пишем байт
|
||
while buffer_len >= 8:
|
||
buffer_len -= 8
|
||
to_write = (buffer >> buffer_len) & 0xFF
|
||
output_fd.write(bytes([to_write]))
|
||
|
||
# остаток, если есть
|
||
if buffer_len > 0:
|
||
to_write = (buffer << (8 - buffer_len)) & 0xFF
|
||
output_fd.write(bytes([to_write]))
|
||
|
||
# -----------------------------------------------------------------------------
|
||
# Шаг 5: декодирование файла
|
||
# -----------------------------------------------------------------------------
|
||
def decode_flow(input_fd: BinaryIO, output_fd: BinaryIO, nodes: List[HTLS], root_idx: int, total_bytes: int):
|
||
"""
|
||
Декодирует поток Хаффмана обратно в исходные байты.
|
||
|
||
Аргументы:
|
||
input_fd: бинарный файл с закодированными данными
|
||
output_fd: бинарный выходной файл для декодированных байт
|
||
nodes: список узлов дерева Хаффмана
|
||
root_idx: индекс корня дерева
|
||
total_bytes: общее количество исходных байт (чтобы остановиться)
|
||
"""
|
||
input_fd.seek(0)
|
||
current_node = root_idx
|
||
bytes_written = 0
|
||
|
||
while True:
|
||
byte_s = input_fd.read(1)
|
||
if not byte_s:
|
||
break
|
||
b = byte_s[0]
|
||
|
||
for i in reversed(range(8)):
|
||
bit = (b >> i) & 1
|
||
node = nodes[current_node]
|
||
current_node = node.right if bit else node.left
|
||
|
||
# если достигнут лист
|
||
if nodes[current_node].value[0] is not None:
|
||
output_fd.write(bytes([nodes[current_node].value[0]]))
|
||
bytes_written += 1
|
||
if bytes_written >= total_bytes:
|
||
return
|
||
current_node = root_idx |