From 29bd179dfebf19b454923fa3f146f27a02031768 Mon Sep 17 00:00:00 2001 From: Nathan G Date: Fri, 27 Sep 2024 16:34:11 -0700 Subject: [PATCH 1/3] - Fixed GPU benchmark tests with ffmpeg erroring. - Added more comprehesive and in-depth ffmpeg error logging - Fixed MaxStreams for nvidia gpu's always being 1 after a test was finished - Added better Nvidia driver limit checks with a now persistant known limit of the gpu driver across tests. - Fixed some wierd cuda stuff - Changed: Uses ffprobe now for checking the files Possible issues: On my machine the cpu likes to jump from 1 worker to 9 workers, no clue if it's just really excited to be benchmarking or if it's just bugged. --- pytab/core.py | 187 +++++++++++++++++++++++++++++++----------------- pytab/worker.py | 74 ++++++++++++++----- 2 files changed, 181 insertions(+), 80 deletions(-) diff --git a/pytab/core.py b/pytab/core.py index 7abfa0c..d14b880 100644 --- a/pytab/core.py +++ b/pytab/core.py @@ -124,87 +124,131 @@ def unpackArchive(archive_path, target_path): click.echo(" success!") -def benchmark(ffmpeg_cmd: str, debug_flag: bool, prog_bar) -> tuple: +def benchmark(ffmpeg_cmd: str, debug_flag: bool, prog_bar, is_nvidia_gpu: bool, gpu_idx: int) -> tuple: runs = [] total_workers = 1 run = True - last_speed = -0.5 # to Assure first worker always has the required difference + last_speed = -0.5 # To ensure that the first worker always has the required difference formatted_last_speed = "00.00" failure_reason = [] if debug_flag: click.echo(f"> > > > Workers: {total_workers}, Last Speed: {last_speed}") while run: - if not debug_flag: - prog_bar.label = f"Testing | Workers: {total_workers:02d} | Last Speed: {formatted_last_speed}" - prog_bar.render_progress() - output = worker.workMan(total_workers, ffmpeg_cmd) - # First check if we continue Running: - # Stop when first run failed - if output[0] and total_workers == 1: - run = False - failure_reason.append(output[1]) - # When run after scaleback succeded: - elif (last_speed < 1 and not output[0]) and last_speed != -0.5: - limited = False - if last_speed == -1: - limited = True - last_speed = output[1]["speed"] - formatted_last_speed = f"{last_speed:05.2f}" - if debug_flag: - click.echo( - f"> > > > Scaleback success! Limit: {limited}, Total Workers: {total_workers}, Speed: {last_speed}" - ) - run = False + if is_nvidia_gpu: + # For NVIDIA GPUs, use known session limits to prevent exceeding NVENC limits. Does this work properly? Idk. Don't have a machine to test it on without these limits in place. + if total_workers == 1: + gpu_worker_counts = [1, 2, 3, 4, 8] # Nvidia driver limits can be 1, 2, 3, 4, or 8 + else: + gpu_worker_counts = [total_workers] + + for gpu_worker_count in gpu_worker_counts: + if not debug_flag: + prog_bar.label = f"Testing | Workers: {gpu_worker_count:02d} | Last Speed: {formatted_last_speed}" + prog_bar.render_progress() + # Use gpu_worker_count instead of total_workers to correctly test each worker count + output = worker.workMan(gpu_worker_count, ffmpeg_cmd, gpu_idx) + + if not output[0]: # If no failure occurred + runs.append(output[1]) + last_speed = output[1]["speed"] + formatted_last_speed = f"{last_speed:05.2f}" + if debug_flag: + click.echo(f"> > > > Workers: {gpu_worker_count}, Last Speed: {last_speed}") + if last_speed < 1: + failure_reason.append("performance") + run = False + break + else: + failure_reason.extend(output[1]) + if "nvenc_limit_reached" in output[1]: + failure_reason.append("limited") + if debug_flag: + click.echo("Warning: NVIDIA GPU encoding limit reached. This is a known limitation based on the driver version.") + run = False + break + else: + if debug_flag: + click.echo(f"Error during benchmark: {output[1]}") + run = False + break - if limited: - failure_reason.append("limited") + if runs: + total_workers = max(run["workers"] for run in runs) else: + total_workers = 1 # Default to 1 if no runs succeeded + run = False # Exit the while loop after testing known NVENC limits + else: + # Non-NVIDIA GPUs (HAVE NOT TESTED) or CPU + if not debug_flag: + prog_bar.label = f"Testing | Workers: {total_workers:02d} | Last Speed: {formatted_last_speed}" + prog_bar.render_progress() + output = worker.workMan(total_workers, ffmpeg_cmd, gpu_idx) + # First check if we continue running: + # Stop when first run failed + if output[0] and total_workers == 1: + run = False + failure_reason.append(output[1]) + # When run after scaleback succeeded: + elif (last_speed < 1 and not output[0]) and last_speed != -0.5: + limited = False + if last_speed == -1: + limited = True + last_speed = output[1]["speed"] + formatted_last_speed = f"{last_speed:05.2f}" + if debug_flag: + click.echo( + f"> > > > Scaleback success! Limit: {limited}, Total Workers: {total_workers}, Speed: {last_speed}" + ) + run = False + + if limited: + failure_reason.append("limited") + else: + failure_reason.append("performance") + # Scaleback when fail on workers >1 (e.g., NVENC limit) or on speed <1 with last added workers or on last_speed = scaleback + elif ( + (total_workers > 1 and output[0]) + or (output[1]["speed"] < 1 and last_speed >= 2) + or (last_speed == -1) + ): + if output[0]: # Assign variables depending on scaleback reason + last_speed = -1 + formatted_last_speed = "sclbk" + else: + last_speed = output[1]["speed"] + formatted_last_speed = f"{last_speed:05.2f}" + total_workers -= 1 + if debug_flag: + click.echo( + f"> > > > Scaling back to: {total_workers}, Last Speed: {last_speed}" + ) + elif output[0] and total_workers == 0: # Fail when infinite scaleback + run = False + failure_reason.append(output[1]) + failure_reason.append("infinity_scaleback") + elif output[1]["speed"] < 1: + run = False failure_reason.append("performance") - # Scaleback when fail on 1 1 and output[0]) - or (output[1]["speed"] < 1 and last_speed >= 2) - or (last_speed == -1) - ): - if output[0]: # Assign variables depending on Scaleback reason - last_speed = -1 - formatted_last_speed = "sclbk" - else: + # elif output[1]["speed"] - last_speed < 0.5: + # run = False + # failure_reason.append("failed_inconclusive") + else: # When no failure happened + runs.append(output[1]) last_speed = output[1]["speed"] + total_workers += int(last_speed) formatted_last_speed = f"{last_speed:05.2f}" - total_workers -= 1 - if debug_flag: - click.echo( - f"> > > > Scaling back to: {total_workers}, Last Speed: {last_speed}" - ) - elif output[0] and total_workers == 0: # Fail when infinite scaleback - run = False - failure_reason.append(output[1]) - failure_reason.append("infinity_scaleback") - elif output[1]["speed"] < 1: - run = False - failure_reason.append("performance") - # elif output[1]["speed"]-last_speed < 0.5: - # run = False - # failure_reason.append("failed_inconclusive") - else: # When no failure happened - runs.append(output[1]) - last_speed = output[1]["speed"] - total_workers += int(last_speed) - formatted_last_speed = f"{last_speed:05.2f}" - if debug_flag: - click.echo( - f"> > > > Workers: {total_workers}, Last Speed: {last_speed}" - ) + if debug_flag: + click.echo(f"> > > > Workers: {total_workers}, Last Speed: {last_speed}") + if debug_flag: click.echo(f"> > > > Failed: {failure_reason}") if len(runs) > 0: - max_streams = runs[(len(runs)) - 1]["workers"] + max_streams = max(run["workers"] for run in runs) result = { "max_streams": max_streams, "failure_reasons": failure_reason, - "single_worker_speed": runs[(len(runs)) - 1]["speed"], - "single_worker_rss_kb": runs[(len(runs)) - 1]["rss_kb"], + "single_worker_speed": runs[0]["speed"], + "single_worker_rss_kb": runs[0]["rss_kb"], } prog_bar.label = ( f"Done | Workers: {max_streams} | Last Speed: {formatted_last_speed}" @@ -214,7 +258,6 @@ def benchmark(ffmpeg_cmd: str, debug_flag: bool, prog_bar) -> tuple: prog_bar.label = "Skipped | Workers: 00 | Last Speed: 00.00" return False, runs, {} - def output_json(data, file_path): # Create the directory if it doesn't exist os.makedirs(os.path.dirname(file_path), exist_ok=True) @@ -397,6 +440,10 @@ def cli( click.pause("Press any key to exit") exit() + # Adjust gpu_idx for CUDA devices + if gpus[gpu_idx]["vendor"].lower() == "nvidia": + gpu_idx = 0 # The CUDA device index is 0 + # Stop Hardware Selection logic valid, server_data = api.getTestData(platform_id, platforms, server_url) @@ -494,7 +541,8 @@ def cli( ) test_cmd = f"{ffmpeg_binary} {arguments}" - valid, runs, result = benchmark(test_cmd, debug_flag, prog_bar) + is_nvidia_gpu = command["type"] == "nvidia" + valid, runs, result = benchmark(test_cmd, debug_flag, prog_bar, is_nvidia_gpu, gpu_idx) if not debug_flag: prog_bar.update(1) @@ -511,6 +559,17 @@ def cli( if len(runs) >= 1: benchmark_data.append(test_data) + + if debug_flag: + click.echo(f"FFmpeg command: {test_cmd}") + click.echo(f"Test result: {'Failed' if not valid else 'Success'}") + if not valid: + click.echo(f"Failure reasons: {result}") + click.echo(f"FFmpeg stderr: {worker.run_ffmpeg(0, test_cmd.split(), gpu_idx)[0]}") + else: + click.echo(f"Max streams: {result['max_streams']}") + click.echo(f"Single worker speed: {result['single_worker_speed']}") + click.echo("") # Displaying Prompt, before attempting to output / build final dict click.echo("Benchmark Done. Writing file to Output.") result_data = { diff --git a/pytab/worker.py b/pytab/worker.py index 9895239..428dce8 100644 --- a/pytab/worker.py +++ b/pytab/worker.py @@ -21,15 +21,40 @@ import concurrent.futures import re import subprocess +import json import click - -def run_ffmpeg(pid: int, ffmpeg_cmd: list) -> tuple: # Process ID, +def run_ffmpeg(pid: int, ffmpeg_cmd: list, gpu_idx: int) -> tuple: # click.echo(f"{pid} |> Running FFMPEG Process: {pid}") timeout = 120 # Stop any process that runs for more then 120sec failure_reason = None try: + # First, probe the input file to determine the correct codec + input_file = ffmpeg_cmd[ffmpeg_cmd.index('-i') + 1] + probe_cmd = ['ffmpeg/ffmpeg_files/ffprobe', '-v', 'quiet', '-print_format', 'json', '-show_streams', '-select_streams', 'v:0', input_file] + probe_output = subprocess.run(probe_cmd, capture_output=True, text=True) + probe_data = json.loads(probe_output.stdout) + codec = probe_data['streams'][0]['codec_name'] + + # Modify the ffmpeg command to use the correct decoder and CUDA device + if '-c:v' in ffmpeg_cmd: + decoder_index = ffmpeg_cmd.index('-c:v') + 1 + ffmpeg_cmd[decoder_index] = f"{codec}_cuvid" + else: + # Insert '-c:v' option with the correct decoder + ffmpeg_cmd.insert(ffmpeg_cmd.index('-i'), '-c:v') + ffmpeg_cmd.insert(ffmpeg_cmd.index('-i') + 1, f"{codec}_cuvid") + + # Update CUDA device index + if '-init_hw_device' in ffmpeg_cmd: + cuda_init_index = ffmpeg_cmd.index('-init_hw_device') + ffmpeg_cmd[cuda_init_index + 1] = f'cuda=cu:{gpu_idx}' + else: + # Insert '-init_hw_device' option + ffmpeg_cmd.insert(1, '-init_hw_device') + ffmpeg_cmd.insert(2, f'cuda=cu:{gpu_idx}') + process_output = subprocess.run( ffmpeg_cmd, stdin=subprocess.PIPE, @@ -43,28 +68,46 @@ def run_ffmpeg(pid: int, ffmpeg_cmd: list) -> tuple: # Process ID, if retcode > 0: # click.echo(f"ERROR: {ffmpeg_stderr}") <- Silencing Output - failure_reason = "generic_ffmpeg_failure" # <-- HELP WANTED! + failure_reason = parse_ffmpeg_error(ffmpeg_stderr) + else: + ffmpeg_stderr = process_output.stderr except subprocess.TimeoutExpired: - ffmpeg_stderr = 1 - failure_reason = "failed_timeout" + ffmpeg_stderr = "Timeout occurred" + failure_reason = ["failed_timeout"] + + except json.JSONDecodeError as e: + ffmpeg_stderr = f"Failed to parse ffprobe output: {str(e)}" + failure_reason = ["ffprobe_error", str(e)] except Exception as e: - click.echo(e) - exit(1) + ffmpeg_stderr = str(e) + failure_reason = ["unexpected_error", str(e)] # click.echo(f"{pid} >| Ended FFMPEG Run: {pid}") return ffmpeg_stderr, failure_reason +def parse_ffmpeg_error(stderr): + stderr_lower = stderr.lower() + if "no free encoding sessions" in stderr_lower or "cannot open encoder" in stderr_lower or "resource temporarily unavailable" in stderr_lower: + return ["nvenc_limit_reached"] + elif "initialization failed" in stderr_lower: + return ["nvenc_limit_reached"] + elif "no such device" in stderr_lower: + return ["device_not_found"] + elif "invalid device ordinal" in stderr_lower: + return ["invalid_device"] + else: + return ["unknown_ffmpeg_error", stderr] -def workMan(worker_count: int, ffmpeg_cmd: str) -> tuple: +def workMan(worker_count: int, ffmpeg_cmd: str, gpu_idx: int) -> tuple: ffmpeg_cmd_list = ffmpeg_cmd.split() raw_worker_data = {} - failure_reason = None + failure_reasons = [] # click.echo(f"> Run with {worker_count} Processes") with concurrent.futures.ThreadPoolExecutor(max_workers=worker_count) as executor: futures = { - executor.submit(run_ffmpeg, nr, ffmpeg_cmd_list): nr + executor.submit(run_ffmpeg, nr, ffmpeg_cmd_list, gpu_idx): nr for nr in range(worker_count) } for future in concurrent.futures.as_completed(futures): @@ -73,13 +116,12 @@ def workMan(worker_count: int, ffmpeg_cmd: str) -> tuple: raw_worker_data[pid] = future.result() # click.echo(f"> > > Finished Worker Process: {pid}") if raw_worker_data[pid][1]: - failure_reason = raw_worker_data[pid][1] + failure_reasons.extend(raw_worker_data[pid][1]) except Exception as e: - print(f"Worker {pid} generated an exception: {e}") + failure_reasons.append(f"Worker {pid} exception: {str(e)}") - if failure_reason: - raw_worker_data = None - # Deleting all the Raw Data, since run with failed Worker is not counted + if failure_reasons: + return True, failure_reasons run_data_raw = [] if raw_worker_data: # If no run Failed @@ -133,7 +175,7 @@ def workMan(worker_count: int, ffmpeg_cmd: str) -> tuple: run_data_raw.append(worker_data) return False, evaluateRunData(run_data_raw) else: - return True, failure_reason + return True, failure_reasons def evaluateRunData(run_data_raw: list) -> dict: From a8b22a0aa557857827c981a83a5a2f2f5a6b9b98 Mon Sep 17 00:00:00 2001 From: Nathan G Date: Mon, 30 Sep 2024 10:54:29 -0700 Subject: [PATCH 2/3] fix: fix failure reason naming to follow server expectations --- pytab/worker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pytab/worker.py b/pytab/worker.py index 428dce8..3e16ad6 100644 --- a/pytab/worker.py +++ b/pytab/worker.py @@ -90,9 +90,9 @@ def run_ffmpeg(pid: int, ffmpeg_cmd: list, gpu_idx: int) -> tuple: def parse_ffmpeg_error(stderr): stderr_lower = stderr.lower() if "no free encoding sessions" in stderr_lower or "cannot open encoder" in stderr_lower or "resource temporarily unavailable" in stderr_lower: - return ["nvenc_limit_reached"] + return ["failed_nvenc_limit"] elif "initialization failed" in stderr_lower: - return ["nvenc_limit_reached"] + return ["failed_nvenc_limit"] elif "no such device" in stderr_lower: return ["device_not_found"] elif "invalid device ordinal" in stderr_lower: From cb0188e8619a3f0b6e046cebf3cb02fd4bd419b2 Mon Sep 17 00:00:00 2001 From: Nathan G Date: Tue, 1 Oct 2024 20:30:47 -0700 Subject: [PATCH 3/3] Added 'no_failure' if no failure reasons were recorded as the server requires a failure to be present. Fix CPU being "None" when the server expects it to be an intenger. --- pytab/core.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/pytab/core.py b/pytab/core.py index d14b880..fb9a096 100644 --- a/pytab/core.py +++ b/pytab/core.py @@ -242,6 +242,11 @@ def benchmark(ffmpeg_cmd: str, debug_flag: bool, prog_bar, is_nvidia_gpu: bool, if debug_flag: click.echo(f"> > > > Failed: {failure_reason}") + + # Add 'no_failure' if no failure reasons were recorded since the server requires a failure reason to be present. + if not failure_reason: + failure_reason.append('no_failure') + if len(runs) > 0: max_streams = max(run["workers"] for run in runs) result = { @@ -550,9 +555,9 @@ def cli( test_data["type"] = command["type"] if command["type"] != "cpu": test_data["selected_gpu"] = gpu_idx - test_data["selected_cpu"] = None + test_data["selected_cpu"] = -1 else: - test_data["selected_gpu"] = None + test_data["selected_gpu"] = -1 test_data["selected_cpu"] = 0 test_data["runs"] = runs test_data["results"] = result