Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 21 additions & 5 deletions tests/integrations/ray/test_ray.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
import os
import shutil
import time
import uuid

import pytest
Expand Down Expand Up @@ -89,7 +90,7 @@ def read_error_from_log(job_id, ray_temp_dir):
return None


def read_spans_from_log(job_id, ray_temp_dir):
def _parse_spans_from_log(job_id, ray_temp_dir):
# Find the actual session directory that Ray created
session_dirs = [d for d in os.listdir(ray_temp_dir) if d.startswith("session_")]
if not session_dirs:
Expand All @@ -101,15 +102,17 @@ def read_spans_from_log(job_id, ray_temp_dir):
if not os.path.exists(log_dir):
raise FileNotFoundError(f"No logs directory found at {log_dir}")

log_file = [
log_files = [
f
for f in os.listdir(log_dir)
if "worker" in f and job_id in f and f.endswith(".out")
][0]
]
if not log_files:
return []

spans = []
next_line_is_span_payload = False
with open(os.path.join(log_dir, log_file), "r") as file:
with open(os.path.join(log_dir, log_files[0]), "r") as file:
for line in file:
try:
payload = json.loads(line)
Expand All @@ -125,6 +128,19 @@ def read_spans_from_log(job_id, ray_temp_dir):
return spans


def read_spans_from_log(job_id, ray_temp_dir, min_spans=1, timeout=10):
deadline = time.monotonic() + timeout
spans = []
while True:
try:
spans = _parse_spans_from_log(job_id, ray_temp_dir)
except FileNotFoundError:
spans = []
if len(spans) >= min_spans or time.monotonic() >= deadline:
return spans
time.sleep(0.1)
Comment thread
sentry[bot] marked this conversation as resolved.
Comment thread
sl0thentr0py marked this conversation as resolved.


def example_task(span_streaming: bool):
if span_streaming:
with sentry_sdk.traces.start_span(
Expand Down Expand Up @@ -205,7 +221,7 @@ def test_tracing_in_ray_tasks(task_options, task, span_streaming):
ray.get(future)

job_id = future.job_id().hex()
worker_spans = read_spans_from_log(job_id, ray_temp_dir)
worker_spans = read_spans_from_log(job_id, ray_temp_dir, min_spans=2)
finally:
if os.path.exists(ray_temp_dir):
shutil.rmtree(ray_temp_dir, ignore_errors=True)
Expand Down
Loading