Files
Teletext-Editor/src/teletext/io.py
Daniel Dybing fe4253c8df
All checks were successful
Build Linux / Build Linux (push) Successful in 1m31s
Build Windows / Build Windows (push) Successful in 4m47s
fix: prevent data leakage between pages by splitting on row sequence reversal
2026-02-21 12:17:14 +01:00

315 lines
12 KiB
Python

import os
from typing import List, Callable, Optional
from .models import Packet, Page, TeletextService, decode_hamming_8_4
def load_t42(file_path: str, progress_callback: Optional[Callable[[int, int], None]] = None) -> TeletextService:
service = TeletextService()
total_bytes = os.path.getsize(file_path)
# Auto-detect alignment
# Scan first 4096 bytes for the offset that yields the most Row 0 headers
best_offset = 0
max_headers = 0
with open(file_path, 'rb') as f:
head_data = f.read(4096 + 42)
for offset in range(42):
headers = 0
for i in range(offset, len(head_data) - 42, 42):
chunk = head_data[i:i+42]
try:
# We check if Mag/Row decode to something sensible
# AND if it's Row 0, we check if Page Num decodes without errors (ideally)
# But for now, just counting Row 0s is good enough.
mag, row = decode_packet_header(chunk[0], chunk[1])
if row == 0 and mag != 8: # Mag 8 is often just nulls
headers += 1
except:
pass
if headers > max_headers:
max_headers = headers
best_offset = offset
# Each packet is 42 bytes
total_packets = (total_bytes - best_offset) // 42
processed_packets = 0
# Track current active page for each magazine to handle interleaved packets
current_pages_by_mag = {} # mag -> Page object
last_row_by_mag = {} # mag -> int
with open(file_path, 'rb') as f:
f.seek(best_offset)
while True:
chunk = f.read(42)
if not chunk:
break
if len(chunk) < 42:
break
processed_packets += 1
if progress_callback and processed_packets % 100 == 0:
progress_callback(processed_packets, total_packets)
packet = Packet(chunk)
service.all_packets.append(packet)
mag = packet.magazine
row = packet.row
# Logic to group into pages.
if row == 0:
# Start of a new page header.
p_num, sub_code, language = parse_header(packet.data)
# Create new page
new_page = Page(magazine=mag, page_number=p_num, sub_code=sub_code, language=language)
new_page.packets.append(packet)
service.pages.append(new_page)
# Update tracking
current_pages_by_mag[mag] = new_page
last_row_by_mag[mag] = 0
else:
# Add to the "current" page of this magazine.
target_page = current_pages_by_mag.get(mag)
prev_row = last_row_by_mag.get(mag, -1)
# Robustness check for VHS captures:
# If we see a row number that has already passed (e.g. Row 1 after Row 25)
# AND we didn't see a Row 0, it means a new page started but we missed the header.
# We should split into a new Page object to avoid data corruption.
if target_page and row <= prev_row and row != prev_row: # Strictly less than (or handle duplicate rows?)
# In some captures, we might see the same row twice (Field 1/2).
# If it's the SAME row number, we just append (overwrites in renderer).
# If it's a LOWER row number, it's definitely a new cycle.
# Create a "Lost Header" page
# We use page_number=0xFF to indicate unknown, but we keep mag.
target_page = Page(magazine=mag, page_number=0xFF, sub_code=0, language=0)
service.pages.append(target_page)
current_pages_by_mag[mag] = target_page
if target_page:
target_page.packets.append(packet)
last_row_by_mag[mag] = row
else:
# Packet without a header? Orphaned. Just keep in all_packets
pass
return service
def encode_hamming_8_4(value):
# Value is 4 bits (0-15)
d1 = (value >> 0) & 1
d2 = (value >> 1) & 1
d3 = (value >> 2) & 1
d4 = (value >> 3) & 1
# Parity bits (Odd parity default? Or standard Hamming?)
# Teletext spec:
# P1 = 1 + D1 + D2 + D4 (mod 2) -> Inverse of even parity check?
# Actually, simpler to look up or calculate.
# Let's match typical implementation:
# P1 (b0) covers 1,3,7 (D1, D2, D4)
# P2 (b2) covers 1,5,7 (D1, D3, D4)
# P3 (b4) covers 3,5,7 (D2, D3, D4)
# P4 (b6) covers all.
# Teletext uses ODD parity for the hamming bits usually?
# "Hamming 8/4 with odd parity"
p1 = 1 ^ d1 ^ d2 ^ d4
p2 = 1 ^ d1 ^ d3 ^ d4
p3 = 1 ^ d2 ^ d3 ^ d4
res = (p1 << 0) | (d1 << 1) | \
(p2 << 2) | (d2 << 3) | \
(p3 << 4) | (d3 << 5) | \
(d4 << 7)
# P4 (bit 6) makes total bits odd
# Count set bits so far
set_bits = bin(res).count('1')
p4 = 1 if (set_bits % 2 == 0) else 0
res |= (p4 << 6)
return res
def decode_packet_header(b1, b2):
"""
Decodes the Magazine and Row from the first 2 bytes of a T42 packet.
"""
d1 = decode_hamming_8_4(b1)
d2 = decode_hamming_8_4(b2)
mag = (d1 & 0b0111)
if mag == 0: mag = 8
row = (d2 << 1) | ((d1 >> 3) & 1)
return mag, row
def save_t42(file_path: str, service: TeletextService, progress_callback: Optional[Callable[[int, int], None]] = None):
total_packets = len(service.all_packets)
processed = 0
with open(file_path, 'wb') as f:
for packet in service.all_packets:
processed += 1
if progress_callback and processed % 100 == 0:
progress_callback(processed, total_packets)
# Check if we can reuse the original header (preserving parity/integrity)
use_original_header = False
if hasattr(packet, 'original_data') and len(packet.original_data) >= 2:
# Try to decode the original header
try:
orig_mag, orig_row = decode_packet_header(packet.original_data[0], packet.original_data[1])
if orig_mag == packet.magazine and orig_row == packet.row:
use_original_header = True
except:
pass
if use_original_header:
header = packet.original_data[:2]
else:
# Reconstruct header bytes
mag = packet.magazine
if mag == 8: mag = 0 # 0 encoded as 8
# Bits:
# B1 data: M1(0) M2(1) M3(2) R1(3)
m1 = (mag >> 0) & 1
m2 = (mag >> 1) & 1
m3 = (mag >> 2) & 1
r1 = (packet.row >> 0) & 1
b1_val = m1 | (m2 << 1) | (m3 << 2) | (r1 << 3)
b1_enc = encode_hamming_8_4(b1_val)
# B2 data: R2(0) R3(1) R4(2) R5(3)
r2 = (packet.row >> 1) & 1
r3 = (packet.row >> 2) & 1
r4 = (packet.row >> 3) & 1
r5 = (packet.row >> 4) & 1
b2_val = r2 | (r3 << 1) | (r4 << 2) | (r5 << 3)
b2_enc = encode_hamming_8_4(b2_val)
header = bytes([b1_enc, b2_enc])
f.write(header + packet.data)
def parse_header(data: bytearray):
# Data is 40 bytes.
# Bytes 0-7 are Page Num (2), Subcode (4), Control (2) - ALL Hamming encoded.
# 0: Page Units (PU)
# 1: Page Tens (PT)
pu = decode_hamming_8_4(data[0])
pt = decode_hamming_8_4(data[1])
# Use BCD/Hex-like storage: High nibble is Tens, Low nibble is Units.
# This preserves Hex pages (A-F) without colliding with decimal pages.
# E.g. Page 1FF -> Tens=F(15), Units=F(15) -> 0xFF (255)
# Page 12E -> Tens=2, Units=E(14) -> 0x2E (46)
# Page 134 -> Tens=3, Units=4 -> 0x34 (52)
# 0x2E != 0x34. No collision.
page_num = ((pt & 0xF) << 4) | (pu & 0xF)
# Subcode: S1, S2, S3, S4
# S1 (low), S2, S3, S4 (high)
s1 = decode_hamming_8_4(data[2])
s2 = decode_hamming_8_4(data[3])
s3 = decode_hamming_8_4(data[4])
s4 = decode_hamming_8_4(data[5])
# Subcode logic is a bit complex with specific bit mapping for "Time" vs "Subcode"
# But usually just combining them gives the raw subcode value.
# S1: bits 0-3
# S2: bits 4-6 (bit 4 is C4) -> actually S2 has 3 bits of subcode + 1 control bit usually?
# Let's simplify and just concat them for a unique identifier.
sub_code = s1 | (s2 << 4) | (s3 << 8) | (s4 << 12)
# Control bits C12, C13, C14 are in Byte 8 (index 8)
# They determine the National Option (Language)
c_bits_2 = decode_hamming_8_4(data[8])
# Fix for Language Detection:
# It seems C12 and C13 are swapped in the Hamming decoding or file format relative to expected values.
# C12 is bit 0, C13 is bit 1.
# We swap them so D1 maps to C13 (Swedish bit) and D2 maps to C12 (German bit).
# Original: language = c_bits_2 & 0b111
language = ((c_bits_2 & 1) << 1) | ((c_bits_2 & 2) >> 1) | (c_bits_2 & 4)
return page_num, sub_code, language
def save_tti(file_path: str, page: Page):
"""
Saves a single Page object to a TTI file.
"""
with open(file_path, 'w', encoding='latin-1') as f:
# Header Info
# DS - Source? Description?
f.write(f"DS,Teletext Editor Export\n")
f.write(f"SP,{file_path}\n")
# PN - Page Number mpp00
# Typically TTI uses decimal integer for mppss?
# Or mppss as hex digits?
# Standard convention: mppss where m is 1-8, pp is 00-FF, ss is 00-99
# Example: Page 100 -> PN,10000
# Example: Page 1F0 -> PN,1F000
f.write(f"PN,{page.magazine}{page.page_number:02X}00\n")
# SC - Subcode ssss
f.write(f"SC,{page.sub_code:04X}\n")
# PS - Page Status
# 8000 is typical for "Transmission"
f.write(f"PS,8000\n")
# RE - Region (Language)
f.write(f"RE,{page.language}\n")
# Lines
# We need to construct the 40-char string for each row
# Row 0 is special (Header)
# Get all packets for this page
# Map row -> packet
rows = {}
for p in page.packets:
rows[p.row] = p
for r in range(26): # 0 to 25
if r in rows:
packet = rows[r]
# Packet data is 40 bytes (after the 2-byte header we stripped in Packet class? No wait)
# Packet.data in our model IS the 40 bytes of character data (we strip MRAG in __post_init__)
# So we just decode it as latin-1 to get the chars
# However, we must ensure we don't have newlines or nulls breaking the text file structure
# TTI format usually accepts raw bytes 0x00-0xFF if strictly handled, but often expects
# mapped control codes.
# Standard VBIT2 TTI handling treats it as a binary-safe string if mapped to char.
# Row 0 special handling: The first 8 bytes of Row 0 are usually header flags in the packet,
# but visually they are "P100 ".
# TTI usually expects the visual representation for the line content.
# But for transmission, we want the raw bytes.
# OL,r,String
data_str = packet.data.decode('latin-1')
f.write(f"OL,{r},{data_str}\n")
else:
# Empty line? Usually omitted or written as empty
pass