feat: Optimize .t42 loading and improve decoder fidelity
All checks were successful
Build Linux / Build Linux (push) Successful in 1m29s
Build Windows / Build Windows (push) Successful in 4m46s

This commit is contained in:
Daniel Dybing
2026-02-21 20:44:26 +01:00
parent 18fef7b049
commit a15ba67b1a
4 changed files with 161 additions and 130 deletions

View File

@@ -5,69 +5,82 @@ from .models import Packet, Page, TeletextService
def load_t42(file_path: str, progress_callback: Optional[Callable[[int, int], None]] = None) -> TeletextService: def load_t42(file_path: str, progress_callback: Optional[Callable[[int, int], None]] = None) -> TeletextService:
service = TeletextService() service = TeletextService()
if not os.path.exists(file_path):
return service
total_bytes = os.path.getsize(file_path) total_bytes = os.path.getsize(file_path)
# Each packet is 42 bytes
total_packets = total_bytes // 42 total_packets = total_bytes // 42
processed_packets = 0 processed_packets = 0
# Magazine buffers: magazine -> {row_num: Packet}
magazine_buffers = {m: {} for m in range(1, 9)}
# Active page lookup: magazine -> Page object (for O(1) access)
active_pages = {m: None for m in range(1, 9)}
with open(file_path, 'rb') as f: with open(file_path, 'rb') as f:
while True: while True:
chunk = f.read(42) chunk = f.read(42)
if not chunk: if not chunk: break
break if len(chunk) < 42: break
if len(chunk) < 42:
# Should not happen in a valid T42 stream, or we just ignore incomplete tail
break
processed_packets += 1 processed_packets += 1
if progress_callback and processed_packets % 100 == 0: if progress_callback and processed_packets % 500 == 0:
progress_callback(processed_packets, total_packets) progress_callback(processed_packets, total_packets)
packet = Packet(chunk) packet = Packet(chunk)
service.all_packets.append(packet) service.all_packets.append(packet)
# Logic to group into pages. mag = packet.magazine
# This is non-trivial because packets for a page might be interleaved or sequential. buffer = magazine_buffers[mag]
# Standard implementation: Packets arrive in order. Row 0 starts a new page/subpage.
if packet.row == 0: if packet.row == 0:
# Start of a new page header. p_num, sub_code, control_bits, language = parse_header(packet.data)
# Byte 2-9 of header contain Page Number, Subcode, Control bits etc.
# We need to parse the header to identify the page.
# Header format (after Mag/Row): # Check Erase Page bit (C4 is bit 0 of control_bits)
# Bytes: P1 P2 S1 S2 S3 S4 C1 C2 ... erase_page = bool(control_bits & 1)
# All Hamming 8/4 encoded.
# For now, let's just create a new page entry for every Header we see, if erase_page:
# or find the existing one if we want to support updates (but T42 usually is a stream capture). magazine_buffers[mag] = {0: packet}
# If it's an editor file, it's likely sequential. buffer = magazine_buffers[mag]
p_num, sub_code, language = parse_header(packet.data)
# Create new page
new_page = Page(magazine=packet.magazine, page_number=p_num, sub_code=sub_code, language=language)
new_page.packets.append(packet)
service.pages.append(new_page)
else:
# Add to the "current" page of this magazine.
# We need to track the current active page for each magazine.
# A simplistic approach: add to the last page added that matches the magazine ??
# Robust approach: Maintain a dict of current_pages_by_magazine.
# Let's find the last page in service that matches the packet's magazine
# This is O(N) but N (pages) is small.
target_page = None
for p in reversed(service.pages):
if p.magazine == packet.magazine:
target_page = p
break
if target_page:
target_page.packets.append(packet)
else: else:
# Packet without a header? Orphaned. Just keep in all_packets buffer[0] = packet
pass
# Create snapshot
new_page = Page(
magazine=mag,
page_number=p_num,
sub_code=sub_code,
control_bits=control_bits,
language=language
)
# Efficient cloning: use the existing Packet objects where possible,
# but we MUST clone the data bytearray if we plan to edit it later.
for r_num, pkt in sorted(buffer.items()):
# Create a new packet shell sharing the original_data but with its own data bytearray
cloned_pkt = Packet(pkt.original_data)
cloned_pkt.data = bytearray(pkt.data)
new_page.packets.append(cloned_pkt)
service.pages.append(new_page)
active_pages[mag] = new_page # Update active page lookup
elif 1 <= packet.row <= 31:
# Update the running buffer
buffer[packet.row] = packet
# Update the active snapshot immediately
target_page = active_pages[mag]
if target_page:
# Update row in the current active page
found_row = False
for i, p in enumerate(target_page.packets):
if p.row == packet.row:
target_page.packets[i] = packet
found_row = True
break
if not found_row:
target_page.packets.append(packet)
return service return service
@@ -182,52 +195,51 @@ def decode_hamming_8_4(byte_val):
(((byte_val >> 7) & 1) << 3) (((byte_val >> 7) & 1) << 3)
def parse_header(data: bytearray): def parse_header(data: bytearray):
# Data is 40 bytes. # Data is 40 bytes (after MRAG).
# Bytes 0-7 are Page Num (2), Subcode (4), Control (2) - ALL Hamming encoded. # Byte 0: Page Units (PU)
# Byte 1: Page Tens (PT)
# 0: Page Units (PU) # Byte 2: Subcode S1 (bits 0-3)
# 1: Page Tens (PT) # Byte 3: Subcode S2 (bits 4-6), C4 (bit 7)
# Byte 4: Subcode S3 (bits 8-11)
# Byte 5: Subcode S4 (bits 12-13), C5 (bit 14), C6 (bit 15)
# Byte 6: C7-C10
# Byte 7: C11-C14 (C12-C14 are Language)
pu = decode_hamming_8_4(data[0]) pu = decode_hamming_8_4(data[0])
pt = decode_hamming_8_4(data[1]) pt = decode_hamming_8_4(data[1])
# Use BCD/Hex-like storage: High nibble is Tens, Low nibble is Units. # Page number: pt (tens), pu (units). 0x00 to 0xFF.
# This preserves Hex pages (A-F) without colliding with decimal pages.
# E.g. Page 1FF -> Tens=F(15), Units=F(15) -> 0xFF (255)
# Page 12E -> Tens=2, Units=E(14) -> 0x2E (46)
# Page 134 -> Tens=3, Units=4 -> 0x34 (52)
# 0x2E != 0x34. No collision.
page_num = ((pt & 0xF) << 4) | (pu & 0xF) page_num = ((pt & 0xF) << 4) | (pu & 0xF)
# Subcode: S1, S2, S3, S4 # Subcode (13 bits)
# S1 (low), S2, S3, S4 (high)
s1 = decode_hamming_8_4(data[2]) s1 = decode_hamming_8_4(data[2])
s2 = decode_hamming_8_4(data[3]) s2 = decode_hamming_8_4(data[3])
s3 = decode_hamming_8_4(data[4]) s3 = decode_hamming_8_4(data[4])
s4 = decode_hamming_8_4(data[5]) s4 = decode_hamming_8_4(data[5])
# Subcode logic is a bit complex with specific bit mapping for "Time" vs "Subcode" sub_code = (s1 & 0xF) | \
# But usually just combining them gives the raw subcode value. ((s2 & 0x7) << 4) | \
# S1: bits 0-3 ((s3 & 0xF) << 7) | \
# S2: bits 4-6 (bit 4 is C4) -> actually S2 has 3 bits of subcode + 1 control bit usually? ((s4 & 0x3) << 11)
# Let's simplify and just concat them for a unique identifier.
# Control bits C4-C14
c4 = (s2 >> 3) & 1
c5 = (s4 >> 2) & 1
c6 = (s4 >> 3) & 1
sub_code = s1 | (s2 << 4) | (s3 << 8) | (s4 << 12) c_7_10 = decode_hamming_8_4(data[6])
c_11_14 = decode_hamming_8_4(data[7])
# Control bits C12, C13, C14 are in Byte 8 (index 8) # bitmask starting at index 0 for C4
# They determine the National Option (Language) control_bits = c4 | (c5 << 1) | (c6 << 2) | \
c_bits_2 = decode_hamming_8_4(data[8]) ((c_7_10 & 0xF) << 3) | \
((c_11_14 & 0xF) << 7)
# Language (C12, C13, C14)
# c_11_14: bit 0:C11, bit 1:C12, bit 2:C13, bit 3:C14
language = (c_11_14 >> 1) & 0x7
# Fix for Language Detection: return page_num, sub_code, control_bits, language
# It seems C12 and C13 are swapped in the Hamming decoding or file format relative to expected values.
# C12 is bit 0, C13 is bit 1.
# We swap them so D1 maps to C13 (Swedish bit) and D2 maps to C12 (German bit).
# Original: language = c_bits_2 & 0b111
language = ((c_bits_2 & 1) << 1) | ((c_bits_2 & 2) >> 1) | (c_bits_2 & 4)
return page_num, sub_code, language
def save_tti(file_path: str, page: Page): def save_tti(file_path: str, page: Page):
""" """

