Skip to content

Commit

Permalink
[issue 6694][AVRO ENCODE] Reset cursor if message encode fails. (apac…
Browse files Browse the repository at this point in the history
…he#6695)

Fixes apache#6694 

### Motivation

If the avro encode for message fails after writing a few bytes, the cursor in the stream is not reset. The following **flush()** that would normally reset the cursor is skipped in the event of an exception.

### Modifications

Add **flush()** in the finally block.

### Test
Added test for verifying the fix
  • Loading branch information
shiv4289 authored Apr 14, 2020
1 parent d8be7c5 commit 7cffe2a
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,20 @@ public AvroWriter(Schema schema, boolean jsr310ConversionEnabled) {

@Override
public synchronized byte[] write(T message) {
byte[] outputBytes = null;
try {
writer.write(message, this.encoder);
this.encoder.flush();
return this.byteArrayOutputStream.toByteArray();
} catch (Exception e) {
throw new SchemaSerializationException(e);
} finally {
try {
this.encoder.flush();
outputBytes = this.byteArrayOutputStream.toByteArray();
} catch (Exception ex) {
throw new SchemaSerializationException(ex);
}
this.byteArrayOutputStream.reset();
}
return outputBytes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import lombok.extern.slf4j.Slf4j;

import org.apache.avro.Schema;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.BufferedBinaryEncoder;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.avro.SchemaValidationException;
Expand All @@ -51,12 +53,14 @@
import org.apache.pulsar.client.avro.generated.NasaMission;
import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Bar;
import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Foo;
import org.apache.pulsar.client.impl.schema.writer.AvroWriter;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.joda.time.DateTime;
import org.joda.time.chrono.ISOChronology;
import org.json.JSONException;
import org.skyscreamer.jsonassert.JSONAssert;
import org.powermock.reflect.Whitebox;
import org.testng.Assert;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -366,7 +370,26 @@ public void testDecodeByteBuf() {
Foo object1 = avroSchema.decode(byteBuf);
Assert.assertTrue(bytes1.length > 0);
assertEquals(object1, foo1);
}

@Test
public void discardBufferIfBadAvroData() {
AvroWriter<NasaMission> avroWriter = new AvroWriter<>(
ReflectData.AllowNull.get().getSchema(NasaMission.class));

NasaMission badNasaMissionData = new NasaMission();
badNasaMissionData.setId(1);
// set null in the non-null field. The java set will accept it but going ahead, the avro encode will crash.
badNasaMissionData.setName(null);

// Because data does not conform to schema expect a crash
Assert.assertThrows( SchemaSerializationException.class, () -> avroWriter.write(badNasaMissionData));

// Get the buffered data using powermock
BinaryEncoder encoder = Whitebox.getInternalState(avroWriter, "encoder");

// Assert that the buffer position is reset to zero
Assert.assertEquals(((BufferedBinaryEncoder)encoder).bytesBuffered(), 0);
}

}

0 comments on commit 7cffe2a

Please sign in to comment.