Skip to content

Commit

Permalink
merge code and fixed issue alibaba#2755 alibaba#2756 alibaba#2735 ali…
Browse files Browse the repository at this point in the history
  • Loading branch information
agapple committed Aug 22, 2020
1 parent 90bf9e6 commit b79af46
Show file tree
Hide file tree
Showing 66 changed files with 456 additions and 474 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
language: java
sudo: false # faster builds

jdk:
- oraclejdk12
Expand Down
7 changes: 0 additions & 7 deletions admin/admin-ui/src/api/canalInstance.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,6 @@ export function instanceLog(id, nodeId) {
})
}

export function instanceMeta(id, nodeId) {
return request({
url: '/canal/instance/meta/' + id + '/' + nodeId,
method: 'get'
})
}

export function instanceStatus(id, option) {
return request({
url: '/canal/instance/status/' + id + '?option=' + option,
Expand Down
7 changes: 0 additions & 7 deletions admin/admin-ui/src/router/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,6 @@ export const constantRoutes = [
component: () => import('@/views/canalServer/CanalInstanceLogDetail'),
meta: { title: 'Instance 日志' },
hidden: true
},
{
path: 'canalInstance/meta',
name: 'Instance meta',
component: () => import('@/views/canalServer/CanalInstanceMetaDetail'),
meta: { title: 'Instance Meta' },
hidden: true
}
]
},
Expand Down
9 changes: 0 additions & 9 deletions admin/admin-ui/src/views/canalServer/CanalInstance.vue
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@
<el-dropdown-item @click.native="handleStart(scope.row)">启动</el-dropdown-item>
<el-dropdown-item @click.native="handleStop(scope.row)">停止</el-dropdown-item>
<el-dropdown-item @click.native="handleLog(scope.row)">日志</el-dropdown-item>
<el-dropdown-item @click.native="handleMeta(scope.row)">meta</el-dropdown-item>
</el-dropdown-menu>
</el-dropdown>
</template>
Expand Down Expand Up @@ -118,7 +117,6 @@ export default {
}
},
created() {
this.listQuery.name = this.$route.query.name
getClustersAndServers().then((res) => {
this.options = res.data
})
Expand Down Expand Up @@ -224,13 +222,6 @@ export default {
return
}
this.$router.push('canalInstance/log?id=' + row.id + '&nodeId=' + row.nodeServer.id)
},
handleMeta(row) {
if (row.nodeId === null) {
this.$message({ message: '当前Instance不是启动状态,无法查看meta', type: 'warning' })
return
}
this.$router.push('canalInstance/meta?id=' + row.id + '&nodeId=' + row.nodeServer.id)
}
}
}
Expand Down
53 changes: 0 additions & 53 deletions admin/admin-ui/src/views/canalServer/CanalInstanceMetaDetail.vue

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,4 @@ public interface AdminConnector {
*/
String instanceLog(String destination, String fileName, int lines);

/**
* meta
* @param destination
* @param fileName
* @return
*/
String instanceMeta(String destination, String fileName);

}
Original file line number Diff line number Diff line change
Expand Up @@ -206,11 +206,6 @@ public String instanceLog(String destination, String fileName, int lines) {
return doLogAdmin("instance", "file", destination, fileName, lines);
}

@Override
public String instanceMeta(final String destination, final String fileName) {
return doLogAdmin("meta", "file", destination, fileName,100);
}

// ==================== helper method ====================

private String doServerAdmin(String action) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,20 +161,6 @@ public BaseModel<Map<String, String>> instanceLog(@PathVariable Long id, @PathVa
return BaseModel.getInstance(canalInstanceConfigService.remoteInstanceLog(id, nodeId));
}

/**
* 获取实例meta信息
*
* @param id
* @param nodeId
* @param env
* @return
*/
@GetMapping(value = "/instance/meta/{id}/{nodeId}")
public BaseModel<Map<String, String>> meta(@PathVariable Long id, @PathVariable Long nodeId,
@PathVariable String env) {
return BaseModel.getInstance(canalInstanceConfigService.remoteInstanceMeta(id, nodeId));
}

