Skip to content

Commit

Permalink
[SPARK] Take into account rich data when generating the schema
Browse files Browse the repository at this point in the history
relates elastic#672
  • Loading branch information
costin committed Jan 26, 2016
1 parent 0f38774 commit 16f42ac
Showing 1 changed file with 9 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ private[sql] object SchemaUtils {
val arrayIncludes = SettingsUtils.getFieldArrayFilterInclude(cfg)
val arrayExcludes = StringUtils.tokenize(cfg.getReadFieldAsArrayExclude)

var fields = for (fl <- rootField.properties()) yield convertField(fl, geoInfo, null, arrayIncludes, arrayExcludes)
var fields = for (fl <- rootField.properties()) yield convertField(fl, geoInfo, null, arrayIncludes, arrayExcludes, cfg)
if (cfg.getReadMetadata) {
// enrich structure
val metadataMap = DataTypes.createStructField(cfg.getReadMetadataField, DataTypes.createMapType(StringType, StringType, true), true)
Expand All @@ -115,11 +115,13 @@ private[sql] object SchemaUtils {
DataTypes.createStructType(fields)
}

private def convertToStruct(field: Field, geoInfo: JMap[String, GeoField], parentName: String, arrayIncludes: JList[NumberedInclude], arrayExcludes: JList[String]): StructType = {
DataTypes.createStructType(for (fl <- field.properties()) yield convertField(fl, geoInfo, parentName, arrayIncludes, arrayExcludes))
private def convertToStruct(field: Field, geoInfo: JMap[String, GeoField], parentName: String,
arrayIncludes: JList[NumberedInclude], arrayExcludes: JList[String], cfg:Settings): StructType = {
DataTypes.createStructType(for (fl <- field.properties()) yield convertField(fl, geoInfo, parentName, arrayIncludes, arrayExcludes, cfg))
}

private def convertField(field: Field, geoInfo: JMap[String, GeoField], parentName: String, arrayIncludes: JList[NumberedInclude], arrayExcludes: JList[String]): StructField = {
private def convertField(field: Field, geoInfo: JMap[String, GeoField], parentName: String,
arrayIncludes: JList[NumberedInclude], arrayExcludes: JList[String], cfg:Settings): StructField = {
val absoluteName = if (parentName != null) parentName + "." + field.name() else field.name()
val matched = FieldFilter.filter(absoluteName, arrayIncludes, arrayExcludes, false)
val createArray = !arrayIncludes.isEmpty() && matched.matched
Expand All @@ -135,9 +137,9 @@ private[sql] object SchemaUtils {
case FLOAT => FloatType
case DOUBLE => DoubleType
case STRING => StringType
case DATE => TimestampType
case OBJECT => convertToStruct(field, geoInfo, absoluteName, arrayIncludes, arrayExcludes)
case NESTED => DataTypes.createArrayType(convertToStruct(field, geoInfo, absoluteName, arrayIncludes, arrayExcludes))
case DATE => if (cfg.getMappingDateRich) TimestampType else StringType
case OBJECT => convertToStruct(field, geoInfo, absoluteName, arrayIncludes, arrayExcludes, cfg)
case NESTED => DataTypes.createArrayType(convertToStruct(field, geoInfo, absoluteName, arrayIncludes, arrayExcludes, cfg))

// GEO
case GEO_POINT => {
Expand Down

0 comments on commit 16f42ac

Please sign in to comment.