Skip to content

ESQL: Add optimization to purge join on null merge key #127583

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/127583.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 127583
summary: Add optimization to purge join on null merge key
area: ES|QL
type: enhancement
issues:
- 125577
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public String toString() {

@Override
public String nodeString() {
return child.nodeString() + " AS " + name();
return child.nodeString() + " AS " + name() + "#" + id();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not strictly related, but not sure why we wouldn't include the id, it's easier to track which exactly reference points to an alias.

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1650,3 +1650,22 @@ event_duration:long
2764889
3450233
;

nullifiedJoinKeyToPurgeTheJoin
required_capability: join_lookup_v12

FROM employees
| RENAME languages AS language_code
| SORT emp_no, language_code
| LIMIT 4
| EVAL language_code = TO_INTEGER(NULL)
| LOOKUP JOIN languages_lookup ON language_code
| KEEP emp_no, language_code, language_name
;

emp_no:integer | language_code:integer | language_name:keyword
10001 |null |null
10002 |null |null
10003 |null |null
10004 |null |null
;
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.elasticsearch.xpack.esql.optimizer.rules.logical.SubstituteSurrogateExpressions;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.SubstituteSurrogatePlans;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.TranslateTimeSeriesAggregate;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.local.PruneJoinOnNullMatchingField;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.rule.ParameterizedRuleExecutor;
import org.elasticsearch.xpack.esql.rule.RuleExecutor;
Expand Down Expand Up @@ -201,7 +202,8 @@ protected static Batch<LogicalPlan> operators() {
new PushDownEnrich(),
new PushDownAndCombineOrderBy(),
new PruneRedundantOrderBy(),
new PruneRedundantSortClauses()
new PruneRedundantSortClauses(),
new PruneJoinOnNullMatchingField()
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.optimizer.rules.logical.local;

import org.elasticsearch.xpack.esql.core.expression.Alias;
import org.elasticsearch.xpack.esql.core.expression.AttributeMap;
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.Expressions;
import org.elasticsearch.xpack.esql.core.expression.Literal;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.OptimizerRules;
import org.elasticsearch.xpack.esql.plan.logical.Eval;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.Project;
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
import org.elasticsearch.xpack.esql.plan.logical.join.Join;

import java.util.ArrayList;
import java.util.List;

import static org.elasticsearch.xpack.esql.core.expression.Expressions.isGuaranteedNull;
import static org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes.INNER;
import static org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes.LEFT;

/**
* The rule matches a plan pattern having a Join on top of a Project and/or Eval. It then checks if the join's performed on a field which
* is aliased to null (in type or value); if that's the case, it prunes the join, replacing it with an Eval - returning aliases to null
* for all the fields added in by the right side of the Join - plus a Project on top of it.
* The rule can apply on the coordinator already, but it's more likely to be effective on the data nodes, where null aliasing is inserted
* due to locally missing fields. This rule relies on that behavior -- see {@link ReplaceMissingFieldWithNull}.
*/
public class PruneJoinOnNullMatchingField extends OptimizerRules.OptimizerRule<Join> {

@Override
protected LogicalPlan rule(Join join) {
LogicalPlan plan = join;
var joinType = join.config().type();
if (joinType == INNER || joinType == LEFT) { // other types will have different replacement logic
AttributeMap.Builder<Expression> attributeMapBuilder = AttributeMap.builder();
loop: for (var child = join.left();; child = ((UnaryPlan) child).child()) { // cast is safe as both plans are UnaryPlans
switch (child) {
case Project project -> project.projections().forEach(projection -> {
if (projection instanceof Alias alias) {
attributeMapBuilder.put(alias.toAttribute(), alias.child());
}
});
case Eval eval -> eval.fields().forEach(alias -> attributeMapBuilder.put(alias.toAttribute(), alias.child()));
default -> {
break loop;
}
}
}
for (var attr : AttributeSet.of(join.config().matchFields())) {
var resolved = attributeMapBuilder.build().resolve(attr);
if (resolved != null && isGuaranteedNull(resolved)) {
plan = replaceJoin(join);
break;
}
}
}
return plan;
}

private static LogicalPlan replaceJoin(Join join) {
var joinRightOutput = join.rightOutputFields();
if (joinRightOutput.isEmpty()) { // can be empty when the join key is null and the other right side entries pruned (by an agg)
return join.left();
}
List<Alias> aliases = new ArrayList<>(joinRightOutput.size());
// TODO: cache aliases by type, à la ReplaceMissingFieldWithNull#missingToNull (tho lookup indices won't have Ks of fields)
joinRightOutput.forEach(a -> aliases.add(new Alias(a.source(), a.name(), Literal.of(a, null), a.id())));
var eval = new Eval(join.source(), join.left(), aliases);
return new Project(join.source(), eval, join.computeOutput(join.left().output(), Expressions.asAttributes(aliases)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.elasticsearch.xpack.esql.plan.physical.FilterExec;
import org.elasticsearch.xpack.esql.plan.physical.LimitExec;
import org.elasticsearch.xpack.esql.plan.physical.LocalSourceExec;
import org.elasticsearch.xpack.esql.plan.physical.LookupJoinExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.plan.physical.ProjectExec;
import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesAggregateExec;
Expand Down Expand Up @@ -111,6 +112,7 @@
import static org.elasticsearch.xpack.esql.EsqlTestUtils.loadMapping;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.unboundLogicalOptimizerContext;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.withDefaultLimitWarning;
import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.defaultLookupResolution;
import static org.elasticsearch.xpack.esql.core.querydsl.query.Query.unscore;
import static org.elasticsearch.xpack.esql.plan.physical.EsStatsQueryExec.StatsType;
import static org.hamcrest.Matchers.contains;
Expand Down Expand Up @@ -204,7 +206,14 @@ private Analyzer makeAnalyzer(String mappingFileName, EnrichResolution enrichRes
IndexResolution getIndexResult = IndexResolution.valid(test);

return new Analyzer(
new AnalyzerContext(config, new EsqlFunctionRegistry(), getIndexResult, enrichResolution, emptyInferenceResolution()),
new AnalyzerContext(
config,
new EsqlFunctionRegistry(),
getIndexResult,
defaultLookupResolution(),
enrichResolution,
emptyInferenceResolution()
),
new Verifier(new Metrics(new EsqlFunctionRegistry()), new XPackLicenseState(() -> 0L))
);
}
Expand Down Expand Up @@ -1377,6 +1386,82 @@ public void testMissingFieldsDoNotGetExtracted() {
assertThat(Expressions.names(fields), contains("_meta_field", "gender", "hire_date", "job", "job.raw", "languages", "long_noidx"));
}

/*
* LimitExec[1000[INTEGER]]
* \_AggregateExec[[language_code{r}#6],[COUNT(emp_no{f}#12,true[BOOLEAN]) AS c#11, language_code{r}#6],FINAL,[language_code{r}#6, $
* $c$count{r}#25, $$c$seen{r}#26],12]
* \_ExchangeExec[[language_code{r}#6, $$c$count{r}#25, $$c$seen{r}#26],true]
* \_AggregateExec[[languages{r}#15],[COUNT(emp_no{f}#12,true[BOOLEAN]) AS c#11, languages{r}#15 AS language_code#6],INITIAL,[langua
* ges{r}#15, $$c$count{r}#27, $$c$seen{r}#28],12]
* \_FieldExtractExec[emp_no{f}#12]<[],[]>
* \_EvalExec[[null[INTEGER] AS languages#15]]
* \_EsQueryExec[test], indexMode[standard], query[][_doc{f}#29], limit[], sort[] estimatedRowSize[12]
*/
public void testMissingFieldsPurgesTheJoinLocally() {
var stats = EsqlTestUtils.statsForMissingField("languages");

var plan = plannerOptimizer.plan("""
from test
| keep emp_no, languages
| rename languages AS language_code
| lookup join languages_lookup ON language_code
| stats c = count(emp_no) by language_code
""", stats);

var limit = as(plan, LimitExec.class);
var agg = as(limit.child(), AggregateExec.class);
assertThat(Expressions.names(agg.output()), contains("c", "language_code"));

var exchange = as(agg.child(), ExchangeExec.class);
agg = as(exchange.child(), AggregateExec.class);
var extract = as(agg.child(), FieldExtractExec.class);
var eval = as(extract.child(), EvalExec.class);
var source = as(eval.child(), EsQueryExec.class);
}

/*
* LimitExec[1000[INTEGER]]
* \_LookupJoinExec[[language_code{r}#6],[language_code{f}#23],[language_name{f}#24]]
* |_LimitExec[1000[INTEGER]]
* | \_AggregateExec[[languages{f}#15],[COUNT(emp_no{f}#12,true[BOOLEAN]) AS c#10, languages{f}#15 AS language_code#6],FINAL,[language
* s{f}#15, $$c$count{r}#25, $$c$seen{r}#26],62]
* | \_ExchangeExec[[languages{f}#15, $$c$count{r}#25, $$c$seen{r}#26],true]
* | \_AggregateExec[[languages{r}#15],[COUNT(emp_no{f}#12,true[BOOLEAN]) AS c#10, languages{r}#15 AS language_code#6],INITIAL,
* [languages{r}#15, $$c$count{r}#27, $$c$seen{r}#28],12]
* | \_FieldExtractExec[emp_no{f}#12]<[],[]>
* | \_EvalExec[[null[INTEGER] AS languages#15]]
* | \_EsQueryExec[test], indexMode[standard], query[][_doc{f}#29], limit[], sort[] estimatedRowSize[12]
* \_EsQueryExec[languages_lookup], indexMode[lookup], query[][_doc{f}#30], limit[], sort[] estimatedRowSize[4]
*/
public void testMissingFieldsDoesNotPurgeTheJoinOnCoordinator() {
var stats = EsqlTestUtils.statsForMissingField("languages");

// same as the query above, but with the last two lines swapped, so that the join is no longer pushed to the data nodes
var plan = plannerOptimizer.plan("""
from test
| keep emp_no, languages
| rename languages AS language_code
| stats c = count(emp_no) by language_code
| lookup join languages_lookup ON language_code
""", stats);

var limit = as(plan, LimitExec.class);
var join = as(limit.child(), LookupJoinExec.class);
limit = as(join.left(), LimitExec.class);
var agg = as(limit.child(), AggregateExec.class);
var exchange = as(agg.child(), ExchangeExec.class);
agg = as(exchange.child(), AggregateExec.class);
var extract = as(agg.child(), FieldExtractExec.class);
var eval = as(extract.child(), EvalExec.class);
var source = as(eval.child(), EsQueryExec.class);
assertThat(source.indexPattern(), is("test"));
assertThat(source.indexMode(), is(IndexMode.STANDARD));

source = as(join.right(), EsQueryExec.class);
assertThat(source.indexPattern(), is("languages_lookup"));
assertThat(source.indexMode(), is(IndexMode.LOOKUP));
}

/*
Checks that match filters are pushed down to Lucene when using no casting, for example:
WHERE first_name:"Anna")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2782,6 +2782,50 @@ public void testDescendantLimitLookupJoin() {
var localRelation = as(limitBefore.child(), LocalRelation.class);
}

/*
* EsqlProject[[emp_no{f}#9, first_name{f}#10, languages{f}#12, language_code{r}#3, language_name{r}#22]]
* \_Eval[[null[INTEGER] AS language_code#3, null[KEYWORD] AS language_name#22]]
* \_Limit[1000[INTEGER],false]
* \_EsRelation[test][_meta_field{f}#15, emp_no{f}#9, first_name{f}#10, g..]
*/
public void testPruneJoinOnNullMatchingField() {
var plan = optimizedPlan("""
from test
| eval language_code = null::integer
| keep emp_no, first_name, languages, language_code
| lookup join languages_lookup on language_code
""");

var project = as(plan, Project.class);
assertThat(Expressions.names(project.output()), contains("emp_no", "first_name", "languages", "language_code", "language_name"));
var eval = as(project.child(), Eval.class);
var limit = asLimit(eval.child(), 1000, false);
var source = as(limit.child(), EsRelation.class);
}

/*
* EsqlProject[[emp_no{f}#15, first_name{f}#16, my_null{r}#3 AS language_code#9, language_name{r}#27]]
* \_Eval[[null[INTEGER] AS my_null#3, null[KEYWORD] AS language_name#27]]
* \_Limit[1000[INTEGER],false]
* \_EsRelation[test][_meta_field{f}#21, emp_no{f}#15, first_name{f}#16, ..]
*/
public void testPruneJoinOnNullAssignedMatchingField() {
var plan = optimizedPlan("""
from test
| eval my_null = null::integer
| rename languages as language_code
| eval language_code = my_null
| lookup join languages_lookup on language_code
| keep emp_no, first_name, language_code, language_name
""");

var project = as(plan, EsqlProject.class);
assertThat(Expressions.names(project.output()), contains("emp_no", "first_name", "language_code", "language_name"));
var eval = as(project.child(), Eval.class);
var limit = asLimit(eval.child(), 1000, false);
var source = as(limit.child(), EsRelation.class);
}

private static List<String> orderNames(TopN topN) {
return topN.order().stream().map(o -> as(o.child(), NamedExpression.class).name()).toList();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7666,7 +7666,7 @@ private LocalExecutionPlanner.LocalExecutionPlan physicalOperationsFromPhysicalP
// The TopN needs an estimated row size for the planner to work
var plans = PlannerUtils.breakPlanBetweenCoordinatorAndDataNode(EstimatesRowSize.estimateRowSize(0, plan), config);
plan = useDataNodePlan ? plans.v2() : plans.v1();
plan = PlannerUtils.localPlan(List.of(), config, FoldContext.small(), plan);
plan = PlannerUtils.localPlan(config, FoldContext.small(), plan, TEST_SEARCH_STATS);
ExchangeSinkHandler exchangeSinkHandler = new ExchangeSinkHandler(null, 10, () -> 10);
LocalExecutionPlanner planner = new LocalExecutionPlanner(
"test",
Expand Down
Loading