Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializationError;
import com.google.cloud.bigquery.storage.v1.StreamWriter;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
Expand All @@ -37,6 +38,7 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.arrow.memory.BufferAllocator;
Expand Down Expand Up @@ -68,6 +70,11 @@ class BatchProcessor implements AutoCloseable {
private final Schema arrowSchema;
private final VectorSchemaRoot root;

// Drop accounting so hosts can programmatically detect lost analytics rows.
private final AtomicLong droppedQueueFull = new AtomicLong();
private final AtomicLong droppedAppendError = new AtomicLong();
private final AtomicLong droppedSerializationError = new AtomicLong();

public BatchProcessor(
StreamWriter writer,
int batchSize,
Expand Down Expand Up @@ -105,6 +112,7 @@ public void start() {

public void append(Map<String, Object> row) {
if (!queue.offer(row)) {
droppedQueueFull.incrementAndGet();
logger.warning("BigQuery event queue is full, dropping event.");
return;
}
Expand Down Expand Up @@ -139,6 +147,7 @@ public void flush() {
try (ArrowRecordBatch recordBatch = new VectorUnloader(root).getRecordBatch()) {
AppendRowsResponse result = writer.append(recordBatch).get();
if (result.hasError()) {
droppedAppendError.addAndGet(batch.size());
logger.severe("BigQuery append error: " + result.getError().getMessage());
for (var error : result.getRowErrorsList()) {
logger.severe(
Expand All @@ -153,6 +162,7 @@ public void flush() {
Thread.currentThread().interrupt();
}
if (e.getCause() instanceof AppendSerializationError ase) {
droppedSerializationError.addAndGet(batch.size());
logger.log(
Level.SEVERE, "Failed to write batch to BigQuery due to serialization error", ase);
Map<Integer, String> rowIndexToErrorMessage = ase.getRowIndexToErrorMessage();
Expand All @@ -167,6 +177,7 @@ public void flush() {
"AppendSerializationError occurred, but no row-specific errors were provided.");
}
} else {
droppedAppendError.addAndGet(batch.size());
logger.log(Level.SEVERE, "Failed to write batch to BigQuery", e);
}
} finally {
Expand Down Expand Up @@ -247,6 +258,17 @@ private void populateVector(FieldVector vector, int index, Object value) {
}
}

/**
* Returns a snapshot of dropped-row counters keyed by reason ({@code queue_full}, {@code
* append_error}, {@code serialization_error}). Non-zero values indicate lost analytics rows.
*/
ImmutableMap<String, Long> getDropStats() {
return ImmutableMap.of(
"queue_full", droppedQueueFull.get(),
"append_error", droppedAppendError.get(),
"serialization_error", droppedSerializationError.get());
}

@Override
public void close() {
while (this.queue != null && !this.queue.isEmpty()) {
Expand Down
Loading