diff --git a/trino-csharp/Trino.Client/StatementClientV1.cs b/trino-csharp/Trino.Client/StatementClientV1.cs index 996ab98..c2b6d9f 100644 --- a/trino-csharp/Trino.Client/StatementClientV1.cs +++ b/trino-csharp/Trino.Client/StatementClientV1.cs @@ -284,65 +284,80 @@ private async Task Cancel(QueryCancellationReason reason = QueryCancellati /// internal async Task Advance() { - if (this.Statement.nextUri.Contains("/executing")) + try { - if (this.Statement.nextUri.Contains("?")) + if (this.Statement.nextUri.Contains("/executing")) { - this.Statement.nextUri += $"&targetResultSize={Constants.MaxTargetResultSizeMB}MB"; + if (this.Statement.nextUri.Contains("?")) + { + this.Statement.nextUri += $"&targetResultSize={Constants.MaxTargetResultSizeMB}MB"; + } + else + { + this.Statement.nextUri += $"?targetResultSize={Constants.MaxTargetResultSizeMB}MB"; + } } - else + + logger?.LogDebug("Trino: request: {1}", this.Statement.nextUri); + + string responseStr = await this.GetAsync(new Uri(this.Statement.nextUri), OK).ConfigureAwait(false); + logger?.LogDebug("Trino: response: {1}", responseStr); + QueryResultPage response = JsonConvert.DeserializeObject(responseStr); + logger?.LogDebug("Trino: response at {0} msec with state {1}", stopwatch.ElapsedMilliseconds, + response.stats.state); + + // Note, the size is estimated based on the response string size which is not the actual deserialized size. + ResponseQueueStatement responseQueueItem = new ResponseQueueStatement(response, responseStr.Length); + if (responseQueueItem.Response.error != null) { - this.Statement.nextUri += $"?targetResultSize={Constants.MaxTargetResultSizeMB}MB"; + State.StateTransition(TrinoQueryStates.CLIENT_ERROR, TrinoQueryStates.RUNNING); + throw new TrinoException(responseQueueItem.Response.error.message, + responseQueueItem.Response.error); } - } - logger?.LogDebug("Trino: request: {1}", this.Statement.nextUri); - string responseStr = await this.GetAsync(new Uri(this.Statement.nextUri), OK).ConfigureAwait(false); - logger?.LogDebug("Trino: response: {1}", responseStr); - QueryResultPage response = JsonConvert.DeserializeObject(responseStr); - logger?.LogDebug("Trino: response at {0} msec with state {1}", stopwatch.ElapsedMilliseconds, response.stats.state); + // Make status available + this.Statement = responseQueueItem.Response; - // Note, the size is estimated based on the response string size which is not the actual deserialized size. - ResponseQueueStatement responseQueueItem = new ResponseQueueStatement(response, responseStr.Length); - if (responseQueueItem.Response.error != null) - { - State.StateTransition(TrinoQueryStates.CLIENT_ERROR, TrinoQueryStates.RUNNING); - throw new TrinoException(responseQueueItem.Response.error.message, responseQueueItem.Response.error); - } - - if (cancellationToken.IsCancellationRequested) - { - await this.Cancel(QueryCancellationReason.USER_CANCEL).ConfigureAwait(false); - throw new OperationCanceledException("Cancellation requested"); - } + // If no next URI, the query is completed. + if (this.Statement.IsLastPage) + { + this.Finish(); + } + else if (this.IsTimeout) + { + logger?.LogInformation("Trino: Query timed out queryId:{0}, run time: {1} s, timeout {2} s.", + Statement?.id, this.stopwatch.Elapsed.TotalSeconds, + Session.Properties.ClientRequestTimeout.Value.TotalSeconds); + await this.Cancel(QueryCancellationReason.TIMEOUT).ConfigureAwait(false); + throw new TimeoutException( + $"Trino query ran for {this.stopwatch.Elapsed.TotalSeconds} s, exceeding the timeout of {Session.Properties.Timeout.Value.TotalSeconds} s."); + } - // Make status available - this.Statement = responseQueueItem.Response; + // Do not wait if the query had data - the next page may be ready immediately. + if (!responseQueueItem.Response.HasData && !State.IsFinished && readCount > 4) + { + logger?.LogDebug("Trino: No data yet, backoff wait queryId:{0}, delay {1} msec", Statement?.id, + readDelay); + await Task.Delay((int)readDelay).ConfigureAwait(false); + if (readDelay < MAX_READ_DELAY_MSEC) + { + readDelay *= BACKOFF_AMOUNT; + } + } - // If no next URI, the query is completed. - if (this.Statement.IsLastPage) - { - this.Finish(); + readCount++; + return responseQueueItem; } - else if (this.IsTimeout) + catch (Exception) { - logger?.LogInformation("Trino: Query timed out queryId:{0}, run time: {1} s, timeout {2} s.", Statement?.id, this.stopwatch.Elapsed.TotalSeconds, Session.Properties.ClientRequestTimeout.Value.TotalSeconds); - await this.Cancel(QueryCancellationReason.TIMEOUT).ConfigureAwait(false); - throw new TimeoutException($"Trino query ran for {this.stopwatch.Elapsed.TotalSeconds} s, exceeding the timeout of {Session.Properties.Timeout.Value.TotalSeconds} s."); - } - - // Do not wait if the query had data - the next page may be ready immediately. - if (!responseQueueItem.Response.HasData && !State.IsFinished && readCount > 4) - { - logger?.LogDebug("Trino: No data yet, backoff wait queryId:{0}, delay {1} msec", Statement?.id, readDelay); - await Task.Delay((int)readDelay).ConfigureAwait(false); - if (readDelay < MAX_READ_DELAY_MSEC) + if (cancellationToken.IsCancellationRequested) { - readDelay *= BACKOFF_AMOUNT; + await this.Cancel(QueryCancellationReason.USER_CANCEL).ConfigureAwait(false); + throw new OperationCanceledException("Cancellation requested"); } + + throw; } - readCount++; - return responseQueueItem; } ///