Skip to content

Commit 793933b

Browse files
[Improve][Connector-V2] Http source support user-defined schema (apache#2439)
* [Improve][Connector-V2] Http source support user-defined schema
1 parent 84f6b17 commit 793933b

File tree

5 files changed

+116
-26
lines changed

5 files changed

+116
-26
lines changed

docs/en/connector-v2/source/Http.md

+76-20
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,19 @@ Used to read data from Http. Both support streaming and batch mode.
88

99
## Options
1010

11-
| name | type | required | default value |
12-
| --- |--------| --- | --- |
13-
| url | String | Yes | - |
14-
| method | String | No | GET |
15-
| headers | Map | No | - |
16-
| params | Map | No | - |
17-
| body | String | No | - |
11+
| name | type | required | default value |
12+
|---------------|--------|----------|---------------|
13+
| url | String | Yes | - |
14+
| schema | config | No | - |
15+
| schema.fields | config | No | - |
16+
| format | string | No | json |
17+
| method | String | No | get |
18+
| headers | Map | No | - |
19+
| params | Map | No | - |
20+
| body | String | No | - |
1821

1922
### url [string]
23+
2024
http request url
2125

2226
### method [string]
@@ -35,25 +39,77 @@ http params
3539

3640
http body
3741

42+
### format [String]
43+
44+
the format of upstream data, now only support `json` `text`, default `json`.
45+
46+
when you assign format is `json`, you should also assign schema option, for example:
47+
48+
upstream data is the following:
49+
50+
```json
51+
52+
{"code": 200, "data": "get success", "success": true}
53+
54+
```
55+
56+
you should assign schema as the following:
57+
58+
```hocon
59+
60+
schema {
61+
fields {
62+
code = int
63+
data = string
64+
success = boolean
65+
}
66+
}
67+
68+
```
69+
70+
connector will generate data as the following:
71+
72+
| code | data | success |
73+
|------|-------------|---------|
74+
| 200 | get success | true |
75+
76+
when you assign format is `text`, connector will do nothing for upstream data, for example:
77+
78+
upstream data is the following:
79+
80+
```json
81+
82+
{"code": 200, "data": "get success", "success": true}
83+
84+
```
85+
86+
connector will generate data as the following:
87+
88+
| content |
89+
|---------|
90+
| {"code": 200, "data": "get success", "success": true} |
91+
92+
### schema [Config]
93+
94+
#### fields [Config]
95+
96+
the schema fields of upstream data
97+
3898
## Example
3999

40100
simple:
41101

42102
```hocon
43103
Http {
44-
url = "http://localhost/test/query"
45-
method = "GET"
46-
headers {
47-
token = "9e32e859ef044462a257e1fc76730066"
48-
}
49-
params {
50-
id = "1"
51-
type = "TEST"
52-
}
53-
body = "{
54-
\"code\": 5945141259552,
55-
\"name\": \"test\"
56-
}"
104+
url = "https://tyrantlucifer.com/api/getDemoData"
105+
schema {
106+
fields {
107+
code = int
108+
message = string
109+
data = string
110+
ok = boolean
111+
}
57112
}
113+
}
58114
```
59115

seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/schema/SeatunnelSchema.java

+5
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636

3737
public class SeatunnelSchema {
3838
private static final String FIELD_KEY = "fields";
39+
private static final String SIMPLE_SCHEMA_FILED = "content";
3940
private final SeaTunnelRowType seaTunnelRowType;
4041

4142
private SeatunnelSchema(SeaTunnelRowType seaTunnelRowType) {
@@ -211,6 +212,10 @@ public static SeatunnelSchema buildWithConfig(Config schemaConfig) {
211212
return new SeatunnelSchema(seaTunnelRowType);
212213
}
213214

215+
public static SeaTunnelRowType buildSimpleTextSchema() {
216+
return new SeaTunnelRowType(new String[]{SIMPLE_SCHEMA_FILED}, new SeaTunnelDataType<?>[]{BasicType.STRING_TYPE});
217+
}
218+
214219
public SeaTunnelRowType getSeaTunnelRowType() {
215220
return seaTunnelRowType;
216221
}

seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java

+3
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,7 @@ public class HttpConfig {
2424
public static final String HEADERS = "headers";
2525
public static final String PARAMS = "params";
2626
public static final String BODY = "body";
27+
public static final String SCHEMA = "schema";
28+
public static final String FORMAT = "format";
29+
public static final String DEFAULT_FORMAT = "json";
2730
}

seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java

+21-4
Original file line numberDiff line numberDiff line change
@@ -19,21 +19,23 @@
1919

2020
import org.apache.seatunnel.api.common.PrepareFailException;
2121
import org.apache.seatunnel.api.common.SeaTunnelContext;
22+
import org.apache.seatunnel.api.serialization.DeserializationSchema;
2223
import org.apache.seatunnel.api.source.Boundedness;
2324
import org.apache.seatunnel.api.source.SeaTunnelSource;
24-
import org.apache.seatunnel.api.table.type.BasicType;
2525
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
2626
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2727
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
2828
import org.apache.seatunnel.common.config.CheckConfigUtil;
2929
import org.apache.seatunnel.common.config.CheckResult;
3030
import org.apache.seatunnel.common.constants.JobMode;
3131
import org.apache.seatunnel.common.constants.PluginType;
32+
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeatunnelSchema;
3233
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
3334
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
3435
import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
3536
import org.apache.seatunnel.connectors.seatunnel.http.config.HttpConfig;
3637
import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
38+
import org.apache.seatunnel.format.json.JsonDeserializationSchema;
3739

3840
import org.apache.seatunnel.shade.com.typesafe.config.Config;
3941

@@ -44,6 +46,7 @@ public class HttpSource extends AbstractSingleSplitSource<SeaTunnelRow> {
4446
protected final HttpParameter httpParameter = new HttpParameter();
4547
protected SeaTunnelRowType rowType;
4648
protected SeaTunnelContext seaTunnelContext;
49+
protected DeserializationSchema<SeaTunnelRow> deserializationSchema;
4750

4851
@Override
4952
public String getPluginName() {
@@ -62,8 +65,22 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
6265
throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg());
6366
}
6467
this.httpParameter.buildWithConfig(pluginConfig);
65-
// TODO support user custom row type
66-
this.rowType = new SeaTunnelRowType(new String[]{"content"}, new SeaTunnelDataType<?>[]{BasicType.STRING_TYPE});
68+
if (pluginConfig.hasPath(HttpConfig.SCHEMA)) {
69+
Config schema = pluginConfig.getConfig(HttpConfig.SCHEMA);
70+
this.rowType = SeatunnelSchema.buildWithConfig(schema).getSeaTunnelRowType();
71+
} else {
72+
this.rowType = SeatunnelSchema.buildSimpleTextSchema();
73+
}
74+
// TODO: use format SPI
75+
// default use json format
76+
String format;
77+
if (pluginConfig.hasPath(HttpConfig.FORMAT)) {
78+
format = pluginConfig.getString(HttpConfig.FORMAT);
79+
this.deserializationSchema = null;
80+
} else {
81+
format = HttpConfig.DEFAULT_FORMAT;
82+
this.deserializationSchema = new JsonDeserializationSchema(false, false, rowType);
83+
}
6784
}
6885

6986
@Override
@@ -78,6 +95,6 @@ public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
7895

7996
@Override
8097
public AbstractSingleSplitReader<SeaTunnelRow> createReader(SingleSplitReaderContext readerContext) throws Exception {
81-
return new HttpSourceReader(this.httpParameter, readerContext);
98+
return new HttpSourceReader(this.httpParameter, readerContext, this.deserializationSchema);
8299
}
83100
}

seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java

+11-2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.http.source;
1919

20+
import org.apache.seatunnel.api.serialization.DeserializationSchema;
2021
import org.apache.seatunnel.api.source.Boundedness;
2122
import org.apache.seatunnel.api.source.Collector;
2223
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -37,10 +38,12 @@ public class HttpSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
3738
protected final SingleSplitReaderContext context;
3839
protected final HttpParameter httpParameter;
3940
protected HttpClientProvider httpClient;
41+
protected final DeserializationSchema<SeaTunnelRow> deserializationSchema;
4042

41-
public HttpSourceReader(HttpParameter httpParameter, SingleSplitReaderContext context) {
43+
public HttpSourceReader(HttpParameter httpParameter, SingleSplitReaderContext context, DeserializationSchema<SeaTunnelRow> deserializationSchema) {
4244
this.context = context;
4345
this.httpParameter = httpParameter;
46+
this.deserializationSchema = deserializationSchema;
4447
}
4548

4649
@Override
@@ -60,7 +63,13 @@ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
6063
try {
6164
HttpResponse response = httpClient.execute(this.httpParameter.getUrl(), this.httpParameter.getMethod(), this.httpParameter.getHeaders(), this.httpParameter.getParams());
6265
if (HttpResponse.STATUS_OK == response.getCode()) {
63-
output.collect(new SeaTunnelRow(new Object[] {response.getContent()}));
66+
String content = response.getContent();
67+
if (deserializationSchema != null) {
68+
deserializationSchema.deserialize(content.getBytes(), output);
69+
} else {
70+
// TODO: use seatunnel-text-format
71+
output.collect(new SeaTunnelRow(new Object[]{content}));
72+
}
6473
return;
6574
}
6675
LOGGER.error("http client execute exception, http response status code:[{}], content:[{}]", response.getCode(), response.getContent());

0 commit comments

Comments
 (0)