Skip to content

Commit

Permalink
add df script compositor to spark streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
allwefantasy committed Jul 15, 2017
1 parent 8e84822 commit 31d12ab
Show file tree
Hide file tree
Showing 8 changed files with 459 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,12 @@ class DefaultShortNameMapping extends ShortNameMapping {
"stream.output.csv" -> "streaming.core.compositor.spark.streaming.output.SQLCSVOutputCompositor",
"stream.output.parquet" -> "streaming.core.compositor.spark.streaming.output.SQLParquetOutputCompositor",
"stream.output.es" -> "streaming.core.compositor.spark.streaming.output.SQLESOutputCompositor",
"stream.script" -> "streaming.core.compositor.spark.transformation.ScriptCompositor",
"stream.script.df" -> "streaming.core.compositor.spark.transformation.DFScriptCompositor",
"stream.script.df" -> "streaming.core.compositor.spark.streaming.transformation.DFScriptCompositor",
"stream.output.carbondata" -> "streaming.core.compositor.spark.streaming.output.CarbonDataOutputCompositor",
"stream.output" -> "streaming.core.compositor.spark.streaming.output.MultiSQLOutputCompositor",
"stream.output.console" -> "streaming.core.compositor.spark.streaming.output.ConsoleOutputCompositor",
"stream.output.unittest" -> "streaming.core.compositor.spark.streaming.output.SQLUnitTestCompositor",
"stream.output.print" -> "streaming.core.compositor.spark.streaming.output.SQLPrintOutputCompositor",
"stream.script.df" -> "streaming.core.compositor.spark.transformation.DFScriptCompositor",

"ss.sources" -> "streaming.core.compositor.spark.ss.source.MultiSQLSourceCompositor",
"ss.sql" -> "streaming.core.compositor.spark.ss.transformation.SQLCompositor",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ object ManagerConfiguration {
config.getParam("yarnUrl")
}

def sparkSqlServer = {
config.getParam("sparkSqlServer", "")
}

def env = {
//eg. export SPARK_HOME=/opt/spark-2.1.1;export HADOOP_CONF_DIR=/etc/hadoop/conf;cd $SPARK_HOME;
//eg. source /etc/profile ;cd $SPARK_HOME ;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ class RestController extends ApplicationController {

DB

@At(path = Array("/query.html"), types = Array(GET))
def query_index = {
renderHtml(200, "/rest/query.vm", pv(Map("sparkSqlServer" -> ManagerConfiguration.sparkSqlServer)))
}

@At(path = Array("/query"), types = Array(GET))
def query = {
}

@At(path = Array("/spark_monitor"), types = Array(GET, POST))
def spark_monitor = {
Expand Down Expand Up @@ -227,6 +235,8 @@ class RestController extends ApplicationController {
navBuffer += HtmlHelper.link(url = "/submit_job.html", name = "提交任务", style = active("/submit_job.html"))
navBuffer += HtmlHelper.link(url = "/jobs.html", name = "任务管理", style = active("/jobs.html"))
navBuffer += HtmlHelper.link(url = "/upload.html", name = "Jar包上传", style = active("/upload.html"))
navBuffer += HtmlHelper.link(url = "/query.html", name = "Spark SQL Server", style = active("/query.html"))
navBuffer
}

def view(obj: AnyRef) = {
Expand Down
155 changes: 155 additions & 0 deletions streamingpro-manager/src/main/resources-local/rest/query.vm
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
<!DOCTYPE html>
<html lang="UTF-8">
<head>
<meta http-equiv="X-UA-Compatible" content="IE=edge,chrome=1"/>
<meta charset="utf-8"/>
<title>Spark-Sql查询工具</title>

<meta name="description" content="Dynamic tables and grids using jqGrid plugin"/>
<meta name="viewport" content="width=device-width, initial-scale=1.0, maximum-scale=1.0"/>
#parse("/rest/header.vm")
</head>

<body class="no-skin">

<div class="container">
#parse("/rest/menu.vm")
</div>

<div class="container">
<!-- /section:basics/sidebar -->
<div class="main-content">
<!-- /section:basics/content.breadcrumbs -->
<div class="page-content">

<!-- /section:settings.box -->
<div class="page-content-area">
<div class="row">
<div class="col-xs-12">
<!-- PAGE CONTENT BEGINS -->
<div class="widget-box ">
<div class="widget-body">
<div class="widget-main">
<!-- PAGE CONTENT BEGINS -->
<form class="form-inline" action="/query" method="post" target="_blank">
<label class="inline">
<span class="lbl">查询语句:</span>
<textarea class="form-control" name="sql" placeholder="SQL"
rows="8" cols="130"></textarea>
</label>
<label class="inline">
<span class="lbl">结果类型:</span>
<select class="form-control" name="resultType">
<option value="html">html</option>
<option value="json">json</option>
<option value="csv">csv</option>
</select>
</label>
<button type="button" class="btn btn-info btn-sm">
<i class="ace-icon fa fa-key bigger-110"></i>提交
</button>
</form>

<form class="form-inline">
<label class="inline">
<span class="lbl">设定表名:</span>
<input type="text" class="form-control" name="tablename"/>
</label>
<label class="inline">
<span class="lbl">文件路径:</span>
<input type="text" class="form-control" name="tablepath"/>
</label>
<label class="inline">
<span class="lbl">驱动:</span>
<select class="form-control" name="loader_clzz">
<option value="org.elasticsearch.spark.sql">elasticsearch</option>
<option value="parquet">parquet</option>
<option value="org.apache.spark.sql.execution.datasources.hdfs">hdfs
</option>
</select>
</label>
<label class="inline">
<span class="lbl">驱动参数:</span>
<textarea class="form-control" name="loader_param" cols="50"
rows="5"/></textarea>
</label>
<button type="button" class="btn btn-info btn-sm">
添加表
</button>
<button type="button" class="btn btn-info btn-sm">
删除表
</button>
</form>

</div>
</div>
</div>

</div>
</div>
</div><!-- /.page-content-area -->
</div><!-- /.page-content -->
</div><!-- /.main-content -->

<div class="footer">
<div class="footer-inner">

</div>
</div>

<a href="#" id="btn-scroll-up" class="btn-scroll-up btn btn-sm btn-inverse">
<i class="ace-icon fa fa-angle-double-up icon-only bigger-110"></i>
</a>
</div><!-- /.main-container -->


<!-- inline scripts related to this page -->
<script type="text/javascript">
jQuery(function($) {
var eTemplate=$(".widget-main");
var templateFormContainer=$(".form-inline+.form-inline");
var eBtns=templateFormContainer.find("button");
$(eBtns[0]).click(function(){
eTemplate.append(templateFormContainer.clone(true));
});
$(eBtns[1]).click(function(){
if(eTemplate.find(".form-inline").length>2) $(this).closest(".form-inline").remove();
});

var submitForm=$(".form-inline")[0];
$("button",submitForm).click(function(){
var jform=$(submitForm);
$(":input[type='hidden']",submitForm).each(function(k,v){v.remove()});
$(".form-inline+.form-inline").each(function(k,v){
var tablename=$(":input[name='tablename']",v).val();
var tablepath=$(":input[name='tablepath']",v).val();
var loader_clzz=$(":input[name='loader_clzz']",v).val();
var loader_param=$(":input[name='loader_param']",v).val();

if(loader_param){
var params= loader_param.split(";");
if(params.length>0)
$.each(params,function(k1,v1){
var param=v1.split("=");
if(param && param.length==2){
jform.append($("<input type=\"hidden\" name=\"loader_param."+tablename+"."+param[0]+"\" value=\""+param[1]+"\"/>"));
}
})
}
jform.append($("<input type=\"hidden\" name=\"tableName."+tablename+"\" value=\""+tablepath+"\"/>"));
jform.append($("<input type=\"hidden\" name=\"loader_clzz."+tablename+"\" value=\""+loader_clzz+"\"/>"));
});
submitForm.submit();
});

});







</script>
</body>
</html>
155 changes: 155 additions & 0 deletions streamingpro-manager/src/main/resources-online/rest/query.vm
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
<!DOCTYPE html>
<html lang="UTF-8">
<head>
<meta http-equiv="X-UA-Compatible" content="IE=edge,chrome=1"/>
<meta charset="utf-8"/>
<title>Spark-Sql查询工具</title>

<meta name="description" content="Dynamic tables and grids using jqGrid plugin"/>
<meta name="viewport" content="width=device-width, initial-scale=1.0, maximum-scale=1.0"/>
#parse("/rest/header.vm")
</head>

<body class="no-skin">

<div class="container">
#parse("/rest/menu.vm")
</div>

<div class="container">
<!-- /section:basics/sidebar -->
<div class="main-content">
<!-- /section:basics/content.breadcrumbs -->
<div class="page-content">

<!-- /section:settings.box -->
<div class="page-content-area">
<div class="row">
<div class="col-xs-12">
<!-- PAGE CONTENT BEGINS -->
<div class="widget-box ">
<div class="widget-body">
<div class="widget-main">
<!-- PAGE CONTENT BEGINS -->
<form class="form-inline" action="/query" method="post" target="_blank">
<label class="inline">
<span class="lbl">查询语句:</span>
<textarea class="form-control" name="sql" placeholder="SQL"
rows="8" cols="130"></textarea>
</label>
<label class="inline">
<span class="lbl">结果类型:</span>
<select class="form-control" name="resultType">
<option value="html">html</option>
<option value="json">json</option>
<option value="csv">csv</option>
</select>
</label>
<button type="button" class="btn btn-info btn-sm">
<i class="ace-icon fa fa-key bigger-110"></i>提交
</button>
</form>

<form class="form-inline">
<label class="inline">
<span class="lbl">设定表名:</span>
<input type="text" class="form-control" name="tablename"/>
</label>
<label class="inline">
<span class="lbl">文件路径:</span>
<input type="text" class="form-control" name="tablepath"/>
</label>
<label class="inline">
<span class="lbl">驱动:</span>
<select class="form-control" name="loader_clzz">
<option value="org.elasticsearch.spark.sql">elasticsearch</option>
<option value="parquet">parquet</option>
<option value="org.apache.spark.sql.execution.datasources.hdfs">hdfs
</option>
</select>
</label>
<label class="inline">
<span class="lbl">驱动参数:</span>
<textarea class="form-control" name="loader_param" cols="50"
rows="5"/></textarea>
</label>
<button type="button" class="btn btn-info btn-sm">
添加表
</button>
<button type="button" class="btn btn-info btn-sm">
删除表
</button>
</form>

</div>
</div>
</div>

</div>
</div>
</div><!-- /.page-content-area -->
</div><!-- /.page-content -->
</div><!-- /.main-content -->

<div class="footer">
<div class="footer-inner">

</div>
</div>

<a href="#" id="btn-scroll-up" class="btn-scroll-up btn btn-sm btn-inverse">
<i class="ace-icon fa fa-angle-double-up icon-only bigger-110"></i>
</a>
</div><!-- /.main-container -->


<!-- inline scripts related to this page -->
<script type="text/javascript">
jQuery(function($) {
var eTemplate=$(".widget-main");
var templateFormContainer=$(".form-inline+.form-inline");
var eBtns=templateFormContainer.find("button");
$(eBtns[0]).click(function(){
eTemplate.append(templateFormContainer.clone(true));
});
$(eBtns[1]).click(function(){
if(eTemplate.find(".form-inline").length>2) $(this).closest(".form-inline").remove();
});

var submitForm=$(".form-inline")[0];
$("button",submitForm).click(function(){
var jform=$(submitForm);
$(":input[type='hidden']",submitForm).each(function(k,v){v.remove()});
$(".form-inline+.form-inline").each(function(k,v){
var tablename=$(":input[name='tablename']",v).val();
var tablepath=$(":input[name='tablepath']",v).val();
var loader_clzz=$(":input[name='loader_clzz']",v).val();
var loader_param=$(":input[name='loader_param']",v).val();

if(loader_param){
var params= loader_param.split(";");
if(params.length>0)
$.each(params,function(k1,v1){
var param=v1.split("=");
if(param && param.length==2){
jform.append($("<input type=\"hidden\" name=\"loader_param."+tablename+"."+param[0]+"\" value=\""+param[1]+"\"/>"));
}
})
}
jform.append($("<input type=\"hidden\" name=\"tableName."+tablename+"\" value=\""+tablepath+"\"/>"));
jform.append($("<input type=\"hidden\" name=\"loader_clzz."+tablename+"\" value=\""+loader_clzz+"\"/>"));
});
submitForm.submit();
});

});







</script>
</body>
</html>
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ object LocalSparkStreamingApp {
"-streaming.rest", "false",
"-streaming.driver.port", "9003",
"-streaming.platform", "spark_streaming",
"-streaming.job.file.path", "classpath:///test/spark-streaming.json"
"-streaming.job.file.path", "classpath:///test/streaming-example.json"
))
}
}
Loading

0 comments on commit 31d12ab

Please sign in to comment.