Skip to content

Commit

Permalink
addItemPatchSupportInKafkaConnector (Azure#39558)
Browse files Browse the repository at this point in the history
* add itemPatch support

---------

Co-authored-by: annie-mac <[email protected]>
  • Loading branch information
xinlian12 and annie-mac authored Apr 11, 2024
1 parent 75a8e49 commit b148f27
Show file tree
Hide file tree
Showing 29 changed files with 950 additions and 118 deletions.
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-kafka-connect/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
* Added Sink connector. See [PR 39434](https://github.com/Azure/azure-sdk-for-java/pull/39434)
* Added throughput control support. See [PR 39218](https://github.com/Azure/azure-sdk-for-java/pull/39218)
* Added `ServicePrincipal` support - See [PR 39490](https://github.com/Azure/azure-sdk-for-java/pull/39490)
* Added `ItemPatch support` in sink connector - See [PR 39558](https://github.com/Azure/azure-sdk-for-java/pull/39558)

#### Breaking Changes

Expand Down

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion sdk/cosmos/azure-cosmos-kafka-connect/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ Licensed under the MIT License.
--add-opens com.azure.cosmos/com.azure.cosmos.implementation.routing=ALL-UNNAMED
--add-opens com.azure.cosmos.kafka.connect/com.azure.cosmos.kafka.connect=ALL-UNNAMED
--add-opens com.azure.cosmos.kafka.connect/com.azure.cosmos.kafka.connect.implementation=ALL-UNNAMED
--add-opens com.azure.cosmos.kafka.connect/com.azure.cosmos.kafka.connect.implementation.sink=ALL-UNNAMED
--add-opens com.azure.cosmos.kafka.connect/com.azure.cosmos.kafka.connect.implementation.sink=ALL-UNNAMED,com.fasterxml.jackson.databind
--add-opens com.azure.cosmos.kafka.connect/com.azure.cosmos.kafka.connect.implementation.sink.idStrategy=ALL-UNNAMED
--add-opens com.azure.cosmos.kafka.connect/com.azure.cosmos.kafka.connect.implementation.source=com.fasterxml.jackson.databind,ALL-UNNAMED

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import static com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConfig.validateCosmosAccountAuthConfig;
import static com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConfig.validateThroughputControlConfig;
import static com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConfig.validateWriteConfig;

/**
* A Sink connector that publishes topic messages to CosmosDB.
Expand Down Expand Up @@ -84,6 +85,7 @@ public Config validate(Map<String, String> connectorConfigs) {

validateCosmosAccountAuthConfig(configValues);
validateThroughputControlConfig(configValues);
validateWriteConfig(configValues);
return config;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ public class CosmosAadAuthConfig implements CosmosAuthConfig {
private final String clientId;
private final String clientSecret;
private final String tenantId;
private final CosmosAzureEnvironments azureEnvironment;
private final CosmosAzureEnvironment azureEnvironment;

public CosmosAadAuthConfig(String clientId, String clientSecret, String tenantId, CosmosAzureEnvironments azureEnvironment) {
public CosmosAadAuthConfig(String clientId, String clientSecret, String tenantId, CosmosAzureEnvironment azureEnvironment) {
checkArgument(StringUtils.isNotEmpty(clientId), "Argument 'clientId' should not be null");
checkArgument(StringUtils.isNotEmpty(clientSecret), "Argument 'clientSecret' should not be null");
checkArgument(StringUtils.isNotEmpty(tenantId), "Argument 'tenantId' should not be null");
Expand All @@ -36,7 +36,7 @@ public String getTenantId() {
return tenantId;
}

public CosmosAzureEnvironments getAzureEnvironment() {
public CosmosAzureEnvironment getAzureEnvironment() {
return azureEnvironment;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,22 @@

package com.azure.cosmos.kafka.connect.implementation;

public enum CosmosAuthTypes {
public enum CosmosAuthType {
MASTER_KEY("MasterKey"),
SERVICE_PRINCIPAL("ServicePrincipal");

private final String name;

CosmosAuthTypes(String name) {
CosmosAuthType(String name) {
this.name = name;
}

public String getName() {
return name;
}

public static CosmosAuthTypes fromName(String name) {
for (CosmosAuthTypes authTypes : CosmosAuthTypes.values()) {
public static CosmosAuthType fromName(String name) {
for (CosmosAuthType authTypes : CosmosAuthType.values()) {
if (authTypes.getName().equalsIgnoreCase(name)) {
return authTypes;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,24 @@

package com.azure.cosmos.kafka.connect.implementation;

public enum CosmosAzureEnvironments {
public enum CosmosAzureEnvironment {
AZURE("azure"),
AZURE_CHINA("AzureChina"),
AZURE_US_GOVERNMENT("AzureUsGovernment"),
AZURE_GERMANY("AzureGermany");

private final String name;

CosmosAzureEnvironments(String name) {
CosmosAzureEnvironment(String name) {
this.name = name;
}

public String getName() {
return name;
}

public static CosmosAzureEnvironments fromName(String name) {
for (CosmosAzureEnvironments azureEnvironment : CosmosAzureEnvironments.values()) {
public static CosmosAzureEnvironment fromName(String name) {
for (CosmosAzureEnvironment azureEnvironment : CosmosAzureEnvironment.values()) {
if (azureEnvironment.getName().equalsIgnoreCase(name)) {
return azureEnvironment;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@

public class CosmosClientStore {
// TODO[Public Preview]: revalidate how to get the active directory endpoint map. It suppose to come from management SDK.
private static final Map<CosmosAzureEnvironments, String> ACTIVE_DIRECTORY_ENDPOINT_MAP;
private static final Map<CosmosAzureEnvironment, String> ACTIVE_DIRECTORY_ENDPOINT_MAP;
static {
ACTIVE_DIRECTORY_ENDPOINT_MAP = new HashMap<>();
ACTIVE_DIRECTORY_ENDPOINT_MAP.put(CosmosAzureEnvironments.AZURE, "https://login.microsoftonline.com/");
ACTIVE_DIRECTORY_ENDPOINT_MAP.put(CosmosAzureEnvironments.AZURE_CHINA, "https://login.chinacloudapi.cn/");
ACTIVE_DIRECTORY_ENDPOINT_MAP.put(CosmosAzureEnvironments.AZURE_US_GOVERNMENT, "https://login.microsoftonline.us/");
ACTIVE_DIRECTORY_ENDPOINT_MAP.put(CosmosAzureEnvironments.AZURE_GERMANY, "https://login.microsoftonline.de/");
ACTIVE_DIRECTORY_ENDPOINT_MAP.put(CosmosAzureEnvironment.AZURE, "https://login.microsoftonline.com/");
ACTIVE_DIRECTORY_ENDPOINT_MAP.put(CosmosAzureEnvironment.AZURE_CHINA, "https://login.chinacloudapi.cn/");
ACTIVE_DIRECTORY_ENDPOINT_MAP.put(CosmosAzureEnvironment.AZURE_US_GOVERNMENT, "https://login.microsoftonline.us/");
ACTIVE_DIRECTORY_ENDPOINT_MAP.put(CosmosAzureEnvironment.AZURE_GERMANY, "https://login.microsoftonline.de/");
}

public static CosmosAsyncClient getCosmosClient(CosmosAccountConfig accountConfig) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import com.azure.cosmos.implementation.Strings;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.kafka.connect.implementation.sink.ItemWriteStrategy;
import com.azure.cosmos.kafka.connect.implementation.source.CosmosSourceContainersConfig;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
Expand All @@ -18,8 +19,13 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.stream.Collectors;

import static com.azure.cosmos.kafka.connect.implementation.sink.CosmosSinkConfig.PATCH_PROPERTY_CONFIGS;
import static com.azure.cosmos.kafka.connect.implementation.sink.CosmosSinkConfig.PATCH_PROPERTY_CONFIG_PATTERN;
import static com.azure.cosmos.kafka.connect.implementation.sink.CosmosSinkConfig.WRITE_STRATEGY;

/**
* Common Configuration for Cosmos DB Kafka source connector and sink connector.
*/
Expand All @@ -35,7 +41,7 @@ public class KafkaCosmosConfig extends AbstractConfig {
private static final String ACCOUNT_AZURE_ENVIRONMENT = CONFIG_PREFIX + "account.azureEnvironment";
private static final String ACCOUNT_AZURE_ENVIRONMENT_DOC = "The azure environment of the CosmosDB account: `Azure`, `AzureChina`, `AzureUsGovernment`, `AzureGermany`.";
private static final String ACCOUNT_AZURE_ENVIRONMENT_DISPLAY = "The azure environment of the CosmosDB account.";
private static final String DEFAULT_ACCOUNT_AZURE_ENVIRONMENT = CosmosAzureEnvironments.AZURE.getName();
private static final String DEFAULT_ACCOUNT_AZURE_ENVIRONMENT = CosmosAzureEnvironment.AZURE.getName();

private static final String ACCOUNT_TENANT_ID = CONFIG_PREFIX + "account.tenantId";
private static final String ACCOUNT_TENANT_ID_DOC = "The tenantId of the CosmosDB account. Required for `ServicePrincipal` authentication.";
Expand All @@ -46,7 +52,7 @@ public class KafkaCosmosConfig extends AbstractConfig {
private static final String AUTH_TYPE_DOC = "There are two auth types are supported currently: "
+ "`MasterKey`(PrimaryReadWriteKeys, SecondReadWriteKeys, PrimaryReadOnlyKeys, SecondReadWriteKeys), `ServicePrincipal`";
private static final String AUTH_TYPE_DISPLAY = "Cosmos Auth type.";
private static final String DEFAULT_AUTH_TYPE = CosmosAuthTypes.MASTER_KEY.getName();
private static final String DEFAULT_AUTH_TYPE = CosmosAuthType.MASTER_KEY.getName();

private static final String ACCOUNT_KEY = CONFIG_PREFIX + "accountKey";
private static final String ACCOUNT_KEY_DOC = "Cosmos DB Account Key (only required in case of `auth.type` as `MasterKey`)";
Expand Down Expand Up @@ -93,7 +99,7 @@ public class KafkaCosmosConfig extends AbstractConfig {
private static final String THROUGHPUT_CONTROL_ACCOUNT_AZURE_ENVIRONMENT = CONFIG_PREFIX + "throughputControl.account.azureEnvironment";
private static final String THROUGHPUT_CONTROL_ACCOUNT_AZURE_ENVIRONMENT_DOC = "The azure environment of the CosmosDB account: `Azure`, `AzureChina`, `AzureUsGovernment`, `AzureGermany`.";
private static final String THROUGHPUT_CONTROL_ACCOUNT_AZURE_ENVIRONMENT_DISPLAY = "The azure environment of the CosmosDB account.";
private static final String DEFAULT_THROUGHPUT_CONTROL_ACCOUNT_AZURE_ENVIRONMENT = CosmosAzureEnvironments.AZURE.getName();
private static final String DEFAULT_THROUGHPUT_CONTROL_ACCOUNT_AZURE_ENVIRONMENT = CosmosAzureEnvironment.AZURE.getName();

private static final String THROUGHPUT_CONTROL_ACCOUNT_TENANT_ID = CONFIG_PREFIX + "throughputControl.account.tenantId";
private static final String THROUGHPUT_CONTROL_ACCOUNT_TENANT_ID_DOC = "The tenantId of the CosmosDB account. Required for `ServicePrincipal` authentication.";
Expand All @@ -104,7 +110,7 @@ public class KafkaCosmosConfig extends AbstractConfig {
private static final String THROUGHPUT_CONTROL_AUTH_TYPE_DOC = "There are two auth types are supported currently: "
+ "`MasterKey`(PrimaryReadWriteKeys, SecondReadWriteKeys, PrimaryReadOnlyKeys, SecondReadWriteKeys), `ServicePrincipal`";
private static final String THROUGHPUT_CONTROL_AUTH_TYPE_DISPLAY = "Cosmos Auth type.";
private static final String DEFAULT_THROUGHPUT_CONTROL_AUTH_TYPE = CosmosAuthTypes.MASTER_KEY.getName();
private static final String DEFAULT_THROUGHPUT_CONTROL_AUTH_TYPE = CosmosAuthType.MASTER_KEY.getName();

private static final String THROUGHPUT_CONTROL_ACCOUNT_KEY = CONFIG_PREFIX + "throughputControl.accountKey";
private static final String THROUGHPUT_CONTROL_ACCOUNT_KEY_DOC = "Cosmos DB Throughput Control Account Key (only required in case of `throughputControl.auth.type` as `MasterKey`)";
Expand Down Expand Up @@ -219,9 +225,9 @@ private CosmosAccountConfig parseAccountConfigCore(
String preferredRegionListConfig) {

String endpoint = this.getString(accountEndpointConfig);
CosmosAzureEnvironments azureEnvironment = this.parseAzureEnvironment(accountAzureEnvironmentConfig);
CosmosAzureEnvironment azureEnvironment = this.parseAzureEnvironment(accountAzureEnvironmentConfig);
String tenantId = this.getString(accountTenantIdConfig);
CosmosAuthTypes authType = this.parseCosmosAuthType(authTypeConfig);
CosmosAuthType authType = this.parseCosmosAuthType(authTypeConfig);
String masterKey = this.getPassword(accountKeyConfig).value();
String clientId = this.getString(clientIdConfig);
String clientSecret = this.getPassword(clientSecretConfig).value();
Expand All @@ -240,9 +246,9 @@ private CosmosAccountConfig parseAccountConfigCore(
}

private CosmosAuthConfig getAuthConfig(
CosmosAzureEnvironments azureEnvironment,
CosmosAzureEnvironment azureEnvironment,
String tenantId,
CosmosAuthTypes authType,
CosmosAuthType authType,
String masterKey,
String clientId,
String clientSecret) {
Expand All @@ -257,14 +263,14 @@ private CosmosAuthConfig getAuthConfig(
}
}

private CosmosAzureEnvironments parseAzureEnvironment(String configName) {
private CosmosAzureEnvironment parseAzureEnvironment(String configName) {
String authType = this.getString(configName);
return CosmosAzureEnvironments.fromName(authType);
return CosmosAzureEnvironment.fromName(authType);
}

private CosmosAuthTypes parseCosmosAuthType(String configName) {
private CosmosAuthType parseCosmosAuthType(String configName) {
String authType = this.getString(configName);
return CosmosAuthTypes.fromName(authType);
return CosmosAuthType.fromName(authType);
}

private CosmosThroughputControlConfig parseThroughputControlConfig() {
Expand Down Expand Up @@ -740,8 +746,8 @@ public static void validateThroughputControlConfig(Map<String, ConfigValue> conf
String throughputControlAuthTypeString =
StringUtils.isNotEmpty(throughputControlAccountEndpoint)
? configValueMap.get(THROUGHPUT_CONTROL_AUTH_TYPE).value().toString() : configValueMap.get(AUTH_TYPE).value().toString();
CosmosAuthTypes throughputControlAuthType = CosmosAuthTypes.fromName(throughputControlAuthTypeString);
if (throughputControlAuthType == CosmosAuthTypes.SERVICE_PRINCIPAL) {
CosmosAuthType throughputControlAuthType = CosmosAuthType.fromName(throughputControlAuthTypeString);
if (throughputControlAuthType == CosmosAuthType.SERVICE_PRINCIPAL) {
if (targetThroughputThreshold > 0) {
configValueMap
.get(THROUGHPUT_CONTROL_TARGET_THROUGHPUT_THRESHOLD)
Expand Down Expand Up @@ -769,7 +775,7 @@ public static void validateAccountAuthConfigCore(
String clientIdConfig,
String clientSecretConfig) {

CosmosAuthTypes authType = CosmosAuthTypes.fromName(configValueMap.get(authTypeConfig).value().toString());
CosmosAuthType authType = CosmosAuthType.fromName(configValueMap.get(authTypeConfig).value().toString());
switch (authType) {
case MASTER_KEY:
String masterKey = ((Password) configValueMap.get(accountKeyConfig).value()).value();
Expand Down Expand Up @@ -806,6 +812,35 @@ public static void validateAccountAuthConfigCore(
}
}

public static void validateWriteConfig(Map<String, ConfigValue> configValueMap) {
ItemWriteStrategy itemWriteStrategy =
ItemWriteStrategy.fromName(configValueMap.get(WRITE_STRATEGY).value().toString());

if (itemWriteStrategy == ItemWriteStrategy.ITEM_PATCH) {
validatePatchPropertyConfig(configValueMap);
}
}

private static void validatePatchPropertyConfig(Map<String, ConfigValue> configValueMap) {
List<String> patchPropertyConfigs =
convertToList(configValueMap.get(PATCH_PROPERTY_CONFIGS).value().toString());

if (patchPropertyConfigs.size() == 0) {
return;
}

for (String propertyConfig : patchPropertyConfigs) {
Matcher matcher = PATCH_PROPERTY_CONFIG_PATTERN.matcher(propertyConfig);
if (!matcher.matches()) {
configValueMap
.get(PATCH_PROPERTY_CONFIGS)
.addErrorMessage("Patch property config is in valid format."
+ " Only allow property(jsonProperty).op(operationType) or property(jsonProperty).path(patchInCosmosdb).op(operationType)");
return;
}
}
}

public static class AccountEndpointValidator implements ConfigDef.Validator {
@Override
@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -873,15 +908,15 @@ public void ensureValid(String name, Object o) {
throw new ConfigException(name, o, "AuthType can not be empty or null");
}

CosmosAuthTypes authType = CosmosAuthTypes.fromName(authTypeString);
CosmosAuthType authType = CosmosAuthType.fromName(authTypeString);
if (authType == null) {
throw new ConfigException(name, o, "Invalid AuthType, only allow MasterKey or ServicePrincipal");
}
}

@Override
public String toString() {
return "AuthType. Only allow " + CosmosAuthTypes.values();
return "AuthType. Only allow " + CosmosAuthType.values();
}
}

Expand All @@ -894,15 +929,15 @@ public void ensureValid(String name, Object o) {
throw new ConfigException(name, o, "AzureEnvironment can not be empty or null");
}

CosmosAzureEnvironments azureEnvironment = CosmosAzureEnvironments.fromName(azureEnvironmentString);
CosmosAzureEnvironment azureEnvironment = CosmosAzureEnvironment.fromName(azureEnvironmentString);
if (azureEnvironment == null) {
throw new ConfigException(name, o, "Invalid AzureEnvironment, only allow `Azure`, `AzureChina`, `AzureUsGovernment`, `AzureGermany`");
}
}

@Override
public String toString() {
return "AzureEnvironment. Only allow " + CosmosAzureEnvironments.values();
return "AzureEnvironment. Only allow " + CosmosAzureEnvironment.values();
}
}
}
Loading

0 comments on commit b148f27

Please sign in to comment.