Skip to content

Commit

Permalink
[feature](api) add BE HTTP /api/load_channels (apache#40645)
Browse files Browse the repository at this point in the history
## Proposed changes

Add BE HTTP API `/api/load_channels` to get all load_id for current
running load_channels.

Example:

```console
$ curl http://localhost:8040/api/load_channels
{"msg":"OK","code":0,"data":{"host":"10.16.10.7","load_channels":[]},"count":0}
$ curl http://localhost:8040/api/load_channels
{"msg":"OK","code":0,"data":{"host":"10.16.10.7","load_channels":[{"load_id":"159a1b47169d4a67-9136dd6ba9b72011"}]},"count":1}
$ curl http://localhost:8040/api/load_channels
{"msg":"OK","code":0,"data":{"host":"10.16.10.7","load_channels":[]},"count":0}
```
  • Loading branch information
kaijchen authored Sep 13, 2024
1 parent f5a2037 commit b49e025
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 0 deletions.
66 changes: 66 additions & 0 deletions be/src/http/action/load_channel_action.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "http/action/load_channel_action.h"

#include <algorithm>
#include <cstdint>
#include <cstdlib>
#include <limits>
#include <string>
#include <vector>

#include "cloud/config.h"
#include "http/http_channel.h"
#include "http/http_headers.h"
#include "http/http_request.h"
#include "http/http_status.h"
#include "olap/olap_common.h"
#include "olap/storage_engine.h"
#include "olap/tablet_manager.h"
#include "runtime/exec_env.h"
#include "runtime/load_channel_mgr.h"
#include "service/backend_options.h"

namespace doris {

const static std::string HEADER_JSON = "application/json";

void LoadChannelAction::handle(HttpRequest* req) {
req->add_output_header(HttpHeaders::CONTENT_TYPE, HEADER_JSON.c_str());
HttpChannel::send_reply(req, HttpStatus::OK, _get_load_channels().ToString());
}

EasyJson LoadChannelAction::_get_load_channels() {
EasyJson response;

auto load_channels = ExecEnv::GetInstance()->load_channel_mgr()->get_all_load_channel_ids();

response["msg"] = "OK";
response["code"] = 0;
EasyJson data = response.Set("data", EasyJson::kObject);
data["host"] = BackendOptions::get_localhost();
EasyJson tablets = data.Set("load_channels", EasyJson::kArray);
for (auto& load_id : load_channels) {
EasyJson tablet = tablets.PushBack(EasyJson::kObject);
tablet["load_id"] = load_id;
}
response["count"] = load_channels.size();
return response;
}

} // namespace doris
43 changes: 43 additions & 0 deletions be/src/http/action/load_channel_action.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <string>

#include "http/http_handler.h"
#include "http/http_handler_with_auth.h"
#include "util/easy_json.h"

namespace doris {
class HttpRequest;

class ExecEnv;

// Get BE load stream info from http API.
class LoadChannelAction final : public HttpHandlerWithAuth {
public:
LoadChannelAction(ExecEnv* exec_env) : HttpHandlerWithAuth(exec_env) {}

~LoadChannelAction() override = default;

void handle(HttpRequest* req) override;

private:
static EasyJson _get_load_channels();
};
} // namespace doris
10 changes: 10 additions & 0 deletions be/src/runtime/load_channel_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,16 @@ class LoadChannelMgr {

void stop();

std::vector<std::string> get_all_load_channel_ids() {
std::vector<std::string> result;
std::lock_guard<std::mutex> lock(_lock);

for (auto& [id, _] : _load_channels) {
result.push_back(id.to_string());
}
return result;
}

private:
Status _get_load_channel(std::shared_ptr<LoadChannel>& channel, bool& is_eof,
const UniqueId& load_id, const PTabletWriterAddBlockRequest& request);
Expand Down
5 changes: 5 additions & 0 deletions be/src/service/http_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
#include "http/action/health_action.h"
#include "http/action/http_stream.h"
#include "http/action/jeprofile_actions.h"
#include "http/action/load_channel_action.h"
#include "http/action/load_stream_action.h"
#include "http/action/meta_action.h"
#include "http/action/metrics_action.h"
Expand Down Expand Up @@ -188,6 +189,10 @@ Status HttpService::start() {
LoadStreamAction* load_stream_action = _pool.add(new LoadStreamAction(_env));
_ev_http_server->register_handler(HttpMethod::GET, "/api/load_streams", load_stream_action);

// Register BE LoadChannel action
LoadChannelAction* load_channel_action = _pool.add(new LoadChannelAction(_env));
_ev_http_server->register_handler(HttpMethod::GET, "/api/load_channels", load_channel_action);

// Register Tablets Info action
TabletsInfoAction* tablets_info_action =
_pool.add(new TabletsInfoAction(_env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN));
Expand Down
11 changes: 11 additions & 0 deletions regression-test/suites/check_before_quit/check_before_quit.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ suite("check_before_quit", "nonConcurrent,p0") {

def command_metrics = "curl http://${beHost}:${bePort}/metrics"
def command_vars = "curl http://${beHost}:${beBrpcPort}/vars"
def command_load_channels = "curl http://${beHost}:${bePort}/api/load_channels"
def command_load_streams = "curl http://${beHost}:${bePort}/api/load_streams"
while ((System.currentTimeMillis() - beginTime) < timeoutMs) {
clear = true
Expand Down Expand Up @@ -214,6 +215,16 @@ suite("check_before_quit", "nonConcurrent,p0") {
break
}

logger.info("executing command: ${command_load_channels}")
def process_load_channels = command_load_channels.execute()
def outputStream_load_channels = new StringBuffer()
def errorStream_load_channels = new StringBuffer()
process_load_channels.consumeProcessOutput(outputStream_load_channels, errorStream_load_channels)
def code_load_channels = process_load_channels.waitFor()
def load_channels = outputStream_load_channels.toString()
logger.info("Request BE load_channels: code=" + code_load_channels + ", err=" + errorStream_load_channels.toString())
logger.info("load_channels: " + load_channels);

logger.info("executing command: ${command_load_streams}")
def process_load_streams = command_load_streams.execute()
def outputStream_load_streams = new StringBuffer()
Expand Down

0 comments on commit b49e025

Please sign in to comment.