Skip to content

Commit

Permalink
[FLINK-32676][doc] Add doc for catalog modification listener
Browse files Browse the repository at this point in the history
  • Loading branch information
FangYongs authored and PatrickRen committed Aug 2, 2023
1 parent fc3035b commit 7a5500e
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 0 deletions.
69 changes: 69 additions & 0 deletions docs/content.zh/docs/dev/table/catalogs.md
Original file line number Diff line number Diff line change
Expand Up @@ -734,3 +734,72 @@ Flink SQL> show tables;
```
{{< /tab >}}
{{< /tabs >}}

## Catalog Modification Listener

Flink supports registering customized listener for catalog modification, such as database and table ddl. Flink will create
a `CatalogModificationEvent` event for ddl and notify `CatalogModificationListener`. You can implement a listener
and do some customized operations when receiving the event, such as report the information to some external meta-data systems.

### Implement Catalog Listener

There are two interfaces for the catalog modification listener: `CatalogModificationListenerFactory` to create the listener and `CatalogModificationListener`
to receive and process the event. You need to implement these interfaces and below is an example.

```
/** Factory used to create a {@link CatalogModificationListener} instance. */
public class YourCatalogListenerFactory implements CatalogModificationListenerFactory {
/** The identifier for the customized listener factory, you can named it yourself. */
private static final String IDENTIFIER = "your_factory";
@Override
public String factoryIdentifier() {
return IDENTIFIER;
}
@Override
public CatalogModificationListener createListener(Context context) {
return new YourCatalogListener(Create http client from context);
}
}
/** Customized catalog modification listener. */
public class YourCatalogListener implements CatalogModificationListener {
private final HttpClient client;
YourCatalogListener(HttpClient client) {
this.client = client;
}
@Override
public void onEvent(CatalogModificationEvent event) {
// Report the database and table information via http client.
}
}
```

### Register Catalog Listener

After implemented above catalog modification factory and listener, you can register it to the table environment.

```
Configuration configuration = new Configuration();
// Add the factory identifier, you can set multiple listeners in the configuraiton.
configuration.set(TableConfigOptions.TABLE_CATALOG_MODIFICATION_LISTENERS, Arrays.asList("your_factory"));
TableEnvironment env = TableEnvironment.create(
EnvironmentSettings.newInstance()
.withConfiguration(configuration)
.build());
// Create/Alter/Drop database and table.
env.executeSql("CREATE TABLE ...").wait();
```

For sql-gateway, you can add the option `table.catalog-modification.listeners` in the `flink-conf.yaml` and start
the gateway, or you can also use `SET` to specify the listener for ddl, for example, in sql-client or jdbc-driver.

```
Flink SQL> SET 'table.catalog-modification.listeners' = 'your_factory';
Flink SQL> CREATE TABLE test_table(...);
```
69 changes: 69 additions & 0 deletions docs/content/docs/dev/table/catalogs.md
Original file line number Diff line number Diff line change
Expand Up @@ -738,3 +738,72 @@ Flink SQL> show tables;
```
{{< /tab >}}
{{< /tabs >}}

## Catalog Modification Listener

Flink supports registering customized listener for catalog modification, such as database and table ddl. Flink will create
a `CatalogModificationEvent` event for ddl and notify `CatalogModificationListener`. You can implement a listener
and do some customized operations when receiving the event, such as report the information to some external meta-data systems.

### Implement Catalog Listener

There are two interfaces for the catalog modification listener: `CatalogModificationListenerFactory` to create the listener and `CatalogModificationListener`
to receive and process the event. You need to implement these interfaces and below is an example.

```
/** Factory used to create a {@link CatalogModificationListener} instance. */
public class YourCatalogListenerFactory implements CatalogModificationListenerFactory {
/** The identifier for the customized listener factory, you can named it yourself. */
private static final String IDENTIFIER = "your_factory";
@Override
public String factoryIdentifier() {
return IDENTIFIER;
}
@Override
public CatalogModificationListener createListener(Context context) {
return new YourCatalogListener(Create http client from context);
}
}
/** Customized catalog modification listener. */
public class YourCatalogListener implements CatalogModificationListener {
private final HttpClient client;
YourCatalogListener(HttpClient client) {
this.client = client;
}
@Override
public void onEvent(CatalogModificationEvent event) {
// Report the database and table information via http client.
}
}
```

### Register Catalog Listener

After implemented above catalog modification factory and listener, you can register it to the table environment.

```
Configuration configuration = new Configuration();
// Add the factory identifier, you can set multiple listeners in the configuraiton.
configuration.set(TableConfigOptions.TABLE_CATALOG_MODIFICATION_LISTENERS, Arrays.asList("your_factory"));
TableEnvironment env = TableEnvironment.create(
EnvironmentSettings.newInstance()
.withConfiguration(configuration)
.build());
// Create/Alter/Drop database and table.
env.executeSql("CREATE TABLE ...").wait();
```

For sql-gateway, you can add the option `table.catalog-modification.listeners` in the `flink-conf.yaml` and start
the gateway, or you can also use `SET` to specify the listener for ddl, for example, in sql-client or jdbc-driver.

```
Flink SQL> SET 'table.catalog-modification.listeners' = 'your_factory';
Flink SQL> CREATE TABLE test_table(...);
```

0 comments on commit 7a5500e

Please sign in to comment.