diff --git a/collectors/plugins.d/plugins_d.c b/collectors/plugins.d/plugins_d.c index f48fc9b27e0427..20061ad29c49f3 100644 --- a/collectors/plugins.d/plugins_d.c +++ b/collectors/plugins.d/plugins_d.c @@ -240,7 +240,7 @@ void *pluginsd_main(void *ptr) // disable some plugins by default config_get_boolean(CONFIG_SECTION_PLUGINS, "slabinfo", CONFIG_BOOLEAN_NO); config_get_boolean(CONFIG_SECTION_PLUGINS, "logs-management", -#if defined(LOGS_MANAGEMENT_STRESS_TEST) +#if defined(LOGS_MANAGEMENT_DEV_MODE) CONFIG_BOOLEAN_YES #else CONFIG_BOOLEAN_NO diff --git a/logsmanagement/Makefile.am b/logsmanagement/Makefile.am index 609c9c66ae78ea..33f08d556a419d 100644 --- a/logsmanagement/Makefile.am +++ b/logsmanagement/Makefile.am @@ -25,8 +25,4 @@ dist_logsmanagconfig_DATA = \ dist_noinst_DATA = \ README.md \ - stress_test/logrotate.conf \ - stress_test/logs_query.html \ - stress_test/run_stress_test.sh \ - stress_test/stress_test.c \ $(NULL) diff --git a/logsmanagement/defaults.h b/logsmanagement/defaults.h index 8c8773ade2b30d..2309f781090292 100644 --- a/logsmanagement/defaults.h +++ b/logsmanagement/defaults.h @@ -23,7 +23,7 @@ #define UPDATE_TIMEOUT_DEFAULT 10 /**< Default timeout to use to update charts if they haven't been updated in the meantime. **/ -#if !defined(LOGS_MANAGEMENT_STRESS_TEST) +#if !defined(LOGS_MANAGEMENT_DEV_MODE) #define ENABLE_COLLECTED_LOGS_TOTAL_DEFAULT CONFIG_BOOLEAN_NO /**< Default value to enable (or not) metrics of total collected log records **/ #else #define ENABLE_COLLECTED_LOGS_TOTAL_DEFAULT CONFIG_BOOLEAN_YES /**< Default value to enable (or not) metrics of total collected log records, if stress tests are enabled **/ @@ -57,7 +57,7 @@ typedef enum { #define DISK_SPACE_LIMIT_DEFAULT 500 /**< Global default configuration maximum database disk space limit per log source **/ -#if !defined(LOGS_MANAGEMENT_STRESS_TEST) +#if !defined(LOGS_MANAGEMENT_DEV_MODE) #define GLOBAL_DB_MODE_DEFAULT_STR "none" /**< db mode string to be used as global default in configuration **/ #define GLOBAL_DB_MODE_DEFAULT LOGS_MANAG_DB_MODE_NONE /**< db mode to be used as global default, matching GLOBAL_DB_MODE_DEFAULT_STR **/ #else diff --git a/logsmanagement/helper.h b/logsmanagement/helper.h index 55c0716acc006c..6d1d51f76dac7c 100644 --- a/logsmanagement/helper.h +++ b/logsmanagement/helper.h @@ -16,11 +16,11 @@ #define LOGS_MANAG_STR(x) LOGS_MANAG_STR_HELPER(x) #ifndef m_assert -#if defined(LOGS_MANAGEMENT_STRESS_TEST) +#if defined(LOGS_MANAGEMENT_DEV_MODE) #define m_assert(expr, msg) assert(((void)(msg), (expr))) #else #define m_assert(expr, msg) do{} while(0) -#endif // LOGS_MANAGEMENT_STRESS_TEST +#endif // LOGS_MANAGEMENT_DEV_MODE #endif // m_assert /* Test if a timestamp is within a valid range @@ -45,7 +45,7 @@ #define COMPILE_TIME_ASSERT(X) COMPILE_TIME_ASSERT2(X,__LINE__) #endif // COMPILE_TIME_ASSERT -#if defined(NETDATA_INTERNAL_CHECKS) && defined(LOGS_MANAGEMENT_STRESS_TEST) +#if defined(NETDATA_INTERNAL_CHECKS) && defined(LOGS_MANAGEMENT_DEV_MODE) #define debug_log(args...) netdata_logger(NDLS_COLLECTORS, NDLP_DEBUG, __FILE__, __FUNCTION__, __LINE__, ##args) #else #define debug_log(fmt, args...) do {} while(0) diff --git a/logsmanagement/logsmanagement.c b/logsmanagement/logsmanagement.c index f2849426cf19c5..7ab0023ad60abd 100644 --- a/logsmanagement/logsmanagement.c +++ b/logsmanagement/logsmanagement.c @@ -23,10 +23,6 @@ #include "logsmanagement/unit_test/unit_test.h" #endif -#if defined(LOGS_MANAGEMENT_STRESS_TEST) && LOGS_MANAGEMENT_STRESS_TEST == 1 -#include "query_test.h" -#endif // defined(LOGS_MANAGEMENT_STRESS_TEST) - netdata_mutex_t stdout_mut = NETDATA_MUTEX_INITIALIZER; bool logsmanagement_should_exit = false; @@ -35,8 +31,6 @@ struct File_infos_arr *p_file_infos_arr = NULL; static uv_loop_t *main_loop; -static uv_thread_t stats_charts_thread_id; - static struct { uv_signal_t sig; const int signum; @@ -193,7 +187,15 @@ int main(int argc, char **argv) { exit(1); } - fatal_assert(0 == uv_thread_create(&stats_charts_thread_id, stats_charts_init, &stdout_mut)); + uv_thread_t *p_stats_charts_thread_id = NULL; + const char *const netdata_internals_monitoring = getenv("NETDATA_INTERNALS_MONITORING"); + if( netdata_internals_monitoring && + *netdata_internals_monitoring && + strcmp(netdata_internals_monitoring, "YES") == 0){ + + p_stats_charts_thread_id = mallocz(sizeof(uv_thread_t)); + fatal_assert(0 == uv_thread_create(p_stats_charts_thread_id, stats_charts_init, &stdout_mut)); + } #if defined(__STDC_VERSION__) debug_log( "__STDC_VERSION__: %ld", __STDC_VERSION__); @@ -204,12 +206,6 @@ int main(int argc, char **argv) { debug_log( "LZ4 version: %s", LZ4_versionString()); debug_log( "SQLITE version: " SQLITE_VERSION); -#if defined(LOGS_MANAGEMENT_STRESS_TEST) && LOGS_MANAGEMENT_STRESS_TEST == 1 - debug_log( "Running Netdata with logs_management stress test enabled!"); - static uv_thread_t run_stress_test_queries_thread_id; - uv_thread_create(&run_stress_test_queries_thread_id, run_stress_test_queries_thread, NULL); -#endif // LOGS_MANAGEMENT_STRESS_TEST - for(int i = 0; i < (int) (sizeof(signals) / sizeof(signals[0])); i++){ uv_signal_init(main_loop, &signals[i].sig); uv_signal_start(&signals[i].sig, signal_handler, signals[i].signum); @@ -229,7 +225,10 @@ int main(int argc, char **argv) { nd_log_limits_unlimited(); // TODO: Clean up stats charts memory - uv_thread_join(&stats_charts_thread_id); + if(p_stats_charts_thread_id){ + uv_thread_join(p_stats_charts_thread_id); + freez(p_stats_charts_thread_id); + } uv_stop(main_loop); diff --git a/logsmanagement/stress_test/logrotate.conf b/logsmanagement/stress_test/logrotate.conf deleted file mode 100644 index 8073c30744117c..00000000000000 --- a/logsmanagement/stress_test/logrotate.conf +++ /dev/null @@ -1,5 +0,0 @@ -/tmp/netdata_log_management_stress_test_data/0.log /tmp/netdata_log_management_stress_test_data/1.log { - rotate 1 - nocompress - create -} \ No newline at end of file diff --git a/logsmanagement/stress_test/logs_query.html b/logsmanagement/stress_test/logs_query.html deleted file mode 100644 index fdf0861a55a7ef..00000000000000 --- a/logsmanagement/stress_test/logs_query.html +++ /dev/null @@ -1,186 +0,0 @@ - - - - - - - -

