Skip to content

Merge disk space aware take 2 #127613

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 17 commits into
base: main
Choose a base branch
from

Conversation

albertzaharovits
Copy link
Contributor

No description provided.

TimeValue.MINUS_ONE,
Setting.Property.NodeScope
);
public static final Setting<RelativeByteSizeValue> INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING = new Setting<>(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I've opted for the same way to define the disk space threshold as the cluster.routing.allocation.disk.* ones, i.e. a watermark one, which can be a ratio/percentage + an optional max headroom. This makes the disk limits configuration coherent and allows for finer control, but maybe we don't need this much control and a simple disk space limit, e.g. 1 or 5 or 10 GB is Ok?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although I think a percentage default threshold is a better option.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also like that they are similar - in fact I want it to default to the flood stage, which will require it to be defined in the same way.

Comment on lines +372 to +375
FsInfo.Path fsInfo = getFSInfo(dataPath); // uncached
if (leastAvailablePath == null || leastAvailablePath.getAvailable().getBytes() > fsInfo.getAvailable().getBytes()) {
leastAvailablePath = fsInfo;
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think shards can still be distributed across multiple data paths...

Here, I've only considered the path with the least disk space available. If the smallest merge cannot run on that path, all merging will stall (even though there possibly are merges for shards from different data paths that have enough disk space to run).

The solution to properly account for multiple data paths is more difficult, similar to the "max merge threads per shard" complication. It needs to figure out the data path that the shard resides on (given the merge task), and then backlog that merge task if there's currently not enough disk space. There will then be a re-enqueue priority queue per data path, for when disk space becomes available.

If the least available data path I've proposed here is not satisfactory, another option would be to not support this new feature ("prevent merges from filling up disk") if there are multiple data paths defined, in the first release, and leave the MDP support as a homework.

WDYT @henningandersen ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am inclined to use the most available path instead. That is sort of similar to not supporting MDP, except it is still active and could by luck be doing good things sometimes.

The trouble with least available is that it can block merges that should be allowed and this can go on forever.

/** How frequently we check disk usage (default: 5 seconds). */
public static final Setting<TimeValue> INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING = Setting.timeSetting(
"indices.merge.disk.check_interval",
TimeValue.timeValueSeconds(5),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The 5 sec interval that we check the available disk space. This is fixed, even if there is 10% or 90% disk space available, and the disk space is checked even when there isn't any merging going on.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me. I wondered about simply checking this every time, but then again, we need some scheduling once we hit the roof, so this looks good.

Comparator.comparingLong(MergeTask::estimatedMergeSize)
private final PriorityBlockingQueueWithMaxLimit<MergeTask> queuedMergeTasks = new PriorityBlockingQueueWithMaxLimit<>(
MergeTask::estimatedMergeSize,
Long.MAX_VALUE
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we cannot read the available disk space, all merging will be allowed (and there is some logging complaining about it).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, merging starts without waiting to read the fs stats.

Generally, my thinking was to let the merging execute as before, and if the filesystem stats are available, and if the remaining disk space is low, only then stop new merges from starting.

Alternatively, we could allow merging to happen only after filesystem stats are available and the remaining disk space is sufficient.

WDYT?

}
if (leastAvailablePath == null) {
LOGGER.error("Cannot read filesystem info");
return;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we cannot read the available disk space (because of some errors), merging is allowed to go on, rather then be stopped.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would we not need to call updateMaxPriorityLimit(Long.MAX_VALUE) to get that effect - in case we could read it and then cannot? We can also assert that the limit is not set.

lock.lockInterruptibly();
E peek;
try {
while ((peek = priorityQueue.peek()) == null || priorityFunction.applyAsLong(peek) > maxPriorityLimit)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The take method is special here.
In addition to blocking if there's no element, it will also block if the smallest element in the heap is larger than a limit.

Copy link
Contributor

@henningandersen henningandersen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doing this at the queue seems like a good direction. Left a number of comments.

);
public static final Setting<RelativeByteSizeValue> INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING = new Setting<>(
"indices.merge.disk.watermark.high",
"96%",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this should default to the flood stage level rather than introduce yet another limit? I think having the setting is fine in order to explicitly override it, but it seems nice that it follows flood stage if configured rather than having to configure both.

Comment on lines +372 to +375
FsInfo.Path fsInfo = getFSInfo(dataPath); // uncached
if (leastAvailablePath == null || leastAvailablePath.getAvailable().getBytes() > fsInfo.getAvailable().getBytes()) {
leastAvailablePath = fsInfo;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am inclined to use the most available path instead. That is sort of similar to not supporting MDP, except it is still active and could by luck be doing good things sometimes.

The trouble with least available is that it can block merges that should be allowed and this can go on forever.

private long maxPriorityLimit;

PriorityBlockingQueueWithMaxLimit(ToLongFunction<? super E> priorityFunction, long maxPriorityLimit) {
this.priorityFunction = priorityFunction;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function and it's name seems slightly confusing. I think it would be simpler to have it be a predicate that says whether it can run or not? We'd still need the wakeup when disk usage monitoring returns but that could just be a wakeup function.

That way it can also handle all criteria, like heap, disk etc. that we come up with in the future.

In fact, it should maybe reserve the capacity too, such that we conservatively ensure we do not run out. We can add that later, no need for it initially (and maybe it is good enough without it).

}
}

static class PriorityBlockingQueueWithMaxLimit<E> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we can add a comment on why we have this special queue. I think the primary benefit is that we can still process incoming merges that can be executed with this here, whereas if we did the blocking in runMergeTask, we'd not be able to do so once all threads are occupied.

/** How frequently we check disk usage (default: 5 seconds). */
public static final Setting<TimeValue> INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING = Setting.timeSetting(
"indices.merge.disk.check_interval",
TimeValue.timeValueSeconds(5),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me. I wondered about simply checking this every time, but then again, we need some scheduling once we hit the roof, so this looks good.

TimeValue.MINUS_ONE,
Setting.Property.NodeScope
);
public static final Setting<RelativeByteSizeValue> INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING = new Setting<>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also like that they are similar - in fact I want it to default to the flood stage, which will require it to be defined in the same way.

if (INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING.exists(settings)) {
return "-1";
} else {
return "40GB";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we should return the flood stage value.

}
if (leastAvailablePath == null) {
LOGGER.error("Cannot read filesystem info");
return;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would we not need to call updateMaxPriorityLimit(Long.MAX_VALUE) to get that effect - in case we could read it and then cannot? We can also assert that the limit is not set.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants