Skip to content

Commit

Permalink
数据清洗和迁移
Browse files Browse the repository at this point in the history
  • Loading branch information
thedestiny committed Mar 26, 2024
1 parent ba71e03 commit 4f72126
Show file tree
Hide file tree
Showing 6 changed files with 302 additions and 0 deletions.
166 changes: 166 additions & 0 deletions data-cleaning/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.3.RELEASE</version>
<relativePath/>
</parent>

<groupId>com.platform</groupId>
<artifactId>data-cleaning</artifactId>
<version>1.0</version>
<name>data-cleaning</name>
<description>数据清洗和迁移服务</description>
<properties>

<java.version>1.8</java.version>
<hutool-version>5.8.9</hutool-version>

</properties>
<dependencies>

<!-- flink cdc 相关 start -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>1.14.0</version>
</dependency>

<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-mysql-cdc</artifactId>
<version>2.3.0</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.14.0</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.14.0</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_2.12</artifactId>
<version>1.14.0</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime_2.12</artifactId>
<version>1.14.0</version>
</dependency>


<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.29</version>
</dependency>

<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
</dependency>


<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<!-- <optional>true</optional>-->
<version>1.18.24</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>${hutool-version}</version>
</dependency>

<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.12</version>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>32.0.1-jre</version>
</dependency>

<dependency>
<groupId>com.mybatis-flex</groupId>
<artifactId>mybatis-flex-spring-boot-starter</artifactId>
<version>1.7.3</version>
</dependency>

<dependency>
<groupId>com.mybatis-flex</groupId>
<artifactId>mybatis-flex-processor</artifactId>
<version>1.7.3</version>
<scope>provided</scope>
</dependency>

<!-- 代码生成 -->
<!--<dependency>-->
<!--<groupId>com.mybatis-flex</groupId>-->
<!--<artifactId>mybatis-flex-codegen</artifactId>-->
<!--<version>1.7.3</version>-->
<!--</dependency>-->


<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
<version>3.12.4</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>


<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>

<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.4.0</version>
</dependency>


</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.platform.migrate;

import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
* marathon
*/

@Slf4j
@SpringBootApplication
public class DataCleaningApplication {

public static void main(String[] args) {
log.info("start server ");
SpringApplication.run(DataCleaningApplication.class, args);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.platform.migrate.flink;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.springframework.stereotype.Component;


@Slf4j
@Component
public class CustomSink extends RichSinkFunction<String> {

private static final long serialVersionUID = 6358342353769937189L;

@Override
public void invoke(String value, Context context) {
log.info("收到变更原始数据:{}", value);
JSONObject json = JSON.parseObject(value);
// 数据操作类型 c 新增数据 u 更新数据 d 删除数据
String op = json.getString("op");
JSONObject before = json.getJSONObject("before");
JSONObject after = json.getJSONObject("after");
// ts_ms
JSONObject source = json.getJSONObject("source");
log.info("information time {} db {} table {}", source.getString("ts_ms"), source.getString("db"), source.getString("table"));
if ("c".equals(op)) {
log.info("insert data \n{} \n{}", before, after);
}
if ("u".equals(op)) {
log.info("update data \n{} \n{}", before, after);
}
if ("d".equals(op)) {
log.info("delete data \n{} \n{}", before, after);
}
}



}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package com.platform.migrate.flink;


import lombok.extern.slf4j.Slf4j;

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;


@Slf4j
@Component
public class MySqlEventListener implements CommandLineRunner {


@Autowired
private CustomSink customSink;

@Override
public void run(String... args) throws Exception {
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("127.0.0.1")
.port(3306)
// 设置捕获的数据库,如果需要同步整个数据库,请将 tableList 设置为 ".*".
.databaseList("student")
// 设置捕获的表
.tableList("student.student_userinfo")
.username("root")
.password("123456")
// 将 SourceRecord 转换为 JSON 字符串
.deserializer(new JsonDebeziumDeserializationSchema())
// latest:只进行增量导入(不读取历史变化) (默认)initial初始化快照,即全量导入后增量导入(检测更新数据写入)
.startupOptions(StartupOptions.latest())
// 设置时区
.serverTimeZone("Asia/Shanghai")
.build();

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 设置 3s 的 checkpoint 间隔
env.enableCheckpointing(3000);
DataStreamSource<String> streamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
// 设置 source 节点的并行度为 1
.setParallelism(1);

// 设置 sink 节点并行度为 1
streamSource.addSink(customSink).setParallelism(1);

env.execute("Print MySQL Snapshot + Binlog");
}



}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package com.platform.migrate.web;

public class IndexController {
}
12 changes: 12 additions & 0 deletions data-cleaning/src/main/resources/application.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
server:
port: 9098
spring:
profiles:
active: dev

logging:
level:
com.platform.migrate: info
org.apache.flink.runtime.checkpoint: warn
org.apache.flink.runtime.source.coordinator.SourceCoordinator: warn
com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader: warn

0 comments on commit 4f72126

Please sign in to comment.