2025-12-28 21:38:21 +01:00
|
|
|
import os
|
|
|
|
|
from typing import List
|
|
|
|
|
from .models import Packet, Page, TeletextService
|
|
|
|
|
|
|
|
|
|
def load_t42(file_path: str) -> TeletextService:
|
|
|
|
|
service = TeletextService()
|
|
|
|
|
|
|
|
|
|
with open(file_path, 'rb') as f:
|
|
|
|
|
while True:
|
|
|
|
|
chunk = f.read(42)
|
|
|
|
|
if not chunk:
|
|
|
|
|
break
|
|
|
|
|
if len(chunk) < 42:
|
|
|
|
|
# Should not happen in a valid T42 stream, or we just ignore incomplete tail
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
packet = Packet(chunk)
|
|
|
|
|
service.all_packets.append(packet)
|
|
|
|
|
|
|
|
|
|
# Logic to group into pages.
|
|
|
|
|
# This is non-trivial because packets for a page might be interleaved or sequential.
|
|
|
|
|
# Standard implementation: Packets arrive in order. Row 0 starts a new page/subpage.
|
|
|
|
|
|
|
|
|
|
if packet.row == 0:
|
|
|
|
|
# Start of a new page header.
|
|
|
|
|
# Byte 2-9 of header contain Page Number, Subcode, Control bits etc.
|
|
|
|
|
# We need to parse the header to identify the page.
|
|
|
|
|
|
|
|
|
|
# Header format (after Mag/Row):
|
|
|
|
|
# Bytes: P1 P2 S1 S2 S3 S4 C1 C2 ...
|
|
|
|
|
# All Hamming 8/4 encoded.
|
|
|
|
|
|
|
|
|
|
# For now, let's just create a new page entry for every Header we see,
|
|
|
|
|
# or find the existing one if we want to support updates (but T42 usually is a stream capture).
|
|
|
|
|
# If it's an editor file, it's likely sequential.
|
|
|
|
|
|
|
|
|
|
p_num, sub_code = parse_header(packet.data)
|
|
|
|
|
|
|
|
|
|
# Create new page
|
|
|
|
|
new_page = Page(magazine=packet.magazine, page_number=p_num, sub_code=sub_code)
|
|
|
|
|
new_page.packets.append(packet)
|
|
|
|
|
service.pages.append(new_page)
|
|
|
|
|
else:
|
|
|
|
|
# Add to the "current" page of this magazine.
|
|
|
|
|
# We need to track the current active page for each magazine.
|
|
|
|
|
# A simplistic approach: add to the last page added that matches the magazine ??
|
|
|
|
|
# Robust approach: Maintain a dict of current_pages_by_magazine.
|
|
|
|
|
|
|
|
|
|
# Let's find the last page in service that matches the packet's magazine
|
|
|
|
|
# This is O(N) but N (pages) is small.
|
|
|
|
|
target_page = None
|
|
|
|
|
for p in reversed(service.pages):
|
|
|
|
|
if p.magazine == packet.magazine:
|
|
|
|
|
target_page = p
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
if target_page:
|
|
|
|
|
target_page.packets.append(packet)
|
|
|
|
|
else:
|
|
|
|
|
# Packet without a header? Orphaned. Just keep in all_packets
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
return service
|
|
|
|
|
|
2025-12-28 21:57:44 +01:00
|
|
|
|
|
|
|
|
def encode_hamming_8_4(value):
|
|
|
|
|
# Value is 4 bits (0-15)
|
|
|
|
|
d1 = (value >> 0) & 1
|
|
|
|
|
d2 = (value >> 1) & 1
|
|
|
|
|
d3 = (value >> 2) & 1
|
|
|
|
|
d4 = (value >> 3) & 1
|
|
|
|
|
|
|
|
|
|
# Parity bits (Odd parity default? Or standard Hamming?)
|
|
|
|
|
# Teletext spec:
|
|
|
|
|
# P1 = 1 + D1 + D2 + D4 (mod 2) -> Inverse of even parity check?
|
|
|
|
|
# Actually, simpler to look up or calculate.
|
|
|
|
|
# Let's match typical implementation:
|
|
|
|
|
# P1 (b0) covers 1,3,7 (D1, D2, D4)
|
|
|
|
|
# P2 (b2) covers 1,5,7 (D1, D3, D4)
|
|
|
|
|
# P3 (b4) covers 3,5,7 (D2, D3, D4)
|
|
|
|
|
# P4 (b6) covers all.
|
|
|
|
|
# Teletext uses ODD parity for the hamming bits usually?
|
|
|
|
|
# "Hamming 8/4 with odd parity"
|
|
|
|
|
|
|
|
|
|
p1 = 1 ^ d1 ^ d2 ^ d4
|
|
|
|
|
p2 = 1 ^ d1 ^ d3 ^ d4
|
|
|
|
|
p3 = 1 ^ d2 ^ d3 ^ d4
|
|
|
|
|
|
|
|
|
|
res = (p1 << 0) | (d1 << 1) | \
|
|
|
|
|
(p2 << 2) | (d2 << 3) | \
|
|
|
|
|
(p3 << 4) | (d3 << 5) | \
|
|
|
|
|
(d4 << 7)
|
|
|
|
|
|
|
|
|
|
# P4 (bit 6) makes total bits odd
|
|
|
|
|
# Count set bits so far
|
|
|
|
|
set_bits = bin(res).count('1')
|
|
|
|
|
p4 = 1 if (set_bits % 2 == 0) else 0
|
|
|
|
|
|
|
|
|
|
res |= (p4 << 6)
|
|
|
|
|
|
|
|
|
|
return res
|
|
|
|
|
|
2025-12-30 20:41:47 +01:00
|
|
|
|
|
|
|
|
def decode_packet_header(b1, b2):
|
|
|
|
|
"""
|
|
|
|
|
Decodes the Magazine and Row from the first 2 bytes of a T42 packet.
|
|
|
|
|
"""
|
|
|
|
|
d1 = decode_hamming_8_4(b1)
|
|
|
|
|
d2 = decode_hamming_8_4(b2)
|
|
|
|
|
|
|
|
|
|
mag = (d1 & 0b0111)
|
|
|
|
|
if mag == 0: mag = 8
|
|
|
|
|
|
|
|
|
|
row = (d2 << 1) | ((d1 >> 3) & 1)
|
|
|
|
|
return mag, row
|
|
|
|
|
|
2025-12-28 21:38:21 +01:00
|
|
|
def save_t42(file_path: str, service: TeletextService):
|
|
|
|
|
with open(file_path, 'wb') as f:
|
|
|
|
|
for packet in service.all_packets:
|
2025-12-30 20:41:47 +01:00
|
|
|
# Check if we can reuse the original header (preserving parity/integrity)
|
|
|
|
|
use_original_header = False
|
2025-12-28 21:38:21 +01:00
|
|
|
|
2025-12-30 20:41:47 +01:00
|
|
|
if hasattr(packet, 'original_data') and len(packet.original_data) >= 2:
|
|
|
|
|
# Try to decode the original header
|
|
|
|
|
try:
|
|
|
|
|
orig_mag, orig_row = decode_packet_header(packet.original_data[0], packet.original_data[1])
|
|
|
|
|
if orig_mag == packet.magazine and orig_row == packet.row:
|
|
|
|
|
use_original_header = True
|
|
|
|
|
except:
|
|
|
|
|
pass
|
2025-12-28 21:38:21 +01:00
|
|
|
|
2025-12-30 20:41:47 +01:00
|
|
|
if use_original_header:
|
|
|
|
|
header = packet.original_data[:2]
|
|
|
|
|
else:
|
|
|
|
|
# Reconstruct header bytes
|
|
|
|
|
mag = packet.magazine
|
|
|
|
|
if mag == 8: mag = 0 # 0 encoded as 8
|
|
|
|
|
|
|
|
|
|
# Bits:
|
|
|
|
|
# B1 data: M1(0) M2(1) M3(2) R1(3)
|
|
|
|
|
m1 = (mag >> 0) & 1
|
|
|
|
|
m2 = (mag >> 1) & 1
|
|
|
|
|
m3 = (mag >> 2) & 1
|
|
|
|
|
r1 = (packet.row >> 0) & 1
|
|
|
|
|
|
|
|
|
|
b1_val = m1 | (m2 << 1) | (m3 << 2) | (r1 << 3)
|
|
|
|
|
b1_enc = encode_hamming_8_4(b1_val)
|
|
|
|
|
|
|
|
|
|
# B2 data: R2(0) R3(1) R4(2) R5(3)
|
|
|
|
|
r2 = (packet.row >> 1) & 1
|
|
|
|
|
r3 = (packet.row >> 2) & 1
|
|
|
|
|
r4 = (packet.row >> 3) & 1
|
|
|
|
|
r5 = (packet.row >> 4) & 1
|
|
|
|
|
|
|
|
|
|
b2_val = r2 | (r3 << 1) | (r4 << 2) | (r5 << 3)
|
|
|
|
|
b2_enc = encode_hamming_8_4(b2_val)
|
|
|
|
|
|
|
|
|
|
header = bytes([b1_enc, b2_enc])
|
|
|
|
|
|
2025-12-28 21:38:21 +01:00
|
|
|
f.write(header + packet.data)
|
|
|
|
|
|
|
|
|
|
def decode_hamming_8_4(byte_val):
|
|
|
|
|
return ((byte_val >> 1) & 1) | \
|
|
|
|
|
(((byte_val >> 3) & 1) << 1) | \
|
|
|
|
|
(((byte_val >> 5) & 1) << 2) | \
|
|
|
|
|
(((byte_val >> 7) & 1) << 3)
|
|
|
|
|
|
|
|
|
|
def parse_header(data: bytearray):
|
|
|
|
|
# Data is 40 bytes.
|
|
|
|
|
# Bytes 0-7 are Page Num (2), Subcode (4), Control (2) - ALL Hamming encoded.
|
|
|
|
|
|
|
|
|
|
# 0: Page Units (PU)
|
|
|
|
|
# 1: Page Tens (PT)
|
|
|
|
|
|
|
|
|
|
pu = decode_hamming_8_4(data[0])
|
|
|
|
|
pt = decode_hamming_8_4(data[1])
|
|
|
|
|
|
|
|
|
|
page_num = (pt & 0xF) * 10 + (pu & 0xF)
|
|
|
|
|
|
|
|
|
|
# Subcode: S1, S2, S3, S4
|
|
|
|
|
# S1 (low), S2, S3, S4 (high)
|
|
|
|
|
|
|
|
|
|
s1 = decode_hamming_8_4(data[2])
|
|
|
|
|
s2 = decode_hamming_8_4(data[3])
|
|
|
|
|
s3 = decode_hamming_8_4(data[4])
|
|
|
|
|
s4 = decode_hamming_8_4(data[5])
|
|
|
|
|
|
|
|
|
|
# Subcode logic is a bit complex with specific bit mapping for "Time" vs "Subcode"
|
|
|
|
|
# But usually just combining them gives the raw subcode value.
|
|
|
|
|
# S1: bits 0-3
|
|
|
|
|
# S2: bits 4-6 (bit 4 is C4) -> actually S2 has 3 bits of subcode + 1 control bit usually?
|
|
|
|
|
# Let's simplify and just concat them for a unique identifier.
|
|
|
|
|
|
|
|
|
|
sub_code = s1 | (s2 << 4) | (s3 << 8) | (s4 << 12)
|
|
|
|
|
|
|
|
|
|
return page_num, sub_code
|