Skip to content

Commit

Permalink
Merge pull request snowflakedb#103 from binglihub/beta
Browse files Browse the repository at this point in the history
fix bug: creating temporary stage in data sink
  • Loading branch information
binglihub authored Feb 13, 2019
2 parents 237e843 + e222c13 commit 272889f
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,11 @@ class SnowflakeSink(

private implicit val conn = DefaultJDBCWrapper.getConnector(param)

private val stageName: String = param.streamingStage.get
private val stageName: String = {
val name = param.streamingStage.get
conn.createStage(name, overwrite = false, temporary = false)
name
}

private implicit val storage: CloudStorage =
CloudStorageOperations.createStorageClient(param, conn, false, Some(stageName))._1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ object CloudStorageOperations {
private final val AZ_KEY = "EncryptedKey"
private final val AZ_MATDESC = "matdesc"

private val log = LoggerFactory.getLogger(getClass)
val log = LoggerFactory.getLogger(getClass)

private[io] final def getDecryptedStream(
stream: InputStream,
Expand Down Expand Up @@ -427,6 +427,11 @@ sealed trait CloudStorage {
outputStream.write('\n')
}
outputStream.close()

CloudStorageOperations.log.info(
s"upload file $directory/$fileName to stage"
)

new SingleElementIterator(s"$directory/$fileName")
}
}
Expand Down

0 comments on commit 272889f

Please sign in to comment.