Skip to content

Commit

Permalink
[SPARK] Add more logging for geo types
Browse files Browse the repository at this point in the history
  • Loading branch information
costin committed Jan 27, 2016
1 parent f195694 commit c7c398a
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,5 +103,9 @@ class JavaEsRowValueReader extends JdkValueReader with RowValueReader with Value
def endTrailMetadata() {}

def endDoc() {}

def beginGeoField() {}

def endGeoField() {}

}
Original file line number Diff line number Diff line change
Expand Up @@ -130,19 +130,19 @@ private[sql] case class ElasticsearchRelation(parameters: Map[String, String], @

if (filters != null && filters.size > 0) {
if (Utils.isPushDown(cfg)) {
if (logger.isDebugEnabled()) {
logger.debug(s"Pushing down filters ${filters.mkString("[", ",", "]")}")
if (Utils.LOGGER.isDebugEnabled()) {
Utils.LOGGER.debug(s"Pushing down filters ${filters.mkString("[", ",", "]")}")
}
val filterString = createDSLFromFilters(filters, Utils.isPushDownStrict(cfg))

if (logger.isTraceEnabled()) {
logger.trace(s"Transformed filters into DSL ${filterString.mkString("[", ",", "]")}")
if (Utils.LOGGER.isTraceEnabled()) {
Utils.LOGGER.trace(s"Transformed filters into DSL ${filterString.mkString("[", ",", "]")}")
}
paramWithScan += (InternalConfigurationOptions.INTERNAL_ES_QUERY_FILTERS -> IOUtils.serializeToBase64(filterString))
}
else {
if (logger.isTraceEnabled()) {
logger.trace("Push-down is disabled; ignoring Spark filters...")
if (Utils.LOGGER.isTraceEnabled()) {
Utils.LOGGER.trace("Push-down is disabled; ignoring Spark filters...")
}
}
}
Expand Down Expand Up @@ -184,8 +184,8 @@ private[sql] case class ElasticsearchRelation(parameters: Map[String, String], @
}

val filtered = filters.filter(unhandled)
if (logger.isTraceEnabled()) {
logger.trace(s"Unhandled filters from ${filters.mkString("[", ",", "]")} to ${filtered.mkString("[", ",", "]")}")
if (Utils.LOGGER.isTraceEnabled()) {
Utils.LOGGER.trace(s"Unhandled filters from ${filters.mkString("[", ",", "]")} to ${filtered.mkString("[", ",", "]")}")
}
filtered
}
Expand Down Expand Up @@ -230,8 +230,8 @@ private[sql] case class ElasticsearchRelation(parameters: Map[String, String], @
}

if (!strictPushDown && isStrictType) {
if (logger.isDebugEnabled()) {
logger.debug(s"Attribute $attribute type $attrType not suitable for match query; using terms (strict) instead")
if (Utils.LOGGER.isDebugEnabled()) {
Utils.LOGGER.debug(s"Attribute $attribute type $attrType not suitable for match query; using terms (strict) instead")
}
}

Expand Down Expand Up @@ -371,7 +371,7 @@ private[sql] case class ElasticsearchRelation(parameters: Map[String, String], @

def insert(data: DataFrame, overwrite: Boolean) {
if (overwrite) {
logger.info(s"Overwriting data for ${cfg.getResourceWrite}")
Utils.LOGGER.info(s"Overwriting data for ${cfg.getResourceWrite}")

// perform a scan-scroll delete
val cfgCopy = cfg.copy()
Expand All @@ -392,6 +392,4 @@ private[sql] case class ElasticsearchRelation(parameters: Map[String, String], @
rr.close()
empty
}

private def logger = LogFactory.getLog("org.elasticsearch.spark.sql.DataSource")
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ private[sql] object SchemaUtils {

def discoverMappingAsField(cfg: Settings): (Field, JMap[String, GeoField]) = {
InitializationUtils.validateSettings(cfg);
InitializationUtils.discoverEsVersion(cfg, LogFactory.getLog("org.elasticsearch.spark.sql.DataSource"));
InitializationUtils.discoverEsVersion(cfg, Utils.LOGGER);

val repo = new RestRepository(cfg)
try {
Expand Down Expand Up @@ -143,7 +143,7 @@ private[sql] object SchemaUtils {

// GEO
case GEO_POINT => {
geoInfo.get(absoluteName) match {
val geoPoint = geoInfo.get(absoluteName) match {
case GeoPointType.LAT_LON_ARRAY => DataTypes.createArrayType(DoubleType)
case GeoPointType.GEOHASH => StringType
case GeoPointType.LAT_LON_STRING => StringType
Expand All @@ -153,6 +153,11 @@ private[sql] object SchemaUtils {
DataTypes.createStructType(Array(lon,lat))
}
}

if (Utils.LOGGER.isDebugEnabled()) {
Utils.LOGGER.debug(s"Detected field [${absoluteName}] as a GeoPoint with format ${geoPoint.simpleString}")
}
geoPoint
}
case GEO_SHAPE => {
val fields = new ArrayList[StructField]()
Expand All @@ -175,7 +180,12 @@ private[sql] object SchemaUtils {
fields.add(DataTypes.createStructField("radius", StringType, true))
}
}
DataTypes.createStructType(fields)
val geoShape = DataTypes.createStructType(fields)

if (Utils.LOGGER.isDebugEnabled()) {
Utils.LOGGER.debug(s"Detected field [${absoluteName}] as a GeoShape with format ${geoShape.simpleString}")
}
geoShape
}
// fall back to String
case _ => StringType //throw new EsHadoopIllegalStateException("Unknown field type " + field);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package org.elasticsearch.spark.sql;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.serialization.FieldType;
import org.elasticsearch.hadoop.serialization.dto.mapping.Field;
import org.elasticsearch.hadoop.util.unit.Booleans;

abstract class Utils {

static final Log LOGGER = LogFactory.getLog("org.elasticsearch.spark.sql.DataSource");

// required since type has a special meaning in Scala
// and thus the method cannot be called
static FieldType extractType(Field field) {
Expand Down

0 comments on commit c7c398a

Please sign in to comment.