Skip to content

Commit 6673245

Browse files
feat(NODE-6540): Add c++ zstd compression API (#30)
1 parent 8c40b08 commit 6673245

14 files changed

+2120
-93
lines changed

.eslintrc.json

+7
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,13 @@
6464
"@typescript-eslint/no-require-imports": "off"
6565
},
6666
"overrides": [
67+
{
68+
"files": ["lib/*.js"],
69+
"parserOptions": {
70+
"ecmaVersion": 2019,
71+
"sourceType": "commonjs"
72+
}
73+
},
6774
{
6875
"files": ["test/**/*ts"],
6976
"rules": {

.github/workflows/lint.yml

+38
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
name: Lint
2+
3+
on:
4+
push:
5+
branches: [ "main" ]
6+
pull_request:
7+
branches: [ "main" ]
8+
9+
jobs:
10+
build:
11+
runs-on: ubuntu-latest
12+
13+
name: ${{ matrix.lint-target }}
14+
strategy:
15+
matrix:
16+
lint-target: ["c++", "typescript"]
17+
18+
steps:
19+
- uses: actions/checkout@v4
20+
21+
- name: Use Node.js LTS
22+
uses: actions/setup-node@v4
23+
with:
24+
node-version: 'lts/*'
25+
cache: 'npm'
26+
27+
- name: Install dependencies
28+
shell: bash
29+
run: npm i --ignore-scripts
30+
31+
- if: matrix.lint-target == 'c++'
32+
shell: bash
33+
run: |
34+
npm run check:clang-format
35+
- if: matrix.lint-target == 'typescript'
36+
shell: bash
37+
run: |
38+
npm run check:eslint

.github/workflows/test.yml

+6-2
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,12 @@ jobs:
2424
cache: 'npm'
2525
registry-url: 'https://registry.npmjs.org'
2626

27-
- name: Build with Node.js ${{ matrix.node }} on ${{ matrix.os }}
28-
run: npm install && npm run compile
27+
- name: Install zstd
28+
run: npm run install-zstd
29+
shell: bash
30+
31+
- name: install dependencies and compmile
32+
run: npm install --loglevel verbose
2933
shell: bash
3034

3135
- name: Test ${{ matrix.os }}

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,4 @@ node_modules
1818
build
1919

2020
npm-debug.log
21+
deps

addon/compression.cpp

+73
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
#include "compression.h"
2+
3+
std::vector<uint8_t> mongodb_zstd::compress(const std::vector<uint8_t>& data,
4+
size_t compression_level) {
5+
size_t output_buffer_size = ZSTD_compressBound(data.size());
6+
std::vector<uint8_t> output(output_buffer_size);
7+
8+
size_t result_code =
9+
ZSTD_compress(output.data(), output.size(), data.data(), data.size(), compression_level);
10+
11+
if (ZSTD_isError(result_code)) {
12+
throw std::runtime_error(ZSTD_getErrorName(result_code));
13+
}
14+
15+
output.resize(result_code);
16+
17+
return output;
18+
}
19+
20+
std::vector<uint8_t> mongodb_zstd::decompress(const std::vector<uint8_t>& compressed) {
21+
std::vector<uint8_t> decompressed;
22+
23+
using DCTX_Deleter = void (*)(ZSTD_DCtx*);
24+
25+
std::unique_ptr<ZSTD_DCtx, DCTX_Deleter> decompression_context(
26+
ZSTD_createDCtx(), [](ZSTD_DCtx* ctx) { ZSTD_freeDCtx(ctx); });
27+
28+
ZSTD_inBuffer input = {compressed.data(), compressed.size(), 0};
29+
std::vector<uint8_t> output_buffer(ZSTD_DStreamOutSize());
30+
ZSTD_outBuffer output = {output_buffer.data(), output_buffer.size(), 0};
31+
32+
// Source: https://facebook.github.io/zstd/zstd_manual.html#Chapter9
33+
//
34+
// Use ZSTD_decompressStream() repetitively to consume your input.
35+
// The function will update both `pos` fields.
36+
// If `input.pos < input.size`, some input has not been consumed.
37+
// It's up to the caller to present again remaining data.
38+
// The function tries to flush all data decoded immediately, respecting output buffer size.
39+
// If `output.pos < output.size`, decoder has flushed everything it could.
40+
// But if `output.pos == output.size`, there might be some data left within internal buffers.,
41+
// In which case, call ZSTD_decompressStream() again to flush whatever remains in the buffer.
42+
// Note : with no additional input provided, amount of data flushed is necessarily <=
43+
// ZSTD_BLOCKSIZE_MAX.
44+
// @return : 0 when a frame is completely decoded and fully flushed,
45+
// or an error code, which can be tested using ZSTD_isError(),
46+
// or any other value > 0, which means there is still some decoding or flushing to do to
47+
// complete current frame :
48+
// the return value is a suggested next input size (just a hint
49+
// for better latency) that will never request more than the
50+
// remaining frame size.
51+
auto inputRemains = [](const ZSTD_inBuffer& input) { return input.pos < input.size; };
52+
auto isOutputBufferFlushed = [](const ZSTD_outBuffer& output) {
53+
return output.pos < output.size;
54+
};
55+
56+
while (inputRemains(input) || !isOutputBufferFlushed(output)) {
57+
size_t const ret = ZSTD_decompressStream(decompression_context.get(), &output, &input);
58+
if (ZSTD_isError(ret)) {
59+
throw std::runtime_error(ZSTD_getErrorName(ret));
60+
}
61+
62+
size_t decompressed_size = decompressed.size();
63+
decompressed.resize(decompressed_size + output.pos);
64+
std::copy(output_buffer.data(),
65+
output_buffer.data() + output.pos,
66+
decompressed.data() + decompressed_size);
67+
68+
// move the position back go 0, to indicate that we are ready for more data
69+
output.pos = 0;
70+
}
71+
72+
return decompressed;
73+
}

addon/compression.h

+15
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
#ifndef MONGODB_ZSTD_COMPRESSION
2+
#define MONGODB_ZSTD_COMPRESSION
3+
4+
#include <exception>
5+
#include <vector>
6+
7+
#include "compression_worker.h"
8+
#include "zstd.h"
9+
10+
namespace mongodb_zstd {
11+
std::vector<uint8_t> compress(const std::vector<uint8_t>& data, size_t compression_level);
12+
std::vector<uint8_t> decompress(const std::vector<uint8_t>& data);
13+
} // namespace mongodb_zstd
14+
15+
#endif

addon/compression_worker.h

+46
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
#ifndef MONGODB_ZSTD_COMPRESSION_WORKER_H
2+
#define MONGODB_ZSTD_COMPRESSION_WORKER_H
3+
#include <napi.h>
4+
5+
#include <optional>
6+
#include <variant>
7+
8+
using namespace Napi;
9+
10+
namespace mongodb_zstd {
11+
/**
12+
* @brief An asynchronous Napi::Worker that can be with any function that produces
13+
* CompressionResults.
14+
* */
15+
class CompressionWorker final : public AsyncWorker {
16+
public:
17+
CompressionWorker(const Function& callback, std::function<std::vector<uint8_t>()> worker)
18+
: AsyncWorker{callback, "compression worker"}, m_worker(worker), m_result{} {}
19+
20+
protected:
21+
void Execute() final {
22+
m_result = m_worker();
23+
}
24+
25+
void OnOK() final {
26+
if (!m_result.has_value()) {
27+
Callback().Call({Error::New(Env(),
28+
"zstd runtime - async worker finished without "
29+
"a compression or decompression result.")
30+
.Value()});
31+
return;
32+
}
33+
34+
const std::vector<uint8_t>& data = m_result.value();
35+
Buffer result = Buffer<uint8_t>::Copy(Env(), data.data(), data.size());
36+
37+
Callback().Call({Env().Undefined(), result});
38+
}
39+
40+
private:
41+
std::function<std::vector<uint8_t>()> m_worker;
42+
std::optional<std::vector<uint8_t>> m_result;
43+
};
44+
45+
} // namespace mongodb_zstd
46+
#endif

addon/zstd.cpp

+49-9
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,60 @@
1+
#include "zstd.h"
2+
13
#include <napi.h>
24

5+
#include <vector>
6+
7+
#include "compression.h"
8+
#include "compression_worker.h"
9+
310
using namespace Napi;
411

5-
Napi::String Compress(const Napi::CallbackInfo& info) {
6-
auto string = Napi::String::New(info.Env(), "compress()");
7-
return string;
12+
namespace mongodb_zstd {
13+
void Compress(const CallbackInfo& info) {
14+
// Argument handling happens in JS
15+
if (info.Length() != 3) {
16+
const char* error_message = "Expected three arguments.";
17+
throw TypeError::New(info.Env(), error_message);
18+
}
19+
20+
Uint8Array to_compress = info[0].As<Uint8Array>();
21+
std::vector<uint8_t> data(to_compress.Data(), to_compress.Data() + to_compress.ElementLength());
22+
23+
size_t compression_level = static_cast<size_t>(info[1].ToNumber().Int32Value());
24+
Function callback = info[2].As<Function>();
25+
26+
CompressionWorker* worker =
27+
new CompressionWorker(callback, [data = std::move(data), compression_level] {
28+
return mongodb_zstd::compress(data, compression_level);
29+
});
30+
31+
worker->Queue();
832
}
9-
Napi::String Decompress(const Napi::CallbackInfo& info) {
10-
auto string = Napi::String::New(info.Env(), "decompress()");
11-
return string;
33+
34+
void Decompress(const CallbackInfo& info) {
35+
// Argument handling happens in JS
36+
if (info.Length() != 2) {
37+
const char* error_message = "Expected two argument.";
38+
throw TypeError::New(info.Env(), error_message);
39+
}
40+
41+
Uint8Array compressed_data = info[0].As<Uint8Array>();
42+
std::vector<uint8_t> data(compressed_data.Data(),
43+
compressed_data.Data() + compressed_data.ElementLength());
44+
Function callback = info[1].As<Function>();
45+
46+
CompressionWorker* worker = new CompressionWorker(
47+
callback, [data = std::move(data)] { return mongodb_zstd::decompress(data); });
48+
49+
worker->Queue();
1250
}
1351

14-
Napi::Object Init(Napi::Env env, Napi::Object exports) {
15-
exports.Set(Napi::String::New(env, "compress"), Napi::Function::New(env, Compress));
16-
exports.Set(Napi::String::New(env, "decompress"), Napi::Function::New(env, Decompress));
52+
Object Init(Env env, Object exports) {
53+
exports.Set(String::New(env, "compress"), Function::New(env, Compress));
54+
exports.Set(String::New(env, "decompress"), Function::New(env, Decompress));
1755
return exports;
1856
}
1957

2058
NODE_API_MODULE(zstd, Init)
59+
60+
} // namespace mongodb_zstd

binding.gyp

+14-4
Original file line numberDiff line numberDiff line change
@@ -4,25 +4,35 @@
44
'type': 'loadable_module',
55
'defines': ['ZSTD_STATIC_LINKING_ONLY'],
66
'include_dirs': [
7-
"<!(node -p \"require('node-addon-api').include_dir\")"
7+
"<!(node -p \"require('node-addon-api').include_dir\")",
8+
"<(module_root_dir)/deps/zstd/lib",
89
],
910
'variables': {
1011
'ARCH': '<(host_arch)',
1112
'built_with_electron%': 0
1213
},
1314
'sources': [
14-
'addon/zstd.cpp'
15+
'addon/zstd.cpp',
16+
'addon/compression_worker.h',
17+
'addon/compression.h',
18+
'addon/compression.cpp'
1519
],
1620
'xcode_settings': {
1721
'GCC_ENABLE_CPP_EXCEPTIONS': 'YES',
1822
'CLANG_CXX_LIBRARY': 'libc++',
19-
'MACOSX_DEPLOYMENT_TARGET': '10.12',
23+
'MACOSX_DEPLOYMENT_TARGET': '11',
2024
'GCC_SYMBOLS_PRIVATE_EXTERN': 'YES', # -fvisibility=hidden
2125
},
2226
'cflags!': [ '-fno-exceptions' ],
2327
'cflags_cc!': [ '-fno-exceptions' ],
28+
'cflags_cc': ['-std=c++17'],
2429
'msvs_settings': {
2530
'VCCLCompilerTool': { 'ExceptionHandling': 1 },
26-
}
31+
},
32+
'link_settings': {
33+
'libraries': [
34+
'<(module_root_dir)/deps/zstd/build/cmake/lib/libzstd.a',
35+
]
36+
},
2737
}]
2838
}

etc/install-zstd.sh

+26
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
#!/bin/sh
2+
set -o xtrace
3+
4+
clean_deps() {
5+
rm -rf deps
6+
}
7+
8+
download_zstd() {
9+
rm -rf deps
10+
mkdir -p deps/zstd
11+
12+
curl -L "https://github.com/facebook/zstd/releases/download/v1.5.6/zstd-1.5.6.tar.gz" \
13+
| tar -zxf - -C deps/zstd --strip-components 1
14+
}
15+
16+
build_zstd() {
17+
export MACOSX_DEPLOYMENT_TARGET=11
18+
cd deps/zstd/build/cmake
19+
20+
cmake .
21+
make
22+
}
23+
24+
clean_deps
25+
download_zstd
26+
build_zstd

lib/index.js

+19-3
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,32 @@
1-
const { compress: _compress, decompress: _decompress } = require('bindings')('zstd');
1+
'use strict';
2+
const zstd = require('bindings')('zstd');
3+
const { promisify } = require('util');
4+
const { isUint8Array } = require('util/types');
25

6+
const _compress = promisify(zstd.compress);
7+
const _decompress = promisify(zstd.decompress);
38
// Error objects created via napi don't have JS stacks; wrap them so .stack is present
49
// https://github.com/nodejs/node/issues/25318#issuecomment-451068073
510

6-
exports.compress = async function compress(data) {
11+
exports.compress = async function compress(data, compressionLevel) {
12+
if (!isUint8Array(data)) {
13+
throw new TypeError(`parameter 'data' must be a Uint8Array.`);
14+
}
15+
16+
if (compressionLevel != null && typeof compressionLevel !== 'number') {
17+
throw new TypeError(`parameter 'compressionLevel' must be a number.`);
18+
}
19+
720
try {
8-
return await _compress(data);
21+
return await _compress(data, compressionLevel ?? 3);
922
} catch (e) {
1023
throw new Error(`zstd: ${e.message}`);
1124
}
1225
};
1326
exports.decompress = async function decompress(data) {
27+
if (!isUint8Array(data)) {
28+
throw new TypeError(`parameter 'data' must be a Uint8Array.`);
29+
}
1430
try {
1531
return await _decompress(data);
1632
} catch (e) {

0 commit comments

Comments
 (0)