View File

@@ -65,9 +65,13 @@ class Page:
Can have multiple subpages. Can have multiple subpages.
""" """
magazine: int magazine: int
page_number: int # 00-99 page_number: int # 00-99 (Hex storage: 0x00-0xFF)
sub_code: int = 0 # Subpage code (0000 to 3F7F hex usually, simplest is 0-99 equivalent) sub_code: int = 0 # 13-bit subcode (0000 to 3F7F hex)
language: int = 0 # National Option (0-7)
# Control bits C4-C14
control_bits: int = 0
language: int = 0 # National Option (0-7, from C12-C14)
packets: List[Packet] = field(default_factory=list) packets: List[Packet] = field(default_factory=list)
@property @property
@@ -75,6 +79,20 @@ class Page:
# Format as Hex to support A-F pages # Format as Hex to support A-F pages
return f"{self.magazine}{self.page_number:02X}" return f"{self.magazine}{self.page_number:02X}"
def get_control_bit(self, n: int) -> bool:
""" Returns value of control bit Cn (4-14) """
if 4 <= n <= 14:
return bool((self.control_bits >> (n - 4)) & 1)
return False
def set_control_bit(self, n: int, value: bool):
""" Sets value of control bit Cn (4-14) """
if 4 <= n <= 14:
if value:
self.control_bits |= (1 << (n - 4))
else:
self.control_bits &= ~(1 << (n - 4))
def calculate_crc(self) -> int: def calculate_crc(self) -> int:
""" """
Calculates the CRC-16 checksum for the page. Calculates the CRC-16 checksum for the page.

