Skip to content

Commit

Permalink
[fix][broker] Expose timestamp field for SchemaData&SchemaInfo (apach…
Browse files Browse the repository at this point in the history
…e#16380)

Fixes apache#16379

### Motivation

Miss the timestamp field when constructing SchemaData from `SchemaRegistryFormat.SchemaInfo`, so timestamp value 0 gets the scheme.
See: https://github.com/apache/pulsar/blob/95b984694bd3c4f1eadf4e6198de033f9b517128/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java#L621-L629

### Modifications

* Expose timestamp field for SchemaData
* Add timestamp field for SchemaInfo
  • Loading branch information
coderzc authored Jul 12, 2022
1 parent 59658cb commit bd150c0
Show file tree
Hide file tree
Showing 12 changed files with 92 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,7 @@ static SchemaData schemaInfoToSchema(SchemaRegistryFormat.SchemaInfo info) {
.user(info.getUser())
.type(convertToDomainType(info.getType()))
.data(info.getSchema().toByteArray())
.timestamp(info.getTimestamp())
.isDeleted(info.getDeleted())
.props(toMap(info.getPropsList()))
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doReturn;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
Expand All @@ -41,6 +42,7 @@
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
import org.apache.pulsar.client.impl.schema.StringSchema;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
Expand Down Expand Up @@ -172,11 +174,13 @@ private <T> void testSchemaInfoApi(Schema<T> schema,
SchemaInfo readSi = admin.schemas().getSchemaInfo(topicName);
log.info("Read schema of topic {} : {}", topicName, readSi);

((SchemaInfoImpl)readSi).setTimestamp(0);
assertEquals(readSi, si);

readSi = admin.schemas().getSchemaInfo(topicName + "-partition-0");
log.info("Read schema of topic {} : {}", topicName, readSi);

((SchemaInfoImpl)readSi).setTimestamp(0);
assertEquals(readSi, si);

}
Expand Down Expand Up @@ -225,12 +229,14 @@ private <T> void testSchemaInfoWithVersionApi(Schema<T> schema,
SchemaInfoWithVersion readSi = admin.schemas().getSchemaInfoWithVersion(topicName);
log.info("Read schema of topic {} : {}", topicName, readSi);

((SchemaInfoImpl)readSi.getSchemaInfo()).setTimestamp(0);
assertEquals(readSi.getSchemaInfo(), si);
assertEquals(readSi.getVersion(), 0);

readSi = admin.schemas().getSchemaInfoWithVersion(topicName + "-partition-0");
log.info("Read schema of topic {} : {}", topicName, readSi);

((SchemaInfoImpl)readSi.getSchemaInfo()).setTimestamp(0);
assertEquals(readSi.getSchemaInfo(), si);
assertEquals(readSi.getVersion(), 0);

Expand All @@ -242,11 +248,19 @@ public void createKeyValueSchema(ApiVersion version) throws Exception {
"test");
String topicName = "persistent://"+namespace + "/test-key-value-schema";
Schema keyValueSchema = Schema.KeyValue(Schema.AVRO(Foo.class), Schema.AVRO(Foo.class));
admin.schemas().createSchema(topicName,
keyValueSchema.getSchemaInfo());
admin.schemas().createSchema(topicName, keyValueSchema.getSchemaInfo());
SchemaInfo schemaInfo = admin.schemas().getSchemaInfo(topicName);

long timestamp = schemaInfo.getTimestamp();
assertNotEquals(keyValueSchema.getSchemaInfo().getTimestamp(), timestamp);
assertNotEquals(0, timestamp);

((SchemaInfoImpl)keyValueSchema.getSchemaInfo()).setTimestamp(schemaInfo.getTimestamp());
assertEquals(keyValueSchema.getSchemaInfo(), schemaInfo);

admin.schemas().createSchema(topicName, keyValueSchema.getSchemaInfo());
SchemaInfo schemaInfo2 = admin.schemas().getSchemaInfo(topicName);
assertEquals(timestamp, schemaInfo2.getTimestamp());
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,18 @@
*/
package org.apache.pulsar.broker.admin;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;
import com.google.common.collect.Sets;
import io.jsonwebtoken.Jwts;
import io.jsonwebtoken.SignatureAlgorithm;
import java.util.Base64;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import javax.crypto.SecretKey;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
Expand All @@ -30,6 +39,7 @@
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
Expand All @@ -38,15 +48,6 @@
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import javax.crypto.SecretKey;
import java.util.Base64;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;
/**
* Unit tests for schema admin api.
*/
Expand Down Expand Up @@ -117,10 +118,12 @@ public void testGetCreateDeleteSchema() throws Exception {

assertThrows(PulsarAdminException.class, () -> adminWithoutPermission.schemas().getSchemaInfo(topicName));
SchemaInfo readSi = adminWithConsumePermission.schemas().getSchemaInfo(topicName);
((SchemaInfoImpl)readSi).setTimestamp(0);
assertEquals(readSi, si);

assertThrows(PulsarAdminException.class, () -> adminWithoutPermission.schemas().getSchemaInfo(topicName, 0));
readSi = adminWithConsumePermission.schemas().getSchemaInfo(topicName, 0);
((SchemaInfoImpl)readSi).setTimestamp(0);
assertEquals(readSi, si);
List<SchemaInfo> allSchemas = adminWithConsumePermission.schemas().getAllSchemas(topicName);
assertEquals(allSchemas.size(), 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,39 +18,46 @@
*/
package org.apache.pulsar.client.api;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Cleanup;
import lombok.Data;
import lombok.NoArgsConstructor;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import java.io.ByteArrayInputStream;
import java.io.EOFException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Cleanup;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.Schema.Parser;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
import org.apache.pulsar.common.schema.LongSchemaVersion;
import org.apache.avro.reflect.ReflectData;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException.IncompatibleSchemaException;
import org.apache.pulsar.client.api.PulsarClientException.InvalidMessageException;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.BinaryProtoLookupService;
import org.apache.pulsar.client.impl.HttpLookupService;
import org.apache.pulsar.client.impl.LookupService;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
import org.apache.pulsar.client.impl.schema.reader.AvroReader;
import org.apache.pulsar.client.impl.schema.writer.AvroWriter;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.LongSchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.testng.Assert;
Expand All @@ -60,15 +67,6 @@
import org.testng.annotations.Factory;
import org.testng.annotations.Test;

import java.io.ByteArrayInputStream;
import java.io.EOFException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

@Test(groups = "broker-api")
@Slf4j
public class SimpleSchemaTest extends ProducerConsumerBase {
Expand Down Expand Up @@ -455,6 +453,9 @@ public void newProducerForMessageSchemaOnTopicInitialWithNoSchema() throws Excep
}

List<SchemaInfo> allSchemas = admin.schemas().getAllSchemas(topic);
allSchemas.forEach(schemaInfo -> {
((SchemaInfoImpl)schemaInfo).setTimestamp(0);
});
Assert.assertEquals(allSchemas, Arrays.asList(v1Schema.getSchemaInfo(),
v2Schema.getSchemaInfo()));
}
Expand Down Expand Up @@ -493,6 +494,9 @@ public void newNativeAvroProducerForMessageSchemaOnTopicInitialWithNoSchema() th
}

List<SchemaInfo> allSchemas = admin.schemas().getAllSchemas(topic);
allSchemas.forEach(schemaInfo -> {
((SchemaInfoImpl)schemaInfo).setTimestamp(0);
});
Assert.assertEquals(allSchemas, Arrays.asList(v1Schema.getSchemaInfo(),
v2Schema.getSchemaInfo()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,13 @@

import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
import static org.apache.pulsar.schema.compatibility.SchemaCompatibilityCheckTest.randomName;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals;

import lombok.EqualsAndHashCode;
import org.apache.avro.Schema.Parser;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.Sets;
import java.io.ByteArrayInputStream;
Expand All @@ -45,7 +41,9 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import lombok.Cleanup;
import lombok.EqualsAndHashCode;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Schema.Parser;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
Expand Down Expand Up @@ -986,6 +984,9 @@ public void testProducerMultipleSchemaMessages() throws Exception {
producer.newMessage(Schema.NATIVE_AVRO(personThreeSchemaAvroNative)).value(content).send();

List<SchemaInfo> allSchemas = admin.schemas().getAllSchemas(topic);
allSchemas.forEach(schemaInfo -> {
((SchemaInfoImpl)schemaInfo).setTimestamp(0);
});
Assert.assertEquals(allSchemas.size(), 5);
Assert.assertEquals(allSchemas.get(0), Schema.STRING.getSchemaInfo());
Assert.assertEquals(allSchemas.get(1), Schema.JSON(Schemas.PersonThree.class).getSchemaInfo());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@ static SchemaInfo convertGetSchemaResponseToSchemaInfo(TopicName tn,
return SchemaInfo.builder()
.schema(schema)
.type(response.getType())
.timestamp(response.getTimestamp())
.properties(response.getProperties())
.name(tn.getLocalName())
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,5 +252,6 @@ static byte[] getBytes(ByteBuffer byteBuffer) {
return array;
}

SchemaInfo newSchemaInfoImpl(String name, byte[] schema, SchemaType type, Map<String, String> propertiesValue);
SchemaInfo newSchemaInfoImpl(String name, byte[] schema, SchemaType type, long timestamp,
Map<String, String> propertiesValue);
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ public interface SchemaInfo {
*/
Map<String, String> getProperties();

/**
* The created time of schema.
*/
long getTimestamp();

String getSchemaDefinition();

static SchemaInfoBuilder builder() {
Expand All @@ -60,6 +65,7 @@ class SchemaInfoBuilder {
private SchemaType type;
private Map<String, String> properties;
private boolean propertiesSet;
private long timestamp;

SchemaInfoBuilder() {
}
Expand All @@ -85,14 +91,19 @@ public SchemaInfoBuilder properties(Map<String, String> properties) {
return this;
}

public SchemaInfoBuilder timestamp(long timestamp) {
this.timestamp = timestamp;
return this;
}

public SchemaInfo build() {
Map<String, String> propertiesValue = this.properties;
if (!this.propertiesSet) {
propertiesValue = Collections.emptyMap();
}
return DefaultImplementation
.getDefaultImplementation()
.newSchemaInfoImpl(name, schema, type, propertiesValue);
.newSchemaInfoImpl(name, schema, type, timestamp, propertiesValue);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -387,8 +387,8 @@ public MessagePayloadFactory newDefaultMessagePayloadFactory() {
return new MessagePayloadFactoryImpl();
}

public SchemaInfo newSchemaInfoImpl(
String name, byte[] schema, SchemaType type, Map<String, String> propertiesValue) {
return new SchemaInfoImpl(name, schema, type, propertiesValue);
public SchemaInfo newSchemaInfoImpl(String name, byte[] schema, SchemaType type, long timestamp,
Map<String, String> propertiesValue) {
return new SchemaInfoImpl(name, schema, type, timestamp, propertiesValue);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public SchemaInfo build(SchemaType schemaType) {
name,
baseSchema.toString().getBytes(UTF_8),
schemaType,
System.currentTimeMillis(),
properties
);
}
Expand Down
Loading

0 comments on commit bd150c0

Please sign in to comment.