From d003bd85b65b307e77867781986cc7d58c3ba80a Mon Sep 17 00:00:00 2001 From: zhangdonghao <39961809+hawk9821@users.noreply.github.com> Date: Fri, 26 Jul 2024 10:44:09 +0800 Subject: [PATCH] [Feature][Zeta] Added the metrics information of table statistics in multi-table mode (#7212) --- .../sink/multitablesink/MultiTableSink.java | 5 + .../engine/e2e/MultiTableMetricsIT.java | 125 ++++++++++++++++++ .../batch_fake_multi_table_to_console.conf | 64 +++++++++ .../engine/client/SeaTunnelClientTest.java | 114 ++++++++++++++++ .../batch_fake_multi_table_to_console.conf | 66 +++++++++ .../rest/RestHttpGetCommandProcessor.java | 74 ++++++++++- .../server/task/SeaTunnelSourceCollector.java | 53 ++++++-- .../server/task/SourceSeaTunnelTask.java | 13 +- .../server/task/flow/SinkFlowLifeCycle.java | 38 ++++++ 9 files changed, 537 insertions(+), 15 deletions(-) create mode 100644 seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/MultiTableMetricsIT.java create mode 100644 seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fake_multi_table_to_console.conf create mode 100644 seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fake_multi_table_to_console.conf diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java index bb04283ca68..923ecff8b88 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java @@ -25,6 +25,7 @@ import org.apache.seatunnel.api.sink.SinkCommitter; import org.apache.seatunnel.api.sink.SinkCommonOptions; import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.factory.MultiTableFactoryContext; import org.apache.seatunnel.api.table.type.SeaTunnelRow; @@ -149,6 +150,10 @@ public Optional> getCommitInfoSerializer() { return Optional.of(new MultiTableSinkAggregatedCommitter(aggCommitters)); } + public List getSinkTables() { + return sinks.keySet().stream().map(TablePath::of).collect(Collectors.toList()); + } + @Override public Optional> getAggregatedCommitInfoSerializer() { diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/MultiTableMetricsIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/MultiTableMetricsIT.java new file mode 100644 index 00000000000..59942eb4cc8 --- /dev/null +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/MultiTableMetricsIT.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.e2e; + +import org.apache.seatunnel.engine.client.SeaTunnelClient; +import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment; +import org.apache.seatunnel.engine.client.job.ClientJobProxy; +import org.apache.seatunnel.engine.common.config.ConfigProvider; +import org.apache.seatunnel.engine.common.config.JobConfig; +import org.apache.seatunnel.engine.common.config.SeaTunnelConfig; +import org.apache.seatunnel.engine.core.job.JobStatus; +import org.apache.seatunnel.engine.server.SeaTunnelServerStarter; +import org.apache.seatunnel.engine.server.rest.RestConstant; + +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import com.hazelcast.client.config.ClientConfig; +import com.hazelcast.instance.impl.HazelcastInstanceImpl; + +import java.util.Collections; +import java.util.concurrent.TimeUnit; + +import static io.restassured.RestAssured.given; +import static org.hamcrest.Matchers.equalTo; + +public class MultiTableMetricsIT { + + private static final String HOST = "http://localhost:"; + + private static ClientJobProxy batchJobProxy; + + private static HazelcastInstanceImpl node1; + + private static SeaTunnelClient engineClient; + + @BeforeEach + void beforeClass() throws Exception { + String testClusterName = TestUtils.getClusterName("RestApiIT"); + SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig(); + seaTunnelConfig.getHazelcastConfig().setClusterName(testClusterName); + node1 = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig); + + ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig(); + clientConfig.setClusterName(testClusterName); + engineClient = new SeaTunnelClient(clientConfig); + + String batchFilePath = TestUtils.getResource("batch_fake_multi_table_to_console.conf"); + JobConfig batchConf = new JobConfig(); + batchConf.setName("batch_fake_multi_table_to_console"); + ClientJobExecutionEnvironment batchJobExecutionEnv = + engineClient.createExecutionContext(batchFilePath, batchConf, seaTunnelConfig); + batchJobProxy = batchJobExecutionEnv.execute(); + Awaitility.await() + .atMost(2, TimeUnit.MINUTES) + .untilAsserted( + () -> + Assertions.assertEquals( + JobStatus.FINISHED, batchJobProxy.getJobStatus())); + } + + @Test + public void multiTableMetrics() { + Collections.singletonList(node1) + .forEach( + instance -> { + given().get( + HOST + + instance.getCluster() + .getLocalMember() + .getAddress() + .getPort() + + RestConstant.JOB_INFO_URL + + "/" + + batchJobProxy.getJobId()) + .then() + .statusCode(200) + .body("jobName", equalTo("batch_fake_multi_table_to_console")) + .body("jobStatus", equalTo("FINISHED")) + .body("metrics.SourceReceivedCount", equalTo("50")) + .body("metrics.SinkWriteCount", equalTo("50")) + .body( + "metrics.TableSourceReceivedCount.'fake.table1'", + equalTo("20")) + .body( + "metrics.TableSourceReceivedCount.'fake.public.table2'", + equalTo("30")) + .body( + "metrics.TableSinkWriteCount.'fake.table1'", + equalTo("20")) + .body( + "metrics.TableSinkWriteCount.'fake.public.table2'", + equalTo("30")); + }); + } + + @AfterEach + void afterClass() { + if (engineClient != null) { + engineClient.close(); + } + + if (node1 != null) { + node1.shutdown(); + } + } +} diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fake_multi_table_to_console.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fake_multi_table_to_console.conf new file mode 100644 index 00000000000..c51929a0edb --- /dev/null +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fake_multi_table_to_console.conf @@ -0,0 +1,64 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + FakeSource { + result_table_name = "fake1" + row.num = 20 + schema = { + table = "fake.table1" + fields { + name = "string" + age = "int" + } + } + } + + FakeSource { + result_table_name = "fake2" + row.num = 30 + schema = { + table = "fake.public.table2" + fields { + name = "string" + age = "int" + sex = "int" + } + } + } +} + +transform { +} + +sink { + console { + source_table_name = "fake1" + } + console { + source_table_name = "fake2" + } +} diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java index d7e55db4ec2..100aa0b3203 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java +++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java @@ -17,6 +17,8 @@ package org.apache.seatunnel.engine.client; +import org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode; import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.seatunnel.common.config.Common; @@ -51,10 +53,14 @@ import java.net.URL; import java.util.ArrayList; import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Spliterators; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import static org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_COUNT; import static org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_QPS; @@ -548,6 +554,114 @@ public void testSavePointAndRestoreWithSavePoint() throws Exception { } } + @Test + public void testGetMultiTableJobMetrics() { + Common.setDeployMode(DeployMode.CLIENT); + String filePath = TestUtils.getResource("/batch_fake_multi_table_to_console.conf"); + JobConfig jobConfig = new JobConfig(); + jobConfig.setName("testGetMultiTableJobMetrics"); + + SeaTunnelClient seaTunnelClient = createSeaTunnelClient(); + JobClient jobClient = seaTunnelClient.getJobClient(); + + try { + ClientJobExecutionEnvironment jobExecutionEnv = + seaTunnelClient.createExecutionContext(filePath, jobConfig, SEATUNNEL_CONFIG); + + final ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); + CompletableFuture objectCompletableFuture = + CompletableFuture.supplyAsync( + () -> { + return clientJobProxy.waitForJobComplete(); + }); + long jobId = clientJobProxy.getJobId(); + + await().atMost(30000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> + Assertions.assertTrue( + jobClient.getJobDetailStatus(jobId).contains("FINISHED") + && jobClient + .listJobStatus(true) + .contains("FINISHED"))); + + String jobMetrics = jobClient.getJobMetrics(jobId); + + Assertions.assertTrue(jobMetrics.contains(SOURCE_RECEIVED_COUNT + "#fake.table1")); + Assertions.assertTrue( + jobMetrics.contains(SOURCE_RECEIVED_COUNT + "#fake.public.table2")); + Assertions.assertTrue(jobMetrics.contains(SINK_WRITE_COUNT + "#fake.table1")); + Assertions.assertTrue(jobMetrics.contains(SINK_WRITE_COUNT + "#fake.public.table2")); + + log.info("jobMetrics : {}", jobMetrics); + JsonNode jobMetricsStr = new ObjectMapper().readTree(jobMetrics); + List metricNameList = + StreamSupport.stream( + Spliterators.spliteratorUnknownSize( + jobMetricsStr.fieldNames(), 0), + false) + .filter( + metricName -> + metricName.startsWith(SOURCE_RECEIVED_COUNT) + || metricName.startsWith(SINK_WRITE_COUNT)) + .collect(Collectors.toList()); + + Map totalCount = + metricNameList.stream() + .filter(metrics -> !metrics.contains("#")) + .collect( + Collectors.toMap( + metrics -> metrics, + metrics -> + StreamSupport.stream( + jobMetricsStr + .get(metrics) + .spliterator(), + false) + .mapToLong( + value -> + value.get("value") + .asLong()) + .sum())); + + Map tableCount = + metricNameList.stream() + .filter(metrics -> metrics.contains("#")) + .collect( + Collectors.toMap( + metrics -> metrics, + metrics -> + StreamSupport.stream( + jobMetricsStr + .get(metrics) + .spliterator(), + false) + .mapToLong( + value -> + value.get("value") + .asLong()) + .sum())); + + Assertions.assertEquals( + totalCount.get(SOURCE_RECEIVED_COUNT), + tableCount.entrySet().stream() + .filter(e -> e.getKey().startsWith(SOURCE_RECEIVED_COUNT)) + .mapToLong(Map.Entry::getValue) + .sum()); + Assertions.assertEquals( + totalCount.get(SINK_WRITE_COUNT), + tableCount.entrySet().stream() + .filter(e -> e.getKey().startsWith(SINK_WRITE_COUNT)) + .mapToLong(Map.Entry::getValue) + .sum()); + + } catch (ExecutionException | InterruptedException | JsonProcessingException e) { + throw new RuntimeException(e); + } finally { + seaTunnelClient.close(); + } + } + @AfterAll public static void after() { INSTANCE.shutdown(); diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fake_multi_table_to_console.conf b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fake_multi_table_to_console.conf new file mode 100644 index 00000000000..df7ae51fe6e --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fake_multi_table_to_console.conf @@ -0,0 +1,66 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + FakeSource { + result_table_name = "fake1" + row.num = 20 + schema = { + table = "fake.table1" + fields { + name = "string" + age = "int" + } + } + parallelism = 1 + } + + FakeSource { + result_table_name = "fake2" + row.num = 30 + schema = { + table = "fake.public.table2" + fields { + name = "string" + age = "int" + sex = "int" + } + } + parallelism = 1 + } +} + +transform { +} + +sink { + console { + source_table_name = "fake1" + } + console { + source_table_name = "fake2" + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java index 6081b0f2eaf..d5d60b7cbb4 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java @@ -22,6 +22,7 @@ import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.seatunnel.api.common.metrics.JobMetrics; +import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.common.utils.DateTimeUtils; import org.apache.seatunnel.common.utils.JsonUtils; import org.apache.seatunnel.engine.common.Constant; @@ -64,8 +65,10 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; +import java.util.Spliterators; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import static com.hazelcast.internal.ascii.rest.HttpStatusCode.SC_500; import static org.apache.seatunnel.engine.server.rest.RestConstant.FINISHED_JOBS_INFO; @@ -79,7 +82,9 @@ public class RestHttpGetCommandProcessor extends HttpCommandProcessor { private static final String SOURCE_RECEIVED_COUNT = "SourceReceivedCount"; + private static final String TABLE_SOURCE_RECEIVED_COUNT = "TableSourceReceivedCount"; private static final String SINK_WRITE_COUNT = "SinkWriteCount"; + private static final String TABLE_SINK_WRITE_COUNT = "TableSinkWriteCount"; private final Log4j2HttpGetCommandProcessor original; private NodeEngine nodeEngine; @@ -362,12 +367,31 @@ private void getRunningThread(HttpGetCommand command) { .collect(JsonArray::new, JsonArray::add, JsonArray::add)); } - private Map getJobMetrics(String jobMetrics) { - Map metricsMap = new HashMap<>(); + private Map getJobMetrics(String jobMetrics) { + Map metricsMap = new HashMap<>(); long sourceReadCount = 0L; long sinkWriteCount = 0L; + Map tableSourceReceivedCountMap = new HashMap<>(); + Map tableSinkWriteCountMap = new HashMap<>(); try { JsonNode jobMetricsStr = new ObjectMapper().readTree(jobMetrics); + StreamSupport.stream( + Spliterators.spliteratorUnknownSize(jobMetricsStr.fieldNames(), 0), + false) + .filter(metricName -> metricName.contains("#")) + .forEach( + metricName -> { + String tableName = + TablePath.of(metricName.split("#")[1]).getFullName(); + if (metricName.startsWith(SOURCE_RECEIVED_COUNT)) { + tableSourceReceivedCountMap.put( + tableName, jobMetricsStr.get(metricName)); + } + if (metricName.startsWith(SOURCE_RECEIVED_COUNT)) { + tableSinkWriteCountMap.put( + tableName, jobMetricsStr.get(metricName)); + } + }); JsonNode sourceReceivedCountJson = jobMetricsStr.get(SOURCE_RECEIVED_COUNT); JsonNode sinkWriteCountJson = jobMetricsStr.get(SINK_WRITE_COUNT); for (int i = 0; i < jobMetricsStr.get(SOURCE_RECEIVED_COUNT).size(); i++) { @@ -379,9 +403,36 @@ private Map getJobMetrics(String jobMetrics) { } catch (JsonProcessingException | NullPointerException e) { return metricsMap; } + + Map tableSourceReceivedCount = + tableSourceReceivedCountMap.entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + entry -> + StreamSupport.stream( + entry.getValue().spliterator(), + false) + .mapToLong( + node -> node.get("value").asLong()) + .sum())); + Map tableSinkWriteCount = + tableSinkWriteCountMap.entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + entry -> + StreamSupport.stream( + entry.getValue().spliterator(), + false) + .mapToLong( + node -> node.get("value").asLong()) + .sum())); + metricsMap.put(SOURCE_RECEIVED_COUNT, sourceReadCount); metricsMap.put(SINK_WRITE_COUNT, sinkWriteCount); - + metricsMap.put(TABLE_SOURCE_RECEIVED_COUNT, tableSourceReceivedCount); + metricsMap.put(TABLE_SINK_WRITE_COUNT, tableSinkWriteCount); return metricsMap; } @@ -475,11 +526,24 @@ private JsonObject convertToJson(JobInfo jobInfo, long jobId) { .add( RestConstant.IS_START_WITH_SAVE_POINT, jobImmutableInformation.isStartWithSavePoint()) - .add(RestConstant.METRICS, JsonUtil.toJsonObject(getJobMetrics(jobMetrics))); + .add(RestConstant.METRICS, toJsonObject(getJobMetrics(jobMetrics))); return jobInfoJson; } + private JsonObject toJsonObject(Map jobMetrics) { + JsonObject members = new JsonObject(); + jobMetrics.forEach( + (key, value) -> { + if (value instanceof Map) { + members.add(key, toJsonObject((Map) value)); + } else { + members.add(key, value.toString()); + } + }); + return members; + } + private JsonObject getJobInfoJson(JobState jobState, String jobMetrics, JobDAGInfo jobDAGInfo) { return new JsonObject() .add(RestConstant.JOB_ID, String.valueOf(jobState.getJobId())) @@ -498,6 +562,6 @@ private JsonObject getJobInfoJson(JobState jobState, String jobMetrics, JobDAGIn DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS)) .add(RestConstant.JOB_DAG, JsonUtils.toJsonString(jobDAGInfo)) .add(RestConstant.PLUGIN_JARS_URLS, new JsonArray()) - .add(RestConstant.METRICS, JsonUtil.toJsonObject(getJobMetrics(jobMetrics))); + .add(RestConstant.METRICS, toJsonObject(getJobMetrics(jobMetrics))); } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java index f5d4aed1ab4..62612d0617a 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java @@ -21,6 +21,7 @@ import org.apache.seatunnel.api.common.metrics.Meter; import org.apache.seatunnel.api.common.metrics.MetricsContext; import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.event.SchemaChangeEvent; import org.apache.seatunnel.api.table.event.handler.DataTypeChangeEventDispatcher; import org.apache.seatunnel.api.table.event.handler.DataTypeChangeEventHandler; @@ -34,12 +35,17 @@ import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException; import org.apache.seatunnel.engine.server.task.flow.OneInputFlowLifeCycle; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; + import lombok.extern.slf4j.Slf4j; import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_BYTES; @@ -54,12 +60,16 @@ public class SeaTunnelSourceCollector implements Collector { private final List>> outputs; + private final MetricsContext metricsContext; + private final AtomicBoolean schemaChangeBeforeCheckpointSignal = new AtomicBoolean(false); private final AtomicBoolean schemaChangeAfterCheckpointSignal = new AtomicBoolean(false); private final Counter sourceReceivedCount; + private final Map sourceReceivedCountPerTable = new ConcurrentHashMap<>(); + private final Meter sourceReceivedQPS; private final Counter sourceReceivedBytes; @@ -77,17 +87,24 @@ public SeaTunnelSourceCollector( List>> outputs, MetricsContext metricsContext, FlowControlStrategy flowControlStrategy, - SeaTunnelDataType rowType) { + SeaTunnelDataType rowType, + List tablePaths) { this.checkpointLock = checkpointLock; this.outputs = outputs; this.rowType = rowType; + this.metricsContext = metricsContext; if (rowType instanceof MultipleRowType) { ((MultipleRowType) rowType) .iterator() - .forEachRemaining( - type -> { - this.rowTypeMap.put(type.getKey(), type.getValue()); - }); + .forEachRemaining(type -> this.rowTypeMap.put(type.getKey(), type.getValue())); + } + if (CollectionUtils.isNotEmpty(tablePaths)) { + tablePaths.forEach( + tablePath -> + sourceReceivedCountPerTable.put( + getFullName(tablePath), + metricsContext.counter( + SOURCE_RECEIVED_COUNT + "#" + getFullName(tablePath)))); } sourceReceivedCount = metricsContext.counter(SOURCE_RECEIVED_COUNT); sourceReceivedQPS = metricsContext.meter(SOURCE_RECEIVED_QPS); @@ -100,14 +117,12 @@ public SeaTunnelSourceCollector( public void collect(T row) { try { if (row instanceof SeaTunnelRow) { + String tableId = ((SeaTunnelRow) row).getTableId(); int size; if (rowType instanceof SeaTunnelRowType) { size = ((SeaTunnelRow) row).getBytesSize((SeaTunnelRowType) rowType); } else if (rowType instanceof MultipleRowType) { - size = - ((SeaTunnelRow) row) - .getBytesSize( - rowTypeMap.get(((SeaTunnelRow) row).getTableId())); + size = ((SeaTunnelRow) row).getBytesSize(rowTypeMap.get(tableId)); } else { throw new SeaTunnelEngineException( "Unsupported row type: " + rowType.getClass().getName()); @@ -115,6 +130,18 @@ public void collect(T row) { sourceReceivedBytes.inc(size); sourceReceivedBytesPerSeconds.markEvent(size); flowControlGate.audit((SeaTunnelRow) row); + if (StringUtils.isNotEmpty(tableId)) { + String tableName = getFullName(TablePath.of(tableId)); + Counter sourceTableCounter = sourceReceivedCountPerTable.get(tableName); + if (Objects.nonNull(sourceTableCounter)) { + sourceTableCounter.inc(); + } else { + Counter counter = + metricsContext.counter(SOURCE_RECEIVED_COUNT + "#" + tableName); + counter.inc(); + sourceReceivedCountPerTable.put(tableName, counter); + } + } } sendRecordToNext(new Record<>(row)); emptyThisPollNext = false; @@ -205,4 +232,12 @@ public void sendRecordToNext(Record record) throws IOException { } } } + + private String getFullName(TablePath tablePath) { + if (StringUtils.isBlank(tablePath.getTableName())) { + tablePath = + TablePath.of(tablePath.getDatabaseName(), tablePath.getSchemaName(), "default"); + } + return tablePath.getFullName(); + } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java index 53171d40315..dbcde3e9d6e 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java @@ -22,6 +22,8 @@ import org.apache.seatunnel.api.source.SourceSplit; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; +import org.apache.seatunnel.api.table.catalog.TableIdentifier; +import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.core.starter.flowcontrol.FlowControlStrategy; import org.apache.seatunnel.engine.core.dag.actions.SourceAction; @@ -37,9 +39,11 @@ import lombok.Getter; import lombok.NonNull; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; public class SourceSeaTunnelTask extends SeaTunnelTask { @@ -76,10 +80,16 @@ public void init() throws Exception { + startFlowLifeCycle.getClass().getName()); } else { SeaTunnelDataType sourceProducedType; + List tablePaths = new ArrayList<>(); try { List producedCatalogTables = sourceFlow.getAction().getSource().getProducedCatalogTables(); sourceProducedType = CatalogTableUtil.convertToDataType(producedCatalogTables); + tablePaths = + producedCatalogTables.stream() + .map(CatalogTable::getTableId) + .map(TableIdentifier::toTablePath) + .collect(Collectors.toList()); } catch (UnsupportedOperationException e) { // TODO remove it when all connector use `getProducedCatalogTables` sourceProducedType = sourceFlow.getAction().getSource().getProducedType(); @@ -90,7 +100,8 @@ public void init() throws Exception { outputs, this.getMetricsContext(), FlowControlStrategy.fromMap(envOption), - sourceProducedType); + sourceProducedType, + tablePaths); ((SourceFlowLifeCycle) startFlowLifeCycle).setCollector(collector); } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java index 48c530a0c36..516e1c97c41 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java @@ -26,6 +26,8 @@ import org.apache.seatunnel.api.sink.SinkCommitter; import org.apache.seatunnel.api.sink.SinkWriter; import org.apache.seatunnel.api.sink.SupportResourceShare; +import org.apache.seatunnel.api.sink.multitablesink.MultiTableSink; +import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.event.SchemaChangeEvent; import org.apache.seatunnel.api.table.type.Record; import org.apache.seatunnel.api.table.type.SeaTunnelRow; @@ -43,6 +45,8 @@ import org.apache.seatunnel.engine.server.task.operation.sink.SinkRegisterOperation; import org.apache.seatunnel.engine.server.task.record.Barrier; +import org.apache.commons.lang3.StringUtils; + import com.hazelcast.cluster.Address; import lombok.extern.slf4j.Slf4j; @@ -52,9 +56,11 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; @@ -92,6 +98,8 @@ public class SinkFlowLifeCycle sinkWriteCountPerTable = new ConcurrentHashMap<>(); + private Meter sinkWriteQPS; private Counter sinkWriteBytes; @@ -125,6 +133,15 @@ public SinkFlowLifeCycle( sinkWriteQPS = metricsContext.meter(SINK_WRITE_QPS); sinkWriteBytes = metricsContext.counter(SINK_WRITE_BYTES); sinkWriteBytesPerSeconds = metricsContext.meter(SINK_WRITE_BYTES_PER_SECONDS); + if (sinkAction.getSink() instanceof MultiTableSink) { + List sinkTables = ((MultiTableSink) sinkAction.getSink()).getSinkTables(); + sinkTables.forEach( + tablePath -> + sinkWriteCountPerTable.put( + getFullName(tablePath), + metricsContext.counter( + SINK_WRITE_COUNT + "#" + getFullName(tablePath)))); + } } @Override @@ -256,6 +273,19 @@ public void received(Record record) { long size = ((SeaTunnelRow) record.getData()).getBytesSize(); sinkWriteBytes.inc(size); sinkWriteBytesPerSeconds.markEvent(size); + String tableId = ((SeaTunnelRow) record.getData()).getTableId(); + if (StringUtils.isNotBlank(tableId)) { + String tableName = getFullName(TablePath.of(tableId)); + Counter sinkTableCounter = sinkWriteCountPerTable.get(tableName); + if (Objects.nonNull(sinkTableCounter)) { + sinkTableCounter.inc(); + } else { + Counter counter = + metricsContext.counter(SINK_WRITE_COUNT + "#" + tableName); + counter.inc(); + sinkWriteCountPerTable.put(tableName, counter); + } + } } } } catch (Exception e) { @@ -315,4 +345,12 @@ public void restoreState(List actionStateList) throws Except ((SupportResourceShare) this.writer).setMultiTableResourceManager(resourceManager, 0); } } + + private String getFullName(TablePath tablePath) { + if (StringUtils.isBlank(tablePath.getTableName())) { + tablePath = + TablePath.of(tablePath.getDatabaseName(), tablePath.getSchemaName(), "default"); + } + return tablePath.getFullName(); + } }