Skip to content

Reindex support build source from fields result #127553

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@
import org.elasticsearch.client.internal.support.AbstractClient;
import org.elasticsearch.common.BackoffPolicy;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.document.DocumentField;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.reindex.ClientScrollableHitSource;
import org.elasticsearch.index.reindex.ScrollableHitSource;
Expand All @@ -38,7 +41,9 @@
import org.junit.After;
import org.junit.Before;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -159,6 +164,129 @@ public void testScrollKeepAlive() {
client.validateRequest(TransportSearchScrollAction.TYPE, (SearchScrollRequest r) -> assertEquals(r.scroll().seconds(), 110));
}

public void testGenerateSourceWithFields() {
// Test case: source exists and fields should be merged
SearchHit hit = SearchHit.unpooled(0, "id");
hit.sourceRef(new BytesArray("{\"existing_field\":\"existing_value\"}"));

Map<String, DocumentField> fields = new HashMap<>();
fields.put("single_value_field", new DocumentField("single_value_field", List.of("single_value")));
fields.put("multi_value_field", new DocumentField("multi_value_field", List.of("value1", "value2")));
fields.put("existing_field", new DocumentField("existing_field", List.of("new_value")));
hit.addDocumentFields(fields, new HashMap<>());

BytesReference result = ClientScrollableHitSource.generateSource(hit);
Map<String, Object> resultSource = XContentHelper.convertToMap(result, false, XContentHelper.xContentType(result)).v2();

// Verify source contains all expected fields
assertEquals("existing_value", resultSource.get("existing_field")); // existing field should not be overwritten
assertEquals("single_value", resultSource.get("single_value_field"));
assertEquals(List.of("value1", "value2"), resultSource.get("multi_value_field"));
}

public void testGenerateSourceWithEmptyFields() {
// Test case: fields is empty
SearchHit hit = SearchHit.unpooled(0, "id");
hit.sourceRef(new BytesArray("{\"field1\":\"value1\",\"field2\":\"value2\"}"));
hit.addDocumentFields(new HashMap<>(), new HashMap<>());

BytesReference result = ClientScrollableHitSource.generateSource(hit);
Map<String, Object> resultSource = XContentHelper.convertToMap(result, false, XContentHelper.xContentType(result)).v2();

// Source should remain unchanged
assertEquals("value1", resultSource.get("field1"));
assertEquals("value2", resultSource.get("field2"));
}

public void testGenerateSourceWithEmptySource() {
// Test case: source is empty, fields exist
SearchHit hit = SearchHit.unpooled(0, "id");
hit.sourceRef(new BytesArray("{}"));

Map<String, DocumentField> fields = new HashMap<>();
fields.put("field1", new DocumentField("field1", List.of("value1")));
fields.put("field2", new DocumentField("field2", List.of("value2", "value3")));
hit.addDocumentFields(fields, new HashMap<>());

BytesReference result = ClientScrollableHitSource.generateSource(hit);
Map<String, Object> resultSource = XContentHelper.convertToMap(result, false, XContentHelper.xContentType(result)).v2();

assertEquals("value1", resultSource.get("field1"));
assertEquals(List.of("value2", "value3"), resultSource.get("field2"));
}

public void testGenerateSourceWithBothEmpty() {
// Test case: both source and fields are empty
SearchHit hit = SearchHit.unpooled(0, "id");
hit.sourceRef(new BytesArray("{}"));
hit.addDocumentFields(new HashMap<>(), new HashMap<>());

BytesReference result = ClientScrollableHitSource.generateSource(hit);
Map<String, Object> resultSource = XContentHelper.convertToMap(result, false, XContentHelper.xContentType(result)).v2();

assertTrue(resultSource.isEmpty());
}

public void testGenerateSourceWithComplexSource() {
// Test case: complex source with nested objects
SearchHit hit = SearchHit.unpooled(0, "id");
hit.sourceRef(new BytesArray("{\"nested\":{\"field1\":\"value1\"}}"));

Map<String, DocumentField> fields = new HashMap<>();
fields.put("new_field", new DocumentField("new_field", List.of("value2")));
hit.addDocumentFields(fields, new HashMap<>());

BytesReference result = ClientScrollableHitSource.generateSource(hit);
Map<String, Object> resultSource = XContentHelper.convertToMap(result, false, XContentHelper.xContentType(result)).v2();

@SuppressWarnings("unchecked")
Map<String, Object> nested = (Map<String, Object>) resultSource.get("nested");
assertEquals("value1", nested.get("field1"));
assertEquals("value2", resultSource.get("new_field"));
}

