Skip to content

Publish queue latency metrics from tracked thread pools #120488

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
9918299
Publish queue latency metrics from tracked thread pools
nicktindall Jan 21, 2025
b1d8df4
Merge remote-tracking branch 'origin/main' into ES-10531_add_thread_p…
nicktindall Jan 21, 2025
dfef676
Fix metric name
nicktindall Jan 21, 2025
2ceb965
Temporary hack to fix metric name
nicktindall Jan 21, 2025
dbed27f
Propose solution to composite thread-pool names
nicktindall Jan 22, 2025
c04f2ea
Merge remote-tracking branch 'origin/main' into ES-10531_add_thread_p…
nicktindall Jan 22, 2025
c95f625
Fix fixed thread pool names
nicktindall Jan 22, 2025
f850bc9
Merge remote-tracking branch 'origin/main' into ES-10531_add_thread_p…
nicktindall Jan 23, 2025
1b450f0
Merge branch 'main' into ES-10531_add_thread_pool_queue_latency_metric
nicktindall Mar 7, 2025
e7f5bb6
POC using HandlingTimeTracker to track queue latency
nicktindall Mar 11, 2025
d269195
Merge branch 'main' into ES-10531_add_thread_pool_queue_latency_metric
nicktindall Mar 11, 2025
80b8b3f
Tidy
nicktindall Mar 11, 2025
9811299
Merge branch 'main' into ES-10531_add_thread_pool_queue_latency_metric
nicktindall Mar 12, 2025
598d0a8
Fix metric name
nicktindall Mar 13, 2025
4153d27
Generalise HandlingTimeTracker
nicktindall Mar 25, 2025
d4b0818
Merge remote-tracking branch 'origin/main' into ES-10531_add_thread_p…
nicktindall Mar 25, 2025
3e3d9fc
Tidy and fix document/test
nicktindall Mar 25, 2025
be4b96c
Restore field name
nicktindall Mar 26, 2025
8a313ab
Update docs/changelog/120488.yaml
nicktindall Mar 26, 2025
8fa7dc6
Fix changelog area value
nicktindall Mar 26, 2025
2c36b97
Update server/src/main/java/org/elasticsearch/common/metrics/Exponent…
nicktindall Mar 27, 2025
420739e
Setup metrics separately to constructor
nicktindall Mar 28, 2025
3208423
Remove unnecessary change
nicktindall Mar 28, 2025
9dd0457
Merge remote-tracking branch 'origin/main' into ES-10531_add_thread_p…
nicktindall Mar 28, 2025
516a4a9
Merge branch 'main' into ES-10531_add_thread_pool_queue_latency_metric
nicktindall Apr 2, 2025
d6e44ed
Add 99th percentile
nicktindall Apr 6, 2025
dfbd9ac
Merge branch 'main' into ES-10531_add_thread_pool_queue_latency_metric
nicktindall Apr 7, 2025
2b294bd
Record queue latency metrics in beforeExecute
nicktindall Apr 7, 2025
9e227ed
Handle p100 percentile when last bucket is populated
nicktindall Apr 7, 2025
15acd80
Update server/src/main/java/org/elasticsearch/common/metrics/Exponent…
nicktindall May 1, 2025
368a66a
Fix variable naming
nicktindall May 1, 2025
bb84ed9
Update variable names where we use ExponentialBucketHistogram
nicktindall May 1, 2025
273fc23
Merge remote-tracking branch 'origin/main' into ES-10531_add_thread_p…
nicktindall May 1, 2025
778e8b5
Update server/src/main/java/org/elasticsearch/common/metrics/Exponent…
nicktindall May 1, 2025
876ae78
Assert that taskQueueLatency >= 0
nicktindall May 1, 2025
51191eb
Improve documentation of test
nicktindall May 1, 2025
5c6a4a7
Merge branch 'main' into ES-10531_add_thread_pool_queue_latency_metric
nicktindall May 5, 2025
6ebc331
Fix metric name
nicktindall May 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/120488.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 120488
summary: Publish queue latency metrics from tracked thread pools
area: "Infra/Metrics"
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,17 @@
package org.elasticsearch.common.util.concurrent;

