Skip to content

Commit 6b7c53d

Browse files
authored
[Feature][Restapi] Allow metrics information to be associated to logical plan nodes (apache#7786)
1 parent a8bdea2 commit 6b7c53d

File tree

78 files changed

+830
-124
lines changed
  • docs
  • seatunnel-api/src/main/java/org/apache/seatunnel/api/sink
  • seatunnel-connectors-v2
    • connector-activemq/src/main/java/org/apache/seatunnel/connectors/seatunnel/activemq/sink
    • connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink
    • connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/sink
    • connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink
    • connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink
    • connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file
    • connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink
    • connector-datahub/src/main/java/org/apache/seatunnel/connectors/seatunnel/datahub/sink
    • connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink
    • connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink
    • connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink
    • connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/sink
    • connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink
    • connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink
    • connector-file
      • connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/sink
      • connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink
      • connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink
      • connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/sink
      • connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink
      • connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/sink
      • connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink
      • connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink
      • connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink
    • connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/sink
    • connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink
    • connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink
    • connector-http
    • connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink
    • connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink
    • connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink
    • connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/sink
    • connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink
    • connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink
    • connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink
    • connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink
    • connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink
    • connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink
    • connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink
    • connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink
    • connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/sink
    • connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink
    • connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink
    • connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink
    • connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink
    • connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink
    • connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink
    • connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink
    • connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink
    • connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink
    • connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink
    • connector-typesense/src/main/java/org/apache/seatunnel/connectors/seatunnel/typesense/sink
  • seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector
  • seatunnel-e2e
    • seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory
    • seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e
  • seatunnel-engine
  • seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/test/java/org/apache/seatunnel/translation/spark/sink

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

78 files changed

+830
-124
lines changed

docs/en/seatunnel-engine/rest-api-v1.md

+36-7
Original file line numberDiff line numberDiff line change
@@ -161,10 +161,18 @@ network:
161161
"jobStatus": "",
162162
"createTime": "",
163163
"jobDag": {
164-
"vertices": [
164+
"jobId": "",
165+
"vertexInfoMap": [
166+
{
167+
"vertexId": 1,
168+
"type": "",
169+
"vertexName": "",
170+
"tablePaths": [
171+
""
172+
]
173+
}
165174
],
166-
"edges": [
167-
]
175+
"pipelineEdges": {}
168176
},
169177
"metrics": {
170178
"sourceReceivedCount": "",
@@ -218,10 +226,18 @@ This API has been deprecated, please use /hazelcast/rest/maps/job-info/:jobId in
218226
"jobStatus": "",
219227
"createTime": "",
220228
"jobDag": {
221-
"vertices": [
229+
"jobId": "",
230+
"vertexInfoMap": [
231+
{
232+
"vertexId": 1,
233+
"type": "",
234+
"vertexName": "",
235+
"tablePaths": [
236+
""
237+
]
238+
}
222239
],
223-
"edges": [
224-
]
240+
"pipelineEdges": {}
225241
},
226242
"metrics": {
227243
"SourceReceivedCount": "",
@@ -289,7 +305,20 @@ When we can't get the job info, the response will be:
289305
"errorMsg": null,
290306
"createTime": "",
291307
"finishTime": "",
292-
"jobDag": "",
308+
"jobDag": {
309+
"jobId": "",
310+
"vertexInfoMap": [
311+
{
312+
"vertexId": 1,
313+
"type": "",
314+
"vertexName": "",
315+
"tablePaths": [
316+
""
317+
]
318+
}
319+
],
320+
"pipelineEdges": {}
321+
},
293322
"metrics": ""
294323
}
295324
]

docs/en/seatunnel-engine/rest-api-v2.md

+36-7
Original file line numberDiff line numberDiff line change
@@ -128,10 +128,18 @@ seatunnel:
128128
"jobStatus": "",
129129
"createTime": "",
130130
"jobDag": {
131-
"vertices": [
131+
"jobId": "",
132+
"vertexInfoMap": [
133+
{
134+
"vertexId": 1,
135+
"type": "",
136+
"vertexName": "",
137+
"tablePaths": [
138+
""
139+
]
140+
}
132141
],
133-
"edges": [
134-
]
142+
"pipelineEdges": {}
135143
},
136144
"metrics": {
137145
"sourceReceivedCount": "",
@@ -185,10 +193,18 @@ This API has been deprecated, please use /job-info/:jobId instead
185193
"jobStatus": "",
186194
"createTime": "",
187195
"jobDag": {
188-
"vertices": [
196+
"jobId": "",
197+
"vertexInfoMap": [
198+
{
199+
"vertexId": 1,
200+
"type": "",
201+
"vertexName": "",
202+
"tablePaths": [
203+
""
204+
]
205+
}
189206
],
190-
"edges": [
191-
]
207+
"pipelineEdges": {}
192208
},
193209
"metrics": {
194210
"SourceReceivedCount": "",
@@ -256,7 +272,20 @@ When we can't get the job info, the response will be:
256272
"errorMsg": null,
257273
"createTime": "",
258274
"finishTime": "",
259-
"jobDag": "",
275+
"jobDag": {
276+
"jobId": "",
277+
"vertexInfoMap": [
278+
{
279+
"vertexId": 1,
280+
"type": "",
281+
"vertexName": "",
282+
"tablePaths": [
283+
""
284+
]
285+
}
286+
],
287+
"pipelineEdges": {}
288+
},
260289
"metrics": ""
261290
}
262291
]

docs/zh/seatunnel-engine/rest-api-v1.md

+36-8
Original file line numberDiff line numberDiff line change
@@ -159,10 +159,18 @@ network:
159159
"jobStatus": "",
160160
"createTime": "",
161161
"jobDag": {
162-
"vertices": [
162+
"jobId": "",
163+
"vertexInfoMap": [
164+
{
165+
"vertexId": 1,
166+
"type": "",
167+
"vertexName": "",
168+
"tablePaths": [
169+
""
170+
]
171+
}
163172
],
164-
"edges": [
165-
]
173+
"pipelineEdges": {}
166174
},
167175
"metrics": {
168176
"SourceReceivedCount": "",
@@ -230,10 +238,18 @@ network:
230238
"jobStatus": "",
231239
"createTime": "",
232240
"jobDag": {
233-
"vertices": [
241+
"jobId": "",
242+
"vertexInfoMap": [
243+
{
244+
"vertexId": 1,
245+
"type": "",
246+
"vertexName": "",
247+
"tablePaths": [
248+
""
249+
]
250+
}
234251
],
235-
"edges": [
236-
]
252+
"pipelineEdges": {}
237253
},
238254
"metrics": {
239255
"sourceReceivedCount": "",
@@ -287,8 +303,20 @@ network:
287303
"errorMsg": null,
288304
"createTime": "",
289305
"finishTime": "",
290-
"jobDag": "",
291-
"metrics": ""
306+
"jobDag": {
307+
"jobId": "",
308+
"vertexInfoMap": [
309+
{
310+
"vertexId": 1,
311+
"type": "",
312+
"vertexName": "",
313+
"tablePaths": [
314+
""
315+
]
316+
}
317+
],
318+
"pipelineEdges": {}
319+
}, "metrics": ""
292320
}
293321
]
294322
```

docs/zh/seatunnel-engine/rest-api-v2.md

+36-7
Original file line numberDiff line numberDiff line change
@@ -124,10 +124,18 @@ seatunnel:
124124
"jobStatus": "",
125125
"createTime": "",
126126
"jobDag": {
127-
"vertices": [
127+
"jobId": "",
128+
"vertexInfoMap": [
129+
{
130+
"vertexId": 1,
131+
"type": "",
132+
"vertexName": "",
133+
"tablePaths": [
134+
""
135+
]
136+
}
128137
],
129-
"edges": [
130-
]
138+
"pipelineEdges": {}
131139
},
132140
"metrics": {
133141
"SourceReceivedCount": "",
@@ -195,10 +203,18 @@ seatunnel:
195203
"jobStatus": "",
196204
"createTime": "",
197205
"jobDag": {
198-
"vertices": [
206+
"jobId": "",
207+
"vertexInfoMap": [
208+
{
209+
"vertexId": 1,
210+
"type": "",
211+
"vertexName": "",
212+
"tablePaths": [
213+
""
214+
]
215+
}
199216
],
200-
"edges": [
201-
]
217+
"pipelineEdges": {}
202218
},
203219
"metrics": {
204220
"sourceReceivedCount": "",
@@ -252,7 +268,20 @@ seatunnel:
252268
"errorMsg": null,
253269
"createTime": "",
254270
"finishTime": "",
255-
"jobDag": "",
271+
"jobDag": {
272+
"jobId": "",
273+
"vertexInfoMap": [
274+
{
275+
"vertexId": 1,
276+
"type": "",
277+
"vertexName": "",
278+
"tablePaths": [
279+
""
280+
]
281+
}
282+
],
283+
"pipelineEdges": {}
284+
},
256285
"metrics": ""
257286
}
258287
]

seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java

+10
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.seatunnel.api.common.SeaTunnelPluginLifeCycle;
2222
import org.apache.seatunnel.api.serialization.Serializer;
2323
import org.apache.seatunnel.api.source.SeaTunnelJobAware;
24+
import org.apache.seatunnel.api.table.catalog.CatalogTable;
2425
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
2526
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
2627

@@ -135,4 +136,13 @@ default Optional<Serializer<CommitInfoT>> getCommitInfoSerializer() {
135136
default Optional<Serializer<AggregatedCommitInfoT>> getAggregatedCommitInfoSerializer() {
136137
return Optional.empty();
137138
}
139+
140+
/**
141+
* Get the catalog table of the sink.
142+
*
143+
* @return Optional of catalog table.
144+
*/
145+
default Optional<CatalogTable> getWriteCatalogTable() {
146+
return Optional.empty();
147+
}
138148
}

seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java

+19-1
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,15 @@
2525
import org.apache.seatunnel.api.sink.SinkCommitter;
2626
import org.apache.seatunnel.api.sink.SinkCommonOptions;
2727
import org.apache.seatunnel.api.sink.SinkWriter;
28+
import org.apache.seatunnel.api.table.catalog.CatalogTable;
2829
import org.apache.seatunnel.api.table.catalog.TablePath;
2930
import org.apache.seatunnel.api.table.factory.MultiTableFactoryContext;
3031
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
3132

3233
import lombok.Getter;
3334

3435
import java.io.IOException;
36+
import java.util.ArrayList;
3537
import java.util.Collection;
3638
import java.util.HashMap;
3739
import java.util.List;
@@ -157,7 +159,18 @@ public Optional<Serializer<MultiTableCommitInfo>> getCommitInfoSerializer() {
157159
}
158160

159161
public List<TablePath> getSinkTables() {
160-
return sinks.keySet().stream().map(TablePath::of).collect(Collectors.toList());
162+
163+
List<TablePath> tablePaths = new ArrayList<>();
164+
List<SeaTunnelSink> values = new ArrayList<>(sinks.values());
165+
for (int i = 0; i < values.size(); i++) {
166+
if (values.get(i).getWriteCatalogTable().isPresent()) {
167+
tablePaths.add(
168+
((CatalogTable) values.get(i).getWriteCatalogTable().get()).getTablePath());
169+
} else {
170+
tablePaths.add(TablePath.of(sinks.keySet().toArray(new String[0])[i]));
171+
}
172+
}
173+
return tablePaths;
161174
}
162175

163176
@Override
@@ -170,4 +183,9 @@ public List<TablePath> getSinkTables() {
170183
public void setJobContext(JobContext jobContext) {
171184
sinks.values().forEach(sink -> sink.setJobContext(jobContext));
172185
}
186+
187+
@Override
188+
public Optional<CatalogTable> getWriteCatalogTable() {
189+
return SeaTunnelSink.super.getWriteCatalogTable();
190+
}
173191
}

seatunnel-connectors-v2/connector-activemq/src/main/java/org/apache/seatunnel/connectors/seatunnel/activemq/sink/ActivemqSink.java

+11-2
Original file line numberDiff line numberDiff line change
@@ -19,30 +19,39 @@
1919

2020
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2121
import org.apache.seatunnel.api.sink.SinkWriter;
22+
import org.apache.seatunnel.api.table.catalog.CatalogTable;
2223
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2324
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
2425
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
2526
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
2627

2728
import java.io.IOException;
29+
import java.util.Optional;
2830

2931
public class ActivemqSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
3032
private final SeaTunnelRowType seaTunnelRowType;
3133
private final ReadonlyConfig pluginConfig;
34+
private final CatalogTable catalogTable;
3235

3336
@Override
3437
public String getPluginName() {
3538
return "ActiveMQ";
3639
}
3740

38-
public ActivemqSink(ReadonlyConfig pluginConfig, SeaTunnelRowType rowType) {
41+
public ActivemqSink(ReadonlyConfig pluginConfig, CatalogTable catalogTable) {
3942
this.pluginConfig = pluginConfig;
40-
this.seaTunnelRowType = rowType;
43+
this.catalogTable = catalogTable;
44+
this.seaTunnelRowType = catalogTable.getTableSchema().toPhysicalRowDataType();
4145
}
4246

4347
@Override
4448
public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context context)
4549
throws IOException {
4650
return new ActivemqSinkWriter(pluginConfig, seaTunnelRowType);
4751
}
52+
53+
@Override
54+
public Optional<CatalogTable> getWriteCatalogTable() {
55+
return Optional.of(catalogTable);
56+
}
4857
}

seatunnel-connectors-v2/connector-activemq/src/main/java/org/apache/seatunnel/connectors/seatunnel/activemq/sink/ActivemqSinkFactory.java

+1-4
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,6 @@ public OptionRule optionRule() {
7575

7676
@Override
7777
public TableSink createSink(TableSinkFactoryContext context) {
78-
return () ->
79-
new ActivemqSink(
80-
context.getOptions(),
81-
context.getCatalogTable().getTableSchema().toPhysicalRowDataType());
78+
return () -> new ActivemqSink(context.getOptions(), context.getCatalogTable());
8279
}
8380
}

0 commit comments

Comments
 (0)