public void testGenerateSourceWithNoSource() {
// Test case: no source, only fields
SearchHit hit = SearchHit.unpooled(0, "id");
hit.sourceRef(null);

Map<String, DocumentField> fields = new HashMap<>();
fields.put("field1", new DocumentField("field1", List.of("value1")));
hit.addDocumentFields(fields, new HashMap<>());

BytesReference result = ClientScrollableHitSource.generateSource(hit);
Map<String, Object> resultSource = XContentHelper.convertToMap(result, false, XContentHelper.xContentType(result)).v2();

// Source value should be preserved
assertEquals("value1", resultSource.get("field1"));
}

public void testGenerateSourceWithNoSourceAndNoFields() {
// Test case: no source and no fields
SearchHit hit = SearchHit.unpooled(0, "id");
hit.sourceRef(null);
hit.addDocumentFields(new HashMap<>(), new HashMap<>());

BytesReference result = ClientScrollableHitSource.generateSource(hit);
assertNull(result);
}

public void testGenerateSourceWithExistingFieldInSource() {
// Test case: field already exists in source
SearchHit hit = SearchHit.unpooled(0, "id");
hit.sourceRef(new BytesArray("{\"field1\":\"source_value\"}"));

Map<String, DocumentField> fields = new HashMap<>();
fields.put("field1", new DocumentField("field1", List.of("field_value")));
hit.addDocumentFields(fields, new HashMap<>());

BytesReference result = ClientScrollableHitSource.generateSource(hit);
Map<String, Object> resultSource = XContentHelper.convertToMap(result, false, XContentHelper.xContentType(result)).v2();

// Source value should be preserved
assertEquals("source_value", resultSource.get("field1"));
}