import org.elasticsearch.common.ExponentiallyWeightedMovingAverage;
import org.elasticsearch.common.metrics.ExponentialBucketHistogram;
import org.elasticsearch.common.util.concurrent.EsExecutors.TaskTrackingConfig;
import org.elasticsearch.core.TimeValue;

import org.elasticsearch.telemetry.metric.DoubleWithAttributes;
import org.elasticsearch.telemetry.metric.Instrument;
import org.elasticsearch.telemetry.metric.LongWithAttributes;
import org.elasticsearch.telemetry.metric.MeterRegistry;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -22,11 +30,17 @@
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Function;

import static org.elasticsearch.threadpool.ThreadPool.THREAD_POOL_METRIC_NAME_QUEUE_TIME;
import static org.elasticsearch.threadpool.ThreadPool.THREAD_POOL_METRIC_NAME_UTILIZATION;

/**
* An extension to thread pool executor, which tracks statistics for the task execution time.
*/
public final class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThreadPoolExecutor {

public static final int QUEUE_LATENCY_HISTOGRAM_BUCKETS = 18;
private static final int[] LATENCY_PERCENTILES_TO_REPORT = { 50, 90, 99 };

private final Function<Runnable, WrappedRunnable> runnableWrapper;
private final ExponentiallyWeightedMovingAverage executionEWMA;
private final LongAdder totalExecutionTime = new LongAdder();
Expand All @@ -35,6 +49,7 @@ public final class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThrea
private final Map<Runnable, Long> ongoingTasks = new ConcurrentHashMap<>();
private volatile long lastPollTime = System.nanoTime();
private volatile long lastTotalExecutionTime = 0;
private final ExponentialBucketHistogram queueLatencyMillisHistogram = new ExponentialBucketHistogram(QUEUE_LATENCY_HISTOGRAM_BUCKETS);

TaskExecutionTimeTrackingEsThreadPoolExecutor(
String name,
Expand All @@ -55,6 +70,36 @@ public final class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThrea
this.trackOngoingTasks = trackingConfig.trackOngoingTasks();
}

public List<Instrument> setupMetrics(MeterRegistry meterRegistry, String threadPoolName) {
return List.of(
meterRegistry.registerLongsGauge(
ThreadPool.THREAD_POOL_METRIC_PREFIX + threadPoolName + THREAD_POOL_METRIC_NAME_QUEUE_TIME,
"Time tasks spent in the queue for the " + threadPoolName + " thread pool",
"milliseconds",
() -> {
long[] snapshot = queueLatencyMillisHistogram.getSnapshot();
int[] bucketUpperBounds = queueLatencyMillisHistogram.calculateBucketUpperBounds();
List<LongWithAttributes> metricValues = Arrays.stream(LATENCY_PERCENTILES_TO_REPORT)
.mapToObj(
percentile -> new LongWithAttributes(
queueLatencyMillisHistogram.getPercentile(percentile / 100f, snapshot, bucketUpperBounds),
Map.of("percentile", String.valueOf(percentile))
)
)
.toList();
queueLatencyMillisHistogram.clear();
return metricValues;
}
),
meterRegistry.registerDoubleGauge(
ThreadPool.THREAD_POOL_METRIC_PREFIX + threadPoolName + THREAD_POOL_METRIC_NAME_UTILIZATION,
"fraction of maximum thread time utilized for " + threadPoolName,
"fraction",
() -> new DoubleWithAttributes(pollUtilization(), Map.of())
)
);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if rather than publishing a time-series-per-percentile (using percentile attribute) we should publish a metric-per-percentile.
The metric makes no sense if you don't filter by a percentile label.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it easier to plot different percentiles on the same graph with labels (and group by) compared to two different time series?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that makes a difference, but I'm not sure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, just having a look at Kibana just now, it would be much easier to plot as a single metric grouped-by the percentiles. As separate metrics we'd need to add them as distinct time-series.

}

@Override
protected Runnable wrapRunnable(Runnable command) {
return super.wrapRunnable(this.runnableWrapper.apply(command));
Expand Down Expand Up @@ -116,6 +161,12 @@ protected void beforeExecute(Thread t, Runnable r) {
if (trackOngoingTasks) {
ongoingTasks.put(r, System.nanoTime());
}
assert super.unwrap(r) instanceof TimedRunnable : "expected only TimedRunnables in queue";
final TimedRunnable timedRunnable = (TimedRunnable) super.unwrap(r);
timedRunnable.beforeExecute();
final long taskQueueLatency = timedRunnable.getQueueTimeNanos();
assert taskQueueLatency >= 0;
queueLatencyMillisHistogram.addObservation(TimeUnit.NANOSECONDS.toMillis(taskQueueLatency));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
class TimedRunnable extends AbstractRunnable implements WrappedRunnable {
private final Runnable original;
private final long creationTimeNanos;
private long beforeExecuteTime = -1;
private long startTimeNanos;
private long finishTimeNanos = -1;
private boolean failedOrRejected = false;
Expand Down Expand Up @@ -58,6 +59,19 @@ public boolean isForceExecution() {
return original instanceof AbstractRunnable && ((AbstractRunnable) original).isForceExecution();
}

/**
* Returns the time in nanoseconds between the creation time and the execution time
*
* @return The time in nanoseconds or -1 if the task was never de-queued
*/
long getQueueTimeNanos() {
if (beforeExecuteTime == -1) {
assert false : "beforeExecute must be called before getQueueTimeNanos";
return -1;
}
return beforeExecuteTime - creationTimeNanos;
}

/**
* Return the time this task spent being run.
* If the task is still running or has not yet been run, returns -1.
Expand All @@ -70,6 +84,13 @@ long getTotalExecutionNanos() {
return Math.max(finishTimeNanos - startTimeNanos, 1);
}

/**
* Called when the task has reached the front of the queue and is about to be executed
*/
public void beforeExecute() {
beforeExecuteTime = System.nanoTime();
}

/**
* If the task was failed or rejected, return true.
* Otherwise, false.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.ReportingService;
import org.elasticsearch.telemetry.metric.DoubleWithAttributes;
import org.elasticsearch.telemetry.metric.Instrument;
import org.elasticsearch.telemetry.metric.LongAsyncCounter;
import org.elasticsearch.telemetry.metric.LongGauge;
Expand Down Expand Up @@ -154,6 +153,7 @@ public static class Names {
public static final String THREAD_POOL_METRIC_NAME_UTILIZATION = ".threads.utilization.current";
public static final String THREAD_POOL_METRIC_NAME_LARGEST = ".threads.largest.current";
public static final String THREAD_POOL_METRIC_NAME_REJECTED = ".threads.rejected.total";
public static final String THREAD_POOL_METRIC_NAME_QUEUE_TIME = ".queue.latency.histogram";

public enum ThreadPoolType {
FIXED("fixed"),
Expand Down Expand Up @@ -379,14 +379,7 @@ private static ArrayList<Instrument> setupMetrics(MeterRegistry meterRegistry, S
}

if (threadPoolExecutor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor timeTrackingExecutor) {
instruments.add(
meterRegistry.registerDoubleGauge(
prefix + THREAD_POOL_METRIC_NAME_UTILIZATION,
"fraction of maximum thread time utilized for " + name,
"fraction",
() -> new DoubleWithAttributes(timeTrackingExecutor.pollUtilization(), at)
)
);
instruments.addAll(timeTrackingExecutor.setupMetrics(meterRegistry, name));
}
}
return instruments;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,27 @@

package org.elasticsearch.common.util.concurrent;

import org.elasticsearch.common.metrics.ExponentialBucketHistogram;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors.TaskTrackingConfig;
import org.elasticsearch.telemetry.InstrumentType;
import org.elasticsearch.telemetry.Measurement;
import org.elasticsearch.telemetry.RecordingMeterRegistry;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import static org.elasticsearch.common.util.concurrent.EsExecutors.TaskTrackingConfig.DEFAULT_EWMA_ALPHA;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;

/**
* Tests for the automatic queue resizing of the {@code QueueResizingEsThreadPoolExecutorTests}
Expand Down Expand Up @@ -147,6 +156,85 @@ public void testGetOngoingTasks() throws Exception {
executor.awaitTermination(10, TimeUnit.SECONDS);
}

public void testQueueLatencyMetrics() {
RecordingMeterRegistry meterRegistry = new RecordingMeterRegistry();
final var threadPoolName = randomIdentifier();
var executor = new TaskExecutionTimeTrackingEsThreadPoolExecutor(
threadPoolName,
1,
1,
1000,
TimeUnit.MILLISECONDS,
ConcurrentCollections.newBlockingQueue(),
TimedRunnable::new,
EsExecutors.daemonThreadFactory("queuetest"),
new EsAbortPolicy(),
new ThreadContext(Settings.EMPTY),
new TaskTrackingConfig(true, DEFAULT_EWMA_ALPHA)
);
executor.setupMetrics(meterRegistry, threadPoolName);

try {
final var barrier = new CyclicBarrier(2);
final ExponentialBucketHistogram expectedHistogram = new ExponentialBucketHistogram(
TaskExecutionTimeTrackingEsThreadPoolExecutor.QUEUE_LATENCY_HISTOGRAM_BUCKETS
);

/*
* The thread pool has a single thread, so we submit a task that will occupy that thread
* and cause subsequent tasks to be queued
*/
Future<?> runningTask = executor.submit(() -> {
safeAwait(barrier);
safeAwait(barrier);
});
safeAwait(barrier); // wait till the first task starts
expectedHistogram.addObservation(0L); // the first task should not be delayed

/*
* On each iteration we submit a task - which will be queued because of the
* currently running task, pause for some random interval, then unblock the
* new task by releasing the currently running task. This gives us a lower
* bound for the real delays (the real delays will be greater than or equal
* to the synthetic delays we add, i.e. each percentile should be >= our
* expected values)
*/
for (int i = 0; i < 10; i++) {
Future<?> waitingTask = executor.submit(() -> {
safeAwait(barrier);
safeAwait(barrier);
});
final long delayTimeMs = randomLongBetween(1, 50);
safeSleep(delayTimeMs);
safeAwait(barrier); // let the running task complete
safeAwait(barrier); // wait for the next task to start
safeGet(runningTask); // ensure previous task is complete
expectedHistogram.addObservation(delayTimeMs);
runningTask = waitingTask;
}
safeAwait(barrier); // let the last task finish
safeGet(runningTask);
meterRegistry.getRecorder().collect();

List<Measurement> measurements = meterRegistry.getRecorder()
.getMeasurements(
InstrumentType.LONG_GAUGE,
ThreadPool.THREAD_POOL_METRIC_PREFIX + threadPoolName + ThreadPool.THREAD_POOL_METRIC_NAME_QUEUE_TIME
);
assertThat(measurements, hasSize(3));
// we have to use greater than or equal to because the actual delay might be higher than what we imposed
assertThat(getPercentile(measurements, "99"), greaterThanOrEqualTo(expectedHistogram.getPercentile(0.99f)));
assertThat(getPercentile(measurements, "90"), greaterThanOrEqualTo(expectedHistogram.getPercentile(0.9f)));
assertThat(getPercentile(measurements, "50"), greaterThanOrEqualTo(expectedHistogram.getPercentile(0.5f)));
} finally {
ThreadPool.terminate(executor, 10, TimeUnit.SECONDS);
}
}

private long getPercentile(List<Measurement> measurements, String percentile) {
return measurements.stream().filter(m -> m.attributes().get("percentile").equals(percentile)).findFirst().orElseThrow().getLong();
}

/**
* The returned function outputs a WrappedRunnabled that simulates the case
* where {@link TimedRunnable#getTotalExecutionNanos()} always returns {@code timeTakenNanos}.
Expand Down
Loading