Skip to content

Commit 6492a5f

Browse files
author
Mykyta Yarovyi
committed
Fix case when cancellation token was requested but query was not canceled
1 parent 30718ef commit 6492a5f

File tree

1 file changed

+60
-45
lines changed

1 file changed

+60
-45
lines changed

trino-csharp/Trino.Client/StatementClientV1.cs

+60-45
Original file line numberDiff line numberDiff line change
@@ -284,65 +284,80 @@ private async Task<bool> Cancel(QueryCancellationReason reason = QueryCancellati
284284
/// </summary>
285285
internal async Task<ResponseQueueStatement> Advance()
286286
{
287-
if (this.Statement.nextUri.Contains("/executing"))
287+
try
288288
{
289-
if (this.Statement.nextUri.Contains("?"))
289+
if (this.Statement.nextUri.Contains("/executing"))
290290
{
291-
this.Statement.nextUri += $"&targetResultSize={Constants.MaxTargetResultSizeMB}MB";
291+
if (this.Statement.nextUri.Contains("?"))
292+
{
293+
this.Statement.nextUri += $"&targetResultSize={Constants.MaxTargetResultSizeMB}MB";
294+
}
295+
else
296+
{
297+
this.Statement.nextUri += $"?targetResultSize={Constants.MaxTargetResultSizeMB}MB";
298+
}
292299
}
293-
else
300+
301+
logger?.LogDebug("Trino: request: {1}", this.Statement.nextUri);
302+
303+
string responseStr = await this.GetAsync(new Uri(this.Statement.nextUri), OK).ConfigureAwait(false);
304+
logger?.LogDebug("Trino: response: {1}", responseStr);
305+
QueryResultPage response = JsonConvert.DeserializeObject<QueryResultPage>(responseStr);
306+
logger?.LogDebug("Trino: response at {0} msec with state {1}", stopwatch.ElapsedMilliseconds,
307+
response.stats.state);
308+
309+
// Note, the size is estimated based on the response string size which is not the actual deserialized size.
310+
ResponseQueueStatement responseQueueItem = new ResponseQueueStatement(response, responseStr.Length);
311+
if (responseQueueItem.Response.error != null)
294312
{
295-
this.Statement.nextUri += $"?targetResultSize={Constants.MaxTargetResultSizeMB}MB";
313+
State.StateTransition(TrinoQueryStates.CLIENT_ERROR, TrinoQueryStates.RUNNING);
314+
throw new TrinoException(responseQueueItem.Response.error.message,
315+
responseQueueItem.Response.error);
296316
}
297-
}
298-
logger?.LogDebug("Trino: request: {1}", this.Statement.nextUri);
299317

300-
string responseStr = await this.GetAsync(new Uri(this.Statement.nextUri), OK).ConfigureAwait(false);
301-
logger?.LogDebug("Trino: response: {1}", responseStr);
302-
QueryResultPage response = JsonConvert.DeserializeObject<QueryResultPage>(responseStr);
303-
logger?.LogDebug("Trino: response at {0} msec with state {1}", stopwatch.ElapsedMilliseconds, response.stats.state);
318+
// Make status available
319+
this.Statement = responseQueueItem.Response;
304320

305-
// Note, the size is estimated based on the response string size which is not the actual deserialized size.
306-
ResponseQueueStatement responseQueueItem = new ResponseQueueStatement(response, responseStr.Length);
307-
if (responseQueueItem.Response.error != null)
308-
{
309-
State.StateTransition(TrinoQueryStates.CLIENT_ERROR, TrinoQueryStates.RUNNING);
310-
throw new TrinoException(responseQueueItem.Response.error.message, responseQueueItem.Response.error);
311-
}
312-
313-
if (cancellationToken.IsCancellationRequested)
314-
{
315-
await this.Cancel(QueryCancellationReason.USER_CANCEL).ConfigureAwait(false);
316-
throw new OperationCanceledException("Cancellation requested");
317-
}
321+
// If no next URI, the query is completed.
322+
if (this.Statement.IsLastPage)
323+
{
324+
this.Finish();
325+
}
326+
else if (this.IsTimeout)
327+
{
328+
logger?.LogInformation("Trino: Query timed out queryId:{0}, run time: {1} s, timeout {2} s.",
329+
Statement?.id, this.stopwatch.Elapsed.TotalSeconds,
330+
Session.Properties.ClientRequestTimeout.Value.TotalSeconds);
331+
await this.Cancel(QueryCancellationReason.TIMEOUT).ConfigureAwait(false);
332+
throw new TimeoutException(
333+
$"Trino query ran for {this.stopwatch.Elapsed.TotalSeconds} s, exceeding the timeout of {Session.Properties.Timeout.Value.TotalSeconds} s.");
334+
}
318335

319-
// Make status available
320-
this.Statement = responseQueueItem.Response;
336+
// Do not wait if the query had data - the next page may be ready immediately.
337+
if (!responseQueueItem.Response.HasData && !State.IsFinished && readCount > 4)
338+
{
339+
logger?.LogDebug("Trino: No data yet, backoff wait queryId:{0}, delay {1} msec", Statement?.id,
340+
readDelay);
341+
await Task.Delay((int)readDelay).ConfigureAwait(false);
342+
if (readDelay < MAX_READ_DELAY_MSEC)
343+
{
344+
readDelay *= BACKOFF_AMOUNT;
345+
}
346+
}
321347

322-
// If no next URI, the query is completed.
323-
if (this.Statement.IsLastPage)
324-
{
325-
this.Finish();
348+
readCount++;
349+
return responseQueueItem;
326350
}
327-
else if (this.IsTimeout)
351+
catch (Exception)
328352
{
329-
logger?.LogInformation("Trino: Query timed out queryId:{0}, run time: {1} s, timeout {2} s.", Statement?.id, this.stopwatch.Elapsed.TotalSeconds, Session.Properties.ClientRequestTimeout.Value.TotalSeconds);
330-
await this.Cancel(QueryCancellationReason.TIMEOUT).ConfigureAwait(false);
331-
throw new TimeoutException($"Trino query ran for {this.stopwatch.Elapsed.TotalSeconds} s, exceeding the timeout of {Session.Properties.Timeout.Value.TotalSeconds} s.");
332-
}
333-
334-
// Do not wait if the query had data - the next page may be ready immediately.
335-
if (!responseQueueItem.Response.HasData && !State.IsFinished && readCount > 4)
336-
{
337-
logger?.LogDebug("Trino: No data yet, backoff wait queryId:{0}, delay {1} msec", Statement?.id, readDelay);
338-
await Task.Delay((int)readDelay).ConfigureAwait(false);
339-
if (readDelay < MAX_READ_DELAY_MSEC)
353+
if (cancellationToken.IsCancellationRequested)
340354
{
341-
readDelay *= BACKOFF_AMOUNT;
355+
await this.Cancel(QueryCancellationReason.USER_CANCEL).ConfigureAwait(false);
356+
throw new OperationCanceledException("Cancellation requested");
342357
}
358+
359+
throw;
343360
}
344-
readCount++;
345-
return responseQueueItem;
346361
}
347362

348363
/// <summary>

0 commit comments

Comments
 (0)