Skip to content

Commit

Permalink
issue-92: Fixes bucket ID bug in update/delete operation on acid table (
Browse files Browse the repository at this point in the history
qubole#93)

There is a bug in getting bucket ID from each InternalRow of the table . To fetch the bucket id from unsafe row, we are passing table schema. Instead we should have passed rowID schema (which is a struct type and contains bucketID, rowID and writeID). As a result of it, unsafe row returns wrong integer value for rowID column.

Co-authored-by: Sourabh Goyal <[email protected]>
  • Loading branch information
sourabh912 and Sourabh Goyal authored Aug 6, 2020
1 parent e8915f3 commit 8195d0f
Show file tree
Hide file tree
Showing 3 changed files with 4 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ private[hiveacid] class TableWriter(sparkSession: SparkSession,
//
// There is still a chance that rows from multiple buckets go to same partition as well, but this is expected to work!
case HiveAcidOperation.DELETE | HiveAcidOperation.UPDATE =>
df.repartition(MAX_NUMBER_OF_BUCKETS, functions.expr("shiftright(rowId.bucketId & 268369920, 16)"))
df.repartition(MAX_NUMBER_OF_BUCKETS, functions.expr("shiftRightUnsigned(rowId.bucketId & 268369920, 16)"))
.toDF.sortWithinPartitions("rowId.writeId", "rowId.bucketId", "rowId.rowId")
.toDF.queryExecution.executedPlan.execute()
case HiveAcidOperation.INSERT_OVERWRITE | HiveAcidOperation.INSERT_INTO =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.sql.types.StructType
private[hiveacid] class WriterOptions(val currentWriteId: Long,
val operationType: HiveAcidOperation.OperationType,
val serializableHadoopConf: SerializableConfiguration,
val rowIDSchema: StructType,
val tableSchemaWithrowID: StructType,
val dataColumns: Seq[Attribute],
val partitionColumns: Seq[Attribute],
val allColumns: Seq[Attribute],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import com.qubole.shaded.hadoop.hive.serde2.{Deserializer, SerDeUtils}
import com.qubole.shaded.hadoop.hive.serde2.Serializer
import com.qubole.shaded.hadoop.hive.serde2.objectinspector.{ObjectInspector, ObjectInspectorFactory, ObjectInspectorUtils, StructObjectInspector}
import com.qubole.shaded.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
import com.qubole.spark.hiveacid.hive.HiveAcidMetadata
import com.qubole.spark.hiveacid.{HiveAcidErrors, HiveAcidOperation}
import com.qubole.spark.hiveacid.util.Util
import com.qubole.spark.hiveacid.writer.{Writer, WriterOptions}
Expand Down Expand Up @@ -269,7 +270,7 @@ private[writer] class HiveAcidFullAcidWriter(options: WriterOptions,
case HiveAcidOperation.INSERT_INTO | HiveAcidOperation.INSERT_OVERWRITE =>
taskToBucketId
case HiveAcidOperation.DELETE | HiveAcidOperation.UPDATE =>
val rowID = dataRow.get(rowIdColNum, options.rowIDSchema)
val rowID = dataRow.get(rowIdColNum, HiveAcidMetadata.rowIdSchema)
// FIXME: Currently hard coding codec as V1 and also bucket ordinal as 1.
BucketCodec.V1.decodeWriterId(rowID.asInstanceOf[UnsafeRow].getInt(1))
case x =>
Expand Down

0 comments on commit 8195d0f

Please sign in to comment.