Skip to content

Commit

Permalink
Hash Reduction (uber#201)
Browse files Browse the repository at this point in the history
* only Cuda part in this diff, processor part coming later
* only support device mode, host concurrent hash map implementation will be done in future
* concurrent hash map need c++14, we may consider to rewrite it ourselves.
* hash vector and index vector no longer needed
* include a device smart pointer implementation in this diff, may consider migrating other device memory allocation in future diffs
  • Loading branch information
lucafuji authored Jun 11, 2019
1 parent 2593f6e commit 92c409e
Show file tree
Hide file tree
Showing 31 changed files with 988 additions and 415 deletions.
47 changes: 36 additions & 11 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,19 @@ endif ()
################################
# Compiler and linker flags
################################
set(CMAKE_CUDA_STANDARD 11)
set(CXX_STANDARD 11)
message("Detecting whether compiler support C++14")
foreach (FEATURE ${CMAKE_CXX_COMPILE_FEATURES})
if (FEATURE STREQUAL "cxx_std_14")
message("C++14 is supported")
set(CXX_STANDARD 14)
endif ()
endforeach ()

set(CMAKE_CUDA_STANDARD ${CXX_STANDARD})
set(CMAKE_CXX_STANDARD ${CXX_STANDARD})

set(CMAKE_CUDA_STANDARD_REQUIRED ON)
set(CMAKE_CXX_STANDARD 11)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
include_directories(${PROJECT_SOURCE_DIR} ${CMAKE_BINARY_DIR}/include)
set(CMAKE_LIBRARY_OUTPUT_DIRECTORY lib)
Expand All @@ -45,12 +55,18 @@ set(CMAKE_RUNTIME_OUTPUT_DIRECTORY bin)
set(CMAKE_C_FLAGS -fPIC)
set(CMAKE_SHARED_LIBRARY_SUFFIX_C .so)
set(CMAKE_SHARED_LIBRARY_SUFFIX_CXX .so)
set(CMAKE_CUDA_FLAGS "-lineinfo -I. $ENV{NVCCFLAGS} ${CMAKE_CUDA_FLAGS}")
set(CMAKE_CUDA_FLAGS " -lineinfo -I. $ENV{NVCCFLAGS} ${CMAKE_CUDA_FLAGS} --expt-extended-lambda --expt-relaxed-constexpr")
set(GENCODE_FLAGS "-gencode arch=compute_60,code=sm_60 -gencode arch=compute_60,code=compute_60")
set(CMAKE_CUDA_SEPARABLE_COMPILATION ON)

link_directories(${CMAKE_BINARY_DIR}/lib)

# HASH_REDUCTION right now is only support in device mode and when CXX standard is C++14.
if ((${CXX_STANDARD} EQUAL 14) AND (QUERY_MODE STREQUAL "DEVICE"))
list(APPEND MARCROS "SUPPORT_HASH_REDUCTION=1")
set(SUPPORT_HASH_REDUCTION 1)
endif()

################################
# GTest
################################
Expand Down Expand Up @@ -89,15 +105,21 @@ endif ()
# CUDF
# Need cpp/src/hash/concurrent_unordered_map.cuh only
######################################################
set(CUDF_SRC_DIR ${CMAKE_CURRENT_LIST_DIR}/thirdparty/cudf/cpp/src)
set(CUDF_INCLUDE_DIR ${CMAKE_CURRENT_LIST_DIR}/thirdparty/cudf/cpp/include)
if (QUERY_MODE STREQUAL "DEVICE")
include_directories(${CUDF_INCLUDE_DIR} ${CUDF_SRC_DIR})
if (SUPPORT_HASH_REDUCTION)
set(CUDF_PROJECT_DIR ${CMAKE_CURRENT_LIST_DIR}/thirdparty/cudf)
include_directories(
${CUDF_PROJECT_DIR}/cpp/src
${CUDF_PROJECT_DIR}/cpp/include
${CUDF_PROJECT_DIR}/thirdparty/cub
# cudf need to find rmm/thrust_rmm_allocator.h even it's not actually linking
# TODO(lucafuji) remove this include directory after RMM is default memory management system
${RMM_SRC_DIR}/include
${RMM_SRC_DIR}/thirdparty/cnmem/include
)
endif ()
################################
# Mem
################################

if (QUERY_MODE STREQUAL "DEVICE")
list(APPEND MARCROS "RUN_ON_DEVICE=1")
if (USE_RMM)
Expand Down Expand Up @@ -139,14 +161,15 @@ add_library(algorithm SHARED
query/functor.cu
query/geo_intersects.cu
query/hash_lookup.cu
query/hash_reduction.cu
query/hll.cu
query/iterator.hpp
query/memory.cu
query/memory.hpp
query/measure_transform.cu
query/scratch_space_transform.cu
query/sort_reduce.cu
query/thrust_rmm_allocator.hpp
query/time_series_aggregate.h
query/transform.cu
query/transform.hpp
query/utils.cu
Expand All @@ -161,6 +184,7 @@ include(rdkafka.cmake)
# Unit Tests
################################
file(GLOB QUERY_UNITTEST_FILES query/*_unittest.cu)
message(QUERY_UNITTEST_FILES is ${QUERY_UNITTEST_FILES})
add_executable(all_unittest ${QUERY_UNITTEST_FILES} query/unittest_utils.hpp)
add_dependencies(all_unittest googletest)
# Link test executable against gtest & gtest_main
Expand Down Expand Up @@ -269,9 +293,10 @@ set(ANTLR_BIN "${CMAKE_SOURCE_DIR}/.bin/antlr-4.7.1-complete.jar")
set(ANTLR_URL "http://www.antlr.org/download/antlr-4.7.1-complete.jar")
add_custom_target(antlr DEPENDS ANTLR_BIN
COMMAND java -jar ${ANTLR_BIN} -Dlanguage=Go -package antlrgen -Xexact-output-dir -o query/sql/antlrgen -visitor -no-listener query/sql/SqlBase.g4
)
)

add_custom_command(
OUTPUT ANTLR_BIN
COMMAND mkdir -p .bin && curl -L -o ${ANTLR_BIN} ${ANTLR_URL}
VERBATIM
)
)
20 changes: 0 additions & 20 deletions NOTICE
Original file line number Diff line number Diff line change
Expand Up @@ -90,26 +90,6 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
The derived work can be found in the files:
query/functor.cu

--------------------------------------------------
AresDB includes derived work from rapids rmm library (https://github.com/rapidsai/rmm) under the Apache License 2.0:

Copyright (c) 2018, NVIDIA CORPORATION.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

The derived work can be found in the files:
query/thrust_rmm_allocator.hpp

--------------------------------------------------
AresDB includes derived work from m3db (https://github.com/m3db/m3) under the Apache License 2.0:

Expand Down
7 changes: 0 additions & 7 deletions cgoutils/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,6 @@ func IsPooledMemory() bool {
return (GetFlags() & C.POOLED_MEMORY_FLAG) != 0
}

// Init will initialize the memory management.
func Init() {
doCGoCall(func() C.CGoCallResHandle {
return C.Init()
})
}

// HostAlloc allocates memory in C.
func HostAlloc(bytes int) unsafe.Pointer {
return unsafe.Pointer(doCGoCall(func() C.CGoCallResHandle {
Expand Down
5 changes: 3 additions & 2 deletions cgoutils/memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ typedef uint32_t DeviceMemoryFlags;
// Change to CGoCallResHandle when future it will throw any.
DeviceMemoryFlags GetFlags();

CGoCallResHandle Init();

CGoCallResHandle HostAlloc(size_t bytes);

CGoCallResHandle HostFree(void *p);
Expand Down Expand Up @@ -82,7 +80,10 @@ CGoCallResHandle deviceFree(void *devPtr);
CGoCallResHandle deviceMemset(void *devPtr, int value, size_t count);
CGoCallResHandle asyncCopyHostToDevice(void *dst, const void *src,
size_t count, void *stream);
CGoCallResHandle asyncCopyDeviceToHost(void *dst, const void *src,
size_t count, void *stream);

CGoCallResHandle waitForCudaStream(void *stream);
#ifdef __cplusplus
}
#endif
Expand Down
21 changes: 16 additions & 5 deletions cgoutils/memory/cuda_malloc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,6 @@ DeviceMemoryFlags GetFlags() {
return DEVICE_MEMORY_IMPLEMENTATION_FLAG;
}

CGoCallResHandle Init() {
CGoCallResHandle resHandle = {NULL, NULL};
return resHandle;
}

CGoCallResHandle HostAlloc(size_t bytes) {
CGoCallResHandle resHandle = {NULL, NULL};
// cudaHostAllocPortable makes sure that the allocation is associated with all
Expand Down Expand Up @@ -204,3 +199,19 @@ CGoCallResHandle asyncCopyHostToDevice(void* dst, const void* src,
resHandle.pStrErr = checkCUDAError("asyncCopyHostToDevice");
return resHandle;
}

CGoCallResHandle asyncCopyDeviceToHost(void* dst, const void* src,
size_t count, void* stream) {
CGoCallResHandle resHandle = {NULL, NULL};
cudaMemcpyAsync(dst, src, count,
cudaMemcpyDeviceToHost, (cudaStream_t) stream);
resHandle.pStrErr = checkCUDAError("asyncCopyDeviceToHost");
return resHandle;
}

CGoCallResHandle waitForCudaStream(void *stream) {
CGoCallResHandle resHandle = {NULL, NULL};
cudaStreamSynchronize((cudaStream_t) stream);
resHandle.pStrErr = checkCUDAError("waitForCudaStream");
return resHandle;
}
18 changes: 13 additions & 5 deletions cgoutils/memory/malloc.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,6 @@ DeviceMemoryFlags GetFlags(){
return 0x0;
}

CGoCallResHandle Init(){
CGoCallResHandle resHandle = {NULL, NULL};
return resHandle;
}

CGoCallResHandle HostAlloc(size_t bytes) {
CGoCallResHandle resHandle = {NULL, NULL};
resHandle.res = malloc(bytes);
Expand All @@ -45,6 +40,7 @@ CGoCallResHandle CreateCudaStream(int device) {
CGoCallResHandle resHandle = {NULL, NULL};
return resHandle;
}

CGoCallResHandle WaitForCudaStream(void *s, int device) {
CGoCallResHandle resHandle = {NULL, NULL};
return resHandle;
Expand Down Expand Up @@ -145,4 +141,16 @@ CGoCallResHandle asyncCopyHostToDevice(void *dst, const void *src,
CGoCallResHandle resHandle = {NULL, NULL};
memcpy(dst, src, count);
return resHandle;
}

CGoCallResHandle asyncCopyDeviceToHost(void *dst, const void *src,
size_t count, void *stream){
CGoCallResHandle resHandle = {NULL, NULL};
memcpy(dst, src, count);
return resHandle;
}

CGoCallResHandle waitForCudaStream(void *stream) {
CGoCallResHandle resHandle = {NULL, NULL};
return resHandle;
}
68 changes: 48 additions & 20 deletions cgoutils/memory/rmm_alloc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -49,30 +49,42 @@ char *checkRMMError(rmmError_t rmmError, const char* message) {
return NULL;
}

DeviceMemoryFlags GetFlags() {
return DEVICE_MEMORY_IMPLEMENTATION_FLAG | POOLED_MEMORY_FLAG;
}
// init_helper is purely for running some initializing code before main
// is running. If this logic is required in multiple place, we may consider
// wrap it with a macro.
struct init_helper {
static int init_flag;

CGoCallResHandle Init() {
CGoCallResHandle resHandle = GetDeviceCount();
if (resHandle.pStrErr != nullptr) {
return resHandle;
}

size_t deviceCount = reinterpret_cast<size_t>(resHandle.res);
for (size_t device = 0; device < deviceCount; device++) {
cudaSetDevice(device);
rmmOptions_t options = {
PoolAllocation,
0, // Default to half ot total memory
false // Disable logging.
};
resHandle.pStrErr = checkRMMError(rmmInitialize(&options), "rmmInitialize");
// init rmm manager with rmmOptions.
static int init() {
CGoCallResHandle resHandle = GetDeviceCount();
if (resHandle.pStrErr != nullptr) {
return resHandle;
throw std::runtime_error(const_cast<char *>(resHandle.pStrErr));
}

size_t deviceCount = reinterpret_cast<size_t>(resHandle.res);
for (size_t device = 0; device < deviceCount; device++) {
cudaSetDevice(device);
rmmOptions_t options = {
CudaDefaultAllocation, // Use PoolAllocation when RMM has improved
// their sub allocator.
0, // Default to half ot total memory
false // Disable logging.
};
resHandle.pStrErr =
checkRMMError(rmmInitialize(&options), "rmmInitialize");
if (resHandle.pStrErr != nullptr) {
throw std::runtime_error(const_cast<char *>(resHandle.pStrErr));
}
}
return 0;
}
return resHandle;
};

int init_helper::init_flag = init();

DeviceMemoryFlags GetFlags() {
return DEVICE_MEMORY_IMPLEMENTATION_FLAG | POOLED_MEMORY_FLAG;
}

CGoCallResHandle DeviceAllocate(size_t bytes, int device) {
Expand Down Expand Up @@ -256,3 +268,19 @@ CGoCallResHandle asyncCopyHostToDevice(void* dst, const void* src,
resHandle.pStrErr = checkCUDAError("asyncCopyHostToDevice");
return resHandle;
}

CGoCallResHandle asyncCopyDeviceToHost(void* dst, const void* src,
size_t count, void* stream) {
CGoCallResHandle resHandle = {NULL, NULL};
cudaMemcpyAsync(dst, src, count,
cudaMemcpyDeviceToHost, (cudaStream_t) stream);
resHandle.pStrErr = checkCUDAError("asyncCopyDeviceToHost");
return resHandle;
}

CGoCallResHandle waitForCudaStream(void *stream) {
CGoCallResHandle resHandle = {NULL, NULL};
cudaStreamSynchronize((cudaStream_t) stream);
resHandle.pStrErr = checkCUDAError("waitForCudaStream");
return resHandle;
}
4 changes: 0 additions & 4 deletions cgoutils/memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,6 @@ var _ = ginkgo.Describe("memory utils", func() {
Ω(func() { IsPooledMemory() }).ShouldNot(Panic())
})

ginkgo.It("Init should work", func() {
Ω(func() { Init() }).ShouldNot(Panic())
})

ginkgo.It("CudaMemCopies should work", func() {
srcHost := [10]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
dstHost := [10]byte{}
Expand Down
22 changes: 15 additions & 7 deletions query/algorithm.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,27 +52,35 @@ int filter(TransformContext ctx, InputVector input,

// reduce binds aggregate function type and data type from
// aggFunc.
int reduce(DimensionColumnVector inputKeys, uint8_t *inputValues,
DimensionColumnVector outputKeys, uint8_t *outputValues,
int reduce(DimensionVector inputKeys, uint8_t *inputValues,
DimensionVector outputKeys, uint8_t *outputValues,
int valueBytes, int length, AggregateFunction aggFunc,
cudaStream_t cudaStream);

// sort binds KeyIter type from keys.
void sort(DimensionColumnVector keys, int length, cudaStream_t cudaStream);
void sort(DimensionVector keys, int length, cudaStream_t cudaStream);

// hash_reduction will reduce the measures according to the dim values and
// aggregate function. Its interface is the same as reduce except it will not
// use index vector and hash vector in DimensionVector.
int hash_reduction(DimensionVector inputKeys, uint8_t *inputValues,
DimensionVector outputKeys, uint8_t *outputValues,
int valueBytes, int length, AggregateFunction aggFunc,
cudaStream_t cudaStream);

// expand function is used to uncompress the compressed dimension keys and
// append to outputKeys.
int expand(DimensionColumnVector inputKeys,
DimensionColumnVector outputKeys,
int expand(DimensionVector inputKeys,
DimensionVector outputKeys,
uint32_t *baseCounts,
uint32_t *indexVector,
int indexVectorLen,
int outputOccupiedLen,
cudaStream_t cudaStream);

// hyperloglog.
int hyperloglog(DimensionColumnVector prevDimOut,
DimensionColumnVector curDimOut, uint32_t *prevValuesOut,
int hyperloglog(DimensionVector prevDimOut,
DimensionVector curDimOut, uint32_t *prevValuesOut,
uint32_t *curValuesOut, int prevResultSize, int curBatchSize,
bool isLastBatch, uint8_t **hllVectorPtr,
size_t *hllVectorSizePtr, uint16_t **hllDimRegIDCountPtr,
Expand Down
Loading

0 comments on commit 92c409e

Please sign in to comment.