Skip to content

Commit

Permalink
Scala code formatting with Scalafmt
Browse files Browse the repository at this point in the history
Scalafmt https://scalameta.org/scalafmt/#Scalafmt is a code formatting
tool, scalafmt.conf is the main configuration file.
Scalafmt is added using the Gradle Scalafmt plugin:
https://github.com/alenkacz/gradle-scalafmt and Maven scalafmt plugin:
https://github.com/SimonJPegg/mvn_scalafmt. The plugin is configured to run on
compile or can invoked with 'gradle scalafmt' or 'gradle testScalafmt' to
format test code.  It will run during the verification stage of a Maven build.

Set gradle endoding globally  because 'testScalafmt' target would otherwise
choke on some special characters.

Change-Id: Iac96383d88394084e19712177d05f9fc63de766c
Reviewed-on: http://gerrit.cloudera.org:8080/11030
Tested-by: Kudu Jenkins
Reviewed-by: Tony Foerster <[email protected]>
Reviewed-by: Grant Henke <[email protected]>
  • Loading branch information
afoerster authored and granthenke committed Jul 30, 2018
1 parent 349b810 commit 7916695
Show file tree
Hide file tree
Showing 24 changed files with 1,117 additions and 767 deletions.
1 change: 1 addition & 0 deletions java/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,4 @@ gradle/wrapper/*.jar
*.iml
.idea/
classes
**/out
10 changes: 10 additions & 0 deletions java/.scalafmt.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
style = defaultWithAlign
rewrite.rules = [prefercurlyfors, AvoidInfix]
align = false
docstrings=JavaDoc
maxColumn=80
spaces.inImportCurlyBraces=false
unindentTopLevelOperators =true
newlines.alwaysBeforeTopLevelStatements=true
newlines.penalizeSingleSelectMultiArgList=false
lineEndings=unix
1 change: 1 addition & 0 deletions java/buildSrc/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,5 @@ dependencies {
compile "net.ltgt.gradle:gradle-errorprone-plugin:0.0.14"
compile "ru.vyarus:gradle-animalsniffer-plugin:1.4.3"
compile 'com.google.code.gson:gson:2.8.5'
compile "cz.alenkacz:gradle-scalafmt:1.6.0"
}
2 changes: 2 additions & 0 deletions java/gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,5 @@ org.gradle.daemon = true
# org.gradle.configureondemand = true
# org.gradle.parallel = true
# org.gradle.workers.max = 4

systemProp.file.encoding = UTF-8
12 changes: 12 additions & 0 deletions java/gradle/quality.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ apply plugin: "com.github.spotbugs" // Performs static code analysis to look f
apply plugin: "pmd" // Performs static code analysis to look for common code smells in Java code.
apply plugin: "com.github.ben-manes.versions" // Provides a task to determine which dependencies have updates.
apply plugin: "ru.vyarus.animalsniffer" // Ensures Java code uses APIs from a particular version of Java.
apply plugin: "scalafmt"