Logs management queries

-
- - -
-

Query parameters

-
-
-

Set query parameters:

- -

- -

- -

- -

- -

- -

- -

- -
- -
- - - - - diff --git a/logsmanagement/stress_test/run_stress_test.sh b/logsmanagement/stress_test/run_stress_test.sh deleted file mode 100755 index 09a805bdc41dc9..00000000000000 --- a/logsmanagement/stress_test/run_stress_test.sh +++ /dev/null @@ -1,145 +0,0 @@ -#!/bin/bash - -# Default configuration options -DEFAULT_BUILD_CLEAN_NETDATA=0 -DEFAULT_BUILD_FOR_RELEASE=1 -DEFAULT_NUM_LOG_SOURCES=0 -DEFAULT_DELAY_BETWEEN_MSG_WRITE=1000000 -DEFAULT_TOTAL_MSGS_PER_SOURCE=1000000 -DEFAULT_QUERIES_DELAY=3600 -DEFAULT_LOG_ROTATE_AFTER_SEC=3600 -DEFAULT_DELAY_OPEN_TO_WRITE_SEC=6 -DEFAULT_RUN_LOGS_MANAGEMENT_TESTS_ONLY=0 - -if [ "$1" == "-h" ] || [ "$1" == "--help" ]; then - echo "Usage: $(basename "$0") [ARGS]..." - echo "Example: $(basename "$0") 0 1 2 1000 1000000 10 6 6 0" - echo "Build, install and run netdata with logs management " - echo "functionality enabled and (optional) stress tests." - echo "" - echo "arg[1]: [build_clean_netdata] Default: $DEFAULT_BUILD_CLEAN_NETDATA" - echo "arg[2]: [build_for_release] Default: $DEFAULT_BUILD_FOR_RELEASE" - echo "arg[3]: [num_log_sources] Default: $DEFAULT_NUM_LOG_SOURCES" - echo "arg[4]: [delay_between_msg_write] Default: $DEFAULT_DELAY_BETWEEN_MSG_WRITE us" - echo "arg[5]: [total_msgs_per_source] Default: $DEFAULT_TOTAL_MSGS_PER_SOURCE" - echo "arg[6]: [queries_delay] Default: $DEFAULT_QUERIES_DELAY s" - echo "arg[7]: [log_rotate_after_sec] Default: $DEFAULT_LOG_ROTATE_AFTER_SEC s" - echo "arg[8]: [delay_open_to_write_sec] Default: $DEFAULT_DELAY_OPEN_TO_WRITE_SEC s" - echo "arg[9]: [run_logs_management_tests_only] Default: $DEFAULT_RUN_LOGS_MANAGEMENT_TESTS_ONLY" - exit 0 -fi - -build_clean_netdata="${1:-$DEFAULT_BUILD_CLEAN_NETDATA}" -build_for_release="${2:-$DEFAULT_BUILD_FOR_RELEASE}" -num_log_sources="${3:-$DEFAULT_NUM_LOG_SOURCES}" -delay_between_msg_write="${4:-$DEFAULT_DELAY_BETWEEN_MSG_WRITE}" -total_msgs_per_source="${5:-$DEFAULT_TOTAL_MSGS_PER_SOURCE}" -queries_delay="${6:-$DEFAULT_QUERIES_DELAY}" -log_rotate_after_sec="${7:-$DEFAULT_LOG_ROTATE_AFTER_SEC}" -delay_open_to_write_sec="${8:-$DEFAULT_DELAY_OPEN_TO_WRITE_SEC}" -run_logs_management_tests_only="${9:-$DEFAULT_RUN_LOGS_MANAGEMENT_TESTS_ONLY}" - -if [ "$num_log_sources" -le 0 ] -then - enable_stress_tests=0 -else - enable_stress_tests=1 -fi - -INSTALL_PATH="/tmp" - -# Terminate running processes -sudo killall -s KILL netdata -sudo killall -s KILL stress_test -sudo killall -s KILL -u netdata - -# Remove potentially persistent directories and files -sudo rm -f $INSTALL_PATH/netdata/var/log/netdata/error.log -sudo rm -rf $INSTALL_PATH/netdata/var/cache/netdata/logs_management_db -sudo rm -rf $INSTALL_PATH/netdata_log_management_stress_test_data - -CPU_CORES=$(grep ^cpu\\scores /proc/cpuinfo | uniq | awk '{print $4}') - -# Build or rebuild Netdata -if [ "$build_clean_netdata" -eq 1 ] -then - cd ../.. - sudo $INSTALL_PATH/netdata/usr/libexec/netdata/netdata-uninstaller.sh -y -f -e $INSTALL_PATH/netdata/etc/netdata/.environment - sudo rm -rf $INSTALL_PATH/netdata/etc/netdata # Remove /etc/netdata if it persists for some reason - sudo git clean -dxff && git submodule update --init --recursive --force - - if [ "$build_for_release" -eq 0 ] - then - c_flags="-O1 -ggdb -Wall -Wextra " - c_flags+="-fno-omit-frame-pointer -Wformat-signedness -fstack-protector-all -Wformat-truncation=2 -Wunused-result " - c_flags+="-DNETDATA_INTERNAL_CHECKS=1 -DNETDATA_DEV_MODE=1 -DLOGS_MANAGEMENT_STRESS_TEST=$enable_stress_tests " - # c_flags+="-Wl,--no-as-needed -ldl " - sudo CFLAGS="$c_flags" ./netdata-installer.sh \ - --dont-start-it \ - --dont-wait \ - --disable-lto \ - --disable-telemetry \ - --disable-go \ - --disable-ebpf \ - --disable-ml \ - --enable-logsmanagement-tests \ - --install-prefix $INSTALL_PATH - else - c_flags="-DLOGS_MANAGEMENT_STRESS_TEST=$enable_stress_tests " - # c_flags+="-Wl,--no-as-needed -ldl " - sudo CFLAGS="$c_flags" ./netdata-installer.sh \ - --dont-start-it \ - --dont-wait \ - --disable-telemetry \ - --install-prefix $INSTALL_PATH - fi - - sudo cp logsmanagement/stress_test/logs_query.html "$INSTALL_PATH/netdata/usr/share/netdata/web" - sudo chown -R netdata:netdata "$INSTALL_PATH/netdata/usr/share/netdata/web/logs_query.html" - -else - cd ../.. && sudo make -j"$CPU_CORES" || exit 1 && sudo make install - sudo chown -R netdata:netdata "$INSTALL_PATH/netdata/usr/share/netdata/web" -fi - -cd logsmanagement/stress_test || exit - -if [ "$run_logs_management_tests_only" -eq 0 ] -then - # Rebuild and run stress test - if [ "$num_log_sources" -gt 0 ] - then - sudo -u netdata -g netdata mkdir $INSTALL_PATH/netdata_log_management_stress_test_data - gcc stress_test.c -DNUM_LOG_SOURCES="$num_log_sources" \ - -DDELAY_BETWEEN_MSG_WRITE="$delay_between_msg_write" \ - -DTOTAL_MSGS_PER_SOURCE="$total_msgs_per_source" \ - -DQUERIES_DELAY="$queries_delay" \ - -DLOG_ROTATE_AFTER_SEC="$log_rotate_after_sec" \ - -DDELAY_OPEN_TO_WRITE_SEC="$delay_open_to_write_sec" \ - -luv -Og -g -o stress_test - sudo -u netdata -g netdata ./stress_test & - sleep 1 - fi - - # Run Netdata - if [ "$build_for_release" -eq 0 ] - then - sudo -u netdata -g netdata -s gdb -ex="set confirm off" -ex=run --args $INSTALL_PATH/netdata/usr/sbin/netdata -D - elif [ "$build_for_release" -eq 2 ] - then - sudo -u netdata -g netdata -s gdb -ex="set confirm off" -ex=run --args $INSTALL_PATH/netdata/usr/libexec/netdata/plugins.d/logs-management.plugin - else - sudo -u netdata -g netdata ASAN_OPTIONS=log_path=stdout $INSTALL_PATH/netdata/usr/sbin/netdata -D - fi -else - if [[ $($INSTALL_PATH/netdata/usr/sbin/netdata -W buildinfo | grep -Fc DLOGS_MANAGEMENT_STRESS_TEST) -eq 1 ]] - then - sudo -u netdata -g netdata ASAN_OPTIONS=log_path=/dev/null $INSTALL_PATH/netdata/usr/libexec/netdata/plugins.d/logs-management.plugin --unittest - else - echo "=======================================================================" - echo "run_logs_management_tests_only=1 but logs management tests cannot run." - echo "Netdata must be configured with --enable-logsmanagement-tests." - echo "Please rerun script with build_clean_netdata=1 and build_for_release=0." - echo "=======================================================================" - fi -fi diff --git a/logsmanagement/stress_test/stress_test.c b/logsmanagement/stress_test/stress_test.c deleted file mode 100644 index ac6e52a3c7df5c..00000000000000 --- a/logsmanagement/stress_test/stress_test.c +++ /dev/null @@ -1,386 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -/** @file stress_test.c - * @brief Black-box stress testing of Netdata Logs Management - */ - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include "../defaults.h" - -#include "stress_test.h" - -#define SIMULATED_LOGS_DIR "/tmp/netdata_log_management_stress_test_data" -#define LOG_ROTATION_CMD "logrotate --force logrotate.conf -s /tmp/netdata_log_management_stress_test_data/logrotate_status" -#define CSV_DELIMITER " " -#define USE_LTSV_FORMAT 0 - -#define MS_IN_S 1000 -#define US_IN_S 1000000 - -#define NO_OF_FIELDS 10 - -#ifdef _WIN32 -# define PIPENAME "\\\\?\\pipe\\netdata-logs-stress-test" -#else -# define PIPENAME "/tmp/netdata-logs-stress-test" -#endif // _WIN32 - -uv_process_t child_req; -uv_process_options_t options; -size_t max_msg_len; -static int log_files_no; -static volatile int log_rotated = 0; - -static char **all_fields_arr[NO_OF_FIELDS]; -static int all_fields_arr_sizes[NO_OF_FIELDS]; - -static char *vhosts_ports[] = { - "testhost.host:17", - "invalidhost&%$:80", - "testhost12.host:80", - "testhost57.host:19999", - "testhost111.host:77777", - NULL -}; - -static char *vhosts[] = { - "testhost.host", - "invalidhost&%$", - "testhost12.host", - "testhost57.host", - "testhost111.host", - NULL -}; - -static char *ports[] = { - "17", - "80", - "123", - "8080", - "19999", - "77777", - NULL -}; - -static char *req_clients[] = { - "192.168.15.14", - "192.168.2.1", - "188.133.132.15", - "156.134.132.15", - "2001:0db8:85a3:0000:0000:8a2e:0370:7334", - "8501:0ab8:85a3:0000:0000:4a5d:0370:5213", - "garbageAddress", - NULL -}; - -static char *req_methods[] = { - "GET", - "POST", - "UPDATE", - "DELETE", - "PATCH", - "PUT", - "INVALIDMETHOD", - NULL -}; - -static char *resp_codes[] = { - "5", - "200", - "202", - "404", - "410", - "1027", - NULL -}; - -static char *req_protos[] = { - "HTTP/1", - "HTTP/1.0", - "HTTP/2", - "HTTP/3", - NULL -}; - -static char *req_sizes[] = { - "236", - "635", - "954", - "-", - NULL -}; - -static char *resp_sizes[] = { - "128", - "452", - "1056", - "-", - NULL -}; - -static char *ssl_protos[] = { - "TLSv1", - "TLSv1.1", - "TLSv1.2", - "TLSv1.3", - "SSLv3", - "-", - NULL -}; - -static char *ssl_ciphers[] = { - "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256", - "TLS_PSK_WITH_AES_128_CCM_8", - "ECDHE-RSA-AES128-GCM-SHA256", - "TLS_RSA_WITH_DES_CBC_SHA", - "TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256", - "invalid_SSL_cipher_suite", - "invalidSSLCipher", - NULL -}; - - - -// "host:testhost.host\tport:80\treq_client:192.168.15.14\treq_method:\"GET\"\tresp_code:202\treq_proto:HTTP/1\treq_size:635\tresp_size:-\tssl_proto:TLSv1\tssl_cipher:TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256", - - -// TODO: Include query.h instead of copy-pasting -typedef struct db_query_params { - msec_t start_timestamp; - msec_t end_timestamp; - char *filename; - char *keyword; - char *results; - size_t results_size; -} logs_query_params_t; - -size_t get_local_time(char *buf, size_t max_buf_size){ - time_t rawtime; - struct tm *info; - time( &rawtime ); -#if USE_LTSV_FORMAT - return strftime (buf, max_buf_size, "time:[%d/%b/%Y:%H:%M:%S %z]",localtime( &rawtime )); -#else - return strftime (buf, max_buf_size, "[%d/%b/%Y:%H:%M:%S %z]",localtime( &rawtime )); -#endif -} - -static void produce_logs(void *arg) { - msec_t runtime; - msec_t start_time = now_realtime_msec(); - int log_no = *((int *)arg); - int rc = 0; - long int msgs_written = 0; - uv_file file_handle; - uv_buf_t uv_buf; - char *buf = malloc(max_msg_len + 100); - - size_t buf_size; - uv_fs_t write_req; - - uv_loop_t loop; - uv_loop_init(&loop); - - char log_filename[100]; - sprintf(log_filename, "%s/%d.log", SIMULATED_LOGS_DIR, log_no); - - uv_fs_t open_req; - rc = uv_fs_open(&loop, &open_req, log_filename, O_WRONLY | O_CREAT | O_TRUNC, 0777, NULL); - if (rc < 0) { - fprintf(stderr, "[STRESS_TEST] file_open() error: %s (%d) %s\n", log_filename, rc, uv_strerror(rc)); - } else { - fprintf(stderr, "[STRESS_TEST] Opened file: %s\n", log_filename); - file_handle = open_req.result; // open_req->result of a uv_fs_t is the file descriptor in case of the uv_fs_open - } - uv_fs_req_cleanup(&open_req); - - sleep(DELAY_OPEN_TO_WRITE_SEC); - - fprintf(stderr, "[STRESS_TEST] Start logging: %s\n", log_filename); - - int applied_close_open = 0; - while (msgs_written < TOTAL_MSGS_PER_SOURCE) { - - size_t msg_timestamp_len = 50; - msg_timestamp_len = get_local_time(buf, msg_timestamp_len); - buf_size = msg_timestamp_len; - - for(int i = 0; i < NO_OF_FIELDS; i++){ - strcpy(&buf[buf_size++], CSV_DELIMITER); - int arr_item_off = rand() % all_fields_arr_sizes[i]; - size_t arr_item_len = strlen(all_fields_arr[i][arr_item_off]); - memcpy(&buf[buf_size], all_fields_arr[i][arr_item_off], arr_item_len); - buf_size += arr_item_len; - } - - buf[buf_size] = '\n'; - - uv_buf = uv_buf_init(buf, buf_size + 1); - uv_fs_write(&loop, &write_req, file_handle, &uv_buf, 1, -1, NULL); - msgs_written++; - if(!(msgs_written % 1000000)) fprintf(stderr, "[STRESS_TEST] Wrote %" PRId64 " messages to %s\n", msgs_written, log_filename); - if(log_rotated && !applied_close_open) { - uv_fs_t close_req; - rc = uv_fs_close(&loop, &close_req, file_handle, NULL); - if(rc) { - fprintf(stderr, "[STRESS_TEST] file_close() error: %s (%d) %s\n", log_filename, rc, uv_strerror(rc)); - assert(0); - } - uv_fs_req_cleanup(&close_req); - - rc = uv_fs_open(&loop, &open_req, log_filename, O_WRONLY | O_CREAT | O_TRUNC , 0777, NULL); - if (rc < 0) { - fprintf(stderr, "[STRESS_TEST] file_open() error: %s (%d) %s\n", log_filename, rc, uv_strerror(rc)); - assert(0); - } else { - fprintf(stderr, "[STRESS_TEST] Rotated file: %s\n", log_filename); - file_handle = open_req.result; // open_req->result of a uv_fs_t is the file descriptor in case of the uv_fs_open - } - uv_fs_req_cleanup(&open_req); - - applied_close_open = 1; - fflush(stderr); - } -#if DELAY_BETWEEN_MSG_WRITE /**< Sleep delay (in us) in between consequent messages writes to a file **/ - usleep(DELAY_BETWEEN_MSG_WRITE); -#endif - } - - runtime = now_realtime_msec() - start_time - DELAY_OPEN_TO_WRITE_SEC * MS_IN_S; - fprintf(stderr, "[STRESS_TEST] It took %" PRIu64 "ms to write %" PRId64 " log records in %s (%" PRId64 "k msgs/s))\n. ", - runtime, msgs_written, log_filename, msgs_written / runtime); -} - -static void log_rotate(void *arg){ - uv_sleep((DELAY_OPEN_TO_WRITE_SEC + LOG_ROTATE_AFTER_SEC) * MS_IN_S); - assert(system(LOG_ROTATION_CMD) != -1); - log_rotated = 1; - fprintf(stderr, "[STRESS_TEST] Rotate log sources\n"); - fflush(stderr); -} - -static void connect_cb(uv_connect_t* req, int status){ - int rc = 0; - if(status < 0){ - fprintf(stderr, "[STRESS_TEST] Failed to connect to pipe!\n"); - exit(-1); - } - else - fprintf(stderr, "[STRESS_TEST] Connection to pipe successful!\n"); - - uv_write_t write_req; - write_req.data = req->handle; - - // Serialise logs_query_params_t - char *buf = calloc(100 * log_files_no, sizeof(char)); - sprintf(buf, "%d", log_files_no); - for(int i = 0; i < log_files_no ; i++){ - sprintf(&buf[strlen(buf)], ",0,2147483646000," SIMULATED_LOGS_DIR "/%d.log,%s,%zu", i, " ", (size_t) MAX_LOG_MSG_SIZE); - } - fprintf(stderr, "[STRESS_TEST] Serialised DB query params: %s\n", buf); - - // Write to pipe - uv_buf_t uv_buf = uv_buf_init(buf, strlen(buf)); - rc = uv_write(&write_req, (uv_stream_t *) req->handle, &uv_buf, 1, NULL); - if (rc) { - fprintf(stderr, "[STRESS_TEST] uv_write() error: %s\n", uv_strerror(rc)); - uv_close((uv_handle_t *) req->handle, NULL); - exit(-1); - } - -#if 1 - uv_shutdown_t shutdown_req; - rc = uv_shutdown(&shutdown_req, (uv_stream_t *) req->handle, NULL); - if (rc) { - fprintf(stderr, "[STRESS_TEST] uv_shutdown() error: %s\n", uv_strerror(rc)); - uv_close((uv_handle_t *) req->handle, NULL); - exit(-1); - } -#endif - -} - -int main(int argc, const char *argv[]) { - fprintf(stdout, "*****************************************************************************\n" - "%-15s %40s\n", - "* [STRESS_TEST] Starting stress_test", "*"); - - srand(time(NULL)); - - all_fields_arr[0] = vhosts; - all_fields_arr[1] = ports; - all_fields_arr[2] = req_clients; - all_fields_arr[3] = req_methods; - all_fields_arr[4] = resp_codes; - all_fields_arr[5] = req_protos; - all_fields_arr[6] = req_sizes; - all_fields_arr[7] = resp_sizes; - all_fields_arr[8] = ssl_protos; - all_fields_arr[9] = ssl_ciphers; - - for (int i = 0; i < NO_OF_FIELDS; i++){ - char **arr = all_fields_arr[i]; - int arr_size = 0; - size_t max_item_len = 0; - while(arr[arr_size] != NULL){ - size_t item_len = strlen(arr[arr_size]); - if(item_len > max_item_len) max_item_len = item_len; - arr_size++; - } - max_msg_len += max_item_len; - all_fields_arr_sizes[i] = arr_size; - } - - - char *ptr; - log_files_no = NUM_LOG_SOURCES; - fprintf(stdout, "*****************************************************************************\n" - "%-15s%42s %-10u%9s\n" - "%-15s%42s %-10u%9s\n" - "%-15s%42s %-10u%9s\n" - "%-15s%42s %-10u%9s\n" - "%-15s%42s %-10u%9s\n" - "%-15s%42s %-10u%9s\n" - "*****************************************************************************\n", - "* [STRESS_TEST]", "Number of log sources to simulate:", log_files_no, "file *", - "* [STRESS_TEST]", "Total log records to produce per source:", TOTAL_MSGS_PER_SOURCE, "records *", - "* [STRESS_TEST]", "Delay between log record write to file:", DELAY_BETWEEN_MSG_WRITE, "us *", - "* [STRESS_TEST]", "Log sources to rotate via create after:", LOG_ROTATE_AFTER_SEC, "s *", - "* [STRESS_TEST]", "Queries to be executed after:", QUERIES_DELAY, "s *", - "* [STRESS_TEST]", "Delay until start writing logs:", DELAY_OPEN_TO_WRITE_SEC, "s *"); - - /* Start threads that produce log messages */ - uv_thread_t *log_producer_threads = malloc(log_files_no * sizeof(uv_thread_t)); - int *log_producer_thread_no = malloc(log_files_no * sizeof(int)); - for (int i = 0; i < log_files_no; i++) { - fprintf(stderr, "[STRESS_TEST] Starting up log producer for %d.log\n", i); - log_producer_thread_no[i] = i; - assert(!uv_thread_create(&log_producer_threads[i], produce_logs, &log_producer_thread_no[i])); - } - - uv_thread_t *log_rotate_thread = malloc(sizeof(uv_thread_t)); - assert(!uv_thread_create(log_rotate_thread, log_rotate, NULL)); - - for (int j = 0; j < log_files_no; j++) { - uv_thread_join(&log_producer_threads[j]); - } - - sleep(QUERIES_DELAY); // Give netdata-logs more than LOG_FILE_READ_INTERVAL to ensure the entire log file has been read. - - uv_pipe_t query_data_pipe; - uv_pipe_init(uv_default_loop(), &query_data_pipe, 1); - uv_connect_t connect_req; - uv_pipe_connect(&connect_req, &query_data_pipe, PIPENAME, connect_cb); - - uv_run(uv_default_loop(), UV_RUN_DEFAULT); - - uv_close((uv_handle_t *) &query_data_pipe, NULL); -}