Skip to content

Commit

Permalink
more debugging
Browse files Browse the repository at this point in the history
  • Loading branch information
bhearsum committed Dec 10, 2024
1 parent 772cbc8 commit a00d42d
Showing 1 changed file with 76 additions and 3 deletions.
79 changes: 76 additions & 3 deletions taskcluster/fxci_config_taskgraph/util/integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,90 @@
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

import copy
from functools import cache
from typing import Any
from typing import Any, Dict, List, Union

import requests
import taskcluster
from taskgraph.util.taskcluster import get_ancestors
from taskgraph.util.taskcluster import logging
from taskgraph.util.taskcluster import logging, get_task_url, get_session
from taskgraph.util.memoize import memoize

FIREFOXCI_ROOT_URL = "https://firefox-ci-tc.services.mozilla.com"
STAGING_ROOT_URL = "https://stage.taskcluster.nonprod.cloudops.mozgcp.net"

def _do_request(url, method=None, **kwargs):
if method is None:
method = "post" if kwargs else "get"

session = get_session()
if method == "get":
kwargs["stream"] = True

logging.info("making request")
logging.info(f"url: {url}")
logging.info(f"kwargs: {kwargs}")
response = getattr(session, method)(url, **kwargs)
logging.info(f"response code: {response.status_code}")

if response.status_code >= 400:
# Consume content before raise_for_status, so that the connection can be
# reused.
response.content
response.raise_for_status()
return response


@memoize
def get_task_definition(task_id, use_proxy=False):
url = get_task_url(task_id, use_proxy)
logging.info(f"fetching url: {url}")
response = _do_request(url)
logging.info(f"got url: {url}")
logging.info(f"response is: {response}")
return response.json()


@memoize
def _get_deps(task_ids, use_proxy):
upstream_tasks = {}
for task_id in task_ids:
logging.info(f"fetching dep: {task_id}")
task_def = get_task_definition(task_id, use_proxy)
logging.info(f"got task def: {task_def}")
upstream_tasks[task_def["metadata"]["name"]] = task_id

upstream_tasks.update(_get_deps(tuple(task_def["dependencies"]), use_proxy))

return upstream_tasks


def get_ancestors(
task_ids: Union[List[str], str], use_proxy: bool = False
) -> Dict[str, str]:
"""Gets the ancestor tasks of the given task_ids as a dictionary of label -> taskid.
Args:
task_ids (str or [str]): A single task id or a list of task ids to find the ancestors of.
use_proxy (bool): See get_root_url.
Returns:
dict: A dict whose keys are task labels and values are task ids.
"""
upstream_tasks: Dict[str, str] = {}
logging.info("A")

if isinstance(task_ids, str):
task_ids = [task_ids]

for task_id in task_ids:
logging.info(f"getting task_id: {task_id}")
task_def = get_task_definition(task_id, use_proxy)
logging.info(f"got task_id: {task_id}")

upstream_tasks.update(_get_deps(tuple(task_def["dependencies"]), use_proxy))

return copy.deepcopy(upstream_tasks)

@cache
def get_taskcluster_client(service: str):
Expand Down

0 comments on commit a00d42d

Please sign in to comment.