Skip to content

Commit

Permalink
JDBCHelper Session粒度初步分析
Browse files Browse the repository at this point in the history
  • Loading branch information
zlren committed Apr 27, 2017
1 parent e6a1ae6 commit c89658b
Show file tree
Hide file tree
Showing 15 changed files with 1,030 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package lab.zlren.sparkproject.conf;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

/**
* 配置管理组件
* 从properties文件中读取配置项,提供外接读取某个配置key对应的方法
* Created by zlren on 2017/4/26.
*/
public class ConfigurationManager {

private static Properties properties = new Properties();

/**
* 在静态代码块中读取配置文件
*/
static {
try {
InputStream inputStream = ConfigurationManager.class.getClassLoader().getResourceAsStream(
"conf.properties");
properties.load(inputStream);
} catch (IOException e) {
e.printStackTrace();
}
}

public static String getProperty(String key) {
return properties.getProperty(key);
}

public static Integer getInteger(String key) {
return Integer.valueOf(properties.getProperty(key));
}

public static Boolean getBoolean(String key) {
return Boolean.valueOf(properties.getProperty(key));
}
}
46 changes: 46 additions & 0 deletions src/main/java/lab/zlren/sparkproject/constant/Constants.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package lab.zlren.sparkproject.constant;

/**
* 常量接口
* Created by zlren on 2017/4/26.
*/
public interface Constants {

/**
* 数据库相关的常量
*/
String JDBC_DRIVER = "jdbc.driver";
String JDBC_DATASOURCE_SIZE = "jdbc.datasource.size";
String JDBC_URL = "jdbc.url";
String JDBC_USER = "jdbc.user";
String JDBC_PASSWORD = "jdbc.password";
String SPARK_LOCAL = "spark.local";

/**
* Spark作业相关的常量
*/
String SPARK_APP_NAME_SESSION = "UserVisitSessionAnalyzeSpark";

String FIELD_SESSION_ID = "sessionid";
String FIELD_SEARCH_KEYWORDS = "searchKeywords";
String FIELD_CLICK_CATEGORY_IDS = "clickCategoryIds";
String FIELD_AGE = "age";
String FIELD_PROFESSIONAL = "professional";
String FIELD_CITY = "city";
String FIELD_SEX = "sex";
String FIELD_VISIT_LENGTH = "visitLength";
String FIELD_STEP_LENGTH = "stepLength";
String FIELD_START_TIME = "startTime";
String FIELD_CLICK_COUNT = "clickCount";
String FIELD_ORDER_COUNT = "orderCount";
String FIELD_PAY_COUNT = "payCount";
String FIELD_CATEGORY_ID = "categoryid";

/**
* 任务相关常量
*/
String PARAM_START_DATE = "startDate";
String PARAM_END_DATE = "endDate";


}
11 changes: 11 additions & 0 deletions src/main/java/lab/zlren/sparkproject/dao/DAOFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package lab.zlren.sparkproject.dao;

/**
* dao工厂
* Created by zlren on 2017/4/26.
*/
public class DAOFactory {
public static TaskDAO getTaskDAO() {
return new TaskDAO();
}
}
48 changes: 48 additions & 0 deletions src/main/java/lab/zlren/sparkproject/dao/TaskDAO.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package lab.zlren.sparkproject.dao;

import lab.zlren.sparkproject.domain.Task;
import lab.zlren.sparkproject.jdbc.JDBCHelper;

import java.sql.ResultSet;

/**
* Created by zlren on 2017/4/26.
*/
public class TaskDAO {
public Task findById(long taskid) {
final Task task = new Task();

String sql = "select * from task where task_id = ?";
Object[] params = new Object[]{taskid};

JDBCHelper jdbcHelper = JDBCHelper.getInstance();
jdbcHelper.executeQuery(sql, params, new JDBCHelper.QueryCallback() {

@Override
public void process(ResultSet rs) throws Exception {
if (rs.next()) {
long taskid = rs.getLong(1);
String taskName = rs.getString(2);
String createTime = rs.getString(3);
String startTime = rs.getString(4);
String finishTime = rs.getString(5);
String taskType = rs.getString(6);
String taskStatus = rs.getString(7);
String taskParam = rs.getString(8);

task.setTaskid(taskid);
task.setTaskName(taskName);
task.setCreateTime(createTime);
task.setStartTime(startTime);
task.setFinishTime(finishTime);
task.setTaskType(taskType);
task.setTaskStatus(taskStatus);
task.setTaskParam(taskParam);
}
}

});

return task;
}
}
146 changes: 146 additions & 0 deletions src/main/java/lab/zlren/sparkproject/data/MockData.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package lab.zlren.sparkproject.data;

import lab.zlren.sparkproject.util.DateUtils;
import lab.zlren.sparkproject.util.StringUtils;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;

import java.util.*;