private SearchResponse createSearchResponse() {
// create a simulated response.
SearchHit hit = SearchHit.unpooled(0, "id").sourceRef(new BytesArray("{}"));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
setup:
- requires:
cluster_features: [ "reindex.support_from_fields" ]
reason: "reindexing support fields introduced"

- do:
indices.create:
index: source_docvalue
body:
mappings:
_source:
excludes: ["excluded_field"]
properties:
included_field:
type: keyword
excluded_field:
type: keyword
date_field:
type: date
long_field:
type: long

- do:
bulk:
refresh: true
index: source_docvalue
body:
- '{"index": {}}'
- '{"included_field": "value1", "excluded_field": "excluded1", "date_field": "2024-01-01", "long_field": 123}'
- '{"index": {}}'
- '{"included_field": "value2", "excluded_field": "excluded2", "date_field": "2024-01-02", "long_field": 456}'

- do:
indices.create:
index: target_docvalue
body:
mappings:
properties:
included_field:
type: keyword
excluded_field:
type: keyword
date_field:
type: date
long_field:
type: long

- do:
indices.create:
index: target_docvalue2
body:
mappings:
properties:
included_field:
type: keyword
excluded_field:
type: keyword
date_field:
type: date
long_field:
type: long

---
from docvalue fields:
- do:
reindex:
refresh: true
body:
source:
index: source_docvalue
docvalue_fields: ["excluded_field"]
dest:
index: target_docvalue

- match: {created: 2}
- match: {updated: 0}
- match: {version_conflicts: 0}
- match: {batches: 1}
- match: {failures: []}
- match: {throttled_millis: 0}
- gte: { took: 0 }
- is_false: task
- is_false: deleted

- do:
search:
index: target_docvalue
body:
sort:
- included_field: asc
- match: { hits.total.value: 2 }
- match:
hits.hits.0._source:
included_field: value1
excluded_field: excluded1
date_field: "2024-01-01"
long_field: 123
- match:
hits.hits.1._source:
included_field: value2
excluded_field: excluded2
date_field: "2024-01-02"
long_field: 456

---
from docvalue fields with partial source:
- do:
reindex:
refresh: true
body:
source:
index: source_docvalue
_source:
includes: ["date_field"]
docvalue_fields: ["excluded_field", "long_field"]
dest:
index: target_docvalue2

- match: {created: 2}
- match: {updated: 0}
- match: {version_conflicts: 0}
- match: {batches: 1}
- match: {failures: []}
- match: {throttled_millis: 0}
- gte: { took: 0 }
- is_false: task
- is_false: deleted

- do:
search:
index: target_docvalue2
body:
sort:
- long_field: asc
- match: { hits.total.value: 2 }
- match:
hits.hits.0._source:
excluded_field: excluded1
date_field: "2024-01-01"
long_field: 123
- match:
hits.hits.1._source:
excluded_field: excluded2
date_field: "2024-01-02"
long_field: 456

- do:
indices.delete:
index: [source_docvalue, target_docvalue, target_docvalue2]
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

import java.util.Set;

import static org.elasticsearch.index.reindex.ClientScrollableHitSource.REINDEX_SUPPORT_FROM_FIELDS;

public class IndexFeatures implements FeatureSpecification {

@Override
Expand All @@ -39,7 +41,8 @@ public Set<NodeFeature> getTestFeatures() {
LOGSDB_NO_HOST_NAME_FIELD,
SYNONYMS_SET_LENIENT_ON_NON_EXISTING,
THROW_EXCEPTION_FOR_UNKNOWN_TOKEN_IN_REST_INDEX_PUT_ALIAS_ACTION,
THROW_EXCEPTION_ON_INDEX_CREATION_IF_UNSUPPORTED_VALUE_TYPE_IN_ALIAS
THROW_EXCEPTION_ON_INDEX_CREATION_IF_UNSUPPORTED_VALUE_TYPE_IN_ALIAS,
REINDEX_SUPPORT_FROM_FIELDS
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,17 @@
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.features.NodeFeature;
import org.elasticsearch.index.mapper.RoutingFieldMapper;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.lookup.Source;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xcontent.XContentType;

import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;

import static java.util.Collections.emptyList;
Expand All @@ -47,6 +51,8 @@ public class ClientScrollableHitSource extends ScrollableHitSource {
private final ParentTaskAssigningClient client;
private final SearchRequest firstSearchRequest;

public static final NodeFeature REINDEX_SUPPORT_FROM_FIELDS = new NodeFeature("reindex.support_from_fields");

public ClientScrollableHitSource(
Logger logger,
BackoffPolicy backoffPolicy,
Expand Down Expand Up @@ -153,13 +159,44 @@ private static Response wrapSearchResponse(SearchResponse response) {
return new Response(response.isTimedOut(), failures, total, hits, response.getScrollId());
}

public static BytesReference generateSource(SearchHit hit) {
Map<String, DocumentField> fields = hit.getDocumentFields();
if (fields.isEmpty()) {
return hit.hasSource() ? hit.getSourceRef() : null;
}

Source sourceObj = Source.fromBytes(hit.getSourceRef());
Map<String, Object> sourceAsMap = new LinkedHashMap<>(sourceObj.source());
boolean changeSource = false;
for (DocumentField field : fields.values()) {
if (false == sourceAsMap.containsKey(field.getName())) {
if (field.getValues() == null || field.getValues().isEmpty()) {
continue;
}

if (field.getValues().size() == 1) {
sourceAsMap.put(field.getName(), field.getValue());
changeSource = true;
} else {
sourceAsMap.put(field.getName(), field.getValues());
changeSource = true;
}
}
}
if (changeSource) {
return Source.fromMap(sourceAsMap, sourceObj.sourceContentType()).internalSourceRef();
} else {
return hit.hasSource() ? hit.getSourceRef() : null;
}
}

private static class ClientHit implements Hit {
private final SearchHit delegate;
private final BytesReference source;

ClientHit(SearchHit delegate) {
this.delegate = delegate.asUnpooled(); // TODO: use pooled version here
source = this.delegate.hasSource() ? this.delegate.getSourceRef() : null;
this.source = generateSource(delegate);
}

@Override
Expand Down