Skip to content

Commit

Permalink
Fix DoubleByteBuf to send large size messages in TLS mode (apache#447)
Browse files Browse the repository at this point in the history
  • Loading branch information
nkurihar authored and merlimat committed Jun 5, 2017
1 parent 5c6c9c3 commit 049e428
Show file tree
Hide file tree
Showing 4 changed files with 171 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ public void testTlsSyncProducerAndConsumer(int batchMessageDelayMs) throws Excep
* @throws Exception
*/
@Test
public void testAuthemticationFilterNegative() throws Exception {
public void testAuthenticationFilterNegative() throws Exception {
log.info("-- Starting {} test --", methodName);

Map<String, String> authParams = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.client.api;

import static org.mockito.Mockito.spy;

import java.net.URI;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;

import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.yahoo.pulsar.client.admin.PulsarAdmin;
import com.yahoo.pulsar.common.policies.data.ClusterData;
import com.yahoo.pulsar.common.policies.data.PropertyAdmin;

public class TlsProducerConsumerTest extends ProducerConsumerBase {
private static final Logger log = LoggerFactory.getLogger(AuthenticatedProducerConsumerTest.class);

private final String TLS_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/cacert.pem";
private final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/authentication/tls/broker-cert.pem";
private final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/authentication/tls/broker-key.pem";

@BeforeMethod
@Override
protected void setup() throws Exception {

conf.setTlsEnabled(true);
conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);

conf.setClusterName("use");

super.init();
}

protected final void internalSetupForTls() throws Exception {

com.yahoo.pulsar.client.api.ClientConfiguration clientConf = new com.yahoo.pulsar.client.api.ClientConfiguration();
clientConf.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
clientConf.setUseTls(true);

admin = spy(new PulsarAdmin(brokerUrlTls, clientConf));
String lookupUrl = new URI("pulsar+ssl://localhost:" + BROKER_PORT_TLS).toString();
pulsarClient = PulsarClient.create(lookupUrl, clientConf);
}

@AfterMethod
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}

/**
* verifies that messages whose size is larger than 2^14 bytes (max size of single TLS chunk) can be
* produced/consumed
*
* @throws Exception
*/
@Test
public void testTlsLargeSizeMessage() throws Exception {
log.info("-- Starting {} test --", methodName);

final int MESSAGE_SIZE = 16 * 1024 + 1;
log.info("-- message size --", MESSAGE_SIZE);

internalSetupForTls();

admin.clusters().createCluster("use", new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
"pulsar://localhost:" + BROKER_PORT, "pulsar+ssl://localhost:" + BROKER_PORT_TLS));
admin.properties().createProperty("my-property",
new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use")));
admin.namespaces().createNamespace("my-property/use/my-ns");

ConsumerConfiguration conf = new ConsumerConfiguration();
conf.setSubscriptionType(SubscriptionType.Exclusive);
Consumer consumer = pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic1", "my-subscriber-name",
conf);

ProducerConfiguration producerConf = new ProducerConfiguration();

Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic1", producerConf);
for (int i = 0; i < 10; i++) {
byte[] message = new byte[MESSAGE_SIZE];
Arrays.fill(message, (byte) i);
producer.send(message);
}

Message msg = null;
for (int i = 0; i < 10; i++) {
msg = consumer.receive(5, TimeUnit.SECONDS);
byte[] expected = new byte[MESSAGE_SIZE];
Arrays.fill(expected, (byte) i);
Assert.assertArrayEquals(expected, msg.getData());
}
// Acknowledge the consumption of all messages at once
consumer.acknowledgeCumulative(msg);
consumer.close();
log.info("-- Exiting {} test --", methodName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,6 @@ public int capacity() {
return b1.capacity() + b2.capacity();
}

@Override
public int readableBytes() {
return b1.readableBytes() + b2.readableBytes();
}

@Override
public int writableBytes() {
return 0;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.common.api;

import static org.testng.Assert.assertEquals;
import org.testng.annotations.Test;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;

public class DoubleByteBufTest {

/**
* Verify that readableBytes() returns writerIndex - readerIndex. In this case writerIndex is the end of the buffer
* and readerIndex is increased by 64.
*
* @throws Exception
*/
@Test
public void testReadableBytes() throws Exception {

ByteBuf b1 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128);
b1.writerIndex(b1.capacity());
ByteBuf b2 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128);
b2.writerIndex(b2.capacity());
ByteBuf buf = DoubleByteBuf.get(b1, b2);

assertEquals(buf.readerIndex(), 0);
assertEquals(buf.writerIndex(), 256);
assertEquals(buf.readableBytes(), 256);

for (int i = 0; i < 4; ++i) {
buf.skipBytes(64);
assertEquals(buf.readableBytes(), 256 - 64 * (i + 1));
}
}
}

0 comments on commit 049e428

Please sign in to comment.