Skip to content

Commit

Permalink
[improve][io] Remove kafka-connect-avro-converter-shaded (apache#19468)
Browse files Browse the repository at this point in the history
Signed-off-by: tison <[email protected]>
  • Loading branch information
tisonkun authored Feb 9, 2023
1 parent 329c8c0 commit b969fe5
Show file tree
Hide file tree
Showing 9 changed files with 16 additions and 154 deletions.
1 change: 0 additions & 1 deletion build/run_unit_group.sh
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,6 @@ function test_group_other() {
}

function test_group_pulsar_io() {
$MVN_TEST_OPTIONS -pl kafka-connect-avro-converter-shaded clean install
echo "::group::Running pulsar-io tests"
mvn_test --install -Ppulsar-io-tests,-main
echo "::endgroup::"
Expand Down
118 changes: 0 additions & 118 deletions kafka-connect-avro-converter-shaded/pom.xml

This file was deleted.

3 changes: 0 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2149,9 +2149,6 @@ flexible messaging model and an intuitive client API.</description>
<!-- connector-related modules -->
<module>pulsar-io</module>

<!-- kafka connect avro converter shaded, because version mismatch -->
<module>kafka-connect-avro-converter-shaded</module>

<!-- Bouncy Castle Provider loaders-->
<module>bouncy-castle</module>

Expand Down
20 changes: 4 additions & 16 deletions pulsar-io/kafka-connect-adaptor/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -104,25 +104,13 @@
<artifactId>commons-lang3</artifactId>
</dependency>

<!-- confluent connect avro converter -->
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>kafka-connect-avro-converter-shaded</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<groupId>io.confluent</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
<groupId>io.confluent</groupId>
<artifactId>kafka-connect-avro-converter</artifactId>
<version>${confluent.version}</version>
</dependency>




<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-broker</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
*/
package org.apache.pulsar.io.kafka.connect;

import io.confluent.connect.avro.AvroConverter;
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
Expand All @@ -43,9 +46,6 @@
import org.apache.pulsar.io.core.Source;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.kafka.connect.schema.KafkaSchemaWrappedSchema;
import org.apache.pulsar.kafka.shade.io.confluent.connect.avro.AvroConverter;
import org.apache.pulsar.kafka.shade.io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
import org.apache.pulsar.kafka.shade.io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;

/**
* A pulsar source that runs.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import io.confluent.connect.avro.AvroData;
import java.util.Base64;
import java.util.Map;
import java.util.Optional;
Expand All @@ -34,7 +35,6 @@
import org.apache.pulsar.functions.api.KVRecord;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.kafka.connect.schema.KafkaSchemaWrappedSchema;
import org.apache.pulsar.kafka.shade.io.confluent.connect.avro.AvroData;

/**
* A pulsar source that runs.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,9 @@
@Slf4j
public class KafkaSchemaWrappedSchema implements Schema<byte[]>, Serializable {

private SchemaInfo schemaInfo = null;
private final SchemaInfo schemaInfo;

public KafkaSchemaWrappedSchema(org.apache.pulsar.kafka.shade.avro.Schema schema,
Converter converter) {
public KafkaSchemaWrappedSchema(org.apache.avro.Schema schema, Converter converter) {
Map<String, String> props = new HashMap<>();
boolean isJsonConverter = converter instanceof JsonConverter;
props.put(GenericAvroSchema.OFFSET_PROP, isJsonConverter ? "0" : "5");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ExecutionError;
import com.google.common.util.concurrent.UncheckedExecutionException;
import io.confluent.connect.avro.AvroData;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand All @@ -37,7 +38,6 @@
import org.apache.kafka.connect.errors.DataException;
import org.apache.pulsar.client.api.schema.KeyValueSchema;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.kafka.shade.io.confluent.connect.avro.AvroData;

@Slf4j
public class PulsarSchemaToKafkaSchema {
Expand Down Expand Up @@ -74,9 +74,8 @@ public static boolean matchesToKafkaLogicalSchema(Schema kafkaSchema) {
}

// Parse json to shaded schema
private static org.apache.pulsar.kafka.shade.avro.Schema parseAvroSchema(String schemaJson) {
final org.apache.pulsar.kafka.shade.avro.Schema.Parser parser =
new org.apache.pulsar.kafka.shade.avro.Schema.Parser();
private static org.apache.avro.Schema parseAvroSchema(String schemaJson) {
final org.apache.avro.Schema.Parser parser = new org.apache.avro.Schema.Parser();
parser.setValidateDefaults(false);
return parser.parse(schemaJson);
}
Expand Down Expand Up @@ -126,9 +125,8 @@ public static Schema getKafkaConnectSchema(org.apache.pulsar.client.api.Schema p
getKafkaConnectSchema(kvSchema.getValueSchema()))
.build();
}
org.apache.pulsar.kafka.shade.avro.Schema avroSchema =
parseAvroSchema(new String(pulsarSchema.getSchemaInfo().getSchema(),
StandardCharsets.UTF_8));
org.apache.avro.Schema avroSchema = parseAvroSchema(
new String(pulsarSchema.getSchemaInfo().getSchema(), StandardCharsets.UTF_8));
return avroData.toConnectSchema(avroSchema);
});
} catch (ExecutionException | UncheckedExecutionException | ExecutionError ee) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ public void testDebeziumMySqlSourceJsonWithClientBuilder() throws Exception {

@Test(groups = "source")
public void testDebeziumMySqlSourceAvro() throws Exception {
testDebeziumMySqlConnect(
"org.apache.pulsar.kafka.shade.io.confluent.connect.avro.AvroConverter", false, false);
testDebeziumMySqlConnect("io.confluent.connect.avro.AvroConverter", false, false);
}

@Test(groups = "source")
Expand Down

0 comments on commit b969fe5

Please sign in to comment.