diff --git a/build/docker/Dockerfile b/build/docker/Dockerfile
index dd357942ee891..bcb17c0b19abd 100644
--- a/build/docker/Dockerfile
+++ b/build/docker/Dockerfile
@@ -28,7 +28,7 @@ RUN apt-get update && \
liblog4cxx-dev libprotobuf-dev libboost-all-dev google-mock libgtest-dev \
libjsoncpp-dev libxml2-utils protobuf-compiler wget \
curl doxygen openjdk-8-jdk-headless clang-format-5.0 \
- gnupg2 golang-1.10-go zip unzip libzstd-dev
+ gnupg2 golang-1.10-go zip unzip libzstd-dev libsnappy-dev
# Compile and install gtest
RUN cd /usr/src/gtest && cmake . && make && cp libgtest.a /usr/lib
diff --git a/pom.xml b/pom.xml
index 5b28f3d611234..ca7f482027226 100644
--- a/pom.xml
+++ b/pom.xml
@@ -194,6 +194,7 @@ flexible messaging model and an intuitive client API.
0.10.5
0.18.0
1.3.7-3
+ 1.1.1.3
1.4.9
@@ -487,6 +488,12 @@ flexible messaging model and an intuitive client API.
${zstd.version}
+
+ org.xerial.snappy
+ snappy-java
+ ${snappy.version}
+
+
org.slf4j
slf4j-api
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/CompressionType.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/CompressionType.java
index 49eb925acdd3d..180fe62b0cfc0 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/CompressionType.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/CompressionType.java
@@ -33,4 +33,7 @@ public enum CompressionType {
/** Compress with Zstandard codec */
ZSTD,
+
+ /** Compress with Snappy codec */
+ SNAPPY
}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
index 2224e51efcb20..a98036c475e20 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
@@ -260,6 +260,8 @@ public interface ProducerBuilder extends Cloneable {
* {@link CompressionType#ZLIB}: Standard ZLib compression
* {@link CompressionType#ZSTD} Compress with Zstandard codec. Since Pulsar 2.3. Zstd cannot be used if consumer
* applications are not in version >= 2.3 as well
+ * {@link CompressionType#SNAPPY} Compress with Snappy codec. Since Pulsar 2.4. Snappy cannot be used if consumer
+ * applications are not in version >= 2.4 as well
*
*
* @param compressionType
diff --git a/pulsar-client-cpp/CMakeLists.txt b/pulsar-client-cpp/CMakeLists.txt
index 34c58099de149..57df5f509e1d7 100644
--- a/pulsar-client-cpp/CMakeLists.txt
+++ b/pulsar-client-cpp/CMakeLists.txt
@@ -120,6 +120,8 @@ endif()
find_library(LIB_ZSTD zstd libzstd)
+find_library(LIB_SNAPPY snappy libsnappy)
+
if (BUILD_PYTHON_WRAPPER)
find_package(PythonLibs REQUIRED)
MESSAGE(STATUS "PYTHON: " ${PYTHONLIBS_VERSION_STRING})
@@ -183,6 +185,12 @@ else ()
endif ()
MESSAGE(STATUS "HAS_ZSTD: ${HAS_ZSTD}")
+if (LIB_SNAPPY)
+ set(HAS_SNAPPY 1)
+else ()
+ set(HAS_SNAPPY 0)
+endif ()
+MESSAGE(STATUS "HAS_SNAPPY: ${HAS_SNAPPY}")
set(ADDITIONAL_LIBRARIES $ENV{PULSAR_ADDITIONAL_LIBRARIES})
link_directories( $ENV{PULSAR_ADDITIONAL_LIBRARY_PATH} )
@@ -241,6 +249,12 @@ endif ()
add_definitions(-DHAS_ZSTD=${HAS_ZSTD})
+if (HAS_SNAPPY)
+ set(COMMON_LIBS ${COMMON_LIBS} ${LIB_SNAPPY} )
+endif ()
+
+add_definitions(-DHAS_SNAPPY=${HAS_SNAPPY})
+
if(NOT APPLE AND NOT MSVC)
set(COMMON_LIBS ${COMMON_LIBS} rt)
endif ()
diff --git a/pulsar-client-cpp/docker/Dockerfile b/pulsar-client-cpp/docker/Dockerfile
index 965d8ff77755e..043ebf0e65c41 100644
--- a/pulsar-client-cpp/docker/Dockerfile
+++ b/pulsar-client-cpp/docker/Dockerfile
@@ -149,6 +149,14 @@ RUN curl -O -L https://github.com/facebook/zstd/releases/download/v1.3.7/zstd-1.
make install && \
rm -rf /zstd-1.3.7 /zstd-1.3.7.tar.gz
+# Snappy
+RUN curl -O -L https://github.com/google/snappy/releases/download/1.1.3/snappy-1.1.3.tar.gz && \
+ tar xvfz snappy-1.1.3.tar.gz && \
+ cd snappy-1.1.3 && \
+ CFLAGS="-fPIC -O3" ./configure && \
+ make && make install && \
+ rm -rf /snappy-1.1.3 /snappy-1.1.3.tar.gz
+
RUN pip install twine
RUN pip install fastavro
RUN pip install six
diff --git a/pulsar-client-cpp/include/pulsar/CompressionType.h b/pulsar-client-cpp/include/pulsar/CompressionType.h
index 2306b149ea0c4..6fd663a5ba717 100644
--- a/pulsar-client-cpp/include/pulsar/CompressionType.h
+++ b/pulsar-client-cpp/include/pulsar/CompressionType.h
@@ -26,6 +26,7 @@ enum CompressionType
CompressionLZ4 = 1,
CompressionZLib = 2,
CompressionZSTD = 3,
+ CompressionSNAPPY = 4
};
}
diff --git a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
index 0708086af999f..2de22d877b5ff 100644
--- a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
@@ -100,6 +100,8 @@ class PULSAR_PUBLIC ProducerConfiguration {
* {@link CompressionZLib}: ZLib Compression http://zlib.net/
* {@link CompressionZSTD}: Zstandard Compression https://facebook.github.io/zstd/ (Since Pulsar 2.3.
* Zstd cannot be used if consumer applications are not in version >= 2.3 as well)
+ * {@link CompressionSNAPPY}: Snappy Compression https://google.github.io/snappy/ (Since Pulsar 2.4.
+ * Snappy cannot be used if consumer applications are not in version >= 2.4 as well)
*
*/
ProducerConfiguration& setCompressionType(CompressionType compressionType);
diff --git a/pulsar-client-cpp/lib/CompressionCodec.cc b/pulsar-client-cpp/lib/CompressionCodec.cc
index 183439f6063dc..b092cf92299f9 100644
--- a/pulsar-client-cpp/lib/CompressionCodec.cc
+++ b/pulsar-client-cpp/lib/CompressionCodec.cc
@@ -20,6 +20,7 @@
#include "CompressionCodecLZ4.h"
#include "CompressionCodecZLib.h"
#include "CompressionCodecZstd.h"
+#include "CompressionCodecSnappy.h"
#include
@@ -30,6 +31,7 @@ CompressionCodecNone CompressionCodecProvider::compressionCodecNone_;
CompressionCodecLZ4 CompressionCodecProvider::compressionCodecLZ4_;
CompressionCodecZLib CompressionCodecProvider::compressionCodecZLib_;
CompressionCodecZstd CompressionCodecProvider::compressionCodecZstd_;
+CompressionCodecSnappy CompressionCodecProvider::compressionCodecSnappy_;
CompressionCodec& CompressionCodecProvider::getCodec(CompressionType compressionType) {
switch (compressionType) {
@@ -39,6 +41,8 @@ CompressionCodec& CompressionCodecProvider::getCodec(CompressionType compression
return compressionCodecZLib_;
case CompressionZSTD:
return compressionCodecZstd_;
+ case CompressionSNAPPY:
+ return compressionCodecSnappy_;
default:
return compressionCodecNone_;
}
@@ -54,6 +58,8 @@ CompressionType CompressionCodecProvider::convertType(proto::CompressionType typ
return CompressionZLib;
case proto::ZSTD:
return CompressionZSTD;
+ case proto::SNAPPY:
+ return CompressionSNAPPY;
}
}
@@ -67,6 +73,8 @@ proto::CompressionType CompressionCodecProvider::convertType(CompressionType typ
return proto::ZLIB;
case CompressionZSTD:
return proto::ZSTD;
+ case CompressionSNAPPY:
+ return proto::SNAPPY;
}
}
diff --git a/pulsar-client-cpp/lib/CompressionCodec.h b/pulsar-client-cpp/lib/CompressionCodec.h
index 00dd01386ea15..fd65f9cdf6ecc 100644
--- a/pulsar-client-cpp/lib/CompressionCodec.h
+++ b/pulsar-client-cpp/lib/CompressionCodec.h
@@ -35,6 +35,7 @@ class CompressionCodecNone;
class CompressionCodecLZ4;
class CompressionCodecZLib;
class CompressionCodecZstd;
+class CompressionCodecSnappy;
class PULSAR_PUBLIC CompressionCodecProvider {
public:
@@ -48,6 +49,7 @@ class PULSAR_PUBLIC CompressionCodecProvider {
static CompressionCodecLZ4 compressionCodecLZ4_;
static CompressionCodecZLib compressionCodecZLib_;
static CompressionCodecZstd compressionCodecZstd_;
+ static CompressionCodecSnappy compressionCodecSnappy_;
};
class PULSAR_PUBLIC CompressionCodec {
diff --git a/pulsar-client-cpp/lib/CompressionCodecSnappy.cc b/pulsar-client-cpp/lib/CompressionCodecSnappy.cc
new file mode 100644
index 0000000000000..f7a53efae4421
--- /dev/null
+++ b/pulsar-client-cpp/lib/CompressionCodecSnappy.cc
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "CompressionCodecSnappy.h"
+
+#if HAS_SNAPPY
+#include
+#include "snappy-c.h"
+
+namespace pulsar {
+
+SharedBuffer CompressionCodecSnappy::encode(const SharedBuffer& raw) {
+ // Get the max size of the compressed data and allocate a buffer to hold it
+ int maxCompressedSize = snappy_max_compressed_length(raw.readableBytes());
+ SharedBuffer compressed = SharedBuffer::allocate(maxCompressedSize);
+
+ unsigned long bytesWritten = maxCompressedSize;
+
+ snappy_status status =
+ snappy_compress(raw.data(), raw.readableBytes(), compressed.mutableData(), &bytesWritten);
+
+ if (status != SNAPPY_OK) {
+ LOG_ERROR("Failed to compress to snappy. res=" << res);
+ abort();
+ }
+
+ compressed.bytesWritten(bytesWritten);
+
+ return compressed;
+}
+
+bool CompressionCodecSnappy::decode(const SharedBuffer& encoded, uint32_t uncompressedSize,
+ SharedBuffer& decoded) {
+ SharedBuffer decompressed = SharedBuffer::allocate(uncompressedSize);
+
+ snappy_status status = snappy_uncompress(encoded.data(), encoded.readableBytes(),
+ decompressed.mutableData(), uncompressedSize);
+
+ if (status == SNAPPY_OK) {
+ decoded = decompressed;
+ decoded.setWriterIndex(uncompressedSize);
+ return true;
+ } else {
+ // Decompression failed
+ return false;
+ }
+}
+} // namespace pulsar
+
+#else // No SNAPPY
+
+namespace pulsar {
+
+SharedBuffer CompressionCodecSnappy::encode(const SharedBuffer& raw) {
+ throw "Snappy compression not supported";
+}
+
+bool CompressionCodecSnappy::decode(const SharedBuffer& encoded, uint32_t uncompressedSize,
+ SharedBuffer& decoded) {
+ throw "Snappy compression not supported";
+}
+} // namespace pulsar
+
+#endif // HAS_SNAPPY
diff --git a/pulsar-client-cpp/lib/CompressionCodecSnappy.h b/pulsar-client-cpp/lib/CompressionCodecSnappy.h
new file mode 100644
index 0000000000000..47383761f095f
--- /dev/null
+++ b/pulsar-client-cpp/lib/CompressionCodecSnappy.h
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include "CompressionCodec.h"
+
+namespace pulsar {
+
+class CompressionCodecSnappy : public CompressionCodec {
+ public:
+ SharedBuffer encode(const SharedBuffer& raw);
+
+ bool decode(const SharedBuffer& encoded, uint32_t uncompressedSize, SharedBuffer& decoded);
+};
+} // namespace pulsar
diff --git a/pulsar-common/pom.xml b/pulsar-common/pom.xml
index abeb2a699ef47..dbf7d4051b737 100644
--- a/pulsar-common/pom.xml
+++ b/pulsar-common/pom.xml
@@ -83,6 +83,11 @@
com.github.luben
zstd-jni
+
+
+ org.xerial.snappy
+ snappy-java
+
org.bouncycastle
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index 625752b4b99ad..d55f5f9f2730d 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -14,12 +14,14 @@ public enum CompressionType
LZ4(1, 1),
ZLIB(2, 2),
ZSTD(3, 3),
+ SNAPPY(4, 4),
;
public static final int NONE_VALUE = 0;
public static final int LZ4_VALUE = 1;
public static final int ZLIB_VALUE = 2;
public static final int ZSTD_VALUE = 3;
+ public static final int SNAPPY_VALUE = 4;
public final int getNumber() { return value; }
@@ -30,6 +32,7 @@ public static CompressionType valueOf(int value) {
case 1: return LZ4;
case 2: return ZLIB;
case 3: return ZSTD;
+ case 4: return SNAPPY;
default: return null;
}
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecProvider.java b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecProvider.java
index bb8e3124aeec7..f108f2d309245 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecProvider.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecProvider.java
@@ -35,6 +35,7 @@ public class CompressionCodecProvider {
codecs.put(PulsarApi.CompressionType.LZ4, new CompressionCodecLZ4());
codecs.put(PulsarApi.CompressionType.ZLIB, new CompressionCodecZLib());
codecs.put(PulsarApi.CompressionType.ZSTD, new CompressionCodecZstd());
+ codecs.put(PulsarApi.CompressionType.SNAPPY, new CompressionCodecSnappy());
}
public static CompressionCodec getCompressionCodec(PulsarApi.CompressionType type) {
@@ -55,6 +56,8 @@ public static PulsarApi.CompressionType convertToWireProtocol(CompressionType co
return PulsarApi.CompressionType.ZLIB;
case ZSTD:
return PulsarApi.CompressionType.ZSTD;
+ case SNAPPY:
+ return PulsarApi.CompressionType.SNAPPY;
default:
throw new RuntimeException("Invalid compression type");
@@ -71,6 +74,8 @@ public static CompressionType convertFromWireProtocol(PulsarApi.CompressionType
return CompressionType.ZLIB;
case ZSTD:
return CompressionType.ZSTD;
+ case SNAPPY:
+ return CompressionType.SNAPPY;
default:
throw new RuntimeException("Invalid compression type");
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecSnappy.java b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecSnappy.java
new file mode 100644
index 0000000000000..1598d5ee24789
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecSnappy.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.compression;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import lombok.extern.slf4j.Slf4j;
+import org.xerial.snappy.Snappy;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * Snappy Compression
+ */
+@Slf4j
+public class CompressionCodecSnappy implements CompressionCodec {
+
+ @Override
+ public ByteBuf encode(ByteBuf source) {
+ int uncompressedLength = source.readableBytes();
+ int maxLength = Snappy.maxCompressedLength(uncompressedLength);
+
+ ByteBuffer sourceNio = source.nioBuffer(source.readerIndex(), source.readableBytes());
+
+ ByteBuf target = PooledByteBufAllocator.DEFAULT.buffer(maxLength, maxLength);
+ ByteBuffer targetNio = target.nioBuffer(0, maxLength);
+
+ int compressedLength = 0;
+ try {
+ compressedLength = Snappy.compress(sourceNio, targetNio);
+ } catch (IOException e) {
+ log.error("Failed to compress to Snappy: {}", e.getMessage());
+ }
+ target.writerIndex(compressedLength);
+ return target;
+ }
+
+ @Override
+ public ByteBuf decode(ByteBuf encoded, int uncompressedLength) throws IOException {
+ ByteBuf uncompressed = PooledByteBufAllocator.DEFAULT.buffer(uncompressedLength, uncompressedLength);
+ ByteBuffer uncompressedNio = uncompressed.nioBuffer(0, uncompressedLength);
+
+ ByteBuffer encodedNio = encoded.nioBuffer(encoded.readerIndex(), encoded.readableBytes());
+ Snappy.uncompress(encodedNio, uncompressedNio);
+
+ uncompressed.writerIndex(uncompressedLength);
+ return uncompressed;
+ }
+}
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index 58238d7405656..3913b660ef319 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -77,6 +77,7 @@ enum CompressionType {
LZ4 = 1;
ZLIB = 2;
ZSTD = 3;
+ SNAPPY = 4;
}
message MessageMetadata {
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressorCodecTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressorCodecTest.java
index 4dcc22d0f0b18..b7e5c85aa27cc 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressorCodecTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressorCodecTest.java
@@ -24,8 +24,6 @@
import java.io.IOException;
import org.apache.pulsar.common.api.proto.PulsarApi.CompressionType;
-import org.apache.pulsar.common.compression.CompressionCodec;
-import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@@ -39,7 +37,7 @@ public class CompressorCodecTest {
@DataProvider(name = "codec")
public Object[][] codecProvider() {
- return new Object[][] { { CompressionType.NONE }, { CompressionType.LZ4 }, { CompressionType.ZLIB }, { CompressionType.ZSTD }};
+ return new Object[][] { { CompressionType.NONE }, { CompressionType.LZ4 }, { CompressionType.ZLIB }, { CompressionType.ZSTD }, { CompressionType.SNAPPY }};
}
@Test(dataProvider = "codec")
diff --git a/site2/docs/concepts-messaging.md b/site2/docs/concepts-messaging.md
index 5bda2e9205845..dcd079272d778 100644
--- a/site2/docs/concepts-messaging.md
+++ b/site2/docs/concepts-messaging.md
@@ -45,6 +45,7 @@ Messages published by producers can be compressed during transportation in order
* [LZ4](https://github.com/lz4/lz4)
* [ZLIB](https://zlib.net/)
* [ZSTD](https://facebook.github.io/zstd/)
+* [SNAPPY](https://google.github.io/snappy/)
### Batching
diff --git a/site2/docs/reference-cli-tools.md b/site2/docs/reference-cli-tools.md
index c2970871bedfb..7bad0cf44bd10 100644
--- a/site2/docs/reference-cli-tools.md
+++ b/site2/docs/reference-cli-tools.md
@@ -444,7 +444,7 @@ Options
|`--auth_params`|Authentication parameters, whose format is determined by the implementation of method `configure` in authentication plugin class, for example "key1:val1,key2:val2" or "{"key1":"val1","key2":"val2"}.||
|`--auth_plugin`|Authentication plugin class name||
|`-b`, `--batch-time-window`|Batch messages in a window of the specified number of milliseconds|1|
-|`-z`, `--compression`|Compress messages’ payload. Possible values are NONE, LZ4, ZLIB or ZSTD.||
+|`-z`, `--compression`|Compress messages’ payload. Possible values are NONE, LZ4, ZLIB, ZSTD or SNAPPY.||
|`--conf-file`|Configuration file||
|`-k`, `--encryption-key-name`|The public key name to encrypt payload||
|`-v`, `--encryption-key-value-file`|The file which contains the public key to encrypt payload||