/**
* 通过Server id获取所有活动的Instance
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ public interface CanalInstanceService {

Map<String, String> remoteInstanceLog(Long id, Long nodeId);

Map<String, String> remoteInstanceMeta(Long id, Long nodeId);

boolean remoteOperation(Long id, Long nodeId, String option);

boolean instanceOperation(Long id, String option);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package com.alibaba.otter.canal.admin.service.impl;

import com.alibaba.otter.canal.common.zookeeper.ZkClientx;
import io.ebean.Query;

import java.security.NoSuchAlgorithmException;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -258,33 +256,6 @@ public Map<String, String> remoteInstanceLog(Long id, Long nodeId) {
return result;
}

@Override
public Map<String, String> remoteInstanceMeta(final Long id, final Long nodeId) {
Map<String, String> result = new HashMap<>();

NodeServer nodeServer = NodeServer.find.byId(nodeId);
if (nodeServer == null) {
return result;
}
CanalInstanceConfig canalInstanceConfig = CanalInstanceConfig.find.byId(id);
if (canalInstanceConfig == null) {
return result;
}
String meta;
if (nodeServer.getCanalCluster() != null) {
ZkClientx zkClientx = ZkClientx.getZkClient(nodeServer.getCanalCluster().getZkHosts());
String zkPath = MessageFormat.format("/{0}/{1}/{2}/{3}/{4}/{5}", "otter", "canal", "destinations", canalInstanceConfig.getName(), "1001", "cursor");
meta = new String((byte[]) zkClientx.readData(zkPath));
} else {
meta = SimpleAdminConnectors.execute(nodeServer.getIp(),
nodeServer.getAdminPort(),
adminConnector -> adminConnector.instanceMeta(canalInstanceConfig.getName(), "meta.dat"));
}
result.put("instance", canalInstanceConfig.getName());
result.put("meta", meta);
return result;
}

public boolean remoteOperation(Long id, Long nodeId, String option) {
NodeServer nodeServer = null;
if ("start".equals(option)) {
Expand Down
77 changes: 50 additions & 27 deletions admin/admin-web/src/main/resources/canal-template.properties
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd
canal.user = canal
canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458
# canal.user = canal
# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458

# canal admin config
canal.admin.manager = 127.0.0.1:8089
#canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
Expand All @@ -21,7 +21,7 @@ canal.zkServers =
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, RocketMQ
# tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = tcp
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
Expand Down Expand Up @@ -86,14 +86,10 @@ canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360

# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =

#################################################
######### destinations #############
#################################################
canal.destinations =
canal.destinations =
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
Expand All @@ -111,29 +107,56 @@ canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml

##################################################
######### MQ #############
######### MQ Properties #############
##################################################
canal.mq.servers = 127.0.0.1:6667
canal.mq.retries = 0
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 100
canal.mq.bufferMemory = 33554432
# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =
canal.aliyun.uid=

canal.mq.flatMessage = true
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
#canal.mq.properties. =
canal.mq.producerGroup = test
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = local
# aliyun mq namespace
#canal.mq.namespace =

canal.mq.database.hash = true
canal.mq.send.thread.size = 30
canal.mq.build.thread.size = 8

##################################################
######### Kafka #############
##################################################
kafka.bootstrap.servers = 127.0.0.1:6667
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0

kafka.kerberos.enable = false
kafka.kerberos.krb5.file = "../conf/kerberos/krb5.conf"
kafka.kerberos.jaas.file = "../conf/kerberos/jaas.conf"

##################################################
######### RocketMQ #############
##################################################
rocketmq.producer.group = test
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =
rocketmq.namespace =
rocketmq.namesrv.addr = 127.0.0.1:9876
rocketmq.retry.times.when.send.failed = 0
rocketmq.vip.channel.enabled = false

##################################################
######### Kafka Kerberos Info #############
######### RabbitMQ #############
##################################################
canal.mq.kafka.kerberos.enable = false
canal.mq.kafka.kerberos.krb5FilePath = "../conf/kerberos/krb5.conf"
canal.mq.kafka.kerberos.jaasFilePath = "../conf/kerberos/jaas.conf"
rabbitmq.host =
rabbitmq.virtual.host =
rabbitmq.exchange =
rabbitmq.username =
rabbitmq.password =
2 changes: 1 addition & 1 deletion admin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.17</version>
<version>5.1.48</version>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ private Set<String> getNames(Iterable<String> prefixes) {
PropertyDescriptor[] descriptors = BeanUtils.getPropertyDescriptors(this.target.getClass());
for (PropertyDescriptor descriptor : descriptors) {
String name = descriptor.getName();
if (!"class".equals(name)) {
if (!name.equals("class")) {
RelaxedNames relaxedNames = RelaxedNames.forCamelCase(name);
if (prefixes == null) {
for (String relaxedName : relaxedNames) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ private static boolean isText(String columnType) {

public static Object typeConvert(String tableName ,String columnName, String value, int sqlType, String mysqlType) {
if (value == null
|| ("".equals(value) && !(isText(mysqlType) || sqlType == Types.CHAR || sqlType == Types.VARCHAR || sqlType == Types.LONGVARCHAR))) {
|| (value.equals("") && !(isText(mysqlType) || sqlType == Types.CHAR || sqlType == Types.VARCHAR || sqlType == Types.LONGVARCHAR))) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public static List<Dml> parse4Dml(String destination, String groupId, Message me
CanalEntry.EventType eventType = rowChange.getEventType();

final Dml dml = new Dml();
dml.setIsDdl(rowChange.getIsDdl());
dml.setDestination(destination);
dml.setGroupId(groupId);
dml.setDatabase(entry.getHeader().getSchemaName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,11 @@ public void sync(ESSyncConfig config, Dml dml) {
long begin = System.currentTimeMillis();

String type = dml.getType();
if (type != null && "INSERT".equalsIgnoreCase(type)) {
if (type != null && type.equalsIgnoreCase("INSERT")) {
insert(config, dml);
} else if (type != null && "UPDATE".equalsIgnoreCase(type)) {
} else if (type != null && type.equalsIgnoreCase("UPDATE")) {
update(config, dml);
} else if (type != null && "DELETE".equalsIgnoreCase(type)) {
} else if (type != null && type.equalsIgnoreCase("DELETE")) {
delete(config, dml);
} else {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ public void setColumns(Map<String, String> columns) {
columnItem.setRowKey(true);
rowKeyColumn = columnItem;
} else {
if (field == null || "".equals(field)) {
if (field == null || field.equals("")) {
columnItem.setFamily(family);
columnItem.setQualifier(columnField.getKey());
} else {
Expand Down
Loading

0 comments on commit b79af46

Please sign in to comment.