Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 122 additions & 1 deletion core/src/main/java/com/google/adk/telemetry/Instrumentation.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,32 +46,68 @@ public static final class TelemetryContext {
private final Context otelContext;
private @Nullable Event functionResponseEvent;

/**
* Constructs a new {@code TelemetryContext} with the given OpenTelemetry context.
*
* @param otelContext The OpenTelemetry context to store.
*/
public TelemetryContext(Context otelContext) {
this.otelContext = otelContext;
}

/**
* Retrieves the stored OpenTelemetry context.
*
* @return The OpenTelemetry {@link Context}.
*/
public Context otelContext() {
return otelContext;
}

/**
* Retrieves the function response event associated with the execution, if available.
*
* @return The function response {@link Event}, or {@code null} if not set.
*/
public @Nullable Event functionResponseEvent() {
return functionResponseEvent;
}

/**
* Sets the function response event associated with the execution.
*
* @param functionResponseEvent The function response {@link Event} to store.
*/
public void setFunctionResponseEvent(@Nullable Event functionResponseEvent) {
this.functionResponseEvent = functionResponseEvent;
}
}

/** Base class for AutoCloseable telemetry tracking scopes. */
public abstract static class ClosableTelemetryScope implements AutoCloseable {
/** The start time of the scope in nanoseconds. */
protected final long startTimeNanos;

/** The OpenTelemetry span associated with this scope. */
protected final Span span;

/** The OpenTelemetry scope associated with this span. */
protected final Scope scope;

/** The telemetry context for this scope. */
protected final TelemetryContext telemetryContext;

/** The error caught during execution, if any. */
protected @Nullable Throwable caughtError;

/** Whether this scope has been closed. */
protected final AtomicBoolean closed = new AtomicBoolean(false);

/**
* Constructs a new {@code ClosableTelemetryScope} with the given span.
*
* @param span The OpenTelemetry span to manage.
*/
@SuppressWarnings("MustBeClosedChecker")
ClosableTelemetryScope(Span span) {
this.startTimeNanos = System.nanoTime();
Expand All @@ -80,16 +116,27 @@ public abstract static class ClosableTelemetryScope implements AutoCloseable {
this.telemetryContext = new TelemetryContext(Context.current());
}

/**
* Retrieves the telemetry context associated with this scope.
*
* @return The {@link TelemetryContext}.
*/
public TelemetryContext context() {
return telemetryContext;
}

/**
* Records an error on the span and sets its status to error.
*
* @param caughtError The throwable caught during execution.
*/
public void setError(Throwable caughtError) {
this.caughtError = caughtError;
span.recordException(caughtError);
span.setStatus(StatusCode.ERROR, caughtError.getMessage());
}

/** Closes the scope and ends the underlying span, recording any applicable metrics. */
@Override
public final void close() {
if (closed.getAndSet(true)) {
Expand Down Expand Up @@ -125,6 +172,13 @@ public static final class AgentInvocation extends ClosableTelemetryScope {
private final InvocationContext ctx;
private final List<Event> events = Collections.synchronizedList(new ArrayList<>());

/**
* Constructs a new {@code AgentInvocation} telemetry scope.
*
* @param ctx The invocation context of the agent execution.
* @param agent The agent being invoked.
* @param parentContext The OpenTelemetry parent context.
*/
public AgentInvocation(InvocationContext ctx, BaseAgent agent, Context parentContext) {
super(
Tracing.getTracer()
Expand All @@ -136,14 +190,31 @@ public AgentInvocation(InvocationContext ctx, BaseAgent agent, Context parentCon
Tracing.traceAgentInvocation(span, agent.name(), agent.description(), ctx);
}

/**
* Retrieves the invocation context associated with this agent invocation.
*
* @return The {@link InvocationContext}.
*/
public InvocationContext getCtx() {
return ctx;
}

/**
* Adds an event to the list of events tracked during this agent invocation.
*
* @param event The {@link Event} to add.
*/
public void addEvent(Event event) {
events.add(event);
}

/**
* Records metrics for the agent invocation including duration, request size, response size, and
* workflow steps.
*
* @param elapsed The total execution duration.
* @param error The exception thrown during execution, if any.
*/
@Override
protected void recordMetrics(Duration elapsed, @Nullable Throwable error) {
Metrics.recordAgentInvocationDuration(agent.name(), elapsed, error);
Expand All @@ -152,6 +223,11 @@ protected void recordMetrics(Duration elapsed, @Nullable Throwable error) {
Metrics.recordAgentWorkflowSteps(agent.name(), events);
}

/**
* Handles errors that occur while recording metrics for the agent invocation.
*
* @param e The runtime exception encountered during metrics recording.
*/
@Override
protected void handleMetricsError(RuntimeException e) {
logger.error("Failed to record agent metrics for agent {}", agent.name(), e);
Expand All @@ -164,6 +240,14 @@ public static final class ToolExecution extends ClosableTelemetryScope {
private final BaseAgent agent;
private final Map<String, Object> functionArgs;

/**
* Constructs a new {@code ToolExecution} telemetry scope.
*
* @param tool The tool being executed.
* @param agent The agent invoking the tool.
* @param functionArgs The arguments passed to the tool.
* @param parentContext The OpenTelemetry parent context.
*/
public ToolExecution(
BaseTool tool, BaseAgent agent, Map<String, Object> functionArgs, Context parentContext) {
super(
Expand All @@ -176,6 +260,7 @@ public ToolExecution(
this.functionArgs = functionArgs;
}

/** Traces the tool execution attributes on the span before it ends. */
@Override
protected void beforeSpanEnd() {
Event responseEvent = caughtError == null ? context().functionResponseEvent() : null;
Expand All @@ -189,6 +274,12 @@ protected void beforeSpanEnd() {
caughtError);
}

/**
* Records metrics for the tool execution including duration, request size, and response size.
*
* @param elapsed The total execution duration.
* @param error The exception thrown during execution, if any.
*/
@Override
protected void recordMetrics(Duration elapsed, @Nullable Throwable error) {
Metrics.recordToolExecutionDuration(tool.name(), agent.name(), elapsed, error);
Expand All @@ -197,17 +288,37 @@ protected void recordMetrics(Duration elapsed, @Nullable Throwable error) {
Metrics.recordToolResponseSize(tool.name(), agent.name(), responseEvent);
}

/**
* Handles errors that occur while recording metrics for the tool execution.
*
* @param e The runtime exception encountered during metrics recording.
*/
@Override
protected void handleMetricsError(RuntimeException e) {
logger.error("Failed to record tool execution duration for tool {}", tool.name(), e);
}
}

/** Creates an AgentInvocation context to record agent invocation telemetry. */
/**
* Creates an AgentInvocation context to record agent invocation telemetry.
*
* @deprecated Use the version with explicit parent context instead. This method will be removed
* once all callers are updated.
*/
@Deprecated // Use the version with explicit parent context instead.
public static AgentInvocation recordAgentInvocation(InvocationContext ctx, BaseAgent agent) {
return recordAgentInvocation(ctx, agent, Context.current());
}

/**
* Creates an {@link AgentInvocation} context to record agent invocation telemetry with an
* explicit parent context.
*
* @param ctx The invocation context of the agent execution.
* @param agent The agent being invoked.
* @param parentContext The OpenTelemetry parent context.
* @return A new {@link AgentInvocation} scope.
*/
public static AgentInvocation recordAgentInvocation(
InvocationContext ctx, BaseAgent agent, Context parentContext) {
return new AgentInvocation(ctx, agent, parentContext);
Expand All @@ -219,6 +330,16 @@ public static ToolExecution recordToolExecution(
return recordToolExecution(tool, agent, functionArgs, Context.current());
}

/**
* Creates a {@link ToolExecution} context to record tool execution telemetry with an explicit
* parent context.
*
* @param tool The tool being executed.
* @param agent The agent invoking the tool.
* @param functionArgs The arguments passed to the tool.
* @param parentContext The OpenTelemetry parent context.
* @return A new {@link ToolExecution} scope.
*/
public static ToolExecution recordToolExecution(
BaseTool tool, BaseAgent agent, Map<String, Object> functionArgs, Context parentContext) {
return new ToolExecution(tool, agent, functionArgs, parentContext);
Expand Down
49 changes: 49 additions & 0 deletions core/src/main/java/com/google/adk/telemetry/Tracing.java
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,7 @@ public static <T> TracerProvider<T> trace(String spanName) {
* @param <T> The type of the stream.
* @return A TracerProvider configured for agent invocation.
*/
@Deprecated // Use trace() instead and configure the span manually.
public static <T> TracerProvider<T> traceAgent(
String spanName,
String agentName,
Expand Down Expand Up @@ -555,6 +556,12 @@ void end() {
}
}

/**
* Applies tracing to a {@link Flowable} stream.
*
* @param upstream The upstream Flowable.
* @return A Publisher with tracing lifecycle management.
*/
@Override
public Publisher<T> apply(Flowable<T> upstream) {
return Flowable.defer(
Expand All @@ -569,6 +576,12 @@ public Publisher<T> apply(Flowable<T> upstream) {
});
}

/**
* Applies tracing to a {@link Single} stream.
*
* @param upstream The upstream Single.
* @return A SingleSource with tracing lifecycle management.
*/
@Override
public SingleSource<T> apply(Single<T> upstream) {
return Single.defer(
Expand All @@ -583,6 +596,12 @@ public SingleSource<T> apply(Single<T> upstream) {
});
}

/**
* Applies tracing to a {@link Maybe} stream.
*
* @param upstream The upstream Maybe.
* @return A MaybeSource with tracing lifecycle management.
*/
@Override
public MaybeSource<T> apply(Maybe<T> upstream) {
return Maybe.defer(
Expand All @@ -597,6 +616,12 @@ public MaybeSource<T> apply(Maybe<T> upstream) {
});
}

/**
* Applies tracing to a {@link Completable} stream.
*
* @param upstream The upstream Completable.
* @return A CompletableSource with tracing lifecycle management.
*/
@Override
public CompletableSource apply(Completable upstream) {
return Completable.defer(
Expand Down Expand Up @@ -636,21 +661,45 @@ private ContextTransformer(Context context) {
this.context = context;
}

/**
* Applies context re-activation to a {@link Flowable} stream.
*
* @param upstream The upstream Flowable.
* @return A Publisher wrapped with context re-activation.
*/
@Override
public Publisher<T> apply(Flowable<T> upstream) {
return upstream.lift(subscriber -> TracingObserver.wrap(context, subscriber));
}

/**
* Applies context re-activation to a {@link Single} stream.
*
* @param upstream The upstream Single.
* @return A SingleSource wrapped with context re-activation.
*/
@Override
public SingleSource<T> apply(Single<T> upstream) {
return upstream.lift(observer -> TracingObserver.wrap(context, observer));
}

/**
* Applies context re-activation to a {@link Maybe} stream.
*
* @param upstream The upstream Maybe.
* @return A MaybeSource wrapped with context re-activation.
*/
@Override
public MaybeSource<T> apply(Maybe<T> upstream) {
return upstream.lift(observer -> TracingObserver.wrap(context, observer));
}

/**
* Applies context re-activation to a {@link Completable} stream.
*
* @param upstream The upstream Completable.
* @return A CompletableSource wrapped with context re-activation.
*/
@Override
public CompletableSource apply(Completable upstream) {
return upstream.lift(observer -> TracingObserver.wrap(context, observer));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.metrics.data.HistogramPointData;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
Expand Down Expand Up @@ -117,7 +118,7 @@ public void tearDown() {
@Test
public void recordAgentInvocation_success() {
try (Instrumentation.AgentInvocation invocation =
Instrumentation.recordAgentInvocation(invocationContext, testAgent)) {
Instrumentation.recordAgentInvocation(invocationContext, testAgent, Context.current())) {
assertThat(invocation.context()).isNotNull();
assertThat(invocation.context().otelContext()).isNotNull();
}
Expand All @@ -144,7 +145,7 @@ public void recordAgentInvocation_success() {
public void recordAgentInvocation_withError() {
RuntimeException testException = new RuntimeException("test error");
try (Instrumentation.AgentInvocation invocation =
Instrumentation.recordAgentInvocation(invocationContext, testAgent)) {
Instrumentation.recordAgentInvocation(invocationContext, testAgent, Context.current())) {
invocation.setError(testException);
}

Expand All @@ -165,7 +166,7 @@ public void recordToolExecution_success() {

try (Instrumentation.ToolExecution execution =
Instrumentation.recordToolExecution(
testTool, testAgent, ImmutableMap.of("arg1", "value1"))) {
testTool, testAgent, ImmutableMap.of("arg1", "value1"), Context.current())) {
assertThat(execution.context()).isNotNull();
}

Expand Down
Loading