Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NvEnc Limit Stop Testing for device -- Various Fixes #28

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 130 additions & 66 deletions pytab/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,87 +124,136 @@ def unpackArchive(archive_path, target_path):
click.echo(" success!")


def benchmark(ffmpeg_cmd: str, debug_flag: bool, prog_bar) -> tuple:
def benchmark(ffmpeg_cmd: str, debug_flag: bool, prog_bar, is_nvidia_gpu: bool, gpu_idx: int) -> tuple:
runs = []
total_workers = 1
run = True
last_speed = -0.5 # to Assure first worker always has the required difference
last_speed = -0.5 # To ensure that the first worker always has the required difference
formatted_last_speed = "00.00"
failure_reason = []
if debug_flag:
click.echo(f"> > > > Workers: {total_workers}, Last Speed: {last_speed}")
while run:
if not debug_flag:
prog_bar.label = f"Testing | Workers: {total_workers:02d} | Last Speed: {formatted_last_speed}"
prog_bar.render_progress()
output = worker.workMan(total_workers, ffmpeg_cmd)
# First check if we continue Running:
# Stop when first run failed
if output[0] and total_workers == 1:
run = False
failure_reason.append(output[1])
# When run after scaleback succeded:
elif (last_speed < 1 and not output[0]) and last_speed != -0.5:
limited = False
if last_speed == -1:
limited = True
last_speed = output[1]["speed"]
formatted_last_speed = f"{last_speed:05.2f}"
if debug_flag:
click.echo(
f"> > > > Scaleback success! Limit: {limited}, Total Workers: {total_workers}, Speed: {last_speed}"
)
run = False
if is_nvidia_gpu:
# For NVIDIA GPUs, use known session limits to prevent exceeding NVENC limits. Does this work properly? Idk. Don't have a machine to test it on without these limits in place.
if total_workers == 1:
gpu_worker_counts = [1, 2, 3, 4, 8] # Nvidia driver limits can be 1, 2, 3, 4, or 8
else:
gpu_worker_counts = [total_workers]

for gpu_worker_count in gpu_worker_counts:
if not debug_flag:
prog_bar.label = f"Testing | Workers: {gpu_worker_count:02d} | Last Speed: {formatted_last_speed}"
prog_bar.render_progress()
# Use gpu_worker_count instead of total_workers to correctly test each worker count
output = worker.workMan(gpu_worker_count, ffmpeg_cmd, gpu_idx)

if not output[0]: # If no failure occurred
runs.append(output[1])
last_speed = output[1]["speed"]
formatted_last_speed = f"{last_speed:05.2f}"
if debug_flag:
click.echo(f"> > > > Workers: {gpu_worker_count}, Last Speed: {last_speed}")
if last_speed < 1:
failure_reason.append("performance")
run = False
break
else:
failure_reason.extend(output[1])
if "nvenc_limit_reached" in output[1]:
failure_reason.append("limited")
if debug_flag:
click.echo("Warning: NVIDIA GPU encoding limit reached. This is a known limitation based on the driver version.")
run = False
break
else:
if debug_flag:
click.echo(f"Error during benchmark: {output[1]}")
run = False
break

if limited:
failure_reason.append("limited")
if runs:
total_workers = max(run["workers"] for run in runs)
else:
total_workers = 1 # Default to 1 if no runs succeeded
run = False # Exit the while loop after testing known NVENC limits
else:
# Non-NVIDIA GPUs (HAVE NOT TESTED) or CPU
if not debug_flag:
prog_bar.label = f"Testing | Workers: {total_workers:02d} | Last Speed: {formatted_last_speed}"
prog_bar.render_progress()
output = worker.workMan(total_workers, ffmpeg_cmd, gpu_idx)
# First check if we continue running:
# Stop when first run failed
if output[0] and total_workers == 1:
run = False
failure_reason.append(output[1])
# When run after scaleback succeeded:
elif (last_speed < 1 and not output[0]) and last_speed != -0.5:
limited = False
if last_speed == -1:
limited = True
last_speed = output[1]["speed"]
formatted_last_speed = f"{last_speed:05.2f}"
if debug_flag:
click.echo(
f"> > > > Scaleback success! Limit: {limited}, Total Workers: {total_workers}, Speed: {last_speed}"
)
run = False

