Files
decode-gui/vhs_decode_gui.py
Daniel Dybing 9f15ad3348
Some checks failed
Build Binaries / build-linux (push) Successful in 1m36s
Build Binaries / build-windows (push) Has been cancelled
Add build workflow and requirements
2026-01-13 16:35:58 +01:00

507 lines
22 KiB
Python

import tkinter as tk
from tkinter import ttk, filedialog, messagebox
import subprocess
import threading
import queue
import os
class VHSDecodeGUI:
def __init__(self, master):
self.master = master
master.title("VHS Decode GUI")
master.geometry("800x700")
self.process = None
self.log_queue = queue.Queue()
self.job_queue = []
self.is_queue_running = False
self.stop_queue_requested = False
self.create_widgets()
self.master.after(100, self.poll_log_queue) # Start polling the log queue
def create_widgets(self):
# Configure grid for responsiveness
self.master.columnconfigure(0, weight=1)
self.master.rowconfigure(5, weight=1)
# Input File Selection
input_frame = ttk.LabelFrame(self.master, text="Input File")
input_frame.grid(row=0, column=0, sticky="ew", padx=10, pady=5)
input_frame.columnconfigure(0, weight=1)
self.input_file_path = tk.StringVar()
self.input_entry = ttk.Entry(input_frame, textvariable=self.input_file_path)
self.input_entry.grid(row=0, column=0, sticky="ew", padx=5, pady=5)
self.input_button = ttk.Button(input_frame, text="Browse", command=self.browse_input_file)
self.input_button.grid(row=0, column=1, sticky="e", padx=5, pady=5)
# Output File Selection
output_frame = ttk.LabelFrame(self.master, text="Output File (Base Name)")
output_frame.grid(row=1, column=0, sticky="ew", padx=10, pady=5)
output_frame.columnconfigure(0, weight=1)
self.output_file_path = tk.StringVar()
self.output_entry = ttk.Entry(output_frame, textvariable=self.output_file_path)
self.output_entry.grid(row=0, column=0, sticky="ew", padx=5, pady=5)
self.output_button = ttk.Button(output_frame, text="Browse", command=self.browse_output_file)
self.output_button.grid(row=0, column=1, sticky="e", padx=5, pady=5)
# Core Settings
settings_frame = ttk.LabelFrame(self.master, text="Core Settings")
settings_frame.grid(row=2, column=0, sticky="ew", padx=10, pady=5)
settings_frame.columnconfigure(1, weight=1)
# System
ttk.Label(settings_frame, text="Video System:").grid(row=0, column=0, sticky="w", padx=5, pady=2)
self.system_var = tk.StringVar(value="PAL")
system_options = ["PAL", "NTSC", "PAL-M", "NTSC-J"]
self.system_menu = ttk.OptionMenu(settings_frame, self.system_var, "PAL", *system_options)
self.system_menu.grid(row=0, column=1, sticky="ew", padx=5, pady=2)
# Tape Format
ttk.Label(settings_frame, text="Tape Format:").grid(row=1, column=0, sticky="w", padx=5, pady=2)
self.tape_format_var = tk.StringVar(value="VHS")
tape_formats = ["VHS", "SVHS", "HI8", "BETAMAX", "U-MATIC", "VIDEO2000", "UMATIC_HI", "VCR_LP", "VCR", "EIAJ", "VHSHQ", "VIDEO8", "BETAMAX_HIFI", "SVHS_ET", "QUADRUPLEX", "TYPEB", "SUPERBETA", "TYPEC"]
self.tape_format_menu = ttk.OptionMenu(settings_frame, self.tape_format_var, *tape_formats)
self.tape_format_menu.grid(row=1, column=1, sticky="ew", padx=5, pady=2)
# Tape Speed
ttk.Label(settings_frame, text="Tape Speed:").grid(row=2, column=0, sticky="w", padx=5, pady=2)
self.tape_speed_var = tk.StringVar(value="SP")
tape_speeds = ["SP", "LP", "SLP", "EP", "VP"]
self.tape_speed_menu = ttk.OptionMenu(settings_frame, self.tape_speed_var, *tape_speeds)
self.tape_speed_menu.grid(row=2, column=1, sticky="ew", padx=5, pady=2)
# Frequency
ttk.Label(settings_frame, text="RF Frequency (MHz):").grid(row=3, column=0, sticky="w", padx=5, pady=2)
self.frequency_var = tk.StringVar(value="40")
self.frequency_entry = ttk.Entry(settings_frame, textvariable=self.frequency_var)
self.frequency_entry.grid(row=3, column=1, sticky="ew", padx=5, pady=2)
self.cxadc_var = tk.BooleanVar(value=False)
self.cxadc_check = ttk.Checkbutton(settings_frame, text="Use CXADC frequency (~28.63 MHz)", variable=self.cxadc_var)
self.cxadc_check.grid(row=4, column=0, columnspan=2, sticky="w", padx=5, pady=2)
# Advanced Settings
advanced_frame = ttk.LabelFrame(self.master, text="Advanced Settings")
advanced_frame.grid(row=3, column=0, sticky="ew", padx=10, pady=5)
advanced_frame.columnconfigure(0, weight=1)
ttk.Label(advanced_frame, text="Threads:").grid(row=0, column=0, sticky="w", padx=5, pady=2)
self.threads_var = tk.IntVar(value=os.cpu_count() or 1)
self.threads_spinbox = ttk.Spinbox(advanced_frame, from_=1, to=os.cpu_count() * 2 or 16, textvariable=self.threads_var, width=5)
self.threads_spinbox.grid(row=0, column=1, sticky="ew", padx=5, pady=2)
ttk.Label(advanced_frame, text="Start Frame:").grid(row=1, column=0, sticky="w", padx=5, pady=2)
self.start_frame_var = tk.IntVar(value=0)
self.start_frame_entry = ttk.Entry(advanced_frame, textvariable=self.start_frame_var, width=10)
self.start_frame_entry.grid(row=1, column=1, sticky="ew", padx=5, pady=2)
ttk.Label(advanced_frame, text="Length (frames):").grid(row=2, column=0, sticky="w", padx=5, pady=2)
self.length_var = tk.IntVar(value=0)
self.length_entry = ttk.Entry(advanced_frame, textvariable=self.length_var, width=10)
self.length_entry.grid(row=2, column=1, sticky="ew", padx=5, pady=2)
self.overwrite_var = tk.BooleanVar(value=False)
self.overwrite_check = ttk.Checkbutton(advanced_frame, text="Overwrite existing files", variable=self.overwrite_var)
self.overwrite_check.grid(row=3, column=0, columnspan=2, sticky="w", padx=5, pady=2)
self.chroma_trap_var = tk.BooleanVar(value=False)
self.chroma_trap_check = ttk.Checkbutton(advanced_frame, text="Enable Chroma Trap", variable=self.chroma_trap_var)
self.chroma_trap_check.grid(row=4, column=0, columnspan=2, sticky="w", padx=5, pady=2)
self.debug_var = tk.BooleanVar(value=False)
self.debug_check = ttk.Checkbutton(advanced_frame, text="Enable Debug Logging", variable=self.debug_var)
self.debug_check.grid(row=5, column=0, columnspan=2, sticky="w", padx=5, pady=2)
self.ire0_adjust_var = tk.BooleanVar(value=False)
self.ire0_adjust_check = ttk.Checkbutton(advanced_frame, text="ire0_adjust", variable=self.ire0_adjust_var)
self.ire0_adjust_check.grid(row=6, column=0, columnspan=2, sticky="w", padx=5, pady=2)
self.recheck_phase_var = tk.BooleanVar(value=False)
self.recheck_phase_check = ttk.Checkbutton(advanced_frame, text="recheck_phase", variable=self.recheck_phase_var)
self.recheck_phase_check.grid(row=7, column=0, columnspan=2, sticky="w", padx=5, pady=2)
# Control Buttons
button_frame = ttk.Frame(self.master)
button_frame.grid(row=4, column=0, sticky="ew", padx=10, pady=5)
button_frame.columnconfigure(0, weight=1)
button_frame.columnconfigure(1, weight=1)
button_frame.columnconfigure(2, weight=1)
self.start_button = ttk.Button(button_frame, text="Start Decode Now", command=self.run_current_job)
self.start_button.grid(row=0, column=0, sticky="ew", padx=5, pady=5)
self.add_queue_button = ttk.Button(button_frame, text="Add to Queue", command=self.add_to_queue)
self.add_queue_button.grid(row=0, column=1, sticky="ew", padx=5, pady=5)
self.stop_button = ttk.Button(button_frame, text="Stop", command=self.stop_decode, state=tk.DISABLED)
self.stop_button.grid(row=0, column=2, sticky="ew", padx=5, pady=5)
# Bottom Area: Notebook for Queue and Log
self.notebook = ttk.Notebook(self.master)
self.notebook.grid(row=5, column=0, sticky="nsew", padx=10, pady=5)
# Queue Tab
queue_frame = ttk.Frame(self.notebook)
self.notebook.add(queue_frame, text="Queue")
queue_frame.columnconfigure(0, weight=1)
queue_frame.rowconfigure(0, weight=1)
# Queue Treeview
columns = ("status", "input", "output", "system", "format")
self.queue_tree = ttk.Treeview(queue_frame, columns=columns, show="headings", height=8)
self.queue_tree.heading("status", text="Status")
self.queue_tree.heading("input", text="Input File")
self.queue_tree.heading("output", text="Output Base")
self.queue_tree.heading("system", text="System")
self.queue_tree.heading("format", text="Format")
self.queue_tree.column("status", width=80)
self.queue_tree.column("input", width=200)
self.queue_tree.column("output", width=150)
self.queue_tree.column("system", width=60)
self.queue_tree.column("format", width=60)
self.queue_tree.grid(row=0, column=0, sticky="nsew", padx=5, pady=5)
queue_scrollbar = ttk.Scrollbar(queue_frame, orient="vertical", command=self.queue_tree.yview)
queue_scrollbar.grid(row=0, column=1, sticky="ns")
self.queue_tree.configure(yscrollcommand=queue_scrollbar.set)
# Queue Controls
queue_controls = ttk.Frame(queue_frame)
queue_controls.grid(row=1, column=0, columnspan=2, sticky="ew", padx=5, pady=5)
self.start_queue_button = ttk.Button(queue_controls, text="Start Queue", command=self.start_queue)
self.start_queue_button.pack(side=tk.LEFT, padx=5)
self.remove_queue_button = ttk.Button(queue_controls, text="Remove Selected", command=self.remove_from_queue)
self.remove_queue_button.pack(side=tk.LEFT, padx=5)
self.clear_queue_button = ttk.Button(queue_controls, text="Clear Queue", command=self.clear_queue)
self.clear_queue_button.pack(side=tk.LEFT, padx=5)
# Log Tab
log_frame = ttk.Frame(self.notebook)
self.notebook.add(log_frame, text="Log")
log_frame.columnconfigure(0, weight=1)
log_frame.rowconfigure(0, weight=1)
self.log_text = tk.Text(log_frame, state=tk.DISABLED, wrap=tk.WORD, height=10)
self.log_text.grid(row=0, column=0, sticky="nsew", padx=5, pady=5)
log_scrollbar = ttk.Scrollbar(log_frame, command=self.log_text.yview)
log_scrollbar.grid(row=0, column=1, sticky="ns")
self.log_text['yscrollcommand'] = log_scrollbar.set
def _linux_file_dialog(self, title, filetypes):
# Construct zenity command
cmd = ["zenity", "--file-selection", f"--title={title}"]
# Add file filters
# filetypes is a list of tuples: [("Name", "*.ext *.ext2"), ...]
for name, pattern in filetypes:
# Zenity format: --file-filter=Name | *.ext *.ext2
filter_str = f"{name} | {pattern}"
cmd.append(f"--file-filter={filter_str}")
try:
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if result.returncode == 0:
return result.stdout.strip()
except FileNotFoundError:
return None
return None
def _linux_dir_dialog(self, title, initialdir=None):
cmd = ["zenity", "--file-selection", "--directory", f"--title={title}"]
try:
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if result.returncode == 0:
return result.stdout.strip()
except FileNotFoundError:
return None
return None
def browse_input_file(self):
if os.name != 'nt' and os.path.exists("/usr/bin/zenity"):
file_path = self._linux_file_dialog(
title="Select Input RF File",
filetypes=[("RF Files", "*.u8 *.u16"), ("All Files", "*")]
)
else:
file_path = filedialog.askopenfilename(
title="Select Input RF File",
filetypes=[("RF Files", "*.u8 *.u16"), ("All Files", "*.*")]
)
if file_path:
self.input_file_path.set(file_path)
def browse_output_file(self):
initial_dir = os.path.dirname(self.input_file_path.get()) if self.input_file_path.get() else ""
initial_file = os.path.splitext(os.path.basename(self.input_file_path.get()))[0] if self.input_file_path.get() else "output"
if os.name != 'nt' and os.path.exists("/usr/bin/zenity"):
dir_path = self._linux_dir_dialog(title="Select Output Directory", initialdir=initial_dir)
else:
dir_path = filedialog.askdirectory(title="Select Output Directory", initialdir=initial_dir)
if dir_path:
self.output_file_path.set(os.path.join(dir_path, initial_file))
def get_command_args(self):
input_file = self.input_file_path.get()
output_file = self.output_file_path.get()
if not input_file:
return None, "Please select an input file."
if not output_file:
return None, "Please specify an output file base name."
command = []
command.append(input_file)
command.append(output_file)
system_selection = self.system_var.get()
if system_selection == "NTSC":
command.append("-n")
elif system_selection == "PAL":
command.append("-p")
elif system_selection == "PAL-M":
command.append("--pm")
elif system_selection == "NTSC-J":
command.append("--NTSCJ")
command.extend(["--tf", self.tape_format_var.get()])
command.extend(["--ts", self.tape_speed_var.get()])
if self.cxadc_var.get():
command.append("--cxadc")
else:
try:
freq_val = float(self.frequency_var.get())
if freq_val > 0:
command.extend(["-f", self.frequency_var.get()])
except ValueError:
pass # Ignore invalid frequency, let default handle or it was already warned?
# Ideally validation happens before calling this.
threads = self.threads_var.get()
if threads > 0:
command.extend(["-t", str(threads)])
start_frame = self.start_frame_var.get()
if start_frame > 0:
command.extend(["-s", str(start_frame)])
length = self.length_var.get()
if length > 0:
command.extend(["-l", str(length)])
if self.overwrite_var.get():
command.append("--overwrite")
if self.chroma_trap_var.get():
command.append("--ct")
if self.debug_var.get():
command.append("--debug")
if self.ire0_adjust_var.get():
command.append("--ire0_adjust")
if self.recheck_phase_var.get():
command.append("--recheck_phase")
job_details = {
'input': input_file,
'output': output_file,
'system': system_selection,
'format': self.tape_format_var.get(),
'command': command
}
return job_details, None
def add_to_queue(self):
job_details, error = self.get_command_args()
if error:
messagebox.showerror("Error", error)
return
self.job_queue.append(job_details)
self.queue_tree.insert("", tk.END, values=("Pending", job_details['input'], job_details['output'], job_details['system'], job_details['format']))
self.notebook.select(0) # Switch to queue tab
def remove_from_queue(self):
selected_item = self.queue_tree.selection()
if not selected_item:
return
# We need to map treeview item to list index.
# Since we append to both, indices should match.
# However, if we remove from middle, we need to be careful.
# It's better to just rebuild the list or store the ID.
# Simple approach: Iterate backwards to avoid index shifting issues if multiple selected?
# Treeview selection returns tuple of IDs.
# Let's find the index of the selected item
for item in selected_item:
index = self.queue_tree.index(item)
del self.job_queue[index]
self.queue_tree.delete(item)
def clear_queue(self):
self.job_queue = []
self.queue_tree.delete(*self.queue_tree.get_children())
def run_current_job(self):
job_details, error = self.get_command_args()
if error:
messagebox.showerror("Error", error)
return
self.notebook.select(1) # Switch to Log tab
threading.Thread(target=self._run_single_job_thread, args=(job_details['command'],)).start()
def start_queue(self):
if self.is_queue_running:
return
if not self.job_queue:
messagebox.showinfo("Queue", "Queue is empty.")
return
self.is_queue_running = True
self.stop_queue_requested = False
self.start_queue_button.config(state=tk.DISABLED)
self.remove_queue_button.config(state=tk.DISABLED)
self.clear_queue_button.config(state=tk.DISABLED)
self.add_queue_button.config(state=tk.DISABLED)
self.start_button.config(state=tk.DISABLED)
self.notebook.select(1) # Switch to Log tab
threading.Thread(target=self._run_queue_thread).start()
def _run_subprocess(self, command):
if os.name == 'nt':
cli_command = "decode"
else:
cli_command = "vhs-decode"
full_command = [cli_command] + command
try:
self.process = subprocess.Popen(
full_command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1,
universal_newlines=True
)
self.stop_button.config(state=tk.NORMAL)
for line in self.process.stdout:
self.log_queue.put(line)
for line in self.process.stderr:
self.log_queue.put(line)
self.process.wait()
return self.process.returncode
except FileNotFoundError:
self.log_queue.put(f"Error: '{cli_command}' command not found.\n")
return -1
except Exception as e:
self.log_queue.put(f"An unexpected error occurred: {e}\n")
return -1
finally:
self.process = None
self.master.after(0, lambda: self.stop_button.config(state=tk.DISABLED))
def _run_single_job_thread(self, command):
self.start_button.config(state=tk.DISABLED)
self.log_queue.put("--- Starting Decode Process ---\n")
self._run_subprocess(command)
self.log_queue.put("\n--- Process Finished ---\n")
self.master.after(0, lambda: self.start_button.config(state=tk.NORMAL))
def _run_queue_thread(self):
# Iterate through a copy or just index?
# We want to process items that are "Pending".
items = self.queue_tree.get_children()
for index, item_id in enumerate(items):
if self.stop_queue_requested:
self.log_queue.put("\n--- Queue Execution Stopped by User ---\n")
break
# Check status
values = self.queue_tree.item(item_id, "values")
if values[0] == "Completed":
continue
# Update status to Running
new_values = ("Running",) + values[1:]
self.master.after(0, lambda i=item_id, v=new_values: self.queue_tree.item(i, values=v))
# Get command
job = self.job_queue[index]
self.log_queue.put(f"\n--- Starting Job {index + 1}/{len(items)}: {job['input']} ---\n")
# Run
return_code = self._run_subprocess(job['command'])
# Update Status
status = "Completed" if return_code == 0 else "Failed"
final_values = (status,) + values[1:]
self.master.after(0, lambda i=item_id, v=final_values: self.queue_tree.item(i, values=v))
if return_code != 0:
self.log_queue.put(f"Job failed with code {return_code}. Continuing queue...\n")
# Optionally break? For now continue.
self.is_queue_running = False
self.master.after(0, lambda: self.start_queue_button.config(state=tk.NORMAL))
self.master.after(0, lambda: self.remove_queue_button.config(state=tk.NORMAL))
self.master.after(0, lambda: self.clear_queue_button.config(state=tk.NORMAL))
self.master.after(0, lambda: self.add_queue_button.config(state=tk.NORMAL))
self.master.after(0, lambda: self.start_button.config(state=tk.NORMAL))
def stop_decode(self):
if self.is_queue_running:
self.stop_queue_requested = True
if self.process and self.process.poll() is None:
self.log_queue.put("\n--- Attempting to terminate process... ---\n")
try:
os.killpg(os.getpgid(self.process.pid), subprocess.SIGTERM)
self.process.wait(timeout=5)
self.log_queue.put("--- Process terminated. ---\n")
except Exception as e:
if self.process:
self.process.kill()
self.log_queue.put(f"Error terminating process: {e}. Force killing...\n")
else:
self.log_queue.put("\n--- No active process to stop. ---\n")
def poll_log_queue(self):
while not self.log_queue.empty():
try:
line = self.log_queue.get_nowait()
self.log_text.config(state=tk.NORMAL)
self.log_text.insert(tk.END, line)
self.log_text.see(tk.END)
self.log_text.config(state=tk.DISABLED)
except queue.Empty:
pass
self.master.after(100, self.poll_log_queue)
def main():
root = tk.Tk()
app = VHSDecodeGUI(root)
root.mainloop()
if __name__ == "__main__":
main()