Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Expose spatial partitioning from SpatialRDD #1751

Closed
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
implement partitioned
  • Loading branch information
paleolimbot committed Jan 22, 2025
commit c971f5fa19179edf244d68909964d75e3474a07b
17 changes: 17 additions & 0 deletions python/sedona/utils/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,3 +210,20 @@ def toDf(
return Adapter.toDf(srdd, fieldNames, spark)
else:
return Adapter.toDf(srdd, spark)

@classmethod
def toDfPartitioned(cls, spatialRDD: SpatialRDD, sparkSession: SparkSession) -> DataFrame:
"""

:param spatialRDD:
:param sparkSession:
:return:
"""
sc = spatialRDD._sc
jvm = sc._jvm

jdf = jvm.Adapter.toDfPartitioned(spatialRDD._srdd, sparkSession._jsparkSession)

df = Adapter._create_dataframe(jdf, sparkSession)

return df
Original file line number Diff line number Diff line change
Expand Up @@ -162,14 +162,10 @@ object Adapter {
spatialRDD: SpatialRDD[T],
schema: StructType,
sparkSession: SparkSession): DataFrame = {
val rdd = spatialRDD.rawSpatialRDD.rdd.mapPartitions(
iter => {
iter.map[Row](geom => {
val stringRow = extractUserData(geom)
castRowToSchema(stringRow = stringRow, schema = schema)
})
},
true)
val rdd = spatialRDD.rawSpatialRDD.rdd.map[Row](geom => {
val stringRow = extractUserData(geom)
castRowToSchema(stringRow = stringRow, schema = schema)
})

sparkSession.sqlContext.createDataFrame(rdd, schema)
}
Expand Down Expand Up @@ -239,6 +235,46 @@ object Adapter {
sparkSession.sqlContext.createDataFrame(rdd, schema)
}

/**
* Convert a spatial RDD to DataFrame with a given schema keeping spatial partitioning
*
* Note that spatial partitioning methods that introduce duplicates will result in an output
* data frame with duplicate features. This property is essential for implementing correct
* joins; however, may introduce surprising results.
*
* @param spatialRDD
* Spatial RDD
* @param fieldNames
* Desired field names
* @param sparkSession
* Spark Session
* @tparam T
* Geometry
* @return
* DataFrame with the specified field names with spatial partitioning preserved
*/
def toDfPartitioned[T <: Geometry](
spatialRDD: SpatialRDD[T],
fieldNames: Seq[String],
sparkSession: SparkSession): DataFrame = {
val rowRdd = spatialRDD.spatialPartitionedRDD.map[Row](geom => {
val stringRow = extractUserData(geom)
Row.fromSeq(stringRow)
})
var cols: Seq[StructField] = Seq(StructField("geometry", GeometryUDT))
if (fieldNames != null && fieldNames.nonEmpty) {
cols = cols ++ fieldNames.map(f => StructField(f, StringType))
}
val schema = StructType(cols)
sparkSession.createDataFrame(rowRdd, schema)
}

def toDfPartitioned[T <: Geometry](
spatialRDD: SpatialRDD[T],
sparkSession: SparkSession): DataFrame = {
toDfPartitioned(spatialRDD, null, sparkSession)
}

/**
* Extract user data from a geometry.
*
Expand Down
Loading