checkstyle {
configFile = file("$rootDir/kudu_style.xml")
Expand Down Expand Up @@ -62,6 +63,17 @@ pmd {
ignoreFailures = true
}

scalafmt {
configFilePath = "$rootDir/.scalafmt.conf"
}

// Run scalafmt on compile
tasks.withType(ScalaCompile) {
if (!propertyExists("skipFormat")) {
dependsOn("scalafmtAll")
}
}

// Create an aggregate pmd task.
// This simplifies running pmd on all the code by only needing one task instead of multiple in your command.
task pmd(dependsOn: [pmdMain, pmdTest, pmdIntegrationTest], group: "Verification") {
Expand Down
7 changes: 7 additions & 0 deletions java/kudu-backup/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@
</dependencies>

<build>
<!-- set source dirs explicitly for the scalafmt plugin -->
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<extensions>
<!-- Used in the protobuf plugin to find the right protoc artifact
with the property os.detected.classifier -->
Expand Down Expand Up @@ -223,6 +226,10 @@
<artifactId>maven-shade-plugin</artifactId>
<version>${maven-shade-plugin.version}</version>
</plugin>
<plugin>
<groupId>org.antipathy</groupId>
<artifactId>mvn-scalafmt</artifactId>
</plugin>
</plugins>

<!-- This big ol' block of nonsense tells the m2e Eclipse plugin what
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ object KuduBackup {
val log: Logger = LoggerFactory.getLogger(getClass)

def run(options: KuduBackupOptions, session: SparkSession): Unit = {
val context = new KuduContext(options.kuduMasterAddresses, session.sparkContext)
val context =
new KuduContext(options.kuduMasterAddresses, session.sparkContext)
val path = options.path
log.info(s"Backing up to path: $path")

Expand All @@ -45,7 +46,8 @@ object KuduBackup {
val tablePath = Paths.get(path).resolve(URLEncoder.encode(t, "UTF-8"))

val rdd = new KuduBackupRDD(table, options, context, session.sparkContext)
val df = session.sqlContext.createDataFrame(rdd, sparkSchema(table.getSchema))
val df =
session.sqlContext.createDataFrame(rdd, sparkSchema(table.getSchema))
// TODO: Prefix path with the time? Maybe a backup "name" parameter defaulted to something?
// TODO: Take parameter for the SaveMode.
val writer = df.write.mode(SaveMode.ErrorIfExists)
Expand All @@ -59,7 +61,10 @@ object KuduBackup {
}
}

private def writeTableMetadata(metadata: TableMetadataPB, path: Path, session: SparkSession): Unit = {
private def writeTableMetadata(
metadata: TableMetadataPB,
path: Path,
session: SparkSession): Unit = {
val conf = session.sparkContext.hadoopConfiguration
val hPath = new HPath(path.resolve(TableMetadata.MetadataFileName).toString)
val fs = hPath.getFileSystem(conf)
Expand All @@ -71,14 +76,16 @@ object KuduBackup {
}

def main(args: Array[String]): Unit = {
val options = KuduBackupOptions.parse(args)
.getOrElse(throw new IllegalArgumentException("could not parse the arguments"))
val options = KuduBackupOptions
.parse(args)
.getOrElse(
throw new IllegalArgumentException("could not parse the arguments"))

val session = SparkSession.builder()
val session = SparkSession
.builder()
.appName("Kudu Table Backup")
.getOrCreate()

run(options, session)
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -24,76 +24,81 @@ import scopt.OptionParser

@InterfaceAudience.Private
@InterfaceStability.Unstable
case class KuduBackupOptions(tables: Seq[String],
path: String,
kuduMasterAddresses: String = InetAddress.getLocalHost.getCanonicalHostName,
timestampMs: Long = System.currentTimeMillis(),
format: String = KuduBackupOptions.DefaultFormat,
scanBatchSize: Int = KuduBackupOptions.DefaultScanBatchSize,
scanRequestTimeout: Long = KuduBackupOptions.DefaultScanRequestTimeout,
scanPrefetching: Boolean = KuduBackupOptions.DefaultScanPrefetching)
case class KuduBackupOptions(
tables: Seq[String],
path: String,
kuduMasterAddresses: String = InetAddress.getLocalHost.getCanonicalHostName,
timestampMs: Long = System.currentTimeMillis(),
format: String = KuduBackupOptions.DefaultFormat,
scanBatchSize: Int = KuduBackupOptions.DefaultScanBatchSize,
scanRequestTimeout: Long = KuduBackupOptions.DefaultScanRequestTimeout,
scanPrefetching: Boolean = KuduBackupOptions.DefaultScanPrefetching)

object KuduBackupOptions {
val DefaultFormat: String = "parquet"
val DefaultScanBatchSize: Int = 1024*1024*20 // 20 MiB
val DefaultScanRequestTimeout: Long = AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS // 30 seconds
val DefaultScanPrefetching: Boolean = false // TODO: Add a test per KUDU-1260 and enable by default?
val DefaultScanBatchSize: Int = 1024 * 1024 * 20 // 20 MiB
val DefaultScanRequestTimeout: Long =
AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS // 30 seconds
val DefaultScanPrefetching
: Boolean = false // TODO: Add a test per KUDU-1260 and enable by default?

// TODO: clean up usage output.
// TODO: timeout configurations.
private val parser: OptionParser[KuduBackupOptions] = new OptionParser[KuduBackupOptions]("KuduBackup") {
opt[String]("path")
.action((v, o) => o.copy(path = v))
.text("The root path to output backup data. Accepts any Spark compatible path.")
.optional()
private val parser: OptionParser[KuduBackupOptions] =
new OptionParser[KuduBackupOptions]("KuduBackup") {
opt[String]("path")
.action((v, o) => o.copy(path = v))
.text("The root path to output backup data. Accepts any Spark compatible path.")
.optional()

opt[String]("kuduMasterAddresses")
.action((v, o) => o.copy(kuduMasterAddresses = v))
.text("Comma-separated addresses of Kudu masters.")
.optional()
opt[String]("kuduMasterAddresses")
.action((v, o) => o.copy(kuduMasterAddresses = v))
.text("Comma-separated addresses of Kudu masters.")
.optional()

opt[Long]("timestampMs")
.action((v, o) => o.copy(timestampMs = v))
// TODO: Document the limitations based on cluster configuration (ex: ancient history watermark).
.text("A UNIX timestamp in milliseconds since the epoch to execute scans at.")
.optional()
opt[Long]("timestampMs")
.action((v, o) => o.copy(timestampMs = v))
// TODO: Document the limitations based on cluster configuration (ex: ancient history watermark).
.text("A UNIX timestamp in milliseconds since the epoch to execute scans at.")
.optional()

opt[String]("format")
.action((v, o) => o.copy(format = v))
.text("The file format to use when writing the data.")
.optional()
opt[String]("format")
.action((v, o) => o.copy(format = v))
.text("The file format to use when writing the data.")
.optional()

opt[Int]("scanBatchSize")
.action((v, o) => o.copy(scanBatchSize = v))
.text("The maximum number of bytes returned by the scanner, on each batch.")
.optional()
opt[Int]("scanBatchSize")
.action((v, o) => o.copy(scanBatchSize = v))
.text(
"The maximum number of bytes returned by the scanner, on each batch.")
.optional()

opt[Int]("scanRequestTimeout")
.action((v, o) => o.copy(scanRequestTimeout = v))
.text("Sets how long each scan request to a server can last.")
.optional()
opt[Int]("scanRequestTimeout")
.action((v, o) => o.copy(scanRequestTimeout = v))
.text("Sets how long each scan request to a server can last.")
.optional()

opt[Unit]("scanPrefetching")
.action( (_, o) => o.copy(scanPrefetching = true) )
.text("An experimental flag to enable pre-fetching data.")
.optional()
opt[Unit]("scanPrefetching")
.action((_, o) => o.copy(scanPrefetching = true))
.text("An experimental flag to enable pre-fetching data.")
.optional()

arg[String]("<table>...")
.unbounded()
.action( (v, o) => o.copy(tables = o.tables :+ v) )
.text("A list of tables to be backed up.")
}
arg[String]("<table>...")
.unbounded()
.action((v, o) => o.copy(tables = o.tables :+ v))
.text("A list of tables to be backed up.")
}

/**
* Parses the passed arguments into Some[KuduBackupOptions].
*
* If the arguments are bad, an error message is displayed
* and None is returned.
*
* @param args The arguments to parse.
* @return Some[KuduBackupOptions] if parsing was successful, None if not.
*/
* Parses the passed arguments into Some[KuduBackupOptions].
*
* If the arguments are bad, an error message is displayed
* and None is returned.
*
* @param args The arguments to parse.
* @return Some[KuduBackupOptions] if parsing was successful, None if not.
*/
def parse(args: Seq[String]): Option[KuduBackupOptions] = {
parser.parse(args, KuduBackupOptions(Seq(), null))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,25 @@ import scala.collection.JavaConverters._

@InterfaceAudience.Private
@InterfaceStability.Unstable
class KuduBackupRDD private[kudu](@transient val table: KuduTable,
@transient val options: KuduBackupOptions,
val kuduContext: KuduContext,
@transient val sc: SparkContext
) extends RDD[Row](sc, Nil) {
class KuduBackupRDD private[kudu] (
@transient val table: KuduTable,
@transient val options: KuduBackupOptions,
val kuduContext: KuduContext,
@transient val sc: SparkContext)
extends RDD[Row](sc, Nil) {

// TODO: Split large tablets into smaller scan tokens?
override protected def getPartitions: Array[Partition] = {
val client = kuduContext.syncClient

// Set a hybrid time for the scan to ensure application consistency.
val timestampMicros = TimeUnit.MILLISECONDS.toMicros(options.timestampMs)
val hybridTime = HybridTimeUtil.physicalAndLogicalToHTTimestamp(timestampMicros, 0)
val hybridTime =
HybridTimeUtil.physicalAndLogicalToHTTimestamp(timestampMicros, 0)

// Create the scan tokens for each partition.
val tokens = client.newScanTokenBuilder(table)
val tokens = client
.newScanTokenBuilder(table)
.cacheBlocks(false)
// TODO: Use fault tolerant scans to get mostly.
// ordered results when KUDU-2466 is fixed.
Expand All @@ -72,11 +75,14 @@ class KuduBackupRDD private[kudu](@transient val table: KuduTable,
// TODO: Do we need a custom spark partitioner for any guarantees?
// override val partitioner = None

override def compute(part: Partition, taskContext: TaskContext): Iterator[Row] = {
override def compute(
part: Partition,
taskContext: TaskContext): Iterator[Row] = {
val client: KuduClient = kuduContext.syncClient
val partition: KuduBackupPartition = part.asInstanceOf[KuduBackupPartition]
// TODO: Get deletes and updates for incremental backups.
val scanner = KuduScanToken.deserializeIntoScanner(partition.scanToken, client)
val scanner =
KuduScanToken.deserializeIntoScanner(partition.scanToken, client)
new RowIterator(scanner)
}

Expand All @@ -85,19 +91,22 @@ class KuduBackupRDD private[kudu](@transient val table: KuduTable,
}
}

private case class KuduBackupPartition(index: Int,
scanToken: Array[Byte],
locations: Array[String]) extends Partition
private case class KuduBackupPartition(
index: Int,
scanToken: Array[Byte],
locations: Array[String])
extends Partition

/**
* This iterator wraps a KuduScanner, converts the returned RowResults into a
* Spark Row, and allows iterating over those scanned results.
*
* The Spark RDD abstraction has an abstract compute method, implemented in KuduBackupRDD,
* that takes the job partitions and task context and expects to return an Iterator[Row].
* This implementation facilitates that.
*/
private class RowIterator(private val scanner: KuduScanner) extends Iterator[Row] {
* This iterator wraps a KuduScanner, converts the returned RowResults into a
* Spark Row, and allows iterating over those scanned results.
*
* The Spark RDD abstraction has an abstract compute method, implemented in KuduBackupRDD,
* that takes the job partitions and task context and expects to return an Iterator[Row].
* This implementation facilitates that.
*/
private class RowIterator(private val scanner: KuduScanner)
extends Iterator[Row] {

private var currentIterator: RowResultIterator = _

Expand All @@ -115,20 +124,23 @@ private class RowIterator(private val scanner: KuduScanner) extends Iterator[Row
// TODO: Use a more "raw" encoding for efficiency?
private def get(rowResult: RowResult, i: Int): Any = {
if (rowResult.isNull(i)) null
else rowResult.getColumnType(i) match {
case Type.BOOL => rowResult.getBoolean(i)
case Type.INT8 => rowResult.getByte(i)
case Type.INT16 => rowResult.getShort(i)
case Type.INT32 => rowResult.getInt(i)
case Type.INT64 => rowResult.getLong(i)
case Type.UNIXTIME_MICROS => rowResult.getTimestamp(i)
case Type.FLOAT => rowResult.getFloat(i)
case Type.DOUBLE => rowResult.getDouble(i)
case Type.STRING => rowResult.getString(i)
case Type.BINARY => rowResult.getBinaryCopy(i)
case Type.DECIMAL => rowResult.getDecimal(i)
case _ => throw new RuntimeException(s"Unsupported column type: ${rowResult.getColumnType(i)}")
}
else
rowResult.getColumnType(i) match {
case Type.BOOL => rowResult.getBoolean(i)
case Type.INT8 => rowResult.getByte(i)
case Type.INT16 => rowResult.getShort(i)
case Type.INT32 => rowResult.getInt(i)
case Type.INT64 => rowResult.getLong(i)
case Type.UNIXTIME_MICROS => rowResult.getTimestamp(i)
case Type.FLOAT => rowResult.getFloat(i)
case Type.DOUBLE => rowResult.getDouble(i)
case Type.STRING => rowResult.getString(i)
case Type.BINARY => rowResult.getBinaryCopy(i)
case Type.DECIMAL => rowResult.getDecimal(i)
case _ =>
throw new RuntimeException(
s"Unsupported column type: ${rowResult.getColumnType(i)}")
}
}

// TODO: There may be an old KuduRDD implementation where we did some
Expand Down
Loading

0 comments on commit 7916695

Please sign in to comment.