Skip to content

Commit

Permalink
Each test has its own test_run_id
Browse files Browse the repository at this point in the history
  • Loading branch information
Bernard Szabo committed Dec 18, 2024
1 parent 87c646e commit f71d8ae
Showing 1 changed file with 20 additions and 21 deletions.
41 changes: 20 additions & 21 deletions tubular/scripts/dd_synthetic_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ class DatadogClient:
def __init__(self, api_key, app_key):
self.api_key = api_key
self.app_key = app_key
self.test_run_id = None
self.test_batch_id = None
self.trigger_time = None

Expand All @@ -28,9 +27,6 @@ def trigger_synthetic_tests(self, test_requests: [dict]) -> str:
:param test_requests: List of tests to be run
:return: None, but saves opaque test run ID as object attribute
'''
if self.test_run_id:
raise Exception("Datadog error: tests already triggered")

url = f"{self.DATADOG_SYNTHETIC_TESTS_API_URL}/trigger/ci"
headers = {
"Content-Type": "application/json",
Expand All @@ -39,26 +35,29 @@ def trigger_synthetic_tests(self, test_requests: [dict]) -> str:
}
json_request_body = {"tests": [{"public_id": t['id']} for t in test_requests]}
response = requests.post(url, headers=headers, json=json_request_body)
self.trigger_time = time.time() # Key timeouts off of this
logging.info(f'Tests triggered at time {self.trigger_time}')

if response.status_code != 200:
raise Exception(f"Datadog API error. Status = {response.status_code}")

try:
response_body = response.json()
batch_id = response_body['batch_id']
logging.info(f"Datadog test run launched: {batch_id=}")
self.batch_id = response_body['batch_id']
logging.info(f"Datadog test run launched: {self.batch_id=}")
test_run_ids = []

for result in response_body['results']:
aggregate_test_run_id = result['result_id']
logging.info(f"Datadog test run launched: {aggregate_test_run_id=} test id = {result['public_id']}")
self.test_run_id = aggregate_test_run_id
self.test_batch_id = batch_id # used when triggering with /ci option
self.trigger_time = time.time() # Key timeouts off of this
logging.info(f'Tests triggered at time {self.trigger_time}')
result_id = result['result_id']
test_run_ids.append(result_id)
logging.info(f"Datadog test run launched: {result_id=} {result['test_public_id']}")

except Exception as e:
raise Exception("Datadog error on triggering tests: " + str(e))

def get_failed_tests(self, test_requests):
return test_run_ids

def get_failed_tests(self, test_requests, test_run_ids):
'''
Poll for completion of all tests triggered as part of the "test_run_id" test run
:param test_run_id: Opaque identifier for the aggregate test run
Expand All @@ -72,16 +71,16 @@ def get_failed_tests(self, test_requests):
# logging.info(f"Done getting batch result at time {time.time()}")
# logging.info(f"Batch result: {batch_result}")

for test in test_requests:
test_result = self._poll_for_test_result(test['id'])
for i, test in test_requests:
test_result = self._poll_for_test_result(test['id'], test_run_ids[i])
if test_result == False:
failed_tests.append(test)

return failed_tests

# ***************** Private methods **********************

def _poll_for_test_result(self, test_id):
def _poll_for_test_result(self, test_id, test_run_id):
"""
Poll for test run completion within the maximum allowable time for the specified test.
Returns None if still running; otherwise, returns True on test success and False on test failure.
Expand All @@ -91,7 +90,7 @@ def _poll_for_test_result(self, test_id):
while test_result is None and (time.time() - self.trigger_time) < (self.MAX_ALLOWABLE_TIME_SECS):
time.sleep(5) # Poll every 5 seconds
logging.info(f"Polling for test {test_id=}")
test_result = self._get_test_result(test_id)
test_result = self._get_test_result(test_id, test_run_id)
logging.info(f"{test_result=}")

if test_result is None:
Expand Down Expand Up @@ -119,12 +118,12 @@ def _get_batch_result(self):
return response_json


def _get_test_result(self, test_id):
def _get_test_result(self, test_id, test_run_id):
"""
returns JSON structure with test results for all tests in the test run
if the test run has completed; returns None otherwise
"""
url = f"{self.DATADOG_SYNTHETIC_TESTS_API_URL}/{test_id}/results/{self.test_run_id}"
url = f"{self.DATADOG_SYNTHETIC_TESTS_API_URL}/{test_id}/results/{test_run_id}"
headers = {
"DD-API-KEY": self.api_key,
"DD-APPLICATION-KEY": self.app_key
Expand Down Expand Up @@ -228,9 +227,9 @@ def run_synthetic_tests(enable_automated_rollbacks, slack_notification_channel):
]
logging.info(f"Running the following tests: {str(tests_to_run)}")

dd_client.trigger_synthetic_tests(tests_to_run)
test_run_ids = dd_client.trigger_synthetic_tests(tests_to_run)

failed_tests = dd_client.get_failed_tests(tests_to_run)
failed_tests = dd_client.get_failed_tests(tests_to_run, test_run_ids)
for failed_test in failed_tests:
logging.warning(f'Test {failed_test["name"]}({failed_test["id"]}) failed')

Expand Down

0 comments on commit f71d8ae

Please sign in to comment.