From 6576384aebe79dab43840f855bf4e1f455809b64 Mon Sep 17 00:00:00 2001 From: agrawal-siddharth Date: Fri, 1 May 2026 01:53:45 +0000 Subject: [PATCH] feat: add periodic WARNING metrics to assist in debugging --- .../bigquery/storage/v1/ConnectionWorker.java | 336 ++++++++++++++++- .../bigquery/storage/v1/TelemetryMetrics.java | 1 + .../storage/v1/ConnectionWorkerTest.java | 345 ++++++++++++++++++ 3 files changed, 673 insertions(+), 9 deletions(-) diff --git a/java-bigquerystorage/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/java-bigquerystorage/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index 6e353617e36d..68fee2c6580f 100644 --- a/java-bigquerystorage/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/java-bigquerystorage/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -30,12 +30,14 @@ import com.google.cloud.bigquery.storage.v1.AppendRowsRequest.ArrowData; import com.google.cloud.bigquery.storage.v1.AppendRowsRequest.MissingValueInterpretation; import com.google.cloud.bigquery.storage.v1.AppendRowsRequest.ProtoData; +import com.google.cloud.bigquery.storage.v1.ConnectionWorker.HealthCheckMetrics.HealthCheckFields; import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializationError; import com.google.cloud.bigquery.storage.v1.StreamConnection.DoneCallback; import com.google.cloud.bigquery.storage.v1.StreamConnection.RequestCallback; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.Uninterruptibles; +import com.google.gson.Gson; import com.google.protobuf.Int64Value; import io.grpc.Status; import io.grpc.Status.Code; @@ -82,6 +84,7 @@ class ConnectionWorker implements AutoCloseable { */ static Duration MAXIMUM_REQUEST_CALLBACK_WAIT_TIME = Duration.ofMinutes(5); + private static final Gson GSON = new Gson(); private Lock lock; private Condition hasMessageInWaitingQueue; private Condition inflightReduced; @@ -224,6 +227,11 @@ class ConnectionWorker implements AutoCloseable { */ private final AtomicLong inflightWaitSec = new AtomicLong(0); + /* + * Tracks current inflight retries. + */ + private final AtomicLong queuedRetryCount = new AtomicLong(0); + /* * A String that uniquely identifies this writer. */ @@ -250,6 +258,9 @@ class ConnectionWorker implements AutoCloseable { private final RequestProfiler.RequestProfilerHook requestProfilerHook; private final TelemetryMetrics telemetryMetrics; + @GuardedBy("lock") + private final HealthCheckMetrics healthCheckMetrics; + /** Indicate whether this connection is created during multiplexing mode. */ private final Boolean isMultiplexing; @@ -262,7 +273,225 @@ class ConnectionWorker implements AutoCloseable { private static String tableMatching = "(projects/[^/]+/datasets/[^/]+/tables/[^/]+)/"; private static Pattern streamPatternTable = Pattern.compile(tableMatching); - // Latency buckets are based on a list of 1.5 ^ n + class HealthCheckMetrics { + // Interval between health checks. + private Duration HEALTH_CHECK_INTERVAL = Duration.ofSeconds(15); + // At least one of these thresholds must be met to trigger health check warning. + private Duration responseWaitTimeThreshold = Duration.ofSeconds(5); + private Duration latencyThreshold = Duration.ofSeconds(5); + private int percentErrorResponsesThreshold = 10; + private long queuedRequestsThreshold = 100; + private int percentRetriesThreshold = 25; + private long queuedBytesThreshold = 50 * 1024 * 1024; + private long connectionAttemptThreshold = 1; + private long connectionCloseThreshold = 1; + + /* + * When was the last time we did a health check. + */ + @GuardedBy("lock") + private Instant healthCheckTimeStamp = Instant.now(); + + @GuardedBy("lock") + private long windowedRequestsSent; + + @GuardedBy("lock") + private long windowedRequestsSentBytes; + + @GuardedBy("lock") + private long windowedResponsesAcked; + + @GuardedBy("lock") + private long windowedResponsesAckedBytes; + + @GuardedBy("lock") + private long windowedMilliLatencyMax; + + @GuardedBy("lock") + private long windowedMilliLatencySum; + + @GuardedBy("lock") + private long windowedMilliResponseWaitTimeMax; + + @GuardedBy("lock") + private Map windowedResponseCodes = new ConcurrentHashMap<>(); + + @GuardedBy("lock") + private long windowedQueuedRequestsMax; + + @GuardedBy("lock") + private long windowedQueuedRetriesMax; + + private long windowedConnectionAttemptCount; + private long windowedConnectionClosedCount; + + void updateWindowedQueuedRequestsMax(long currentQueueLength, long currentRetryCount) { + if (currentQueueLength > windowedQueuedRequestsMax) { + windowedQueuedRequestsMax = currentQueueLength; + } + if (currentRetryCount > windowedQueuedRetriesMax) { + windowedQueuedRetriesMax = currentRetryCount; + } + } + + void updateResponseWait(Instant sendInstant) { + long currentWaitTime = Duration.between(sendInstant, Instant.now()).toMillis(); + if (currentWaitTime > windowedMilliResponseWaitTimeMax) { + windowedMilliResponseWaitTimeMax = currentWaitTime; + } + } + + void updateRequestsSent(long bytes) { + windowedRequestsSent++; + windowedRequestsSentBytes += bytes; + } + + void updateResponsesAcked(long bytes, long latencyMilli, int code) { + windowedResponsesAcked++; + windowedResponsesAckedBytes += bytes; + if (latencyMilli > windowedMilliLatencyMax) { + windowedMilliLatencyMax = latencyMilli; + } + windowedMilliLatencySum += latencyMilli; + windowedResponseCodes.put(code, windowedResponseCodes.getOrDefault(code, 0) + 1); + } + + synchronized void updateConnectionAttempt() { + windowedConnectionAttemptCount++; + } + + synchronized void updateConnectionClosed() { + windowedConnectionClosedCount++; + } + + class HealthCheckFields { + // All metrics are windowed unless otherwise specified + long msecLongestResponseWaitTime; + long msecMaxLatency; + long msecAvgLatency; + long sendBps; + long receiveBps; + Map responseCodes; + long requestsSentCount; + long responseCount; + long queuedRequestCountMax; + long queuedRetryCountMax; // How many active waiting or inflight requests are retries + long inflightBytes; + long connectionAttemptCount; + long connectionClosedCount; + boolean isConnected; // snapshot at instant metrics are gathered + String streamName; + String writerId; + String windowStartTime; + } + + /* + * Compute current values of all health check metrics. + */ + private void gatherHealthCheckMetrics(HealthCheckFields healthCheckFields) { + healthCheckFields.streamName = streamName; + healthCheckFields.writerId = writerId; + healthCheckFields.queuedRequestCountMax = windowedQueuedRequestsMax; + healthCheckFields.queuedRetryCountMax = windowedQueuedRetriesMax; + healthCheckFields.msecLongestResponseWaitTime = windowedMilliResponseWaitTimeMax; + healthCheckFields.inflightBytes = inflightBytes; + healthCheckFields.requestsSentCount = windowedRequestsSent; + healthCheckFields.responseCount = windowedResponsesAcked; + if (HEALTH_CHECK_INTERVAL.toMillis() > 0) { + healthCheckFields.sendBps = + (windowedRequestsSentBytes * 1000) / HEALTH_CHECK_INTERVAL.toMillis(); + healthCheckFields.receiveBps = + (windowedResponsesAckedBytes * 1000) / HEALTH_CHECK_INTERVAL.toMillis(); + } + healthCheckFields.msecMaxLatency = windowedMilliLatencyMax; + healthCheckFields.msecAvgLatency = + windowedResponsesAcked > 0 ? windowedMilliLatencySum / windowedResponsesAcked : 0; + healthCheckFields.responseCodes.clear(); + healthCheckFields.responseCodes.putAll(windowedResponseCodes); + healthCheckFields.connectionAttemptCount = windowedConnectionAttemptCount; + healthCheckFields.connectionClosedCount = windowedConnectionClosedCount; + healthCheckFields.isConnected = streamConnectionIsConnected; + healthCheckFields.windowStartTime = healthCheckTimeStamp.toString(); + } + + /* + * Determine if health check thresholds have been met. + */ + private boolean checkThresholds(HealthCheckFields healthCheckFields) { + if ((healthCheckFields.queuedRequestCountMax >= queuedRequestsThreshold) + || (healthCheckFields.inflightBytes >= queuedBytesThreshold) + || (healthCheckFields.msecLongestResponseWaitTime >= responseWaitTimeThreshold.toMillis()) + || (healthCheckFields.msecMaxLatency >= latencyThreshold.toMillis()) + || (healthCheckFields.connectionAttemptCount >= connectionAttemptThreshold) + || (healthCheckFields.connectionClosedCount >= connectionCloseThreshold)) { + return true; + } + if (healthCheckFields.queuedRequestCountMax > 0) { + if (((healthCheckFields.queuedRetryCountMax * 100) + / (healthCheckFields.queuedRequestCountMax)) + >= percentRetriesThreshold) { + return true; + } + } + if (!healthCheckFields.responseCodes.isEmpty()) { + int successResponses = 0; + if (healthCheckFields.responseCodes.containsKey(Status.Code.OK.value())) { + successResponses = healthCheckFields.responseCodes.get(Status.Code.OK.value()); + } + int allResponses = + healthCheckFields.responseCodes.values().stream().mapToInt(Integer::intValue).sum(); + if (allResponses > 0) { + int errorResponses = allResponses - successResponses; + if (((errorResponses * 100) / allResponses) >= percentErrorResponsesThreshold) { + return true; + } + } + } + return false; + } + + /* + * Dump given health check metrics as WARNING log. + */ + private void emitHealthCheckMetrics(HealthCheckFields healthCheckFields) { + log.warning(GSON.toJson(healthCheckFields)); + } + + /* + * Reset per-interval health check metrics in preparation for next window. + */ + private void resetWindowedMetrics() { + windowedRequestsSent = 0; + windowedRequestsSentBytes = 0; + windowedResponsesAcked = 0; + windowedResponsesAckedBytes = 0; + windowedMilliLatencyMax = 0; + windowedMilliLatencySum = 0; + windowedMilliResponseWaitTimeMax = 0; + windowedResponseCodes.clear(); + windowedConnectionAttemptCount = 0; + windowedConnectionClosedCount = 0; + windowedQueuedRequestsMax = 0; + windowedQueuedRetriesMax = 0; + } + + /* + * Periodically run health checks. In case of issues emit a warning log message. + */ + private void periodicHealthCheck() { + Duration timeSinceLastHealthCheck = Duration.between(healthCheckTimeStamp, Instant.now()); + if (timeSinceLastHealthCheck.compareTo(HEALTH_CHECK_INTERVAL) >= 0) { + HealthCheckFields healthCheckFields = new HealthCheckFields(); + healthCheckFields.responseCodes = new ConcurrentHashMap<>(); + gatherHealthCheckMetrics(healthCheckFields); + if (checkThresholds(healthCheckFields)) { + emitHealthCheckMetrics(healthCheckFields); + } + resetWindowedMetrics(); + healthCheckTimeStamp = Instant.now(); + } + } + } public static Boolean isDefaultStreamName(String streamName) { Matcher matcher = DEFAULT_STREAM_PATTERN.matcher(streamName); @@ -429,6 +658,7 @@ public ConnectionWorker( this.requestProfilerHook = new RequestProfiler.RequestProfilerHook(enableRequestProfiler); this.telemetryMetrics = new TelemetryMetrics(this, enableOpenTelemetry, getTableName(), writerId, traceId); + this.healthCheckMetrics = new HealthCheckMetrics(); this.isMultiplexing = isMultiplexing; // Always recreate a client for connection worker. @@ -480,6 +710,7 @@ public void run() { private void resetConnection() { log.info("Start connecting stream: " + streamName + " id: " + writerId); telemetryMetrics.recordConnectionStart(); + healthCheckMetrics.updateConnectionAttempt(); if (this.streamConnection != null) { // It's safe to directly close the previous connection as the in flight messages // will be picked up by the next connection. @@ -543,11 +774,6 @@ private void addMessageToFrontOfWaitingQueue(AppendRequestAndResponse requestWra addMessageToWaitingQueue(requestWrapper, /* addToFront= */ true); } - @GuardedBy("lock") - private void addMessageToBackOfWaitingQueue(AppendRequestAndResponse requestWrapper) { - addMessageToWaitingQueue(requestWrapper, /* addToFront= */ false); - } - @GuardedBy("lock") private void addMessageToWaitingQueue( AppendRequestAndResponse requestWrapper, boolean addToFront) { @@ -561,6 +787,8 @@ private void addMessageToWaitingQueue( } else { waitingRequestQueue.add(requestWrapper); } + healthCheckMetrics.updateWindowedQueuedRequestsMax( + waitingRequestQueue.size() + inflightRequestQueue.size(), queuedRetryCount.get()); } /** Schedules the writing of rows at given offset. */ @@ -699,6 +927,8 @@ private ApiFuture appendInternal( ++this.inflightRequests; this.inflightBytes += requestWrapper.messageSize; waitingRequestQueue.addLast(requestWrapper); + healthCheckMetrics.updateWindowedQueuedRequestsMax( + waitingRequestQueue.size() + inflightRequestQueue.size(), queuedRetryCount.get()); hasMessageInWaitingQueue.signal(); requestProfilerHook.startOperation( RequestProfiler.OperationName.WAIT_INFLIGHT_QUOTA, requestUniqueId); @@ -765,6 +995,66 @@ void setTestOnlyRunTimeExceptionInAppendLoop( this.testOnlyRunTimeExceptionInAppendLoop = testOnlyRunTimeExceptionInAppendLoop; } + @VisibleForTesting() + HealthCheckMetrics.HealthCheckFields gatherTestOnlyHealthCheckMetrics() { + this.lock.lock(); + try { + HealthCheckFields healthCheckFields = healthCheckMetrics.new HealthCheckFields(); + healthCheckFields.responseCodes = new ConcurrentHashMap<>(); + healthCheckMetrics.gatherHealthCheckMetrics(healthCheckFields); + healthCheckMetrics.emitHealthCheckMetrics(healthCheckFields); + return healthCheckFields; + } finally { + this.lock.unlock(); + } + } + + @VisibleForTesting + void setTestOnlyHealthCheckInterval(Duration interval) { + this.lock.lock(); + try { + healthCheckMetrics.HEALTH_CHECK_INTERVAL = interval; + } finally { + this.lock.unlock(); + } + } + + @VisibleForTesting + void setTestOnlyHealthCheckThresholds( + long queuedRequestsThreshold, + long queuedBytesThreshold, + Duration responseWaitTimeThreshold, + Duration latencyThreshold, + int percentRetriesThreshold, + int percentErrorResponsesThreshold, + long connectionAttemptThreshold, + long connectionCloseThreshold) { + this.lock.lock(); + try { + healthCheckMetrics.queuedRequestsThreshold = queuedRequestsThreshold; + healthCheckMetrics.queuedBytesThreshold = queuedBytesThreshold; + healthCheckMetrics.responseWaitTimeThreshold = responseWaitTimeThreshold; + healthCheckMetrics.latencyThreshold = latencyThreshold; + healthCheckMetrics.percentRetriesThreshold = percentRetriesThreshold; + healthCheckMetrics.percentErrorResponsesThreshold = percentErrorResponsesThreshold; + healthCheckMetrics.connectionAttemptThreshold = connectionAttemptThreshold; + healthCheckMetrics.connectionCloseThreshold = connectionCloseThreshold; + } finally { + this.lock.unlock(); + } + } + + @VisibleForTesting + boolean checkTestOnlyHealthCheckThresholds( + HealthCheckMetrics.HealthCheckFields healthCheckFields) { + this.lock.lock(); + try { + return healthCheckMetrics.checkThresholds(healthCheckFields); + } finally { + this.lock.unlock(); + } + } + public long getInflightWaitSeconds() { return inflightWaitSec.longValue(); } @@ -861,9 +1151,11 @@ private void appendLoop() { if (inflightRequestQueue.size() > 0) { Instant sendInstant = inflightRequestQueue.getFirst().requestSendTimeStamp; if (sendInstant != null) { + healthCheckMetrics.updateResponseWait(sendInstant); throwIfWaitCallbackTooLong(sendInstant); } } + healthCheckMetrics.periodicHealthCheck(); // Copy the streamConnectionIsConnected guarded by lock to a local variable. // In addition, only reconnect if there is a retriable error. @@ -895,6 +1187,7 @@ private void appendLoop() { waitForBackoffIfNecessary(requestWrapper); this.inflightRequestQueue.add(requestWrapper); localQueue.addLast(requestWrapper); + healthCheckMetrics.updateRequestsSent(requestWrapper.messageSize); } } catch (InterruptedException e) { log.warning( @@ -1113,6 +1406,7 @@ private void cleanupInflightRequests() { Deque localQueue = new LinkedList(); this.lock.lock(); try { + queuedRetryCount.set(0L); if (this.connectionFinalStatus != null) { finalStatus = this.connectionFinalStatus; } @@ -1191,6 +1485,9 @@ private Boolean retryOnRetryableError(Code errorCode, AppendRequestAndResponse r Long offset = requestWrapper.message.hasOffset() ? requestWrapper.message.getOffset().getValue() : -1; + if (requestWrapper.retryCount == 1) { + queuedRetryCount.incrementAndGet(); // this is the first retry attempt + } if (isDefaultStreamName(streamName) || offset == -1) { log.info( String.format( @@ -1244,6 +1541,20 @@ private void requestCallback(AppendRowsResponse response) { AppendRequestAndResponse requestWrapper; this.lock.lock(); try { + Duration durationLatency = Duration.ZERO; + long latencyMilli = 0; + long responseMessageSize = 0; + if (!this.inflightRequestQueue.isEmpty()) { + responseMessageSize = this.inflightRequestQueue.getFirst().messageSize; + Instant sendInstant = this.inflightRequestQueue.getFirst().requestSendTimeStamp; + if (sendInstant != null) { + durationLatency = Duration.between(sendInstant, Instant.now()); + latencyMilli = durationLatency.toMillis(); + } + } + int statusCode = response.hasError() ? response.getError().getCode() : Status.Code.OK.value(); + healthCheckMetrics.updateResponsesAcked(responseMessageSize, latencyMilli, statusCode); + // Ignored response has arrived if (responsesToIgnore > 0) { if (response.hasError()) { @@ -1275,9 +1586,7 @@ private void requestCallback(AppendRowsResponse response) { connectionRetryStartTime = 0; } if (!this.inflightRequestQueue.isEmpty()) { - Instant sendInstant = inflightRequestQueue.getFirst().requestSendTimeStamp; - if (sendInstant != null) { - Duration durationLatency = Duration.between(sendInstant, Instant.now()); + if (durationLatency.compareTo(Duration.ZERO) > 0) { telemetryMetrics.recordNetworkLatency(durationLatency); } @@ -1321,6 +1630,14 @@ private void requestCallback(AppendRowsResponse response) { return; } } + if (requestWrapper.retryCount > 0) { + this.lock.lock(); + try { + queuedRetryCount.decrementAndGet(); + } finally { + this.lock.unlock(); + } + } // We need a separate thread pool to unblock the next request callback. // Otherwise user may call append inside request callback, which may be blocked on waiting @@ -1401,6 +1718,7 @@ private void doneCallback(Throwable finalStatus) { this.streamConnectionIsConnected = false; this.telemetryMetrics.recordConnectionEnd( Code.values()[Status.fromThrowable(finalStatus).getCode().ordinal()].toString()); + this.healthCheckMetrics.updateConnectionClosed(); if (connectionFinalStatus == null) { if (!closedIdleConnection && connectionRetryStartTime == 0) { connectionRetryStartTime = System.currentTimeMillis(); diff --git a/java-bigquerystorage/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/TelemetryMetrics.java b/java-bigquerystorage/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/TelemetryMetrics.java index e94a45f45034..a06ccaf01c99 100644 --- a/java-bigquerystorage/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/TelemetryMetrics.java +++ b/java-bigquerystorage/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/TelemetryMetrics.java @@ -53,6 +53,7 @@ private static final class OpenTelemetryMetrics { } private OpenTelemetryMetrics openTelemetryMetrics = new OpenTelemetryMetrics(); + // Latency buckets are based on a list of 1.5 ^ n private static final List METRICS_MILLISECONDS_LATENCY_BUCKETS = ImmutableList.of( 0L, 17L, 38L, 86L, 195L, 438L, 985L, 2217L, 4988L, 11223L, 25251L, 56815L, 127834L, diff --git a/java-bigquerystorage/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java b/java-bigquerystorage/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java index 5017a52aa6b8..cb1d1be0dd13 100644 --- a/java-bigquerystorage/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java +++ b/java-bigquerystorage/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java @@ -18,6 +18,7 @@ import static com.google.common.truth.Truth.assertThat; import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -35,6 +36,7 @@ import com.google.protobuf.DescriptorProtos; import com.google.protobuf.Int64Value; import io.grpc.Status; +import io.grpc.Status.Code; import io.grpc.StatusRuntimeException; import io.opentelemetry.api.common.Attributes; import java.io.ByteArrayInputStream; @@ -48,6 +50,7 @@ import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import java.util.logging.Logger; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.VarCharVector; @@ -716,6 +719,12 @@ private AppendRowsResponse createAppendResponse(long offset) { .build(); } + private AppendRowsResponse createAppendResponseWithError(Status.Code code, String message) { + return AppendRowsResponse.newBuilder() + .setError(com.google.rpc.Status.newBuilder().setCode(code.value()).setMessage(message)) + .build(); + } + private ConnectionWorker createMultiplexedConnectionWorker() throws IOException { // By default use only the first table as table reference. return createMultiplexedConnectionWorker( @@ -1150,9 +1159,345 @@ void testDoubleDisconnectWithShorterRetryDuration() throws Exception { assertEquals(0, futures.get(0).get().getAppendResult().getOffset().getValue()); } + @Test + void testHealthCheck() throws Exception { + ProtoSchema schema1 = createProtoSchema("foo"); + StreamWriter sw1 = + StreamWriter.newBuilder(TEST_STREAM_1, client) + .setLocation("us") + .setWriterSchema(schema1) + .build(); + ConnectionWorker connectionWorker = + new ConnectionWorker( + TEST_STREAM_1, + "us", + createProtoSchema("foo"), + 100000, + 100000, + Duration.ofSeconds(100), + FlowController.LimitExceededBehavior.Block, + TEST_TRACE_ID, + null, + client.getSettings(), + retrySettings, + /* enableRequestProfiler= */ false, + /* enableOpenTelemetry= */ false, + /* isMultiplexing= */ true); + int msecResponseDelay = 500; + testBigQueryWrite.setResponseSleep(java.time.Duration.ofMillis(msecResponseDelay)); + + int appendCount = 3; + long sizePerRequest = 27; + for (int i = 0; i < appendCount; i++) { + testBigQueryWrite.addResponse(createAppendResponse(i)); + } + List> futures = new ArrayList<>(); + for (int i = 0; i < appendCount; i++) { + futures.add( + sendTestMessage( + connectionWorker, sw1, createFooProtoRows(new String[] {String.valueOf(i)}), i)); + } + // Sleep for quarter second to ensure requests are queued + Thread.sleep(250); + ConnectionWorker.HealthCheckMetrics.HealthCheckFields healthCheckFields = + connectionWorker.gatherTestOnlyHealthCheckMetrics(); + assertTrue( + healthCheckFields.msecLongestResponseWaitTime > 1 + && healthCheckFields.msecLongestResponseWaitTime < msecResponseDelay); + assertEquals(appendCount, healthCheckFields.queuedRequestCountMax); + assertEquals(appendCount * sizePerRequest, healthCheckFields.inflightBytes); + assertEquals("projects/p1/datasets/d1/tables/t1/streams/s1", healthCheckFields.streamName); + assertEquals(connectionWorker.getWriterId(), healthCheckFields.writerId); + + // Wait for responses to arrive + for (ApiFuture future : futures) { + future.get(); + } + + // Ensure connection is closed + connectionWorker.close(); + + healthCheckFields = connectionWorker.gatherTestOnlyHealthCheckMetrics(); + assertTrue(healthCheckFields.msecMaxLatency >= msecResponseDelay); + assertTrue(healthCheckFields.msecAvgLatency >= msecResponseDelay); + assertTrue(healthCheckFields.msecMaxLatency >= healthCheckFields.msecAvgLatency); + assertTrue(healthCheckFields.sendBps > 0L); + assertTrue(healthCheckFields.receiveBps > 0L); + assertTrue(healthCheckFields.responseCodes.containsKey(Status.Code.OK.value())); + assertEquals(appendCount, healthCheckFields.responseCodes.get(Status.Code.OK.value())); + assertEquals(appendCount, healthCheckFields.requestsSentCount); + assertEquals(appendCount, healthCheckFields.responseCount); + assertEquals(appendCount, healthCheckFields.queuedRequestCountMax); + assertEquals(0, healthCheckFields.queuedRetryCountMax); + assertEquals(false, healthCheckFields.isConnected); + assertTrue(healthCheckFields.connectionAttemptCount > 0); + assertTrue(healthCheckFields.connectionClosedCount > 0); + } + @Test void testLocationName() throws Exception { assertEquals( "projects/p1/locations/us", ConnectionWorker.getRoutingHeader(TEST_STREAM_1, "us")); } + + @Test + void testHealthCheckThresholds() throws Exception { + ConnectionWorker connectionWorker = + new ConnectionWorker( + TEST_STREAM_1, + "us", + createProtoSchema("foo"), + 100000, + 100000, + Duration.ofSeconds(100), + FlowController.LimitExceededBehavior.Block, + TEST_TRACE_ID, + null, + client.getSettings(), + retrySettings, + /* enableRequestProfiler= */ false, + /* enableOpenTelemetry= */ false, + /* isMultiplexing= */ true); + ConnectionWorker.HealthCheckMetrics.HealthCheckFields fields = + connectionWorker.gatherTestOnlyHealthCheckMetrics(); + + // Default everything to zero/healthy + fields.msecLongestResponseWaitTime = 0; + fields.msecMaxLatency = 0; + fields.msecAvgLatency = 0; + fields.sendBps = 0; + fields.receiveBps = 0; + fields.responseCodes.clear(); + fields.requestsSentCount = 0; + fields.responseCount = 0; + fields.queuedRequestCountMax = 0; + fields.queuedRetryCountMax = 0; + fields.inflightBytes = 0; + fields.connectionAttemptCount = 0; + fields.connectionClosedCount = 0; + fields.isConnected = false; + + // Should be healthy with default values + assertEquals(false, connectionWorker.checkTestOnlyHealthCheckThresholds(fields)); + + // msecLongestResponseWaitTime >= responseWaitTimeThreshold (5000) + fields.msecLongestResponseWaitTime = 5000; + assertEquals(true, connectionWorker.checkTestOnlyHealthCheckThresholds(fields)); + fields.msecLongestResponseWaitTime = 0; + + // msecMaxLatency >= latencyThreshold (5000) + fields.msecMaxLatency = 5000; + assertEquals(true, connectionWorker.checkTestOnlyHealthCheckThresholds(fields)); + fields.msecMaxLatency = 0; + + // Division by zero check (allResponses == 0) -> should not crash and be false + fields.responseCodes.clear(); + assertEquals(false, connectionWorker.checkTestOnlyHealthCheckThresholds(fields)); + + // Trigger percentErrorResponsesThreshold (10%) + fields.responseCodes.put(Status.Code.OK.value(), 8); + fields.responseCodes.put(Status.Code.INTERNAL.value(), 2); // 2/10 = 20% + assertEquals(true, connectionWorker.checkTestOnlyHealthCheckThresholds(fields)); + fields.responseCodes.clear(); + + // Healthy response codes + fields.responseCodes.put(Status.Code.OK.value(), 10); + assertEquals(false, connectionWorker.checkTestOnlyHealthCheckThresholds(fields)); + fields.responseCodes.clear(); + + // queuedRequestCountMax >= queuedRequestsThreshold (100) + fields.queuedRequestCountMax = 100; + assertEquals(true, connectionWorker.checkTestOnlyHealthCheckThresholds(fields)); + fields.queuedRequestCountMax = 0; + + // Division by zero check (queuedRequestCountMax == 0) -> should not crash + // and be false + fields.queuedRequestCountMax = 0; + fields.queuedRetryCountMax = 10; + assertEquals(false, connectionWorker.checkTestOnlyHealthCheckThresholds(fields)); + + // Trigger percentRetriesThreshold (25%) + fields.queuedRequestCountMax = 10; + fields.queuedRetryCountMax = 3; // 3/10 = 30% + assertEquals(true, connectionWorker.checkTestOnlyHealthCheckThresholds(fields)); + fields.queuedRequestCountMax = 0; + fields.queuedRetryCountMax = 0; + + // inflightBytes >= queuedBytesThreshold (52428800) + fields.inflightBytes = 50 * 1024 * 1024; + assertEquals(true, connectionWorker.checkTestOnlyHealthCheckThresholds(fields)); + fields.inflightBytes = 0; + + // connectionAttemptCount >= connectionAttemptThreshold (1) + fields.connectionAttemptCount = 1; + assertEquals(true, connectionWorker.checkTestOnlyHealthCheckThresholds(fields)); + fields.connectionAttemptCount = 0; + + // connectionClosedCount >= connectionCloseThreshold (1) + fields.connectionClosedCount = 1; + assertEquals(true, connectionWorker.checkTestOnlyHealthCheckThresholds(fields)); + } + + @Test + void testInflightRetryCountHealthMetric() throws Exception { + ProtoSchema schema1 = createProtoSchema("foo"); + StreamWriter sw1 = + StreamWriter.newBuilder(TEST_STREAM_1, client) + .setLocation("us") + .setWriterSchema(schema1) + .build(); + ConnectionWorker connectionWorker = + new ConnectionWorker( + TEST_STREAM_1, + "us", + createProtoSchema("foo"), + 100000, + 100000, + Duration.ofSeconds(100), + FlowController.LimitExceededBehavior.Block, + TEST_TRACE_ID, + null, + client.getSettings(), + retrySettings, + /* enableRequestProfiler= */ false, + /* enableOpenTelemetry= */ false, + /* isMultiplexing= */ true); + + // Simulate a retriable error + int msecResponseDelay = 1000; + testBigQueryWrite.setResponseSleep(java.time.Duration.ofMillis(msecResponseDelay)); + + // Add responses for the retries and the final success + testBigQueryWrite.addResponse( + createAppendResponseWithError(Status.INTERNAL.getCode(), "force the first retry")); + testBigQueryWrite.addResponse( + createAppendResponseWithError(Status.INTERNAL.getCode(), "force the second retry")); + testBigQueryWrite.addResponse(createAppendResponse(-1)); + + ApiFuture future = + sendTestMessage( + connectionWorker, sw1, createFooProtoRows(new String[] {String.valueOf(0)}), -1); + + // Wait for the initial failure and retry to be enqueued + Thread.sleep(1500); + + ConnectionWorker.HealthCheckMetrics.HealthCheckFields healthCheckFields = + connectionWorker.gatherTestOnlyHealthCheckMetrics(); + assertEquals(1, healthCheckFields.queuedRequestCountMax); + assertEquals(1, healthCheckFields.queuedRetryCountMax); + assertTrue(healthCheckFields.responseCodes.containsKey(Code.INTERNAL.value())); + assertEquals(1, healthCheckFields.responseCodes.get(Code.INTERNAL.value())); + assertFalse(healthCheckFields.responseCodes.containsKey(Status.Code.OK.value())); + + // Allow the retries to complete successfully + future.get(); + + healthCheckFields = connectionWorker.gatherTestOnlyHealthCheckMetrics(); + assertEquals(1, healthCheckFields.queuedRequestCountMax); + assertEquals(1, healthCheckFields.queuedRetryCountMax); + assertTrue(healthCheckFields.responseCodes.containsKey(Code.INTERNAL.value())); + assertEquals(2, healthCheckFields.responseCodes.get(Code.INTERNAL.value())); + assertTrue(healthCheckFields.responseCodes.containsKey(Status.Code.OK.value())); + assertEquals(1, healthCheckFields.responseCodes.get(Status.Code.OK.value())); + } + + private static class DummyResponseSupplierWillFailThenSucceed + implements Supplier { + + private final int totalFailCount; + private int failCount; + private final com.google.rpc.Status failStatus; + private final FakeBigQueryWriteImpl.Response response; + + DummyResponseSupplierWillFailThenSucceed( + FakeBigQueryWriteImpl.Response response, + int totalFailCount, + com.google.rpc.Status failStatus) { + this.totalFailCount = totalFailCount; + this.response = response; + this.failStatus = failStatus; + this.failCount = 0; + } + + @Override + public FakeBigQueryWriteImpl.Response get() { + if (failCount >= totalFailCount) { + return response; + } + failCount++; + return new FakeBigQueryWriteImpl.Response( + AppendRowsResponse.newBuilder().setError(this.failStatus).build()); + } + } + + @Test + void testInflightRetryCountHealthMetricExactlyOnce() throws Exception { + ProtoSchema schema1 = createProtoSchema("foo"); + StreamWriter sw1 = + StreamWriter.newBuilder(TEST_STREAM_1, client) + .setLocation("us") + .setWriterSchema(schema1) + .build(); + ConnectionWorker connectionWorker = + new ConnectionWorker( + TEST_STREAM_1, + "us", + createProtoSchema("foo"), + 100000, + 100000, + Duration.ofSeconds(100), + FlowController.LimitExceededBehavior.Block, + TEST_TRACE_ID, + null, + client.getSettings(), + retrySettings, + /* enableRequestProfiler= */ false, + /* enableOpenTelemetry= */ false, + /* isMultiplexing= */ false); + + // Simulate a retriable error + int msecResponseDelay = 1000; + testBigQueryWrite.setResponseSleep(java.time.Duration.ofMillis(msecResponseDelay)); + + // Add responses for the retries and the final success + testBigQueryWrite.addResponse( + new DummyResponseSupplierWillFailThenSucceed( + new FakeBigQueryWriteImpl.Response(createAppendResponse(0)), + /* totalFailCount= */ 1, + com.google.rpc.Status.newBuilder().setCode(Code.INTERNAL.ordinal()).build())); + testBigQueryWrite.addResponse(createAppendResponse(1)); + + ApiFuture future1 = + sendTestMessage( + connectionWorker, sw1, createFooProtoRows(new String[] {String.valueOf(0)}), 0); + ApiFuture future2 = + sendTestMessage( + connectionWorker, sw1, createFooProtoRows(new String[] {String.valueOf(1)}), 1); + + // Wait for the initial failure and retry to be enqueued + Thread.sleep(1500); + + ConnectionWorker.HealthCheckMetrics.HealthCheckFields healthCheckFields = + connectionWorker.gatherTestOnlyHealthCheckMetrics(); + assertEquals(2, healthCheckFields.queuedRequestCountMax); + assertEquals( + 1, + healthCheckFields + .queuedRetryCountMax); // Only the failed request is included in the retry count, even + // though all inflight requests are resent. + assertTrue(healthCheckFields.responseCodes.containsKey(Code.INTERNAL.value())); + assertEquals(1, healthCheckFields.responseCodes.get(Code.INTERNAL.value())); + assertFalse(healthCheckFields.responseCodes.containsKey(Status.Code.OK.value())); + + // Allow the retries to complete successfully + future2.get(); + + healthCheckFields = connectionWorker.gatherTestOnlyHealthCheckMetrics(); + assertEquals(2, healthCheckFields.queuedRequestCountMax); + assertEquals(1, healthCheckFields.queuedRetryCountMax); + assertTrue(healthCheckFields.responseCodes.containsKey(Code.INTERNAL.value())); + assertEquals(1, healthCheckFields.responseCodes.get(Code.INTERNAL.value())); + assertTrue(healthCheckFields.responseCodes.containsKey(Status.Code.OK.value())); + assertEquals(3, healthCheckFields.responseCodes.get(Status.Code.OK.value())); + } }