From 62dc11fd40460e591c483eb86fb4c8c50a8aa12b Mon Sep 17 00:00:00 2001 From: Daniel Dybing Date: Sat, 21 Feb 2026 11:35:55 +0100 Subject: [PATCH] feat: implement Hamming 8/4 error correction and auto-alignment for T42 files --- src/teletext/io.py | 39 +++++++++++++++++++++++++++------------ src/teletext/models.py | 41 ++++++++++++++++++++++++++++++++++++----- 2 files changed, 63 insertions(+), 17 deletions(-) diff --git a/src/teletext/io.py b/src/teletext/io.py index b8573e8..1aa3b09 100644 --- a/src/teletext/io.py +++ b/src/teletext/io.py @@ -1,22 +1,46 @@ import os from typing import List, Callable, Optional -from .models import Packet, Page, TeletextService +from .models import Packet, Page, TeletextService, decode_hamming_8_4 def load_t42(file_path: str, progress_callback: Optional[Callable[[int, int], None]] = None) -> TeletextService: service = TeletextService() total_bytes = os.path.getsize(file_path) + + # Auto-detect alignment + # Scan first 4096 bytes for the offset that yields the most Row 0 headers + best_offset = 0 + max_headers = 0 + with open(file_path, 'rb') as f: + head_data = f.read(4096 + 42) + for offset in range(42): + headers = 0 + for i in range(offset, len(head_data) - 42, 42): + chunk = head_data[i:i+42] + try: + # We check if Mag/Row decode to something sensible + # AND if it's Row 0, we check if Page Num decodes without errors (ideally) + # But for now, just counting Row 0s is good enough. + mag, row = decode_packet_header(chunk[0], chunk[1]) + if row == 0 and mag != 8: # Mag 8 is often just nulls + headers += 1 + except: + pass + if headers > max_headers: + max_headers = headers + best_offset = offset + # Each packet is 42 bytes - total_packets = total_bytes // 42 + total_packets = (total_bytes - best_offset) // 42 processed_packets = 0 with open(file_path, 'rb') as f: + f.seek(best_offset) while True: chunk = f.read(42) if not chunk: break if len(chunk) < 42: - # Should not happen in a valid T42 stream, or we just ignore incomplete tail break processed_packets += 1 @@ -27,9 +51,6 @@ def load_t42(file_path: str, progress_callback: Optional[Callable[[int, int], No service.all_packets.append(packet) # Logic to group into pages. - # This is non-trivial because packets for a page might be interleaved or sequential. - # Standard implementation: Packets arrive in order. Row 0 starts a new page/subpage. - if packet.row == 0: # Start of a new page header. # Byte 2-9 of header contain Page Number, Subcode, Control bits etc. @@ -175,12 +196,6 @@ def save_t42(file_path: str, service: TeletextService, progress_callback: Option f.write(header + packet.data) -def decode_hamming_8_4(byte_val): - return ((byte_val >> 1) & 1) | \ - (((byte_val >> 3) & 1) << 1) | \ - (((byte_val >> 5) & 1) << 2) | \ - (((byte_val >> 7) & 1) << 3) - def parse_header(data: bytearray): # Data is 40 bytes. # Bytes 0-7 are Page Num (2), Subcode (4), Control (2) - ALL Hamming encoded. diff --git a/src/teletext/models.py b/src/teletext/models.py index 5cb7c48..8ea7c64 100644 --- a/src/teletext/models.py +++ b/src/teletext/models.py @@ -2,11 +2,42 @@ from dataclasses import dataclass, field from typing import List, Optional def decode_hamming_8_4(byte_val): - # Extract data bits: bits 1, 3, 5, 7 - return ((byte_val >> 1) & 1) | \ - (((byte_val >> 3) & 1) << 1) | \ - (((byte_val >> 5) & 1) << 2) | \ - (((byte_val >> 7) & 1) << 3) + """ + Decodes a byte using Hamming 8/4 with error correction. + Returns the 4-bit data value (0-15). + """ + b = [(byte_val >> i) & 1 for i in range(8)] + + # Syndromes (ETSI EN 300 706) + # P1 (b0) covers D1, D2, D4 (b1, b3, b7) + # P2 (b2) covers D1, D3, D4 (b1, b5, b7) + # P3 (b4) covers D2, D3, D4 (b3, b5, b7) + # P4 (b6) is parity over all bits + + s1 = 1 ^ b[0] ^ b[1] ^ b[3] ^ b[7] + s2 = 1 ^ b[2] ^ b[1] ^ b[5] ^ b[7] + s3 = 1 ^ b[4] ^ b[3] ^ b[5] ^ b[7] + s4 = 1 ^ (sum(b) % 2) + + syndrome = s1 | (s2 << 1) | (s3 << 2) + + if s4 != 0: + # Error detected. Try to correct 1-bit error. + mapping = { + 0b001: 0, # p1 (b0) + 0b011: 1, # d1 (b1) + 0b010: 2, # p2 (b2) + 0b101: 3, # d2 (b3) + 0b100: 4, # p3 (b4) + 0b110: 5, # d3 (b5) + 0b111: 7 # d4 (b7) + } + if syndrome in mapping: + bit_to_flip = mapping[syndrome] + b[bit_to_flip] ^= 1 + + # Return 4 data bits: D1, D2, D3, D4 + return b[1] | (b[3] << 1) | (b[5] << 2) | (b[7] << 3) @dataclass class Packet: