[BUGFIX] - Fix track filename mismatch issue #1530 (#1594)

* Fix off-by-one track filename bug when moving files after rip (bug #1530)

* updating file name matching logic

* reverting uneeded change

* using authoritative file naming

Authoritative Filename Propagation:

The filename variable is defined as the authoritative source of truth for the destination path (filepathname).
Before executing the HandBrake command, track.filename and track.orig_filename are updated with the authoritative filename.
Database Commit:

The updated values of track.filename are committed to the database before proceeding with the HandBrake command.
Error Handling:

If HandBrake fails, the authoritative name is already saved in the database, ensuring consistency for debugging and recovery.

* Update handbrake.py

* minimized changes

* Remove filename assignment in handbrake.py

Remove assignment of track.filename and track.orig_filename.

* Initial plan

* Fix filename path duplication bug in handbrake_mkv function

Co-authored-by: TJSeit <7307974+TJSeit@users.noreply.github.com>

* Fix filename path duplication in handbrake_main_feature function

Co-authored-by: TJSeit <7307974+TJSeit@users.noreply.github.com>

* Initial plan

* Fix MakeMKV track numbering to be 1-indexed for consistency with HandBrake

Co-authored-by: TJSeit <7307974+TJSeit@users.noreply.github.com>

* Fix logging message - remove incorrect -1 from no_of_titles

Co-authored-by: TJSeit <7307974+TJSeit@users.noreply.github.com>

* Initial plan

* Implement fuzzy file matching for MKV transcoding filename mismatches

Co-authored-by: TJSeit <7307974+TJSeit@users.noreply.github.com>

* Update test file to be standalone and fix flake8 issues

Co-authored-by: TJSeit <7307974+TJSeit@users.noreply.github.com>

* Initial plan

* Fix PEP8 compliance issues in test_ripper_makemkv_track_numbering.py

Co-authored-by: TJSeit <7307974+TJSeit@users.noreply.github.com>

* Initial plan

* Clean up PR: remove track numbering changes, keep file matching fix

Co-authored-by: TJSeit <7307974+TJSeit@users.noreply.github.com>

* Bump version from 2.20.4 to 2.20.5

* Initial plan

* Refactor handbrake.py to reduce code duplication

Co-authored-by: TJSeit <7307974+TJSeit@users.noreply.github.com>

* Further cleanup: remove redundant error logging in handbrake_mkv

Co-authored-by: TJSeit <7307974+TJSeit@users.noreply.github.com>

* Initial plan

* Fix SonarQube code duplication issues in test file

Co-authored-by: TJSeit <7307974+TJSeit@users.noreply.github.com>

* Initial plan

* Fix useless self-assignment in handbrake.py line 134

Co-authored-by: TJSeit <7307974+TJSeit@users.noreply.github.com>

* Resolving SonarQube issues and bumping version

* Removing unnecessary try catch

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: TJSeit <7307974+TJSeit@users.noreply.github.com>
Co-authored-by: 1337-server <sndspamfilter@gmail.com>
This commit is contained in:
TJSeit 2025-11-04 03:42:56 -07:00 committed by GitHub
parent 8e3af93a90
commit 5e9dfd7035
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 306 additions and 65 deletions

View file

@ -15,6 +15,72 @@ from arm.models.job import JobState
PROCESS_COMPLETE = "Handbrake processing complete"
def run_handbrake_command(cmd, track=None, track_number=None):
"""
Execute a HandBrake command and handle errors consistently.
:param cmd: The HandBrake command to execute
:param track: Optional track object to update status
:param track_number: Optional track number for error messages
:return: Output from HandBrake command
:raises subprocess.CalledProcessError: If HandBrake fails
"""
logging.debug(f"Sending command: {cmd}")
try:
hand_brake_output = subprocess.check_output(
cmd,
shell=True
).decode("utf-8")
logging.debug(f"Handbrake exit code: {hand_brake_output}")
if track:
track.status = "success"
return hand_brake_output
except subprocess.CalledProcessError as hb_error:
if track_number:
err = f"Handbrake encoding of title {track_number} failed with code: {hb_error.returncode}" \
f"({hb_error.output})"
else:
err = f"Call to handbrake failed with code: {hb_error.returncode}({hb_error.output})"
logging.error(err)
if track:
track.status = "fail"
track.error = err
raise subprocess.CalledProcessError(hb_error.returncode, cmd)
def build_handbrake_command(srcpath, filepathname, hb_preset, hb_args, logfile,
track_number=None, main_feature=False):
"""
Build a HandBrake command string with consistent formatting.
:param srcpath: Path to source for HB (dvd or files)
:param filepathname: Full output path including filename
:param hb_preset: HandBrake preset to use
:param hb_args: Additional HandBrake arguments
:param logfile: Logfile for HB to redirect output to
:param track_number: Optional track number to encode
:param main_feature: Whether to use --main-feature flag
:return: Formatted command string
"""
cmd = f"nice {cfg.arm_config['HANDBRAKE_CLI']} " \
f"-i {shlex.quote(srcpath)} " \
f"-o {shlex.quote(filepathname)} "
if main_feature:
cmd += "--main-feature "
cmd += f"--preset \"{hb_preset}\" "
if track_number is not None:
cmd += f"-t {track_number} "
cmd += f"{hb_args} " \
f">> {logfile} 2>&1"
return cmd
def handbrake_sleep_check(job):
"""Wait until there is a spot to transcode.
@ -42,7 +108,7 @@ def handbrake_main_feature(srcpath, basepath, logfile, job):
handbrake_sleep_check(job)
logging.info("Starting DVD Movie main_feature processing")
filename = os.path.join(basepath, job.title + "." + cfg.arm_config["DEST_EXT"])
filename = job.title + "." + cfg.arm_config["DEST_EXT"]
filepathname = os.path.join(basepath, filename)
logging.info(f"Ripping title main_feature to {shlex.quote(filepathname)}")
@ -58,28 +124,16 @@ def handbrake_main_feature(srcpath, basepath, logfile, job):
db.session.commit()
hb_args, hb_preset = correct_hb_settings(job)
cmd = f"nice {cfg.arm_config['HANDBRAKE_CLI']} " \
f"-i {shlex.quote(srcpath)} " \
f"-o {shlex.quote(filepathname)} " \
f"--main-feature " \
f"--preset \"{hb_preset}\" " \
f"{hb_args} " \
f">> {logfile} 2>&1"
logging.debug(f"Sending command: {cmd}")
cmd = build_handbrake_command(srcpath, filepathname, hb_preset, hb_args, logfile, main_feature=True)
try:
subprocess.check_output(cmd, shell=True).decode("utf-8")
run_handbrake_command(cmd, track)
logging.info("Handbrake call successful")
track.status = "success"
except subprocess.CalledProcessError as hb_error:
err = f"Call to handbrake failed with code: {hb_error.returncode}({hb_error.output})"
logging.error(err)
track.status = "fail"
track.error = job.errors = err
except subprocess.CalledProcessError:
job.errors = track.error
job.status = JobState.FAILURE.value
db.session.commit()
raise subprocess.CalledProcessError(hb_error.returncode, cmd)
raise
logging.info(PROCESS_COMPLETE)
logging.debug(f"\n\r{job.pretty_table()}")
@ -123,39 +177,21 @@ def handbrake_all(srcpath, basepath, logfile, job):
logging.info(f"Processing track #{track.track_number} of {job.no_of_titles}. "
f"Length is {track.length} seconds.")
filename = f"title_{track.track_number}.{cfg.arm_config['DEST_EXT']}"
filepathname = os.path.join(basepath, filename)
track.filename = track.orig_filename = f"title_{track.track_number}.{cfg.arm_config['DEST_EXT']}"
filepathname = os.path.join(basepath, track.filename)
logging.info(f"Transcoding title {track.track_number} to {shlex.quote(filepathname)}")
track.filename = track.orig_filename = filename
db.session.commit()
cmd = f"nice {cfg.arm_config['HANDBRAKE_CLI']} " \
f"-i {shlex.quote(srcpath)} " \
f"-o {shlex.quote(filepathname)} " \
f"--preset \"{hb_preset}\" " \
f"-t {track.track_number} " \
f"{hb_args} " \
f">> {logfile} 2>&1"
logging.debug(f"Sending command: {cmd}")
cmd = build_handbrake_command(srcpath, filepathname, hb_preset, hb_args, logfile,
track_number=track.track_number)
try:
hand_brake_output = subprocess.check_output(
cmd,
shell=True
).decode("utf-8")
logging.debug(f"Handbrake exit code: {hand_brake_output}")
track.status = "success"
except subprocess.CalledProcessError as hb_error:
err = f"Handbrake encoding of title {track.track_number} failed with code: {hb_error.returncode}" \
f"({hb_error.output})"
logging.error(err)
track.status = "fail"
track.error = err
run_handbrake_command(cmd, track, track.track_number)
except subprocess.CalledProcessError:
db.session.commit()
raise subprocess.CalledProcessError(hb_error.returncode, cmd)
raise
track.ripped = True
db.session.commit()
@ -208,30 +244,13 @@ def handbrake_mkv(srcpath, basepath, logfile, job):
track.filename = destfile + "." + cfg.arm_config["DEST_EXT"]
logging.debug("UPDATED filename: " + track.filename)
db.session.commit()
filename = os.path.join(basepath, destfile + "." + cfg.arm_config["DEST_EXT"])
filename = destfile + "." + cfg.arm_config["DEST_EXT"]
filepathname = os.path.join(basepath, filename)
logging.info(f"Transcoding file {shlex.quote(files)} to {shlex.quote(filepathname)}")
cmd = f'nice {cfg.arm_config["HANDBRAKE_CLI"]} ' \
f'-i {shlex.quote(srcpathname)} ' \
f'-o {shlex.quote(filepathname)} ' \
f'--preset "{hb_preset}" {hb_args} >> {logfile} 2>&1'
logging.debug(f"Sending command: {cmd}")
try:
hand_break_output = subprocess.check_output(
cmd,
shell=True
).decode("utf-8")
logging.debug(f"Handbrake exit code: {hand_break_output}")
except subprocess.CalledProcessError as hb_error:
err = f"Handbrake encoding of file {shlex.quote(files)} failed with code: {hb_error.returncode}" \
f"({hb_error.output})"
logging.error(err)
raise subprocess.CalledProcessError(hb_error.returncode, cmd)
# job.errors.append(f)
cmd = build_handbrake_command(srcpathname, filepathname, hb_preset, hb_args, logfile)
run_handbrake_command(cmd)
logging.info(PROCESS_COMPLETE)
logging.debug(f"\n\r{job.pretty_table()}")

View file

@ -226,6 +226,95 @@ def move_files(base_path, filename, job, is_main_feature=False):
return movie_path
def _calculate_filename_similarity(expected_base, actual_base):
"""
Calculate similarity score between two filenames.
:param str expected_base: Expected filename without extension
:param str actual_base: Actual filename without extension
:return int: Similarity score
"""
score = 0
min_len = min(len(expected_base), len(actual_base))
# Count matching characters from the start
for i in range(min_len):
if expected_base[i] == actual_base[i]:
score += 1
else:
break
# Count matching characters from the end
for i in range(1, min_len + 1):
if expected_base[-i] == actual_base[-i]:
score += 1
else:
break
# Bonus for similar length
length_diff = abs(len(expected_base) - len(actual_base))
if length_diff <= 2: # Within 2 characters difference
score += (3 - length_diff) * 2
return score
def find_matching_file(expected_file):
"""
Find a file that matches the expected filename, handling minor naming discrepancies.
This is particularly useful for MKV files transcoded by HandBrake where the output
filename may differ slightly from what's stored in the database.
:param str expected_file: The full path to the expected file
:return str: The actual file path if found, or the original expected_file if no match
"""
if os.path.isfile(expected_file):
return expected_file
directory = os.path.dirname(expected_file)
expected_filename = os.path.basename(expected_file)
if not os.path.isdir(directory):
return expected_file
expected_base, expected_ext = os.path.splitext(expected_filename)
# Get candidate files with same extension
try:
files_in_dir = [f for f in os.listdir(directory) if os.path.isfile(os.path.join(directory, f))]
except OSError:
return expected_file
candidate_files = []
for file in files_in_dir:
base, ext = os.path.splitext(file)
if ext.lower() == expected_ext.lower():
candidate_files.append((file, base))
if not candidate_files:
return expected_file
# Find best match
best_match = None
best_score = 0
for file, base in candidate_files:
score = _calculate_filename_similarity(expected_base, base)
if score > best_score:
best_score = score
best_match = file
# Use match if similar enough (at least 80% of expected length matched)
min_score = len(expected_base) * 0.8
if best_match and best_score >= min_score:
actual_file = os.path.join(directory, best_match)
if actual_file != expected_file:
logging.info(f"Found similar file '{best_match}' for expected '{expected_filename}' (score: {best_score})")
return actual_file
return expected_file
def move_files_main(old_file, new_file, base_path):
"""
The base function for moving files with logging\n
@ -235,10 +324,13 @@ def move_files_main(old_file, new_file, base_path):
:return: None
"""
if not os.path.isfile(new_file):
# Try to find the file, handling minor naming discrepancies
actual_old_file = find_matching_file(old_file)
try:
shutil.move(old_file, new_file)
shutil.move(actual_old_file, new_file)
except Exception as error:
logging.error(f"Unable to move '{old_file}' to '{base_path}' - Error: {error}")
logging.error(f"Unable to move '{actual_old_file}' to '{base_path}' - Error: {error}")
else:
logging.info(f"File: {new_file} already exists. Not moving.")

View file

@ -0,0 +1,130 @@
import sys
import unittest
import os
import tempfile
import shutil
sys.path.insert(0, '/opt/arm')
from arm.ripper.utils import find_matching_file # noqa: E402
class TestFileMatching(unittest.TestCase):
def setUp(self):
"""Create a temporary directory for testing"""
self.test_dir = tempfile.mkdtemp()
def tearDown(self):
"""Clean up the temporary directory"""
if os.path.exists(self.test_dir):
shutil.rmtree(self.test_dir)
def test_find_matching_file_exact_match(self):
"""Test that exact filename match is returned"""
# Create a test file
test_file = os.path.join(self.test_dir, "TestMovie.mp4")
with open(test_file, 'w') as f:
f.write("test")
result = find_matching_file(test_file)
self.assertEqual(result, test_file)
def test_find_matching_file_off_by_one(self):
"""Test finding file when name is off by one character (FiveArmies case)"""
# Expected file has one extra character
expected_file = os.path.join(self.test_dir, "FiveArmiess.mp4")
# Actual file is missing one 's'
actual_file = os.path.join(self.test_dir, "FiveArmies.mp4")
with open(actual_file, 'w') as f:
f.write("test")
result = find_matching_file(expected_file)
self.assertEqual(result, actual_file)
def test_find_matching_file_off_by_two(self):
"""Test finding file when name is off by two characters (SpiritedAway case)"""
# Expected file has two extra characters
expected_file = os.path.join(self.test_dir, "SpiritedAwayy.mp4")
# Actual file is missing two 'y's
actual_file = os.path.join(self.test_dir, "SpiritedAway.mp4")
with open(actual_file, 'w') as f:
f.write("test")
result = find_matching_file(expected_file)
self.assertEqual(result, actual_file)
def test_find_matching_file_no_match(self):
"""Test that original file is returned when no similar file exists"""
expected_file = os.path.join(self.test_dir, "NonExistent.mp4")
result = find_matching_file(expected_file)
self.assertEqual(result, expected_file)
def test_find_matching_file_multiple_candidates(self):
"""Test that the best match is selected when multiple similar files exist"""
expected_file = os.path.join(self.test_dir, "Movie123.mp4")
# Create multiple files
close_match = os.path.join(self.test_dir, "Movie12.mp4")
far_match = os.path.join(self.test_dir, "Movie1.mp4")
with open(close_match, 'w') as f:
f.write("test")
with open(far_match, 'w') as f:
f.write("test")
result = find_matching_file(expected_file)
# Should return the closer match
self.assertEqual(result, close_match)
def test_find_matching_file_different_extension(self):
"""Test that files with different extensions are not matched"""
expected_file = os.path.join(self.test_dir, "TestMovie.mp4")
wrong_ext_file = os.path.join(self.test_dir, "TestMovie.mkv")
with open(wrong_ext_file, 'w') as f:
f.write("test")
result = find_matching_file(expected_file)
# Should not match files with different extensions
self.assertEqual(result, expected_file)
def test_find_matching_file_nonexistent_directory(self):
"""Test handling of non-existent directory"""
expected_file = os.path.join(self.test_dir, "nonexistent_dir", "TestMovie.mp4")
result = find_matching_file(expected_file)
self.assertEqual(result, expected_file)
def test_fuzzy_match_with_file_move(self):
"""Test fuzzy matching in a simulated file move scenario"""
# Create source file with slightly different name
actual_source = os.path.join(self.test_dir, "SourceMovie.mp4")
expected_source = os.path.join(self.test_dir, "SourceMoviee.mp4")
destination = os.path.join(self.test_dir, "dest", "FinalMovie.mp4")
# Create the source file
with open(actual_source, 'w') as f:
f.write("test content")
# Create destination directory
os.makedirs(os.path.dirname(destination), exist_ok=True)
# Find the file with fuzzy matching
found_file = find_matching_file(expected_source)
# Verify it found the right file
self.assertEqual(found_file, actual_source)
# Simulate the move
shutil.move(found_file, destination)
# Verify the file was moved
self.assertTrue(os.path.isfile(destination))
self.assertFalse(os.path.isfile(actual_source))
if __name__ == '__main__':
unittest.main()