/**
* 模拟数据程序
* 生成了user_visit_action表的很多数据
* 还有user_info表的很多数据
*
* @author Administrator
*/
public class MockData {

/**
* 模拟数据
*
* @param sc
* @param sqlContext
*/
public static void mock(JavaSparkContext sc,
SQLContext sqlContext) {
List<Row> rows = new ArrayList<>();

String[] searchKeywords = new String[]{"火锅", "蛋糕", "重庆辣子鸡", "重庆小面",
"呷哺呷哺", "新辣道鱼火锅", "国贸大厦", "太古商场", "日本料理", "温泉"};
String date = DateUtils.getTodayDate();
String[] actions = new String[]{"search", "click", "order", "pay"};
Random random = new Random();

for (int i = 0; i < 100; i++) {
long userid = random.nextInt(100);

for (int j = 0; j < 10; j++) {
String sessionid = UUID.randomUUID().toString().replace("-", "");
String baseActionTime = date + " " + random.nextInt(23);

for (int k = 0; k < random.nextInt(100); k++) {
long pageid = random.nextInt(10);
String actionTime = baseActionTime + ":" + StringUtils.fulfuill(String.valueOf(random.nextInt(59)
)) + ":" + StringUtils.fulfuill(String.valueOf(random.nextInt(59)));
String searchKeyword = null;
Long clickCategoryId = null;
Long clickProductId = null;
String orderCategoryIds = null;
String orderProductIds = null;
String payCategoryIds = null;
String payProductIds = null;

String action = actions[random.nextInt(4)];
if ("search".equals(action)) {
searchKeyword = searchKeywords[random.nextInt(10)];
} else if ("click".equals(action)) {
clickCategoryId = Long.valueOf(String.valueOf(random.nextInt(100)));
clickProductId = Long.valueOf(String.valueOf(random.nextInt(100)));
} else if ("order".equals(action)) {
orderCategoryIds = String.valueOf(random.nextInt(100));
orderProductIds = String.valueOf(random.nextInt(100));
} else if ("pay".equals(action)) {
payCategoryIds = String.valueOf(random.nextInt(100));
payProductIds = String.valueOf(random.nextInt(100));
}

Row row = RowFactory.create(date, userid, sessionid,
pageid, actionTime, searchKeyword,
clickCategoryId, clickProductId,
orderCategoryIds, orderProductIds,
payCategoryIds, payProductIds);
rows.add(row);
}
}
}

JavaRDD<Row> rowsRDD = sc.parallelize(rows);

StructType schema = DataTypes.createStructType(Arrays.asList(
DataTypes.createStructField("date", DataTypes.StringType, true),
DataTypes.createStructField("user_id", DataTypes.LongType, true),
DataTypes.createStructField("session_id", DataTypes.StringType, true),
DataTypes.createStructField("page_id", DataTypes.LongType, true),
DataTypes.createStructField("action_time", DataTypes.StringType, true),
DataTypes.createStructField("search_keyword", DataTypes.StringType, true),
DataTypes.createStructField("click_category_id", DataTypes.LongType, true),
DataTypes.createStructField("click_product_id", DataTypes.LongType, true),
DataTypes.createStructField("order_category_ids", DataTypes.StringType, true),
DataTypes.createStructField("order_product_ids", DataTypes.StringType, true),
DataTypes.createStructField("pay_category_ids", DataTypes.StringType, true),
DataTypes.createStructField("pay_product_ids", DataTypes.StringType, true)));

DataFrame df = sqlContext.createDataFrame(rowsRDD, schema);

df.registerTempTable("user_visit_action");

for (Row _row : df.take(1)) {
System.out.println(_row);
}

/**
* ==================================================================
*/

rows.clear();
String[] sexes = new String[]{"male", "female"};
for (int i = 0; i < 100; i++) {
long userid = i;
String username = "user" + i;
String name = "name" + i;
int age = random.nextInt(60);
String professional = "professional" + random.nextInt(100);
String city = "city" + random.nextInt(100);
String sex = sexes[random.nextInt(2)];

Row row = RowFactory.create(userid, username, name, age,
professional, city, sex);
rows.add(row);
}

rowsRDD = sc.parallelize(rows);

StructType schema2 = DataTypes.createStructType(Arrays.asList(
DataTypes.createStructField("user_id", DataTypes.LongType, true),
DataTypes.createStructField("username", DataTypes.StringType, true),
DataTypes.createStructField("name", DataTypes.StringType, true),
DataTypes.createStructField("age", DataTypes.IntegerType, true),
DataTypes.createStructField("professional", DataTypes.StringType, true),
DataTypes.createStructField("city", DataTypes.StringType, true),
DataTypes.createStructField("sex", DataTypes.StringType, true)));

DataFrame df2 = sqlContext.createDataFrame(rowsRDD, schema2);
for (Row _row : df2.take(1)) {
System.out.println(_row);
}

df2.registerTempTable("user_info");
}
}
84 changes: 84 additions & 0 deletions src/main/java/lab/zlren/sparkproject/domain/Task.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package lab.zlren.sparkproject.domain;

import java.io.Serializable;

/**
* 任务
* <p>
* Created by zlren on 2017/4/26.
*/
public class Task implements Serializable {

private long taskid;
private String taskName;
private String createTime;
private String startTime;
private String finishTime;
private String taskType;
private String taskStatus;
private String taskParam;

public long getTaskid() {
return taskid;
}

public void setTaskid(long taskid) {
this.taskid = taskid;
}

public String getTaskName() {
return taskName;
}

public void setTaskName(String taskName) {
this.taskName = taskName;
}

public String getCreateTime() {
return createTime;
}

public void setCreateTime(String createTime) {
this.createTime = createTime;
}

public String getStartTime() {
return startTime;
}

public void setStartTime(String startTime) {
this.startTime = startTime;
}

public String getFinishTime() {
return finishTime;
}

public void setFinishTime(String finishTime) {
this.finishTime = finishTime;
}

public String getTaskType() {
return taskType;
}

public void setTaskType(String taskType) {
this.taskType = taskType;
}

public String getTaskStatus() {
return taskStatus;
}

public void setTaskStatus(String taskStatus) {
this.taskStatus = taskStatus;
}

public String getTaskParam() {
return taskParam;
}

public void setTaskParam(String taskParam) {
this.taskParam = taskParam;
}
}
Loading

0 comments on commit c89658b

Please sign in to comment.