Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
allwefantasy committed May 24, 2021
1 parent bf3914f commit 86919ff
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import org.apache.spark.sql.types.{BinaryType, LongType, StructField, StructType
import org.apache.spark.sql.{functions => f, _}
import org.kamranzafar.jtar.{TarEntry, TarInputStream}
import tech.mlsql.common.utils.path.PathFun
import tech.mlsql.tool.{HDFSOperatorV2, TarfileUtil, YieldByteArrayOutputStream}
import tech.mlsql.tool.{HDFSOperatorV2, SparkTarfileUtil, TarfileUtil, YieldByteArrayOutputStream}

import scala.collection.mutable.ArrayBuffer

Expand Down Expand Up @@ -62,35 +62,12 @@ class MLSQLUnStructured(override val uid: String) extends MLSQLBaseFileSource wi
val context = ScriptSQLExec.contextGetOrForTest()
val targetPath = resourceRealPath(context.execListener, Option(context.owner), config.path)
val rdd = config.df.get.repartition(1).sortWithinPartitions(f.col("start").asc).rdd
assert(rdd.partitions.length == 1,"rdd partition num should be 1")
rdd.foreachPartition { iter =>
if (!iter.hasNext) Seq[Row]().toIterator
else {
val fs = FileSystem.get(HDFSOperatorV2.hadoopConfiguration)
var currentBlockRow = iter.next()
var currentBuf = currentBlockRow.getAs[Array[Byte]]("value")
var currentBufPos = 0
val inputStream = new InputStream {
override def read(): Int = {
if (currentBufPos == currentBuf.length) {
val hasNext = iter.hasNext
if (hasNext) {
currentBlockRow = iter.next()
currentBuf = currentBlockRow.getAs[Array[Byte]]("value")
currentBufPos = 0
} else {
return -1
}
}
val b = currentBuf(currentBufPos)
currentBufPos += 1
b & 0xFF
}
}

// val fio = new FileOutputStream(new File("/tmp/wowowowow.tar"))
// IOUtils.copy(inputStream,fio)
// fio.close()
// inputStream.close()
val inputStream = SparkTarfileUtil.buildInputStreamFromIterator(iter)

val fileNames = new ArrayBuffer[String]()
val tarInputStream = new TarInputStream(inputStream)
Expand Down
38 changes: 28 additions & 10 deletions streamingpro-mlsql/src/main/java/tech/mlsql/ets/ShowFileTable.scala
Original file line number Diff line number Diff line change
@@ -1,26 +1,44 @@
package tech.mlsql.ets

import java.io.ByteArrayInputStream
import java.io.InputStream

import org.apache.hadoop.fs.FileSystem
import org.apache.spark.MLSQLSparkUtils
import org.apache.spark.sql.{functions => f}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, functions => f}
import org.kamranzafar.jtar.{TarEntry, TarInputStream}
import tech.mlsql.app.{ResultRender, ResultResp}
import tech.mlsql.tool.TarfileUtil
import tech.mlsql.tool.{HDFSOperatorV2, SparkTarfileUtil}

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

/**
* 24/5/2021 WilliamZhu([email protected])
*/
class ShowFileTable extends ResultRender {
override def call(d: ResultResp): ResultResp = {
if (MLSQLSparkUtils.isFileTypeTable(d.df)) {
val newdf = d.df.repartition(1).sortWithinPartitions(f.col("start").asc)
val bytesArray = newdf.collect().map(_.getAs[Array[Byte]]("value")).reduce((a, b) => a ++ b)
val fileNames = TarfileUtil.extractTarFile(new ByteArrayInputStream(bytesArray))
import d.df.sparkSession.implicits._
val ds = d.df.sparkSession.createDataset[String](fileNames.asScala.toSeq)
ResultResp(ds.toDF("content"), d.name)
val rdd = d.df.repartition(1).sortWithinPartitions(f.col("start").asc).rdd
val newRdd = rdd.mapPartitions { iter =>
if (!iter.hasNext) Seq[Seq[String]]().toIterator
else {
val inputStream = SparkTarfileUtil.buildInputStreamFromIterator(iter)

val fileNames = new ArrayBuffer[String]()
val tarInputStream = new TarInputStream(inputStream)

var entry: TarEntry = tarInputStream.getNextEntry
while (entry != null) {
fileNames += entry.getName
entry = tarInputStream.getNextEntry
}
tarInputStream.close()
inputStream.close()
Seq(fileNames.toSeq).toIterator
}
}.flatMap(item => item).map(item => Row.fromSeq(Seq(item)))
val ds = d.df.sparkSession.createDataFrame(newRdd, StructType(Array(StructField("files", StringType))))
ResultResp(ds, d.name)
} else d
}
}
17 changes: 0 additions & 17 deletions streamingpro-mlsql/src/main/java/tech/mlsql/tool/ObjectCount.scala

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package tech.mlsql.tool

import java.io.InputStream

import org.apache.spark.sql.Row

/**
* 2019-05-20 WilliamZhu([email protected])
*/
object SparkTarfileUtil {
def buildInputStreamFromIterator(iter: Iterator[Row]) = {
var currentBlockRow = iter.next()
var currentBuf = currentBlockRow.getAs[Array[Byte]]("value")
var currentBufPos = 0
val inputStream = new InputStream {
override def read(): Int = {
if (currentBufPos == currentBuf.length) {
val hasNext = iter.hasNext
if (hasNext) {
currentBlockRow = iter.next()
currentBuf = currentBlockRow.getAs[Array[Byte]]("value")
currentBufPos = 0
} else {
return -1
}
}
val b = currentBuf(currentBufPos)
currentBufPos += 1
b & 0xFF
}
}
inputStream
}
}

0 comments on commit 86919ff

Please sign in to comment.