Skip to content

selective mapping check when user specified schema #2386

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions mr/src/main/java/org/elasticsearch/hadoop/rest/RestClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -311,14 +311,22 @@ public List<List<Map<String, Object>>> targetShards(String index, String routing
}

public MappingSet getMappings(Resource indexResource) {
return getMappings(indexResource, Collections.emptyList());
}

public MappingSet getMappings(Resource indexResource, Collection<String> includeFields) {
if (indexResource.isTyped()) {
return getMappings(indexResource.index() + "/_mapping/" + indexResource.type(), true);
return getMappings(indexResource.index() + "/_mapping/" + indexResource.type(), true, includeFields);
} else {
return getMappings(indexResource.index() + "/_mapping" + (indexReadMissingAsEmpty ? "?ignore_unavailable=true" : ""), false);
return getMappings(indexResource.index() + "/_mapping" + (indexReadMissingAsEmpty ? "?ignore_unavailable=true" : ""), false, includeFields);
}
}

public MappingSet getMappings(String query, boolean includeTypeName) {
return getMappings(query, includeTypeName, Collections.emptyList());
}

public MappingSet getMappings(String query, boolean includeTypeName, Collection<String> includeFields) {
// If the version is not at least 7, then the property isn't guaranteed to exist. If it is, then defer to the flag.
boolean requestTypeNameInResponse = clusterInfo.getMajorVersion().onOrAfter(EsMajorVersion.V_7_X) && includeTypeName;
// Response will always have the type name in it if node version is before 7, and if it is not, defer to the flag.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,7 @@
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.Map.Entry;

import static org.elasticsearch.hadoop.rest.Request.Method.POST;
Expand Down Expand Up @@ -300,6 +296,10 @@ public MappingSet getMappings() {
return client.getMappings(resources.getResourceRead());
}

public MappingSet getMappings(Collection<String> includeFields) {
return client.getMappings(resources.getResourceRead(), includeFields);
}

public Map<String, GeoField> sampleGeoFields(Mapping mapping) {
Map<String, GeoType> fields = MappingUtils.geoFields(mapping);
Map<String, Object> geoMapping = client.sampleForFields(resources.getResourceRead(), fields.keySet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@

package org.elasticsearch.hadoop.serialization.dto.mapping;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.*;

import org.elasticsearch.hadoop.EsHadoopIllegalArgumentException;
import org.elasticsearch.hadoop.serialization.FieldType;
Expand Down Expand Up @@ -52,13 +49,25 @@ public static MappingSet parseTypelessMappings(Map<String, Object> content) {
* @return MappingSet for that response.
*/
public static MappingSet parseMappings(Map<String, Object> content, boolean includeTypeName) {
return parseMappings(content, includeTypeName, Collections.emptyList());
}

/**
* Convert the deserialized mapping request body into an object
* @param content entire mapping request body for all indices and types
* @param includeTypeName true if the given content to be parsed includes type names within the structure,
* or false if it is in the typeless format
* @param includeFields list of field that should have mapping checked
* @return MappingSet for that response.
*/
public static MappingSet parseMappings(Map<String, Object> content, boolean includeTypeName, Collection<String> includeFields) {
Iterator<Map.Entry<String, Object>> indices = content.entrySet().iterator();
List<Mapping> indexMappings = new ArrayList<Mapping>();
while(indices.hasNext()) {
// These mappings are ordered by index, then optionally type.
parseIndexMappings(indices.next(), indexMappings, includeTypeName);
}
return new MappingSet(indexMappings);
return new MappingSet(indexMappings, includeFields);
}

private static void parseIndexMappings(Map.Entry<String, Object> indexToMappings, List<Mapping> collector, boolean includeTypeName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,7 @@
package org.elasticsearch.hadoop.serialization.dto.mapping;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.*;

import org.elasticsearch.hadoop.EsHadoopIllegalArgumentException;
import org.elasticsearch.hadoop.serialization.FieldType;
Expand All @@ -46,7 +41,7 @@ public class MappingSet implements Serializable {
private final Map<String, Map<String, Mapping>> indexTypeMap = new HashMap<String, Map<String, Mapping>>();
private final Mapping resolvedSchema;

public MappingSet(List<Mapping> mappings) {
public MappingSet(List<Mapping> mappings, Collection<String> includeFields) {
if (mappings.isEmpty()) {
this.empty = true;
this.resolvedSchema = new Mapping(RESOLVED_INDEX_NAME, RESOLVED_MAPPING_NAME, Field.NO_FIELDS);
Expand Down Expand Up @@ -78,34 +73,37 @@ public MappingSet(List<Mapping> mappings) {

mappingsToSchema.put(typeName, mapping);
}
this.resolvedSchema = mergeMappings(mappings);
this.resolvedSchema = mergeMappings(mappings, includeFields);
}
}

private static Mapping mergeMappings(List<Mapping> mappings) {
private static Mapping mergeMappings(List<Mapping> mappings, Collection<String> includeFields) {
Map<String, Object[]> fieldMap = new LinkedHashMap<String, Object[]>();
for (Mapping mapping: mappings) {
for (Field field : mapping.getFields()) {
addToFieldTable(field, "", fieldMap);
addToFieldTable(field, "", fieldMap, includeFields);
}
}
Field[] collapsed = collapseFields(fieldMap);
return new Mapping(RESOLVED_INDEX_NAME, RESOLVED_MAPPING_NAME, collapsed);
}

@SuppressWarnings("unchecked")
private static void addToFieldTable(Field field, String parent, Map<String, Object[]> fieldTable) {
private static void addToFieldTable(Field field, String parent, Map<String, Object[]> fieldTable, Collection<String> includeFields) {
String fullName = parent + field.name();
Object[] entry = fieldTable.get(fullName);
if (entry == null) {
if (!includeFields.isEmpty() && !includeFields.contains(fullName)) {
return;
}
else if (entry == null) {
// Haven't seen field yet.
if (FieldType.isCompound(field.type())) {
// visit its children
Map<String, Object[]> subTable = new LinkedHashMap<String, Object[]>();
entry = new Object[]{field, subTable};
String prefix = fullName + ".";
for (Field subField : field.properties()) {
addToFieldTable(subField, prefix, subTable);
addToFieldTable(subField, prefix, subTable, includeFields);
}
} else {
// note that we saw it
Expand All @@ -130,7 +128,7 @@ private static void addToFieldTable(Field field, String parent, Map<String, Obje
Map<String, Object[]> subTable = (Map<String, Object[]>)entry[1];
String prefix = fullName + ".";
for (Field subField : field.properties()) {
addToFieldTable(subField, prefix, subTable);
addToFieldTable(subField, prefix, subTable, includeFields);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ import org.elasticsearch.hadoop.util.StringUtils
import org.elasticsearch.hadoop.util.Version
import org.elasticsearch.spark.cfg.SparkSettingsManager
import org.elasticsearch.spark.serialization.ScalaValueWriter
import org.elasticsearch.spark.sql.SchemaUtils.{Schema, discoverMapping}
import org.elasticsearch.spark.sql.streaming.EsSparkSqlStreamingSink
import org.elasticsearch.spark.sql.streaming.SparkSqlStreamingConfigs
import org.elasticsearch.spark.sql.streaming.StructuredStreamingVersionLock
Expand Down Expand Up @@ -235,11 +236,15 @@ private[sql] case class ElasticsearchRelation(parameters: Map[String, String], @
conf
}

@transient lazy val lazySchema = { SchemaUtils.discoverMapping(cfg) }
@transient lazy val lazySchema = userSchema match {
case None => SchemaUtils.discoverMapping(cfg)
//TODO: properly flatten the schema so we can selectively check mapping of nested field as well
case Some(s) => SchemaUtils.discoverMapping(cfg, s.names) // Or we just take the user specified schema as it is: Schema(s)
}

@transient lazy val valueWriter = { new ScalaValueWriter }

override def schema = userSchema.getOrElse(lazySchema.struct)
override def schema: StructType = lazySchema.struct

// TableScan
def buildScan(): RDD[Row] = buildScan(Array.empty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import java.util.{LinkedHashSet => JHashSet}
import java.util.{List => JList}
import java.util.{Map => JMap}
import java.util.Properties

import scala.collection.JavaConverters.asScalaBufferConverter
import scala.collection.JavaConverters.propertiesAsScalaMapConverter
import scala.collection.mutable.ArrayBuffer
Expand Down Expand Up @@ -70,12 +69,7 @@ import org.elasticsearch.hadoop.serialization.FieldType.SHORT
import org.elasticsearch.hadoop.serialization.FieldType.STRING
import org.elasticsearch.hadoop.serialization.FieldType.TEXT
import org.elasticsearch.hadoop.serialization.FieldType.WILDCARD
import org.elasticsearch.hadoop.serialization.dto.mapping.Field
import org.elasticsearch.hadoop.serialization.dto.mapping.GeoField
import org.elasticsearch.hadoop.serialization.dto.mapping.GeoPointType
import org.elasticsearch.hadoop.serialization.dto.mapping.GeoShapeType
import org.elasticsearch.hadoop.serialization.dto.mapping.Mapping
import org.elasticsearch.hadoop.serialization.dto.mapping.MappingUtils
import org.elasticsearch.hadoop.serialization.dto.mapping.{Field, GeoField, GeoPointType, GeoShapeType, Mapping, MappingSet, MappingUtils}
import org.elasticsearch.hadoop.serialization.field.FieldFilter
import org.elasticsearch.hadoop.serialization.field.FieldFilter.NumberedInclude
import org.elasticsearch.hadoop.util.Assert
Expand All @@ -86,23 +80,25 @@ import org.elasticsearch.spark.sql.Utils.ROOT_LEVEL_NAME
import org.elasticsearch.spark.sql.Utils.ROW_INFO_ARRAY_PROPERTY
import org.elasticsearch.spark.sql.Utils.ROW_INFO_ORDER_PROPERTY

import scala.jdk.CollectionConverters.SeqHasAsJava

private[sql] object SchemaUtils {
case class Schema(mapping: Mapping, struct: StructType)
case class Schema(struct: StructType)

def discoverMapping(cfg: Settings): Schema = {
val (mapping, geoInfo) = discoverMappingAndGeoFields(cfg)
def discoverMapping(cfg: Settings, includeFields: Seq[String] = Seq.empty[String]): Schema = {
val (mapping, geoInfo) = discoverMappingAndGeoFields(cfg, includeFields)
val struct = convertToStruct(mapping, geoInfo, cfg)
Schema(mapping, struct)
Schema(struct)
}

def discoverMappingAndGeoFields(cfg: Settings): (Mapping, JMap[String, GeoField]) = {
def discoverMappingAndGeoFields(cfg: Settings, includeFields: Seq[String]): (Mapping, JMap[String, GeoField]) = {
InitializationUtils.validateSettings(cfg)
InitializationUtils.discoverClusterInfo(cfg, Utils.LOGGER)

val repo = new RestRepository(cfg)
try {
if (repo.resourceExists(true)) {
var mappingSet = repo.getMappings
val mappingSet = repo.getMappings(includeFields.asJava)
if (mappingSet == null || mappingSet.isEmpty) {
throw new EsHadoopIllegalArgumentException(s"Cannot find mapping for ${cfg.getResourceRead} - one is required before using Spark SQL")
}
Expand Down