Skip to content

Commit

Permalink
begin
Browse files Browse the repository at this point in the history
  • Loading branch information
sulei committed Mar 30, 2022
0 parents commit fcbab63
Show file tree
Hide file tree
Showing 10 changed files with 604 additions and 0 deletions.
26 changes: 26 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Compiled class file
*.class

# Log file
*.log

# BlueJ files
*.ctxt

# Mobile Tools for Java (J2ME)
.mtj.tmp/

# Package Files #
*.jar
*.war
*.nar
*.ear
*.zip
*.tar.gz
*.rar

# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
hs_err_pid*

*.iml
.idea
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# hudi-application
针对hudi做一些代码开发
19 changes: 19 additions & 0 deletions hudi-application-common/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>hudi-application</artifactId>
<groupId>com.shursulei</groupId>
<version>1.0.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>hudi-application-common</artifactId>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>

</project>
2 changes: 2 additions & 0 deletions hudi-application-spark/hudi-application-spark.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@

#hudi 基于spark的代码开发
78 changes: 78 additions & 0 deletions hudi-application-spark/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>hudi-application</artifactId>
<groupId>com.shursulei</groupId>
<version>1.0.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>hudi-application-spark</artifactId>

<properties>
<spark.version>2.3.0</spark.version>
<cupid.sdk.version>3.3.8-public</cupid.sdk.version>
<scala.version>2.11.8</scala.version>
<scala.binary.version>2.11</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>cupid-sdk</artifactId>
<version>${cupid.sdk.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>hadoop-fs-oss</artifactId>
<version>${cupid.sdk.version}</version>
</dependency>
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-spark-datasource_${scala.binary.version}</artifactId>
<version>${cupid.sdk.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-actors</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.12.1</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
7 changes: 7 additions & 0 deletions hudi-application-spark/spark_on_oss.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
spark on maxcompute and hudi on oss
版本
hudi 0.10.1
spark 版本2.4.5 spark-2.4.5-odps0.33.0
一、spark安装(使用版本,按照hudi官方的建议去做的)
参考阿里云的spark的版本安装
1、使用阿里云maxcompute->spark
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package com.shursulei;

import com.aliyun.odps.Odps;
import com.aliyun.odps.cupid.CupidSession;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.*;

import java.util.ArrayList;
import java.util.List;

import org.apache.spark.sql.types.StructField;

/**
* @author hanfeng
* @version 1.0
* @date 2022/3/30 12:11
*/
public class SparkOnOssAndOdps {
public static void main(String[] args) {
SparkSession spark = SparkSession
.builder()
.appName("SparkSQL-on-MaxCompute")
.config("spark.sql.broadcastTimeout", 20 * 60)
.config("spark.sql.crossJoin.enabled", true)
.config("odps.exec.dynamic.partition.mode", "nonstrict")
.getOrCreate();
JavaSparkContext sparkContext = new JavaSparkContext(spark.sparkContext());


String tableName = "mc_test_table";
String tableNameCopy = "mc_test_table_copy";
String ptTableName = "mc_test_pt_table";


spark.sql("DROP TABLE IF EXISTS " + tableName);
spark.sql("DROP TABLE IF EXISTS " + tableNameCopy);
spark.sql("DROP TABLE IF EXISTS " + ptTableName);

spark.sql("CREATE TABLE " + tableName + " (name STRING, num BIGINT)");
spark.sql("CREATE TABLE " + ptTableName + " (name STRING, num BIGINT) PARTITIONED BY (pt1 STRING, pt2 STRING)");

List<Integer> data = new ArrayList<Integer>();
for (int i = 0; i < 100; i++) {
data.add(i);
}

JavaRDD<Row> dfRDD = sparkContext.parallelize(data, 2).map(new Function<Integer, Row>() {
public Row call(Integer i) {
return RowFactory.create(
"name-" + i.toString(),
Long.valueOf(i));
}
});

JavaRDD<Row> ptDfRDD = sparkContext.parallelize(data, 2).map(new Function<Integer, Row>() {
public Row call(Integer i) {
return RowFactory.create(
"name-" + i.toString(),
Long.valueOf(i),
"2018",
"0601");
}
});

List<StructField> structFilelds = new ArrayList<StructField>();
structFilelds.add(DataTypes.createStructField("name", DataTypes.StringType, true));
structFilelds.add(DataTypes.createStructField("num", DataTypes.LongType, true));
Dataset<Row> df = spark.createDataFrame(dfRDD, DataTypes.createStructType(structFilelds));

structFilelds.add(DataTypes.createStructField("pt1", DataTypes.StringType, true));
structFilelds.add(DataTypes.createStructField("pt2", DataTypes.StringType, true));
Dataset<Row> ptDf = spark.createDataFrame(ptDfRDD, DataTypes.createStructType(structFilelds));

// 写 普通表
df.write().insertInto(tableName); // insertInto语义
df.write().mode("overwrite").insertInto(tableName);// insertOverwrite语义

// 读 普通表
Dataset<Row> rdf = spark.sql("select name, num from " + tableName);
System.out.println("rdf count: " + rdf.count());
rdf.printSchema();

//create table as select
spark.sql("CREATE TABLE " + tableNameCopy + " AS SELECT name, num FROM " + tableName);
spark.sql("SELECT * FROM " + tableNameCopy).show();

// 写 分区表
// DataFrameWriter 无法指定分区写入 需要通过临时表再用SQL写入特定分区
df.registerTempTable(ptTableName + "_tmp_view");
spark.sql("insert into table " + ptTableName + " partition (pt1='2018', pt2='0601') select * from " + ptTableName + "_tmp_view");
spark.sql("insert overwrite table " + ptTableName + " partition (pt1='2018', pt2='0601') select * from " + ptTableName + "_tmp_view");

ptDf.write().insertInto(ptTableName);// 动态分区 insertInto语义
ptDf.write().mode("overwrite").insertInto(ptTableName); // 动态分区 insertOverwrite语义

// 读 分区表
Dataset<Row> rptdf = spark.sql("select name, num, pt1, pt2 from " + ptTableName + " where pt1 = '2018' and pt2 = '0601'");
System.out.println("rptdf count: " + rptdf.count());
rptdf.printSchema();

Odps odps = CupidSession.get().odps();
System.out.println(odps.tables().get(ptTableName).getPartitions().size());
System.out.println(odps.tables().get(ptTableName).getPartitions().get(0).getPartitionSpec());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package com.shursulei.util;

import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.logging.log4j.core.Appender;
import org.apache.logging.log4j.core.LoggerContext;
import org.apache.logging.log4j.core.appender.ConsoleAppender;
import org.apache.logging.log4j.core.config.AppenderRef;
import org.apache.logging.log4j.core.config.Configuration;
import org.apache.logging.log4j.core.config.LoggerConfig;
import org.apache.logging.log4j.core.layout.PatternLayout;

/**
* @author hanfeng
* @version 1.0
* @date 2022/3/30 12:15
*/
public class ConfigLog4j {
// private static final LoggerContext CONTEXT;
// public static final String DEFAULT_APPENDER = "MY_STDOUT";
// public static final String
// DEFAULT_PATTERN =
// "%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5level %logger - %msg %ex %n";
//
// static {
// CONTEXT = (LoggerContext) LogManager.getContext(false);
// }
//
// /**
// * @Description: add specific logger for specific package
// * @Param: packageName, such as com.xx.yy
// * @return: void
// * @Author: [email protected]
// * @Date: 2020/12/29
// */
// public static void initPackageLogger(String packageName) {
// LoggerContext loggerContext = CONTEXT;
// Configuration config = loggerContext.getConfiguration();
//
// ConsoleAppender.Builder builder = ConsoleAppender.newBuilder();
// builder.setName(DEFAULT_APPENDER);
// builder.setLayout(PatternLayout.newBuilder().withPattern(DEFAULT_PATTERN).build());
// Appender stdoutAppender = builder.setTarget(ConsoleAppender.Target.SYSTEM_OUT).build();
// stdoutAppender.start();
//
// config.addAppender(stdoutAppender);
//
// AppenderRef ref = AppenderRef.createAppenderRef(DEFAULT_APPENDER, null, null);
// AppenderRef[] refs = new AppenderRef[]{ref};
//
// LoggerConfig
// loggerConfig =
// LoggerConfig.createLogger(false, Level.INFO, packageName,
// "true", refs, null, config, null);
// loggerConfig.addAppender(stdoutAppender, null, null);
// config.addLogger(packageName, loggerConfig);
//
// loggerContext.updateLoggers();
// }
}
14 changes: 14 additions & 0 deletions hudi-application-spark/src/main/resources/config.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
spark.hadoop.odps.project.name=
spark.hadoop.odps.access.id=
spark.hadoop.odps.access.key=
spark.hadoop.odps.end.point=
spark.sql.catalogImplementation=

spark.hadoop.fs.oss.accessKeyId = xxxxxx
spark.hadoop.fs.oss.accessKeySecret = xxxxxx
spark.hadoop.fs.oss.endpoint = oss-xxxxxx-internal.aliyuncs.com
# 此配置表明Spark是通过StsToken去访问OSS资源。
spark.hadoop.fs.oss.credentials.provider=org.apache.hadoop.fs.aliyun.oss.AliyunStsTokenCredentialsProvider

# 此配置是一键授权后产生的一个roleArn。
spark.hadoop.fs.oss.ststoken.roleArn=acs:ram::xxxxxxxxxxxxxxx:role/aliyunodpsdefaultrole
Loading

0 comments on commit fcbab63

Please sign in to comment.