Skip to content

Commit

Permalink
Added support for ZSTD compression (apache#3159)
Browse files Browse the repository at this point in the history
* Added support for ZSTD compression

* Fixed C++ formatting

* Added warning in javadoc

* Fixed comment format

* Fixed exception include

* Fixed exception mistake

* Added ztsd to presto license file
  • Loading branch information
merlimat authored Jan 24, 2019
1 parent 0ec897f commit 9a1bd81
Show file tree
Hide file tree
Showing 29 changed files with 353 additions and 13 deletions.
2 changes: 1 addition & 1 deletion build/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ RUN apt-get update && \
liblog4cxx-dev libprotobuf-dev libboost-all-dev google-mock libgtest-dev \
libjsoncpp-dev libxml2-utils protobuf-compiler wget \
curl doxygen openjdk-8-jdk-headless clang-format-5.0 \
gnupg2 golang-1.10-go zip unzip
gnupg2 golang-1.10-go zip unzip libzstd-dev

# Compile and install gtest
RUN cd /usr/src/gtest && cmake . && make && cp libgtest.a /usr/lib
Expand Down
26 changes: 26 additions & 0 deletions distribution/server/licenses/LICENSE-zstd-jni.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
Zstd-jni: JNI bindings to Zstd Library

Copyright (c) 2015-present, Luben Karavelov/ All rights reserved.

BSD License

Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:

* Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.

* Redistributions in binary form must reproduce the above copyright notice, this
list of conditions and the following disclaimer in the documentation and/or
other materials provided with the distribution.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
1 change: 1 addition & 0 deletions distribution/server/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,7 @@ BSD 3-clause "New" or "Revised" License

BSD 2-Clause License
* HdrHistogram -- org.hdrhistogram-HdrHistogram-2.1.9.jar -- licenses/LICENSE-HdrHistogram.txt
* Zstandard JNI -- com.github.luben-zstd-jni-1.3.7-3.jar -- licenses/LICENSE-zstd-jni.txt

MIT License
* Java SemVer -- com.github.zafarkhaja-java-semver-0.9.0.jar -- licenses/LICENSE-SemVer.txt
Expand Down
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ flexible messaging model and an intuitive client API.</description>
<debezium.version>0.8.2</debezium.version>
<jsonwebtoken.version>0.10.5</jsonwebtoken.version>
<opencensus.version>0.12.3</opencensus.version>
<zstd.version>1.3.7-3</zstd.version>

<!-- test dependencies -->
<arquillian-cube.version>1.15.1</arquillian-cube.version>
Expand Down Expand Up @@ -456,6 +457,12 @@ flexible messaging model and an intuitive client API.</description>
<version>1.5.0</version>
</dependency>

<dependency>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
<version>${zstd.version}</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,8 @@ public enum CompressionType {
LZ4,

/** Compress with ZLib */
ZLIB
ZLIB,

/** Compress with Zstandard codec */
ZSTD,
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public interface ProducerBuilder<T> extends Cloneable {
* <p>
* If a message is not acknowledged by the server before the sendTimeout expires, an error will be reported.
* Setting the timeout to zero, for example <code>setTimeout(0, TimeUnit.SECONDS)</code> will set the timeout
* to infinity, which can be useful when using Pulsar's message deduplication feature.
* to infinity, which can be useful when using Pulsar's message deduplication feature.
*
* @param sendTimeout
* the send timeout
Expand Down Expand Up @@ -201,8 +201,10 @@ public interface ProducerBuilder<T> extends Cloneable {
* <p>
* By default, message payloads are not compressed. Supported compression types are:
* <ul>
* <li><code>CompressionType.LZ4</code></li>
* <li><code>CompressionType.ZLIB</code></li>
* <li>{@link CompressionType.LZ4}</li>
* <li>{@link CompressionType.ZLIB}</li>
* <li>{@link CompressionType.ZSTD} (Since Pulsar 2.3. Zstd
* cannot be used if consumer applications are not in version >= 2.3 as well)</li>
* </ul>
*
* @param compressionType
Expand Down
16 changes: 16 additions & 0 deletions pulsar-client-cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ if (LINK_STATIC)
find_library(PROTOBUF_LIBRARIES NAMES libprotobuf.a)
find_library(CURL_LIBRARY_PATH NAMES libcurl.a curl)
find_library(LIB_JSON NAMES libjsoncpp.a libjsoncpp_static.a)
find_library(LIB_ZSTD NAMES libzstd.a)

if (USE_LOG4CXX)
find_library(LOG4CXX_LIBRARY_PATH NAMES liblog4cxx.a)
Expand All @@ -97,6 +98,7 @@ else()
endif (NOT PROTOBUF_LIBRARIES)

find_library(LIB_JSON jsoncpp)
find_library(LIB_ZSTD zstd)
find_library(CURL_LIBRARY_PATH curl)
if (USE_LOG4CXX)
find_library(LOG4CXX_LIBRARY_PATH log4cxx)
Expand Down Expand Up @@ -161,6 +163,7 @@ if (NOT LIB_JSON)
find_library(LIB_JSON json_cpp)
endif (NOT LIB_JSON)


if (NOT JSON_INCLUDE_PATH)
find_path(JSON_INCLUDE_PATH json)
else (NOT JSON_INCLUDE_PATH)
Expand All @@ -174,6 +177,13 @@ if (NOT APPLE)
set(CMAKE_CXX_FLAGS " -fvisibility=hidden -Wl,--exclude-libs,ALL ${CMAKE_CXX_FLAGS}")
endif ()

if (LIB_ZSTD)
set(HAS_ZSTD 1)
else ()
set(HAS_ZSTD 0)
endif ()
MESSAGE(STATUS "HAS_ZSTD: ${HAS_ZSTD}")


set(ADDITIONAL_LIBRARIES $ENV{PULSAR_ADDITIONAL_LIBRARIES})
link_directories( $ENV{PULSAR_ADDITIONAL_LIBRARY_PATH} )
Expand Down Expand Up @@ -222,6 +232,12 @@ if (USE_LOG4CXX)
)
endif ()

if (HAS_ZSTD)
set(COMMON_LIBS ${COMMON_LIBS} ${LIB_ZSTD} )
endif ()

set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DHAS_ZSTD=${HAS_ZSTD}")

if(NOT APPLE)
set(COMMON_LIBS ${COMMON_LIBS} rt)
endif ()
Expand Down
8 changes: 8 additions & 0 deletions pulsar-client-cpp/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,14 @@ RUN curl -O -L https://github.com/curl/curl/releases/download/curl-7_61_0/curl-
make && make install && \
rm -rf /curl-7.61.0.tar.gz /curl-7.61.0

# Zstandard
RUN curl -O -L https://github.com/facebook/zstd/releases/download/v1.3.7/zstd-1.3.7.tar.gz && \
tar xvfz zstd-1.3.7.tar.gz && \
cd zstd-1.3.7 && \
CFLAGS="-fPIC -O3" make -j8 && \
make install && \
rm -rf /zstd-1.3.7 /zstd-1.3.7.tar.gz

RUN pip install twine


Expand Down
3 changes: 2 additions & 1 deletion pulsar-client-cpp/include/pulsar/CompressionType.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ enum CompressionType
{
CompressionNone = 0,
CompressionLZ4 = 1,
CompressionZLib = 2
CompressionZLib = 2,
CompressionZSTD = 3,
};
}

Expand Down
13 changes: 13 additions & 0 deletions pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,19 @@ class ProducerConfiguration {
ProducerConfiguration& setInitialSequenceId(int64_t initialSequenceId);
int64_t getInitialSequenceId() const;

/**
* Set the compression type for the producer.
* <p>
* By default, message payloads are not compressed. Supported compression types are:
* <ul>
*
* <li>{@link CompressionNone}: No compression</li>
* <li>{@link CompressionLZ4}: LZ4 Compression https://lz4.github.io/lz4/
* <li>{@link CompressionZLib}: ZLib Compression http://zlib.net/</li>
* <li>{@link CompressionZSTD}: Zstandard Compression https://facebook.github.io/zstd/ (Since Pulsar 2.3.
* Zstd cannot be used if consumer applications are not in version >= 2.3 as well)</li>
* </ul>
*/
ProducerConfiguration& setCompressionType(CompressionType compressionType);
CompressionType getCompressionType() const;

Expand Down
8 changes: 8 additions & 0 deletions pulsar-client-cpp/lib/CompressionCodec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "CompressionCodec.h"
#include "CompressionCodecLZ4.h"
#include "CompressionCodecZLib.h"
#include "CompressionCodecZstd.h"

#include <cassert>

Expand All @@ -28,13 +29,16 @@ namespace pulsar {
CompressionCodecNone CompressionCodecProvider::compressionCodecNone_;
CompressionCodecLZ4 CompressionCodecProvider::compressionCodecLZ4_;
CompressionCodecZLib CompressionCodecProvider::compressionCodecZLib_;
CompressionCodecZstd CompressionCodecProvider::compressionCodecZstd_;

CompressionCodec& CompressionCodecProvider::getCodec(CompressionType compressionType) {
switch (compressionType) {
case CompressionLZ4:
return compressionCodecLZ4_;
case CompressionZLib:
return compressionCodecZLib_;
case CompressionZSTD:
return compressionCodecZstd_;
default:
return compressionCodecNone_;
}
Expand All @@ -48,6 +52,8 @@ CompressionType CompressionCodecProvider::convertType(proto::CompressionType typ
return CompressionLZ4;
case proto::ZLIB:
return CompressionZLib;
case proto::ZSTD:
return CompressionZSTD;
}
}

Expand All @@ -59,6 +65,8 @@ proto::CompressionType CompressionCodecProvider::convertType(CompressionType typ
return proto::LZ4;
case CompressionZLib:
return proto::ZLIB;
case CompressionZSTD:
return proto::ZSTD;
}
}

Expand Down
2 changes: 2 additions & 0 deletions pulsar-client-cpp/lib/CompressionCodec.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class CompressionCodec;
class CompressionCodecNone;
class CompressionCodecLZ4;
class CompressionCodecZLib;
class CompressionCodecZstd;

class CompressionCodecProvider {
public:
Expand All @@ -50,6 +51,7 @@ class CompressionCodecProvider {
static CompressionCodecNone compressionCodecNone_;
static CompressionCodecLZ4 compressionCodecLZ4_;
static CompressionCodecZLib compressionCodecZLib_;
static CompressionCodecZstd compressionCodecZstd_;
};

class CompressionCodec {
Expand Down
69 changes: 69 additions & 0 deletions pulsar-client-cpp/lib/CompressionCodecZstd.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "CompressionCodecZstd.h"

#if HAS_ZSTD
#include <zstd.h>

namespace pulsar {

static const int COMPRESSION_LEVEL = 3;

SharedBuffer CompressionCodecZstd::encode(const SharedBuffer& raw) {
// Get the max size of the compressed data and allocate a buffer to hold it
size_t maxCompressedSize = ZSTD_compressBound(raw.readableBytes());
SharedBuffer compressed = SharedBuffer::allocate(maxCompressedSize);

int compressedSize = ZSTD_compress(compressed.mutableData(), maxCompressedSize, raw.data(),
raw.readableBytes(), COMPRESSION_LEVEL);
compressed.bytesWritten(compressedSize);

return compressed;
}

bool CompressionCodecZstd::decode(const SharedBuffer& encoded, uint32_t uncompressedSize,
SharedBuffer& decoded) {
SharedBuffer decompressed = SharedBuffer::allocate(uncompressedSize);

size_t result = ZSTD_decompress(decompressed.mutableData(), uncompressedSize, encoded.data(),
encoded.readableBytes());
if (result == uncompressedSize) {
decompressed.bytesWritten(uncompressedSize);
decoded = decompressed;
return true;
} else {
// Decompression failed
return false;
}
}
} // namespace pulsar

#else // No ZSTD

namespace pulsar {

SharedBuffer CompressionCodecZstd::encode(const SharedBuffer& raw) { throw "ZStd compression not supported"; }

bool CompressionCodecZstd::decode(const SharedBuffer& encoded, uint32_t uncompressedSize,
SharedBuffer& decoded) {
throw "ZStd compression not supported";
}
} // namespace pulsar

#endif // HAS_ZSTD
31 changes: 31 additions & 0 deletions pulsar-client-cpp/lib/CompressionCodecZstd.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#pragma once

#include "CompressionCodec.h"

namespace pulsar {

class CompressionCodecZstd : public CompressionCodec {
public:
SharedBuffer encode(const SharedBuffer& raw);

bool decode(const SharedBuffer& encoded, uint32_t uncompressedSize, SharedBuffer& decoded);
};
} // namespace pulsar
4 changes: 3 additions & 1 deletion pulsar-client-cpp/python/pulsar/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,9 @@ def create_producer(self, topic,
* `compression_type`:
Set the compression type for the producer. By default, message
payloads are not compressed. Supported compression types are
`CompressionType.LZ4` and `CompressionType.ZLib`.
`CompressionType.LZ4`, `CompressionType.ZLib` and `CompressionType.ZSTD`.
ZSTD is supported since Pulsar 2.3. Consumers will need to be at least at that
release in order to be able to receive messages compressed with ZSTD.
* `max_pending_messages`:
Set the max size of the queue holding the messages pending to receive
an acknowledgment from the broker.
Expand Down
Loading

0 comments on commit 9a1bd81

Please sign in to comment.