if limited:
failure_reason.append("limited")
else:
failure_reason.append("performance")
# Scaleback when fail on workers >1 (e.g., NVENC limit) or on speed <1 with last added workers or on last_speed = scaleback
elif (
(total_workers > 1 and output[0])
or (output[1]["speed"] < 1 and last_speed >= 2)
or (last_speed == -1)
):
if output[0]: # Assign variables depending on scaleback reason
last_speed = -1
formatted_last_speed = "sclbk"
else:
last_speed = output[1]["speed"]
formatted_last_speed = f"{last_speed:05.2f}"
total_workers -= 1
if debug_flag:
click.echo(
f"> > > > Scaling back to: {total_workers}, Last Speed: {last_speed}"
)
elif output[0] and total_workers == 0: # Fail when infinite scaleback
run = False
failure_reason.append(output[1])
failure_reason.append("infinity_scaleback")
elif output[1]["speed"] < 1:
run = False
failure_reason.append("performance")
# Scaleback when fail on 1<workers (NvEnc Limit) or on Speed<1 with 1<last added workers or on last_Speed = Scaleback
elif (
(total_workers > 1 and output[0])
or (output[1]["speed"] < 1 and last_speed >= 2)
or (last_speed == -1)
):
if output[0]: # Assign variables depending on Scaleback reason
last_speed = -1
formatted_last_speed = "sclbk"
else:
# elif output[1]["speed"] - last_speed < 0.5:
# run = False
# failure_reason.append("failed_inconclusive")
else: # When no failure happened
runs.append(output[1])
last_speed = output[1]["speed"]
total_workers += int(last_speed)
formatted_last_speed = f"{last_speed:05.2f}"
total_workers -= 1
if debug_flag:
click.echo(
f"> > > > Scaling back to: {total_workers}, Last Speed: {last_speed}"
)
elif output[0] and total_workers == 0: # Fail when infinite scaleback
run = False
failure_reason.append(output[1])
failure_reason.append("infinity_scaleback")
elif output[1]["speed"] < 1:
run = False
failure_reason.append("performance")
# elif output[1]["speed"]-last_speed < 0.5:
# run = False
# failure_reason.append("failed_inconclusive")
else: # When no failure happened
runs.append(output[1])
last_speed = output[1]["speed"]
total_workers += int(last_speed)
formatted_last_speed = f"{last_speed:05.2f}"
if debug_flag:
click.echo(
f"> > > > Workers: {total_workers}, Last Speed: {last_speed}"
)
if debug_flag:
click.echo(f"> > > > Workers: {total_workers}, Last Speed: {last_speed}")

if debug_flag:
click.echo(f"> > > > Failed: {failure_reason}")

# Add 'no_failure' if no failure reasons were recorded since the server requires a failure reason to be present.
if not failure_reason:
failure_reason.append('no_failure')

