From b49e0256492eafa288c740592acca7850af728af Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Fri, 13 Sep 2024 23:44:41 +0800 Subject: [PATCH] [feature](api) add BE HTTP /api/load_channels (#40645) ## Proposed changes Add BE HTTP API `/api/load_channels` to get all load_id for current running load_channels. Example: ```console $ curl http://localhost:8040/api/load_channels {"msg":"OK","code":0,"data":{"host":"10.16.10.7","load_channels":[]},"count":0} $ curl http://localhost:8040/api/load_channels {"msg":"OK","code":0,"data":{"host":"10.16.10.7","load_channels":[{"load_id":"159a1b47169d4a67-9136dd6ba9b72011"}]},"count":1} $ curl http://localhost:8040/api/load_channels {"msg":"OK","code":0,"data":{"host":"10.16.10.7","load_channels":[]},"count":0} ``` --- be/src/http/action/load_channel_action.cpp | 66 +++++++++++++++++++ be/src/http/action/load_channel_action.h | 43 ++++++++++++ be/src/runtime/load_channel_mgr.h | 10 +++ be/src/service/http_service.cpp | 5 ++ .../check_before_quit.groovy | 11 ++++ 5 files changed, 135 insertions(+) create mode 100644 be/src/http/action/load_channel_action.cpp create mode 100644 be/src/http/action/load_channel_action.h diff --git a/be/src/http/action/load_channel_action.cpp b/be/src/http/action/load_channel_action.cpp new file mode 100644 index 00000000000000..35efe56ecdea2d --- /dev/null +++ b/be/src/http/action/load_channel_action.cpp @@ -0,0 +1,66 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "http/action/load_channel_action.h" + +#include +#include +#include +#include +#include +#include + +#include "cloud/config.h" +#include "http/http_channel.h" +#include "http/http_headers.h" +#include "http/http_request.h" +#include "http/http_status.h" +#include "olap/olap_common.h" +#include "olap/storage_engine.h" +#include "olap/tablet_manager.h" +#include "runtime/exec_env.h" +#include "runtime/load_channel_mgr.h" +#include "service/backend_options.h" + +namespace doris { + +const static std::string HEADER_JSON = "application/json"; + +void LoadChannelAction::handle(HttpRequest* req) { + req->add_output_header(HttpHeaders::CONTENT_TYPE, HEADER_JSON.c_str()); + HttpChannel::send_reply(req, HttpStatus::OK, _get_load_channels().ToString()); +} + +EasyJson LoadChannelAction::_get_load_channels() { + EasyJson response; + + auto load_channels = ExecEnv::GetInstance()->load_channel_mgr()->get_all_load_channel_ids(); + + response["msg"] = "OK"; + response["code"] = 0; + EasyJson data = response.Set("data", EasyJson::kObject); + data["host"] = BackendOptions::get_localhost(); + EasyJson tablets = data.Set("load_channels", EasyJson::kArray); + for (auto& load_id : load_channels) { + EasyJson tablet = tablets.PushBack(EasyJson::kObject); + tablet["load_id"] = load_id; + } + response["count"] = load_channels.size(); + return response; +} + +} // namespace doris diff --git a/be/src/http/action/load_channel_action.h b/be/src/http/action/load_channel_action.h new file mode 100644 index 00000000000000..2a9ec3dbf492b5 --- /dev/null +++ b/be/src/http/action/load_channel_action.h @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "http/http_handler.h" +#include "http/http_handler_with_auth.h" +#include "util/easy_json.h" + +namespace doris { +class HttpRequest; + +class ExecEnv; + +// Get BE load stream info from http API. +class LoadChannelAction final : public HttpHandlerWithAuth { +public: + LoadChannelAction(ExecEnv* exec_env) : HttpHandlerWithAuth(exec_env) {} + + ~LoadChannelAction() override = default; + + void handle(HttpRequest* req) override; + +private: + static EasyJson _get_load_channels(); +}; +} // namespace doris diff --git a/be/src/runtime/load_channel_mgr.h b/be/src/runtime/load_channel_mgr.h index c9c8f4c2a0f3cc..6d17b6f275f21d 100644 --- a/be/src/runtime/load_channel_mgr.h +++ b/be/src/runtime/load_channel_mgr.h @@ -64,6 +64,16 @@ class LoadChannelMgr { void stop(); + std::vector get_all_load_channel_ids() { + std::vector result; + std::lock_guard lock(_lock); + + for (auto& [id, _] : _load_channels) { + result.push_back(id.to_string()); + } + return result; + } + private: Status _get_load_channel(std::shared_ptr& channel, bool& is_eof, const UniqueId& load_id, const PTabletWriterAddBlockRequest& request); diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp index f2c325bebc7806..46ee9569992876 100644 --- a/be/src/service/http_service.cpp +++ b/be/src/service/http_service.cpp @@ -47,6 +47,7 @@ #include "http/action/health_action.h" #include "http/action/http_stream.h" #include "http/action/jeprofile_actions.h" +#include "http/action/load_channel_action.h" #include "http/action/load_stream_action.h" #include "http/action/meta_action.h" #include "http/action/metrics_action.h" @@ -188,6 +189,10 @@ Status HttpService::start() { LoadStreamAction* load_stream_action = _pool.add(new LoadStreamAction(_env)); _ev_http_server->register_handler(HttpMethod::GET, "/api/load_streams", load_stream_action); + // Register BE LoadChannel action + LoadChannelAction* load_channel_action = _pool.add(new LoadChannelAction(_env)); + _ev_http_server->register_handler(HttpMethod::GET, "/api/load_channels", load_channel_action); + // Register Tablets Info action TabletsInfoAction* tablets_info_action = _pool.add(new TabletsInfoAction(_env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN)); diff --git a/regression-test/suites/check_before_quit/check_before_quit.groovy b/regression-test/suites/check_before_quit/check_before_quit.groovy index 23850a7c8a59f8..7b097b58cd23c6 100644 --- a/regression-test/suites/check_before_quit/check_before_quit.groovy +++ b/regression-test/suites/check_before_quit/check_before_quit.groovy @@ -126,6 +126,7 @@ suite("check_before_quit", "nonConcurrent,p0") { def command_metrics = "curl http://${beHost}:${bePort}/metrics" def command_vars = "curl http://${beHost}:${beBrpcPort}/vars" + def command_load_channels = "curl http://${beHost}:${bePort}/api/load_channels" def command_load_streams = "curl http://${beHost}:${bePort}/api/load_streams" while ((System.currentTimeMillis() - beginTime) < timeoutMs) { clear = true @@ -214,6 +215,16 @@ suite("check_before_quit", "nonConcurrent,p0") { break } + logger.info("executing command: ${command_load_channels}") + def process_load_channels = command_load_channels.execute() + def outputStream_load_channels = new StringBuffer() + def errorStream_load_channels = new StringBuffer() + process_load_channels.consumeProcessOutput(outputStream_load_channels, errorStream_load_channels) + def code_load_channels = process_load_channels.waitFor() + def load_channels = outputStream_load_channels.toString() + logger.info("Request BE load_channels: code=" + code_load_channels + ", err=" + errorStream_load_channels.toString()) + logger.info("load_channels: " + load_channels); + logger.info("executing command: ${command_load_streams}") def process_load_streams = command_load_streams.execute() def outputStream_load_streams = new StringBuffer()