feat: implement Hamming 8/4 error correction and auto-alignment for T42 files
All checks were successful
Build Linux / Build Linux (push) Successful in 1m33s
Build Windows / Build Windows (push) Successful in 4m50s

This commit is contained in:
2026-02-21 11:35:55 +01:00
parent 18fef7b049
commit 62dc11fd40
2 changed files with 63 additions and 17 deletions

View File

@@ -1,22 +1,46 @@
import os import os
from typing import List, Callable, Optional from typing import List, Callable, Optional
from .models import Packet, Page, TeletextService from .models import Packet, Page, TeletextService, decode_hamming_8_4
def load_t42(file_path: str, progress_callback: Optional[Callable[[int, int], None]] = None) -> TeletextService: def load_t42(file_path: str, progress_callback: Optional[Callable[[int, int], None]] = None) -> TeletextService:
service = TeletextService() service = TeletextService()
total_bytes = os.path.getsize(file_path) total_bytes = os.path.getsize(file_path)
# Auto-detect alignment
# Scan first 4096 bytes for the offset that yields the most Row 0 headers
best_offset = 0
max_headers = 0
with open(file_path, 'rb') as f:
head_data = f.read(4096 + 42)
for offset in range(42):
headers = 0
for i in range(offset, len(head_data) - 42, 42):
chunk = head_data[i:i+42]
try:
# We check if Mag/Row decode to something sensible
# AND if it's Row 0, we check if Page Num decodes without errors (ideally)
# But for now, just counting Row 0s is good enough.
mag, row = decode_packet_header(chunk[0], chunk[1])
if row == 0 and mag != 8: # Mag 8 is often just nulls
headers += 1
except:
pass
if headers > max_headers:
max_headers = headers
best_offset = offset
# Each packet is 42 bytes # Each packet is 42 bytes
total_packets = total_bytes // 42 total_packets = (total_bytes - best_offset) // 42
processed_packets = 0 processed_packets = 0
with open(file_path, 'rb') as f: with open(file_path, 'rb') as f:
f.seek(best_offset)
while True: while True:
chunk = f.read(42) chunk = f.read(42)
if not chunk: if not chunk:
break break
if len(chunk) < 42: if len(chunk) < 42:
# Should not happen in a valid T42 stream, or we just ignore incomplete tail
break break
processed_packets += 1 processed_packets += 1
@@ -27,9 +51,6 @@ def load_t42(file_path: str, progress_callback: Optional[Callable[[int, int], No
service.all_packets.append(packet) service.all_packets.append(packet)
# Logic to group into pages. # Logic to group into pages.
# This is non-trivial because packets for a page might be interleaved or sequential.
# Standard implementation: Packets arrive in order. Row 0 starts a new page/subpage.
if packet.row == 0: if packet.row == 0:
# Start of a new page header. # Start of a new page header.
# Byte 2-9 of header contain Page Number, Subcode, Control bits etc. # Byte 2-9 of header contain Page Number, Subcode, Control bits etc.
@@ -175,12 +196,6 @@ def save_t42(file_path: str, service: TeletextService, progress_callback: Option
f.write(header + packet.data) f.write(header + packet.data)
def decode_hamming_8_4(byte_val):
return ((byte_val >> 1) & 1) | \
(((byte_val >> 3) & 1) << 1) | \
(((byte_val >> 5) & 1) << 2) | \
(((byte_val >> 7) & 1) << 3)
def parse_header(data: bytearray): def parse_header(data: bytearray):
# Data is 40 bytes. # Data is 40 bytes.
# Bytes 0-7 are Page Num (2), Subcode (4), Control (2) - ALL Hamming encoded. # Bytes 0-7 are Page Num (2), Subcode (4), Control (2) - ALL Hamming encoded.

View File

@@ -2,11 +2,42 @@ from dataclasses import dataclass, field
from typing import List, Optional from typing import List, Optional
def decode_hamming_8_4(byte_val): def decode_hamming_8_4(byte_val):
# Extract data bits: bits 1, 3, 5, 7 """
return ((byte_val >> 1) & 1) | \ Decodes a byte using Hamming 8/4 with error correction.
(((byte_val >> 3) & 1) << 1) | \ Returns the 4-bit data value (0-15).
(((byte_val >> 5) & 1) << 2) | \ """
(((byte_val >> 7) & 1) << 3) b = [(byte_val >> i) & 1 for i in range(8)]
# Syndromes (ETSI EN 300 706)
# P1 (b0) covers D1, D2, D4 (b1, b3, b7)
# P2 (b2) covers D1, D3, D4 (b1, b5, b7)
# P3 (b4) covers D2, D3, D4 (b3, b5, b7)
# P4 (b6) is parity over all bits
s1 = 1 ^ b[0] ^ b[1] ^ b[3] ^ b[7]
s2 = 1 ^ b[2] ^ b[1] ^ b[5] ^ b[7]
s3 = 1 ^ b[4] ^ b[3] ^ b[5] ^ b[7]
s4 = 1 ^ (sum(b) % 2)
syndrome = s1 | (s2 << 1) | (s3 << 2)
if s4 != 0:
# Error detected. Try to correct 1-bit error.
mapping = {
0b001: 0, # p1 (b0)
0b011: 1, # d1 (b1)
0b010: 2, # p2 (b2)
0b101: 3, # d2 (b3)
0b100: 4, # p3 (b4)
0b110: 5, # d3 (b5)
0b111: 7 # d4 (b7)
}
if syndrome in mapping:
bit_to_flip = mapping[syndrome]
b[bit_to_flip] ^= 1
# Return 4 data bits: D1, D2, D3, D4
return b[1] | (b[3] << 1) | (b[5] << 2) | (b[7] << 3)
@dataclass @dataclass
class Packet: class Packet: