diff --git a/tests/integrations/ray/test_ray.py b/tests/integrations/ray/test_ray.py index 9165a63261..9b5f21f0fa 100644 --- a/tests/integrations/ray/test_ray.py +++ b/tests/integrations/ray/test_ray.py @@ -1,6 +1,7 @@ import json import os import shutil +import time import uuid import pytest @@ -89,7 +90,7 @@ def read_error_from_log(job_id, ray_temp_dir): return None -def read_spans_from_log(job_id, ray_temp_dir): +def _parse_spans_from_log(job_id, ray_temp_dir): # Find the actual session directory that Ray created session_dirs = [d for d in os.listdir(ray_temp_dir) if d.startswith("session_")] if not session_dirs: @@ -101,15 +102,17 @@ def read_spans_from_log(job_id, ray_temp_dir): if not os.path.exists(log_dir): raise FileNotFoundError(f"No logs directory found at {log_dir}") - log_file = [ + log_files = [ f for f in os.listdir(log_dir) if "worker" in f and job_id in f and f.endswith(".out") - ][0] + ] + if not log_files: + return [] spans = [] next_line_is_span_payload = False - with open(os.path.join(log_dir, log_file), "r") as file: + with open(os.path.join(log_dir, log_files[0]), "r") as file: for line in file: try: payload = json.loads(line) @@ -125,6 +128,19 @@ def read_spans_from_log(job_id, ray_temp_dir): return spans +def read_spans_from_log(job_id, ray_temp_dir, min_spans=1, timeout=10): + deadline = time.monotonic() + timeout + spans = [] + while True: + try: + spans = _parse_spans_from_log(job_id, ray_temp_dir) + except FileNotFoundError: + spans = [] + if len(spans) >= min_spans or time.monotonic() >= deadline: + return spans + time.sleep(0.1) + + def example_task(span_streaming: bool): if span_streaming: with sentry_sdk.traces.start_span( @@ -205,7 +221,7 @@ def test_tracing_in_ray_tasks(task_options, task, span_streaming): ray.get(future) job_id = future.job_id().hex() - worker_spans = read_spans_from_log(job_id, ray_temp_dir) + worker_spans = read_spans_from_log(job_id, ray_temp_dir, min_spans=2) finally: if os.path.exists(ray_temp_dir): shutil.rmtree(ray_temp_dir, ignore_errors=True)