if len(runs) > 0:
max_streams = runs[(len(runs)) - 1]["workers"]
max_streams = max(run["workers"] for run in runs)
result = {
"max_streams": max_streams,
"failure_reasons": failure_reason,
"single_worker_speed": runs[(len(runs)) - 1]["speed"],
"single_worker_rss_kb": runs[(len(runs)) - 1]["rss_kb"],
"single_worker_speed": runs[0]["speed"],
"single_worker_rss_kb": runs[0]["rss_kb"],
}
prog_bar.label = (
f"Done | Workers: {max_streams} | Last Speed: {formatted_last_speed}"
Expand All @@ -214,7 +263,6 @@ def benchmark(ffmpeg_cmd: str, debug_flag: bool, prog_bar) -> tuple:
prog_bar.label = "Skipped | Workers: 00 | Last Speed: 00.00"
return False, runs, {}


def output_json(data, file_path):
# Create the directory if it doesn't exist
os.makedirs(os.path.dirname(file_path), exist_ok=True)
Expand Down Expand Up @@ -397,6 +445,10 @@ def cli(
click.pause("Press any key to exit")
exit()

# Adjust gpu_idx for CUDA devices
if gpus[gpu_idx]["vendor"].lower() == "nvidia":
gpu_idx = 0 # The CUDA device index is 0

# Stop Hardware Selection logic

valid, server_data = api.getTestData(platform_id, platforms, server_url)
Expand Down Expand Up @@ -494,23 +546,35 @@ def cli(
)
test_cmd = f"{ffmpeg_binary} {arguments}"

valid, runs, result = benchmark(test_cmd, debug_flag, prog_bar)
is_nvidia_gpu = command["type"] == "nvidia"
valid, runs, result = benchmark(test_cmd, debug_flag, prog_bar, is_nvidia_gpu, gpu_idx)
if not debug_flag:
prog_bar.update(1)

test_data["id"] = test["id"]
test_data["type"] = command["type"]
if command["type"] != "cpu":
test_data["selected_gpu"] = gpu_idx
test_data["selected_cpu"] = None
test_data["selected_cpu"] = -1
else:
test_data["selected_gpu"] = None
test_data["selected_gpu"] = -1
test_data["selected_cpu"] = 0
test_data["runs"] = runs
test_data["results"] = result

if len(runs) >= 1:
benchmark_data.append(test_data)

if debug_flag:
click.echo(f"FFmpeg command: {test_cmd}")
click.echo(f"Test result: {'Failed' if not valid else 'Success'}")
if not valid:
click.echo(f"Failure reasons: {result}")
click.echo(f"FFmpeg stderr: {worker.run_ffmpeg(0, test_cmd.split(), gpu_idx)[0]}")
else:
click.echo(f"Max streams: {result['max_streams']}")
click.echo(f"Single worker speed: {result['single_worker_speed']}")

click.echo("") # Displaying Prompt, before attempting to output / build final dict
click.echo("Benchmark Done. Writing file to Output.")
result_data = {
Expand Down
74 changes: 58 additions & 16 deletions pytab/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,40 @@
import concurrent.futures
import re
import subprocess
import json

import click


def run_ffmpeg(pid: int, ffmpeg_cmd: list) -> tuple: # Process ID,
def run_ffmpeg(pid: int, ffmpeg_cmd: list, gpu_idx: int) -> tuple:
# click.echo(f"{pid} |> Running FFMPEG Process: {pid}")
timeout = 120 # Stop any process that runs for more then 120sec
failure_reason = None
try:
# First, probe the input file to determine the correct codec
input_file = ffmpeg_cmd[ffmpeg_cmd.index('-i') + 1]
probe_cmd = ['ffmpeg/ffmpeg_files/ffprobe', '-v', 'quiet', '-print_format', 'json', '-show_streams', '-select_streams', 'v:0', input_file]
probe_output = subprocess.run(probe_cmd, capture_output=True, text=True)
probe_data = json.loads(probe_output.stdout)
codec = probe_data['streams'][0]['codec_name']

# Modify the ffmpeg command to use the correct decoder and CUDA device
if '-c:v' in ffmpeg_cmd:
decoder_index = ffmpeg_cmd.index('-c:v') + 1
ffmpeg_cmd[decoder_index] = f"{codec}_cuvid"
else:
# Insert '-c:v' option with the correct decoder
ffmpeg_cmd.insert(ffmpeg_cmd.index('-i'), '-c:v')
ffmpeg_cmd.insert(ffmpeg_cmd.index('-i') + 1, f"{codec}_cuvid")

# Update CUDA device index
if '-init_hw_device' in ffmpeg_cmd:
cuda_init_index = ffmpeg_cmd.index('-init_hw_device')
ffmpeg_cmd[cuda_init_index + 1] = f'cuda=cu:{gpu_idx}'
else:
# Insert '-init_hw_device' option
ffmpeg_cmd.insert(1, '-init_hw_device')
ffmpeg_cmd.insert(2, f'cuda=cu:{gpu_idx}')

process_output = subprocess.run(
ffmpeg_cmd,
stdin=subprocess.PIPE,
Expand All @@ -43,28 +68,46 @@ def run_ffmpeg(pid: int, ffmpeg_cmd: list) -> tuple: # Process ID,

if retcode > 0:
# click.echo(f"ERROR: {ffmpeg_stderr}") <- Silencing Output
failure_reason = "generic_ffmpeg_failure" # <-- HELP WANTED!
failure_reason = parse_ffmpeg_error(ffmpeg_stderr)
else:
ffmpeg_stderr = process_output.stderr

except subprocess.TimeoutExpired:
ffmpeg_stderr = 1
failure_reason = "failed_timeout"
ffmpeg_stderr = "Timeout occurred"
failure_reason = ["failed_timeout"]

except json.JSONDecodeError as e:
ffmpeg_stderr = f"Failed to parse ffprobe output: {str(e)}"
failure_reason = ["ffprobe_error", str(e)]

except Exception as e:
click.echo(e)
exit(1)
ffmpeg_stderr = str(e)
failure_reason = ["unexpected_error", str(e)]

# click.echo(f"{pid} >| Ended FFMPEG Run: {pid}")
return ffmpeg_stderr, failure_reason

def parse_ffmpeg_error(stderr):
stderr_lower = stderr.lower()
if "no free encoding sessions" in stderr_lower or "cannot open encoder" in stderr_lower or "resource temporarily unavailable" in stderr_lower:
return ["failed_nvenc_limit"]
elif "initialization failed" in stderr_lower:
return ["failed_nvenc_limit"]
elif "no such device" in stderr_lower:
return ["device_not_found"]
elif "invalid device ordinal" in stderr_lower:
return ["invalid_device"]
else:
return ["unknown_ffmpeg_error", stderr]

def workMan(worker_count: int, ffmpeg_cmd: str) -> tuple:
def workMan(worker_count: int, ffmpeg_cmd: str, gpu_idx: int) -> tuple:
ffmpeg_cmd_list = ffmpeg_cmd.split()
raw_worker_data = {}
failure_reason = None
failure_reasons = []
# click.echo(f"> Run with {worker_count} Processes")
with concurrent.futures.ThreadPoolExecutor(max_workers=worker_count) as executor:
futures = {
executor.submit(run_ffmpeg, nr, ffmpeg_cmd_list): nr
executor.submit(run_ffmpeg, nr, ffmpeg_cmd_list, gpu_idx): nr
for nr in range(worker_count)
}
for future in concurrent.futures.as_completed(futures):
Expand All @@ -73,13 +116,12 @@ def workMan(worker_count: int, ffmpeg_cmd: str) -> tuple:
raw_worker_data[pid] = future.result()
# click.echo(f"> > > Finished Worker Process: {pid}")
if raw_worker_data[pid][1]:
failure_reason = raw_worker_data[pid][1]
failure_reasons.extend(raw_worker_data[pid][1])
except Exception as e:
print(f"Worker {pid} generated an exception: {e}")
failure_reasons.append(f"Worker {pid} exception: {str(e)}")

if failure_reason:
raw_worker_data = None
# Deleting all the Raw Data, since run with failed Worker is not counted
if failure_reasons:
return True, failure_reasons

run_data_raw = []
if raw_worker_data: # If no run Failed
Expand Down Expand Up @@ -133,7 +175,7 @@ def workMan(worker_count: int, ffmpeg_cmd: str) -> tuple:
run_data_raw.append(worker_data)
return False, evaluateRunData(run_data_raw)
else:
return True, failure_reason
return True, failure_reasons


def evaluateRunData(run_data_raw: list) -> dict:
Expand Down