Skip to content

Commit

Permalink
Add support for performing pre-checks for TableRebalance (#15029)
Browse files Browse the repository at this point in the history
* Add support for performing pre-checks for TableRebalance
  • Loading branch information
somandal authored Feb 15, 2025
1 parent c0d4532 commit 5f220b3
Show file tree
Hide file tree
Showing 17 changed files with 529 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -252,11 +252,11 @@ public void init(PinotConfiguration pinotConfiguration)
// queries)
FunctionRegistry.init();
_adminApp = createControllerAdminApp();
// This executor service is used to do async tasks from multiget util or table rebalancing.
_executorService = createExecutorService(_config.getControllerExecutorNumThreads(), "async-task-thread-%d");
// Do not use this before the invocation of {@link PinotHelixResourceManager::start()}, which happens in {@link
// ControllerStarter::start()}
_helixResourceManager = createHelixResourceManager();
// This executor service is used to do async tasks from multiget util or table rebalancing.
_executorService = createExecutorService(_config.getControllerExecutorNumThreads(), "async-task-thread-%d");
_tenantRebalanceExecutorService = createExecutorService(_config.getControllerExecutorRebalanceNumThreads(),
"tenant-rebalance-thread-%d");
_tenantRebalancer = new DefaultTenantRebalancer(_helixResourceManager, _tenantRebalanceExecutorService);
Expand Down Expand Up @@ -324,7 +324,7 @@ private void setupHelixClusterConstraints() {
* @return A new instance of PinotHelixResourceManager.
*/
protected PinotHelixResourceManager createHelixResourceManager() {
return new PinotHelixResourceManager(_config);
return new PinotHelixResourceManager(_config, _executorService);
}

public PinotHelixResourceManager getHelixResourceManager() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ private static long getRandomInitialDelayInSeconds() {
public static final String ACCESS_CONTROL_USERNAME = "access.control.init.username";
public static final String ACCESS_CONTROL_PASSWORD = "access.control.init.password";
public static final String LINEAGE_MANAGER_CLASS = "controller.lineage.manager.class";
public static final String REBALANCE_PRE_CHECKER_CLASS = "controller.rebalance.pre.checker.class";
// Amount of the time the segment can take from the beginning of upload to the end of upload. Used when parallel push
// protection is enabled. If the upload does not finish within the timeout, next upload can override the previous one.
private static final String SEGMENT_UPLOAD_TIMEOUT_IN_MILLIS = "controller.segment.upload.timeoutInMillis";
Expand All @@ -316,6 +317,8 @@ private static long getRandomInitialDelayInSeconds() {
private static final String DEFAULT_ACCESS_CONTROL_PASSWORD = "admin";
private static final String DEFAULT_LINEAGE_MANAGER =
"org.apache.pinot.controller.helix.core.lineage.DefaultLineageManager";
private static final String DEFAULT_REBALANCE_PRE_CHECKER =
"org.apache.pinot.controller.helix.core.rebalance.DefaultRebalancePreChecker";
private static final long DEFAULT_SEGMENT_UPLOAD_TIMEOUT_IN_MILLIS = 600_000L; // 10 minutes
private static final int DEFAULT_MIN_NUM_CHARS_IN_IS_TO_TURN_ON_COMPRESSION = -1;
private static final int DEFAULT_REALTIME_SEGMENT_METADATA_COMMIT_NUMLOCKS = 64;
Expand Down Expand Up @@ -952,6 +955,14 @@ public void setLineageManagerClass(String lineageModifierClass) {
setProperty(LINEAGE_MANAGER_CLASS, lineageModifierClass);
}

public String getRebalancePreCheckerClass() {
return getProperty(REBALANCE_PRE_CHECKER_CLASS, DEFAULT_REBALANCE_PRE_CHECKER);
}

public void setRebalancePreCheckerClass(String rebalancePreCheckerClass) {
setProperty(REBALANCE_PRE_CHECKER_CLASS, rebalancePreCheckerClass);
}

public long getSegmentUploadTimeoutInMillis() {
return getProperty(SEGMENT_UPLOAD_TIMEOUT_IN_MILLIS, DEFAULT_SEGMENT_UPLOAD_TIMEOUT_IN_MILLIS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -948,7 +948,7 @@ public String getTableReloadMetadata(
new TableMetadataReader(_executor, _connectionManager, _pinotHelixResourceManager);
Map<String, JsonNode> needReloadMetadata =
tableMetadataReader.getServerCheckSegmentsReloadMetadata(tableNameWithType,
_controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
_controllerConf.getServerAdminRequestTimeoutSeconds() * 1000).getServerReloadJsonResponses();
boolean needReload =
needReloadMetadata.values().stream().anyMatch(value -> value.get("needReload").booleanValue());
Map<String, ServerSegmentsReloadCheckResponse> serverResponses = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,8 @@ public RebalanceResult rebalance(
@ApiParam(value = "OFFLINE|REALTIME", required = true) @QueryParam("type") String tableTypeStr,
@ApiParam(value = "Whether to rebalance table in dry-run mode") @DefaultValue("false") @QueryParam("dryRun")
boolean dryRun,
@ApiParam(value = "Whether to enable pre-checks for table, must be in dry-run mode to enable")
@DefaultValue("false") @QueryParam("preChecks") boolean preChecks,
@ApiParam(value = "Whether to reassign instances before reassigning segments") @DefaultValue("false")
@QueryParam("reassignInstances") boolean reassignInstances,
@ApiParam(value = "Whether to reassign CONSUMING segments for real-time table") @DefaultValue("false")
Expand Down Expand Up @@ -644,6 +646,7 @@ public RebalanceResult rebalance(
String tableNameWithType = constructTableNameWithType(tableName, tableTypeStr);
RebalanceConfig rebalanceConfig = new RebalanceConfig();
rebalanceConfig.setDryRun(dryRun);
rebalanceConfig.setPreChecks(preChecks);
rebalanceConfig.setReassignInstances(reassignInstances);
rebalanceConfig.setIncludeConsuming(includeConsuming);
rebalanceConfig.setBootstrap(bootstrap);
Expand All @@ -663,8 +666,9 @@ public RebalanceResult rebalance(
String rebalanceJobId = TableRebalancer.createUniqueRebalanceJobIdentifier();

try {
if (dryRun || downtime) {
// For dry-run or rebalance with downtime, directly return the rebalance result as it should return immediately
if (dryRun || preChecks || downtime) {
// For dry-run, preChecks or rebalance with downtime, directly return the rebalance result as it should return
// immediately
return _pinotHelixResourceManager.rebalanceTable(tableNameWithType, rebalanceConfig, rebalanceJobId, false);
} else {
// Make a dry-run first to get the target assignment
Expand All @@ -682,7 +686,8 @@ public RebalanceResult rebalance(
} catch (Throwable t) {
String errorMsg = String.format("Caught exception/error while rebalancing table: %s", tableNameWithType);
LOGGER.error(errorMsg, t);
return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED, errorMsg, null, null, null);
return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED, errorMsg, null, null, null,
null);
}
});
boolean isJobIdPersisted = waitForRebalanceToPersist(
Expand All @@ -702,7 +707,8 @@ public RebalanceResult rebalance(

return new RebalanceResult(dryRunResult.getJobId(), RebalanceResult.Status.IN_PROGRESS,
"In progress, check controller logs for updates", dryRunResult.getInstanceAssignment(),
dryRunResult.getTierInstanceAssignment(), dryRunResult.getSegmentAssignment());
dryRunResult.getTierInstanceAssignment(), dryRunResult.getSegmentAssignment(),
dryRunResult.getPreChecksResult());
} else {
// If dry-run failed or is no-op, return the dry-run result
return dryRunResult;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
Expand Down Expand Up @@ -151,6 +152,8 @@
import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
import org.apache.pinot.controller.helix.core.rebalance.RebalancePreChecker;
import org.apache.pinot.controller.helix.core.rebalance.RebalancePreCheckerFactory;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceContext;
import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
Expand Down Expand Up @@ -237,10 +240,12 @@ private enum LineageUpdateType {
private PinotLLCRealtimeSegmentManager _pinotLLCRealtimeSegmentManager;
private TableCache _tableCache;
private final LineageManager _lineageManager;
private final RebalancePreChecker _rebalancePreChecker;

public PinotHelixResourceManager(String zkURL, String helixClusterName, @Nullable String dataDir,
boolean isSingleTenantCluster, boolean enableBatchMessageMode, int deletedSegmentsRetentionInDays,
boolean enableTieredSegmentAssignment, LineageManager lineageManager) {
boolean enableTieredSegmentAssignment, LineageManager lineageManager, RebalancePreChecker rebalancePreChecker,
@Nullable ExecutorService executorService) {
_helixZkURL = HelixConfig.getAbsoluteZkPathForHelix(zkURL);
_helixClusterName = helixClusterName;
_dataDir = dataDir;
Expand All @@ -263,13 +268,24 @@ public String load(String instanceId) {
_lineageUpdaterLocks[i] = new Object();
}
_lineageManager = lineageManager;
_rebalancePreChecker = rebalancePreChecker;
_rebalancePreChecker.init(this, executorService);
}

public PinotHelixResourceManager(ControllerConf controllerConf, @Nullable ExecutorService executorService) {
this(controllerConf.getZkStr(), controllerConf.getHelixClusterName(), controllerConf.getDataDir(),
controllerConf.tenantIsolationEnabled(), controllerConf.getEnableBatchMessageMode(),
controllerConf.getDeletedSegmentsRetentionInDays(), controllerConf.tieredSegmentAssignmentEnabled(),
LineageManagerFactory.create(controllerConf),
RebalancePreCheckerFactory.create(controllerConf.getRebalancePreCheckerClass()), executorService);
}

public PinotHelixResourceManager(ControllerConf controllerConf) {
this(controllerConf.getZkStr(), controllerConf.getHelixClusterName(), controllerConf.getDataDir(),
controllerConf.tenantIsolationEnabled(), controllerConf.getEnableBatchMessageMode(),
controllerConf.getDeletedSegmentsRetentionInDays(), controllerConf.tieredSegmentAssignmentEnabled(),
LineageManagerFactory.create(controllerConf));
LineageManagerFactory.create(controllerConf),
RebalancePreCheckerFactory.create(controllerConf.getRebalancePreCheckerClass()), null);
}

/**
Expand Down Expand Up @@ -423,6 +439,15 @@ public LineageManager getLineageManager() {
return _lineageManager;
}

/**
* Get the rebalance pre-checker
*
* @return rebalance pre-checker
*/
public RebalancePreChecker getRebalancePreChecker() {
return _rebalancePreChecker;
}

/**
* Instance related APIs
*/
Expand Down Expand Up @@ -3587,7 +3612,7 @@ public RebalanceResult rebalanceTable(String tableNameWithType, TableConfig tabl
tierToSegmentsMap = updateTargetTier(rebalanceJobId, tableNameWithType, tableConfig);
}
TableRebalancer tableRebalancer =
new TableRebalancer(_helixZkManager, zkBasedTableRebalanceObserver, _controllerMetrics);
new TableRebalancer(_helixZkManager, zkBasedTableRebalanceObserver, _controllerMetrics, _rebalancePreChecker);
return tableRebalancer.rebalance(tableConfig, rebalanceConfig, rebalanceJobId, tierToSegmentsMap);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.controller.helix.core.rebalance;

import com.fasterxml.jackson.databind.JsonNode;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import javax.annotation.Nullable;
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils;
import org.apache.pinot.common.exception.InvalidConfigException;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.util.TableMetadataReader;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class DefaultRebalancePreChecker implements RebalancePreChecker {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultRebalancePreChecker.class);

public static final String NEEDS_RELOAD_STATUS = "needsReloadStatus";
public static final String IS_MINIMIZE_DATA_MOVEMENT = "isMinimizeDataMovement";

private PinotHelixResourceManager _pinotHelixResourceManager;
private ExecutorService _executorService;

@Override
public void init(PinotHelixResourceManager pinotHelixResourceManager, @Nullable ExecutorService executorService) {
_pinotHelixResourceManager = pinotHelixResourceManager;
_executorService = executorService;
}

@Override
public Map<String, String> check(String rebalanceJobId, String tableNameWithType,
TableConfig tableConfig) {
LOGGER.info("Start pre-checks for table: {} with rebalanceJobId: {}", tableNameWithType, rebalanceJobId);

Map<String, String> preCheckResult = new HashMap<>();
// Check for reload status
Boolean needsReload = checkReloadNeededOnServers(rebalanceJobId, tableNameWithType);
preCheckResult.put(NEEDS_RELOAD_STATUS, needsReload == null ? "error" : String.valueOf(needsReload));
// Check whether minimizeDataMovement is set in TableConfig
boolean isMinimizeDataMovement = checkIsMinimizeDataMovement(rebalanceJobId, tableNameWithType, tableConfig);
preCheckResult.put(IS_MINIMIZE_DATA_MOVEMENT, String.valueOf(isMinimizeDataMovement));

LOGGER.info("End pre-checks for table: {} with rebalanceJobId: {}", tableNameWithType, rebalanceJobId);
return preCheckResult;
}

/**
* Checks if the current segments on any servers needs a reload (table config or schema change that hasn't been
* applied yet). This check does not guarantee that the segments in deep store are up to date.
* TODO: Add an API to check for whether segments in deep store are up to date with the table configs and schema
* and add a pre-check here to call that API.
*/
private Boolean checkReloadNeededOnServers(String rebalanceJobId, String tableNameWithType) {
LOGGER.info("Fetching whether reload is needed for table: {} with rebalanceJobId: {}", tableNameWithType,
rebalanceJobId);
Boolean needsReload = null;
if (_executorService == null) {
LOGGER.warn("Executor service is null, skipping needsReload check for table: {} rebalanceJobId: {}",
tableNameWithType, rebalanceJobId);
return needsReload;
}
try (PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager()) {
TableMetadataReader metadataReader = new TableMetadataReader(_executorService, connectionManager,
_pinotHelixResourceManager);
TableMetadataReader.TableReloadJsonResponse needsReloadMetadataPair =
metadataReader.getServerCheckSegmentsReloadMetadata(tableNameWithType, 30_000);
Map<String, JsonNode> needsReloadMetadata = needsReloadMetadataPair.getServerReloadJsonResponses();
int failedResponses = needsReloadMetadataPair.getNumFailedResponses();
LOGGER.info("Received {} needs reload responses and {} failed responses from servers for table: {} with "
+ "rebalanceJobId: {}", needsReloadMetadata.size(), failedResponses, tableNameWithType, rebalanceJobId);
needsReload = needsReloadMetadata.values().stream().anyMatch(value -> value.get("needReload").booleanValue());
if (needsReload) {
return needsReload;
}
if (failedResponses > 0) {
LOGGER.warn("Received {} failed responses from servers and needsReload is false from returned responses, "
+ "check needsReload status manually", failedResponses);
needsReload = null;
}
} catch (InvalidConfigException | IOException e) {
LOGGER.warn("Caught exception while trying to fetch reload status from servers", e);
}

return needsReload;
}

/**
* Checks if minimize data movement is set for the given table in the TableConfig
*/
private boolean checkIsMinimizeDataMovement(String rebalanceJobId, String tableNameWithType,
TableConfig tableConfig) {
LOGGER.info("Checking whether minimizeDataMovement is set for table: {} with rebalanceJobId: {}", tableNameWithType,
rebalanceJobId);
try {
if (tableConfig.getTableType() == TableType.OFFLINE) {
InstanceAssignmentConfig instanceAssignmentConfig =
InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(tableConfig, InstancePartitionsType.OFFLINE);
return instanceAssignmentConfig.isMinimizeDataMovement();
} else {
InstanceAssignmentConfig instanceAssignmentConfigConsuming =
InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(tableConfig, InstancePartitionsType.CONSUMING);
// For REALTIME tables need to check for both CONSUMING and COMPLETED segments if relocation is enabled
if (!InstanceAssignmentConfigUtils.shouldRelocateCompletedSegments(tableConfig)) {
return instanceAssignmentConfigConsuming.isMinimizeDataMovement();
}

InstanceAssignmentConfig instanceAssignmentConfigCompleted =
InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(tableConfig, InstancePartitionsType.COMPLETED);
return instanceAssignmentConfigConsuming.isMinimizeDataMovement()
&& instanceAssignmentConfigCompleted.isMinimizeDataMovement();
}
} catch (IllegalStateException e) {
LOGGER.warn("Error while trying to fetch instance assignment config, assuming minimizeDataMovement is false", e);
return false;
}
}
}
Loading

0 comments on commit 5f220b3

Please sign in to comment.