Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathanmetzman committed Nov 14, 2024
1 parent f7db345 commit 395360f
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 39 deletions.
4 changes: 0 additions & 4 deletions configs/test/batch/batch.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,3 @@ mapping:
preemptible: true
machine_type: n1-standard-1
project: 'test-clusterfuzz'
# TODO(metzman): Come up with a better system than this. I think a system where
# we have a list of zones and associated config (such as subnets) to pick where
# to launch tasks is ideal.
region: 'us-central1'
4 changes: 2 additions & 2 deletions infra/k8s/schedule-fuzz.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ kind: CronJob
metadata:
name: schedule-fuzz
spec:
schedule: "*/30 * * * *"
schedule: "*/10 * * * *"
concurrencyPolicy: Forbid
jobTemplate:
spec:
activeDeadlineSeconds: 1200 # 20 minutes.
activeDeadlineSeconds: 900 # 15 minutes.
template:
spec:
containers:
Expand Down
9 changes: 9 additions & 0 deletions src/clusterfuzz/_internal/base/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,15 @@ def __init__(self,
self.is_command_override = is_command_override
self.high_end = high_end
self.extra_info = extra_info

# is_from_queue is a temporary hack to keep track of which fuzz tasks came
# from the queue. Previously all fuzz tasks were picked by the bot when
# there was nothing on the queue. With the rearchitecture, we want fuzz
# tasks that were put on the queue by the schedule_fuzz cron job to be
# executed on batch. is_from_queue is used to do this.
# TODO(b/378684001): This code is very ugly, get rid of it when no more
# fuzz tasks are executed on the bots themselves (i.e. when the rearch
# is complete).
self.is_from_queue = is_from_queue

def __repr__(self):
Expand Down
66 changes: 34 additions & 32 deletions src/clusterfuzz/_internal/cron/schedule_fuzz.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@ def _get_quotas(project, region):


def get_available_cpus(project: str, region: str) -> int:
"""Gets the number of available CPUs in the current GCE region."""
"""Returns the number of available CPUs in the current GCE region."""
quotas = _get_quotas(project, region)

# Sometimes, the preemptible quota is 0, which means the number of preemptible
# CPUs is actually limited by the CPU quota.
# If preemptible quota is not defined, we need to use CPU quota instead.
cpu_quota = None
preemptible_quota = None
Expand All @@ -58,6 +60,7 @@ def get_available_cpus(project: str, region: str) -> int:
assert preemptible_quota or cpu_quota

if not preemptible_quota['limit']:
# Preemptible quota is not set. Obey the CPU quota since that limitss us.
quota = cpu_quota
else:
quota = preemptible_quota
Expand All @@ -81,7 +84,7 @@ def _get_cpus_per_fuzz_job(self, job_name):
return 2


class FuzzerJob:
class FuzzTaskCandidate:
"""Data class that holds more info about FuzzerJobs than the ndb.Models do.
Something like this would probably not be needed if we were using SQL and
could use joins."""
Expand All @@ -94,7 +97,7 @@ def __init__(self, job, project, queue, fuzzer=None, weight=None):
self.weight = weight

def copy(self):
return FuzzerJob(
return FuzzTaskCandidate(
job=self.job,
project=self.project,
queue=self.queue,
Expand All @@ -109,69 +112,68 @@ def get_fuzz_tasks(self) -> Dict[str, tasks.Task]:
# TODO(metzman): Handle high end.
# A job's weight is determined by its own weight and the weight of the
# project is a part of. First get project weights.
logs.info('Getting projects.')
projects = list(
ndb_utils.get_all_from_query(data_types.OssFuzzProject.query()))

logs.info(f'Got {len(projects)} projects.')
total_cpu_weight = sum(project.cpu_weight for project in projects)
project_weights = {}
for project in projects:
project_weight = project.cpu_weight / total_cpu_weight
project_weights[project.name] = project_weight

# Then get FuzzerJob weights.
# Then get FuzzTaskCandidate weights.
logs.info('Getting jobs.')
jobs = {}
# TODO(metzman): Handle cases where jobs are fuzzed by multiple fuzzers.
candidates_by_job = {}
for job in ndb_utils.get_all_from_query(data_types.Job.query()):
jobs[job.name] = FuzzerJob(
candidates_by_job[job.name] = FuzzTaskCandidate(
job=job.name,
project=job.project,
queue=tasks.queue_for_platform(job.platform))

fuzzer_job_weight_by_project = collections.defaultdict(int)
fuzzer_jobs = {}
fuzz_task_candidates = []
fuzzer_job_query = ndb_utils.get_all_from_query(
data_types.FuzzerJob.query())

def get_fuzzer_job_key(fuzzer, job):
return f'{job},{fuzzer}'

for fuzzer_job_db in fuzzer_job_query:
fuzzer_job = jobs[fuzzer_job_db.job].copy()
fuzzer_job.fuzzer = fuzzer_job_db.fuzzer
project_weight = project_weights.get(fuzzer_job.project, None)
for fuzzer_job in fuzzer_job_query:
fuzz_task_candidate = candidates_by_job[fuzzer_job.job].copy()
fuzz_task_candidate.fuzzer = fuzzer_job.fuzzer
project_weight = project_weights.get(fuzz_task_candidate.project, None)
if project_weight is None:
logs.info(f'No project weight for {fuzzer_job.project}')
logs.info(f'No project weight for {fuzz_task_candidate.project}')
continue

fuzzer_job.weight = fuzzer_job_db.actual_weight * project_weight
key = get_fuzzer_job_key(fuzzer_job_db.fuzzer, fuzzer_job_db.job)
fuzzer_jobs[key] = fuzzer_job
fuzz_task_candidate.weight = fuzzer_job.actual_weight * project_weight
fuzz_task_candidates.append(fuzz_task_candidate)

fuzzer_job_weight_by_project[fuzzer_job.project] += (
fuzzer_job_db.actual_weight)
fuzzer_job_weight_by_project[fuzz_task_candidate.project] += (
fuzzer_job.actual_weight)

for key, fuzzer_job in list(fuzzer_jobs.items()):
total_project_weight = fuzzer_job_weight_by_project[fuzzer_job.project]
fuzzer_job.weight /= total_project_weight
for fuzz_task_candidate in fuzz_task_candidates:
total_project_weight = fuzzer_job_weight_by_project[
fuzz_task_candidate.project]
fuzz_task_candidate.weight /= total_project_weight

# Prepare lists for random.choice
fuzzer_job_list = []
weights = []
for fuzzer_job in fuzzer_jobs.values():
weights.append(fuzzer_job.weight)
fuzzer_job_list.append(fuzzer_job)
for fuzz_task_candidate in fuzz_task_candidates:
weights.append(fuzz_task_candidate.weight)

# TODO(metzman): Handle high-end jobs correctly.
num_instances = int(self.num_cpus / self._get_cpus_per_fuzz_job(None))
logs.info(f'Scheduling {num_instances} fuzz tasks.')

choices = random.choices(fuzzer_job_list, weights=weights, k=num_instances)
choices = random.choices(
fuzz_task_candidates, weights=weights, k=num_instances)
queues_to_tasks = collections.defaultdict(list)
for fuzzer_job in choices:
queue_tasks = queues_to_tasks[fuzzer_job.queue]
for fuzz_task_candidate in choices:
queue_tasks = queues_to_tasks[fuzz_task_candidate.queue]

queue_tasks.append(tasks.Task('fuzz', fuzzer_job.fuzzer, fuzzer_job.job))
task = tasks.Task('fuzz', fuzz_task_candidate.fuzzer,
fuzz_task_candidate.job)
queue_tasks.append(task)
return queues_to_tasks


Expand Down
1 change: 0 additions & 1 deletion src/clusterfuzz/_internal/google_cloud_utils/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
# limitations under the License.
"""Cloud Batch helpers."""
import collections
import itertools
import threading
from typing import List
import uuid
Expand Down

0 comments on commit 395360f

Please sign in to comment.