From f688096656de425bc8b860929bc43f3b4e934b07 Mon Sep 17 00:00:00 2001 From: weizijun Date: Wed, 30 Apr 2025 17:30:39 +0800 Subject: [PATCH] reindex support build source from fields result --- .../ClientScrollableHitSourceTests.java | 128 +++++++++++++++ .../test/reindex/120_from_docvalue_fields.yml | 149 ++++++++++++++++++ .../elasticsearch/index/IndexFeatures.java | 5 +- .../reindex/ClientScrollableHitSource.java | 39 ++++- 4 files changed, 319 insertions(+), 2 deletions(-) create mode 100644 modules/reindex/src/yamlRestTest/resources/rest-api-spec/test/reindex/120_from_docvalue_fields.yml diff --git a/modules/reindex/src/test/java/org/elasticsearch/reindex/ClientScrollableHitSourceTests.java b/modules/reindex/src/test/java/org/elasticsearch/reindex/ClientScrollableHitSourceTests.java index 26922c62d3931..12aa6c57abc95 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/reindex/ClientScrollableHitSourceTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/reindex/ClientScrollableHitSourceTests.java @@ -23,8 +23,11 @@ import org.elasticsearch.client.internal.support.AbstractClient; import org.elasticsearch.common.BackoffPolicy; import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.document.DocumentField; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.reindex.ClientScrollableHitSource; import org.elasticsearch.index.reindex.ScrollableHitSource; @@ -38,7 +41,9 @@ import org.junit.After; import org.junit.Before; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; @@ -159,6 +164,129 @@ public void testScrollKeepAlive() { client.validateRequest(TransportSearchScrollAction.TYPE, (SearchScrollRequest r) -> assertEquals(r.scroll().seconds(), 110)); } + public void testGenerateSourceWithFields() { + // Test case: source exists and fields should be merged + SearchHit hit = SearchHit.unpooled(0, "id"); + hit.sourceRef(new BytesArray("{\"existing_field\":\"existing_value\"}")); + + Map fields = new HashMap<>(); + fields.put("single_value_field", new DocumentField("single_value_field", List.of("single_value"))); + fields.put("multi_value_field", new DocumentField("multi_value_field", List.of("value1", "value2"))); + fields.put("existing_field", new DocumentField("existing_field", List.of("new_value"))); + hit.addDocumentFields(fields, new HashMap<>()); + + BytesReference result = ClientScrollableHitSource.generateSource(hit); + Map resultSource = XContentHelper.convertToMap(result, false, XContentHelper.xContentType(result)).v2(); + + // Verify source contains all expected fields + assertEquals("existing_value", resultSource.get("existing_field")); // existing field should not be overwritten + assertEquals("single_value", resultSource.get("single_value_field")); + assertEquals(List.of("value1", "value2"), resultSource.get("multi_value_field")); + } + + public void testGenerateSourceWithEmptyFields() { + // Test case: fields is empty + SearchHit hit = SearchHit.unpooled(0, "id"); + hit.sourceRef(new BytesArray("{\"field1\":\"value1\",\"field2\":\"value2\"}")); + hit.addDocumentFields(new HashMap<>(), new HashMap<>()); + + BytesReference result = ClientScrollableHitSource.generateSource(hit); + Map resultSource = XContentHelper.convertToMap(result, false, XContentHelper.xContentType(result)).v2(); + + // Source should remain unchanged + assertEquals("value1", resultSource.get("field1")); + assertEquals("value2", resultSource.get("field2")); + } + + public void testGenerateSourceWithEmptySource() { + // Test case: source is empty, fields exist + SearchHit hit = SearchHit.unpooled(0, "id"); + hit.sourceRef(new BytesArray("{}")); + + Map fields = new HashMap<>(); + fields.put("field1", new DocumentField("field1", List.of("value1"))); + fields.put("field2", new DocumentField("field2", List.of("value2", "value3"))); + hit.addDocumentFields(fields, new HashMap<>()); + + BytesReference result = ClientScrollableHitSource.generateSource(hit); + Map resultSource = XContentHelper.convertToMap(result, false, XContentHelper.xContentType(result)).v2(); + + assertEquals("value1", resultSource.get("field1")); + assertEquals(List.of("value2", "value3"), resultSource.get("field2")); + } + + public void testGenerateSourceWithBothEmpty() { + // Test case: both source and fields are empty + SearchHit hit = SearchHit.unpooled(0, "id"); + hit.sourceRef(new BytesArray("{}")); + hit.addDocumentFields(new HashMap<>(), new HashMap<>()); + + BytesReference result = ClientScrollableHitSource.generateSource(hit); + Map resultSource = XContentHelper.convertToMap(result, false, XContentHelper.xContentType(result)).v2(); + + assertTrue(resultSource.isEmpty()); + } + + public void testGenerateSourceWithComplexSource() { + // Test case: complex source with nested objects + SearchHit hit = SearchHit.unpooled(0, "id"); + hit.sourceRef(new BytesArray("{\"nested\":{\"field1\":\"value1\"}}")); + + Map fields = new HashMap<>(); + fields.put("new_field", new DocumentField("new_field", List.of("value2"))); + hit.addDocumentFields(fields, new HashMap<>()); + + BytesReference result = ClientScrollableHitSource.generateSource(hit); + Map resultSource = XContentHelper.convertToMap(result, false, XContentHelper.xContentType(result)).v2(); + + @SuppressWarnings("unchecked") + Map nested = (Map) resultSource.get("nested"); + assertEquals("value1", nested.get("field1")); + assertEquals("value2", resultSource.get("new_field")); + } + + public void testGenerateSourceWithNoSource() { + // Test case: no source, only fields + SearchHit hit = SearchHit.unpooled(0, "id"); + hit.sourceRef(null); + + Map fields = new HashMap<>(); + fields.put("field1", new DocumentField("field1", List.of("value1"))); + hit.addDocumentFields(fields, new HashMap<>()); + + BytesReference result = ClientScrollableHitSource.generateSource(hit); + Map resultSource = XContentHelper.convertToMap(result, false, XContentHelper.xContentType(result)).v2(); + + // Source value should be preserved + assertEquals("value1", resultSource.get("field1")); + } + + public void testGenerateSourceWithNoSourceAndNoFields() { + // Test case: no source and no fields + SearchHit hit = SearchHit.unpooled(0, "id"); + hit.sourceRef(null); + hit.addDocumentFields(new HashMap<>(), new HashMap<>()); + + BytesReference result = ClientScrollableHitSource.generateSource(hit); + assertNull(result); + } + + public void testGenerateSourceWithExistingFieldInSource() { + // Test case: field already exists in source + SearchHit hit = SearchHit.unpooled(0, "id"); + hit.sourceRef(new BytesArray("{\"field1\":\"source_value\"}")); + + Map fields = new HashMap<>(); + fields.put("field1", new DocumentField("field1", List.of("field_value"))); + hit.addDocumentFields(fields, new HashMap<>()); + + BytesReference result = ClientScrollableHitSource.generateSource(hit); + Map resultSource = XContentHelper.convertToMap(result, false, XContentHelper.xContentType(result)).v2(); + + // Source value should be preserved + assertEquals("source_value", resultSource.get("field1")); + } + private SearchResponse createSearchResponse() { // create a simulated response. SearchHit hit = SearchHit.unpooled(0, "id").sourceRef(new BytesArray("{}")); diff --git a/modules/reindex/src/yamlRestTest/resources/rest-api-spec/test/reindex/120_from_docvalue_fields.yml b/modules/reindex/src/yamlRestTest/resources/rest-api-spec/test/reindex/120_from_docvalue_fields.yml new file mode 100644 index 0000000000000..cdc456d9ce4ee --- /dev/null +++ b/modules/reindex/src/yamlRestTest/resources/rest-api-spec/test/reindex/120_from_docvalue_fields.yml @@ -0,0 +1,149 @@ +setup: + - requires: + cluster_features: [ "reindex.support_from_fields" ] + reason: "reindexing support fields introduced" + + - do: + indices.create: + index: source_docvalue + body: + mappings: + _source: + excludes: ["excluded_field"] + properties: + included_field: + type: keyword + excluded_field: + type: keyword + date_field: + type: date + long_field: + type: long + + - do: + bulk: + refresh: true + index: source_docvalue + body: + - '{"index": {}}' + - '{"included_field": "value1", "excluded_field": "excluded1", "date_field": "2024-01-01", "long_field": 123}' + - '{"index": {}}' + - '{"included_field": "value2", "excluded_field": "excluded2", "date_field": "2024-01-02", "long_field": 456}' + + - do: + indices.create: + index: target_docvalue + body: + mappings: + properties: + included_field: + type: keyword + excluded_field: + type: keyword + date_field: + type: date + long_field: + type: long + + - do: + indices.create: + index: target_docvalue2 + body: + mappings: + properties: + included_field: + type: keyword + excluded_field: + type: keyword + date_field: + type: date + long_field: + type: long + +--- +from docvalue fields: + - do: + reindex: + refresh: true + body: + source: + index: source_docvalue + docvalue_fields: ["excluded_field"] + dest: + index: target_docvalue + + - match: {created: 2} + - match: {updated: 0} + - match: {version_conflicts: 0} + - match: {batches: 1} + - match: {failures: []} + - match: {throttled_millis: 0} + - gte: { took: 0 } + - is_false: task + - is_false: deleted + + - do: + search: + index: target_docvalue + body: + sort: + - included_field: asc + - match: { hits.total.value: 2 } + - match: + hits.hits.0._source: + included_field: value1 + excluded_field: excluded1 + date_field: "2024-01-01" + long_field: 123 + - match: + hits.hits.1._source: + included_field: value2 + excluded_field: excluded2 + date_field: "2024-01-02" + long_field: 456 + +--- +from docvalue fields with partial source: + - do: + reindex: + refresh: true + body: + source: + index: source_docvalue + _source: + includes: ["date_field"] + docvalue_fields: ["excluded_field", "long_field"] + dest: + index: target_docvalue2 + + - match: {created: 2} + - match: {updated: 0} + - match: {version_conflicts: 0} + - match: {batches: 1} + - match: {failures: []} + - match: {throttled_millis: 0} + - gte: { took: 0 } + - is_false: task + - is_false: deleted + + - do: + search: + index: target_docvalue2 + body: + sort: + - long_field: asc + - match: { hits.total.value: 2 } + - match: + hits.hits.0._source: + excluded_field: excluded1 + date_field: "2024-01-01" + long_field: 123 + - match: + hits.hits.1._source: + excluded_field: excluded2 + date_field: "2024-01-02" + long_field: 456 + + - do: + indices.delete: + index: [source_docvalue, target_docvalue, target_docvalue2] diff --git a/server/src/main/java/org/elasticsearch/index/IndexFeatures.java b/server/src/main/java/org/elasticsearch/index/IndexFeatures.java index 051e746af00ee..bbe2b79bd9527 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexFeatures.java +++ b/server/src/main/java/org/elasticsearch/index/IndexFeatures.java @@ -14,6 +14,8 @@ import java.util.Set; +import static org.elasticsearch.index.reindex.ClientScrollableHitSource.REINDEX_SUPPORT_FROM_FIELDS; + public class IndexFeatures implements FeatureSpecification { @Override @@ -39,7 +41,8 @@ public Set getTestFeatures() { LOGSDB_NO_HOST_NAME_FIELD, SYNONYMS_SET_LENIENT_ON_NON_EXISTING, THROW_EXCEPTION_FOR_UNKNOWN_TOKEN_IN_REST_INDEX_PUT_ALIAS_ACTION, - THROW_EXCEPTION_ON_INDEX_CREATION_IF_UNSUPPORTED_VALUE_TYPE_IN_ALIAS + THROW_EXCEPTION_ON_INDEX_CREATION_IF_UNSUPPORTED_VALUE_TYPE_IN_ALIAS, + REINDEX_SUPPORT_FROM_FIELDS ); } } diff --git a/server/src/main/java/org/elasticsearch/index/reindex/ClientScrollableHitSource.java b/server/src/main/java/org/elasticsearch/index/reindex/ClientScrollableHitSource.java index 68582138d3f61..bc24ba9b5af19 100644 --- a/server/src/main/java/org/elasticsearch/index/reindex/ClientScrollableHitSource.java +++ b/server/src/main/java/org/elasticsearch/index/reindex/ClientScrollableHitSource.java @@ -26,13 +26,17 @@ import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.features.NodeFeature; import org.elasticsearch.index.mapper.RoutingFieldMapper; import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.lookup.Source; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xcontent.XContentType; import java.util.ArrayList; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import java.util.function.Consumer; import static java.util.Collections.emptyList; @@ -47,6 +51,8 @@ public class ClientScrollableHitSource extends ScrollableHitSource { private final ParentTaskAssigningClient client; private final SearchRequest firstSearchRequest; + public static final NodeFeature REINDEX_SUPPORT_FROM_FIELDS = new NodeFeature("reindex.support_from_fields"); + public ClientScrollableHitSource( Logger logger, BackoffPolicy backoffPolicy, @@ -153,13 +159,44 @@ private static Response wrapSearchResponse(SearchResponse response) { return new Response(response.isTimedOut(), failures, total, hits, response.getScrollId()); } + public static BytesReference generateSource(SearchHit hit) { + Map fields = hit.getDocumentFields(); + if (fields.isEmpty()) { + return hit.hasSource() ? hit.getSourceRef() : null; + } + + Source sourceObj = Source.fromBytes(hit.getSourceRef()); + Map sourceAsMap = new LinkedHashMap<>(sourceObj.source()); + boolean changeSource = false; + for (DocumentField field : fields.values()) { + if (false == sourceAsMap.containsKey(field.getName())) { + if (field.getValues() == null || field.getValues().isEmpty()) { + continue; + } + + if (field.getValues().size() == 1) { + sourceAsMap.put(field.getName(), field.getValue()); + changeSource = true; + } else { + sourceAsMap.put(field.getName(), field.getValues()); + changeSource = true; + } + } + } + if (changeSource) { + return Source.fromMap(sourceAsMap, sourceObj.sourceContentType()).internalSourceRef(); + } else { + return hit.hasSource() ? hit.getSourceRef() : null; + } + } + private static class ClientHit implements Hit { private final SearchHit delegate; private final BytesReference source; ClientHit(SearchHit delegate) { this.delegate = delegate.asUnpooled(); // TODO: use pooled version here - source = this.delegate.hasSource() ? this.delegate.getSourceRef() : null; + this.source = generateSource(delegate); } @Override