-
Notifications
You must be signed in to change notification settings - Fork 25.2k
Merge disk space aware take 2 #127613
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Merge disk space aware take 2 #127613
Conversation
TimeValue.MINUS_ONE, | ||
Setting.Property.NodeScope | ||
); | ||
public static final Setting<RelativeByteSizeValue> INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING = new Setting<>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here I've opted for the same way to define the disk space threshold as the cluster.routing.allocation.disk.*
ones, i.e. a watermark one, which can be a ratio/percentage + an optional max headroom. This makes the disk limits configuration coherent and allows for finer control, but maybe we don't need this much control and a simple disk space limit, e.g. 1 or 5 or 10 GB is Ok?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although I think a percentage default threshold is a better option.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also like that they are similar - in fact I want it to default to the flood stage, which will require it to be defined in the same way.
FsInfo.Path fsInfo = getFSInfo(dataPath); // uncached | ||
if (leastAvailablePath == null || leastAvailablePath.getAvailable().getBytes() > fsInfo.getAvailable().getBytes()) { | ||
leastAvailablePath = fsInfo; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think shards can still be distributed across multiple data paths...
Here, I've only considered the path with the least disk space available. If the smallest merge cannot run on that path, all merging will stall (even though there possibly are merges for shards from different data paths that have enough disk space to run).
The solution to properly account for multiple data paths is more difficult, similar to the "max merge threads per shard" complication. It needs to figure out the data path that the shard resides on (given the merge task), and then backlog that merge task if there's currently not enough disk space. There will then be a re-enqueue priority queue per data path, for when disk space becomes available.
If the least available data path I've proposed here is not satisfactory, another option would be to not support this new feature ("prevent merges from filling up disk") if there are multiple data paths defined, in the first release, and leave the MDP support as a homework.
WDYT @henningandersen ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am inclined to use the most available path instead. That is sort of similar to not supporting MDP, except it is still active and could by luck be doing good things sometimes.
The trouble with least available is that it can block merges that should be allowed and this can go on forever.
/** How frequently we check disk usage (default: 5 seconds). */ | ||
public static final Setting<TimeValue> INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING = Setting.timeSetting( | ||
"indices.merge.disk.check_interval", | ||
TimeValue.timeValueSeconds(5), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The 5 sec interval that we check the available disk space. This is fixed, even if there is 10% or 90% disk space available, and the disk space is checked even when there isn't any merging going on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good to me. I wondered about simply checking this every time, but then again, we need some scheduling once we hit the roof, so this looks good.
Comparator.comparingLong(MergeTask::estimatedMergeSize) | ||
private final PriorityBlockingQueueWithMaxLimit<MergeTask> queuedMergeTasks = new PriorityBlockingQueueWithMaxLimit<>( | ||
MergeTask::estimatedMergeSize, | ||
Long.MAX_VALUE |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we cannot read the available disk space, all merging will be allowed (and there is some logging complaining about it).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly, merging starts without waiting to read the fs stats.
Generally, my thinking was to let the merging execute as before, and if the filesystem stats are available, and if the remaining disk space is low, only then stop new merges from starting.
Alternatively, we could allow merging to happen only after filesystem stats are available and the remaining disk space is sufficient.
WDYT?
} | ||
if (leastAvailablePath == null) { | ||
LOGGER.error("Cannot read filesystem info"); | ||
return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we cannot read the available disk space (because of some errors), merging is allowed to go on, rather then be stopped.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would we not need to call updateMaxPriorityLimit(Long.MAX_VALUE)
to get that effect - in case we could read it and then cannot? We can also assert that the limit is not set.
lock.lockInterruptibly(); | ||
E peek; | ||
try { | ||
while ((peek = priorityQueue.peek()) == null || priorityFunction.applyAsLong(peek) > maxPriorityLimit) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The take
method is special here.
In addition to blocking if there's no element, it will also block if the smallest element in the heap is larger than a limit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doing this at the queue seems like a good direction. Left a number of comments.
); | ||
public static final Setting<RelativeByteSizeValue> INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING = new Setting<>( | ||
"indices.merge.disk.watermark.high", | ||
"96%", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this should default to the flood stage level rather than introduce yet another limit? I think having the setting is fine in order to explicitly override it, but it seems nice that it follows flood stage if configured rather than having to configure both.
FsInfo.Path fsInfo = getFSInfo(dataPath); // uncached | ||
if (leastAvailablePath == null || leastAvailablePath.getAvailable().getBytes() > fsInfo.getAvailable().getBytes()) { | ||
leastAvailablePath = fsInfo; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am inclined to use the most available path instead. That is sort of similar to not supporting MDP, except it is still active and could by luck be doing good things sometimes.
The trouble with least available is that it can block merges that should be allowed and this can go on forever.
private long maxPriorityLimit; | ||
|
||
PriorityBlockingQueueWithMaxLimit(ToLongFunction<? super E> priorityFunction, long maxPriorityLimit) { | ||
this.priorityFunction = priorityFunction; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function and it's name seems slightly confusing. I think it would be simpler to have it be a predicate that says whether it can run or not? We'd still need the wakeup when disk usage monitoring returns but that could just be a wakeup function.
That way it can also handle all criteria, like heap, disk etc. that we come up with in the future.
In fact, it should maybe reserve the capacity too, such that we conservatively ensure we do not run out. We can add that later, no need for it initially (and maybe it is good enough without it).
} | ||
} | ||
|
||
static class PriorityBlockingQueueWithMaxLimit<E> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we can add a comment on why we have this special queue. I think the primary benefit is that we can still process incoming merges that can be executed with this here, whereas if we did the blocking in runMergeTask
, we'd not be able to do so once all threads are occupied.
/** How frequently we check disk usage (default: 5 seconds). */ | ||
public static final Setting<TimeValue> INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING = Setting.timeSetting( | ||
"indices.merge.disk.check_interval", | ||
TimeValue.timeValueSeconds(5), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good to me. I wondered about simply checking this every time, but then again, we need some scheduling once we hit the roof, so this looks good.
TimeValue.MINUS_ONE, | ||
Setting.Property.NodeScope | ||
); | ||
public static final Setting<RelativeByteSizeValue> INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING = new Setting<>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also like that they are similar - in fact I want it to default to the flood stage, which will require it to be defined in the same way.
if (INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING.exists(settings)) { | ||
return "-1"; | ||
} else { | ||
return "40GB"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we should return the flood stage value.
} | ||
if (leastAvailablePath == null) { | ||
LOGGER.error("Cannot read filesystem info"); | ||
return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would we not need to call updateMaxPriorityLimit(Long.MAX_VALUE)
to get that effect - in case we could read it and then cannot? We can also assert that the limit is not set.
No description provided.