View File

@@ -214,10 +214,18 @@ class TeletextCanvas(QWidget):
painter.end() painter.end()
return return
# Draw each packet # Check Control Bits for "Inhibit Display" (C10)
# Initialize a grid of empty chars # In our bitmask (from parse_header):
# C4:0, C5:1, C6:2, C7:3, C8:4, C9:5, C10:6, C11:7, C12:8, C13:9, C14:10
inhibit_display = bool((self.page.control_bits >> 6) & 1)
if inhibit_display:
painter.setPen(Qt.GlobalColor.gray)
painter.drawText(10, 20, f"Page {self.page.full_page_number} - INHIBIT DISPLAY (C10 set)")
painter.end()
return
# Organize each packet by row
grid = [None] * 26 # 0-25 grid = [None] * 26 # 0-25
for p in self.page.packets: for p in self.page.packets:
if 0 <= p.row <= 25: if 0 <= p.row <= 25:
grid[p.row] = p grid[p.row] = p
@@ -243,6 +251,10 @@ class TeletextCanvas(QWidget):
# Output mask for the next row # Output mask for the next row
next_occlusion_mask = [False] * 40 next_occlusion_mask = [False] * 40
# Check for Suppress Header (C7)
# C7:3, so bit 3 of control_bits
suppress_header = bool((self.page.control_bits >> 3) & 1)
# Default State at start of row # Default State at start of row
fg = COLORS[7] # White fg = COLORS[7] # White
bg = COLORS[0] # Black bg = COLORS[0] # Black
@@ -272,29 +284,18 @@ class TeletextCanvas(QWidget):
for c in range(40): for c in range(40):
x = c * self.cell_w x = c * self.cell_w
# If this cell is occluded by the row above, skip drawing and attribute processing?
# Spec says "The characters in the row below are ignored."
# Ideally we shouldn't even process attributes, but for simple renderer we just skip draw.
# However, if we skip attribute processing, state (fg/bg) won't update.
# Teletext attributes are serial.
# BUT, if the row above covers it, the viewer sees the row above.
# Does the hidden content affect the *rest* of the row?
# Likely yes, attributes usually propagate.
# But the spec says "ignored". Let's assume we skip *everything* for this cell visually,
# but maybe we should technically maintain state?
# For "Double Height" visual correctness, skipping drawing is the key.
# We will Process attributes (to keep state consistent) but Skip Drawing if occluded.
# Wait, if we process attributes, we might set double_height=True for the NEXT row?
# If this cell is occluded, it shouldn't trigger DH for the next row.
is_occluded = occlusion_mask[c] is_occluded = occlusion_mask[c]
# Decide byte value # Decide byte value
if row == 0 and c < 8: if row == 0:
# Use generated header prefix if c < 8:
byte_val = ord(header_prefix[c]) # Column 0-7: Header prefix
byte_val = ord(header_prefix[c])
elif suppress_header and c < 32:
# Column 8-31: Hide header if C7 set
byte_val = 0x20
else:
byte_val = data[c] if c < len(data) else 0x20
else: else:
byte_val = data[c] if c < len(data) else 0x20 byte_val = data[c] if c < len(data) else 0x20

View File

@@ -524,8 +524,9 @@ class MainWindow(QMainWindow):
self.language_overrides[key] = idx self.language_overrides[key] = idx
# Patch Row 0 packet data to persist language selection to file # Patch Row 0 packet data to persist language selection to file
# Language bits are in Byte 8 (Control Bits 2): C12, C13, C14 # Language bits are in Byte 7 (Control Bits C11-C14)
# We need to preserve C11 (bit 3 of encoded 4-bit val) which is "Inhibit Display" usually 0 # Byte 7 encoded structure: bit 0:C11, bit 1:C12, bit 2:C13, bit 3:C14
# National Option index corresponds to (C14 C13 C12)
# Find Row 0 packet # Find Row 0 packet
header_packet = None header_packet = None
@@ -534,36 +535,23 @@ class MainWindow(QMainWindow):
header_packet = p header_packet = p
break break
if header_packet and len(header_packet.data) > 8: if header_packet and len(header_packet.data) >= 8:
try: try:
old_val = decode_hamming_8_4(header_packet.data[8]) # Byte 7 contains C11, C12, C13, C14
# Encoded nibble structure: D1(b0), D2(b1), D3(b2), D4(b3) old_val = decode_hamming_8_4(header_packet.data[7])
# D1 maps to C12
# D2 maps to C13
# D3 maps to C14
# D4 maps to C11
# io.py logic for reading: l0 = (idx >> 0) & 1 # C12
# language = ((c_bits_2 & 1) << 1) | ((c_bits_2 & 2) >> 1) | (c_bits_2 & 4) l1 = (idx >> 1) & 1 # C13
# i.e. Lang Bit 0 comes from D2, Lang Bit 1 comes from D1, Lang Bit 2 comes from D3 l2 = (idx >> 2) & 1 # C14
# So for writing: d1 = (old_val >> 0) & 1 # Preserve C11
# D1 = Lang Bit 1
# D2 = Lang Bit 0
# D3 = Lang Bit 2
l0 = (idx >> 0) & 1
l1 = (idx >> 1) & 1
l2 = (idx >> 2) & 1
d1 = l1
d2 = l0 d2 = l0
d3 = l2 d3 = l1
d4 = (old_val >> 3) & 1 # Preserve C11 d4 = l2
new_val = d1 | (d2 << 1) | (d3 << 2) | (d4 << 3) new_val = d1 | (d2 << 1) | (d3 << 2) | (d4 << 3)
header_packet.data[8] = encode_hamming_8_4(new_val) header_packet.data[7] = encode_hamming_8_4(new_val)
self.set_modified(True) self.set_modified(True)
self.status_label.setText(f"Language set to {self.language_names[idx]} (saved to header).") self.status_label.setText(f"Language set to {self.language_names[idx]} (saved to header).")
except Exception as e: except Exception as e:
@@ -884,9 +872,21 @@ class MainWindow(QMainWindow):
self.subpage_combo.clear() self.subpage_combo.clear()
for i, p in enumerate(pages): for i, p in enumerate(pages):
# Display format: Index or Subcode? # Try to find the clock in Row 0 (last 8 characters)
# Subcode is often 0000. Index 1/N is clearer for editing. clock_str = ""
label = f"{i+1}/{len(pages)} (Sub {p.sub_code:04X})" for pkt in p.packets:
if pkt.row == 0:
# Bytes 32-39 of the 40-byte data are the clock
raw_clock = pkt.data[32:40].decode('latin-1', errors='replace')
# Strip parity from each char and filter non-printables
clock_str = "".join([chr(ord(c) & 0x7F) if 32 <= (ord(c) & 0x7F) <= 126 else " " for c in raw_clock])
break
label = f"{i+1}/{len(pages)} "
if clock_str.strip():
label += f"[{clock_str.strip()}] "
label += f"(Sub {p.sub_code:04X})"
self.subpage_combo.addItem(label, p) self.subpage_combo.addItem(label, p)
self.subpage_combo.blockSignals(False) self.subpage_combo.blockSignals(False)