Skip to content

Commit

Permalink
CASSSIDECAR-189: Fix SidecarSchema stuck at initialization due to Clu…
Browse files Browse the repository at this point in the history
…sterLeaseTask scheduling (apache#175)

Patch by Yifan Cai; Reviewed by Francisco Guerrero for CASSSIDECAR-189
  • Loading branch information
yifan-c authored Jan 22, 2025
1 parent 5a19e34 commit c4b5f40
Show file tree
Hide file tree
Showing 9 changed files with 188 additions and 56 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
1.0.0
-----
* Fix SidecarSchema stuck at initialization due to ClusterLeaseTask scheduling (CASSSIDECAR-189)
* Add RBAC Authorization support in Sidecar (CASSSIDECAR-161)
* Standardize configuration for duration units (CASSSIDECAR-186)
* Adds sidecar endpoint for node decommissioning operation (CASSANDRASC-151)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.cassandra.sidecar.config;

import org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration;
import org.jetbrains.annotations.NotNull;

/**
* Encapsulates key or trust store option configurations
Expand All @@ -44,18 +45,20 @@ public interface KeyStoreConfiguration

/**
* Returns the interval in which the key store will be checked for filesystem changes. Setting
* this value to 0 or negative will disable reloading the store.
* this value to {@link SecondBoundConfiguration#ZERO} will disable reloading the store.
*
* @return the interval in which the key store will be checked for changes in the filesystem
*/
@NotNull
SecondBoundConfiguration checkInterval();

/**
* @return {@code true} if the key store will be reloaded if a change is detected, {@code false} otherwise
*/
default boolean reloadStore()
{
return checkInterval().compareTo(SecondBoundConfiguration.ZERO) > 0;
SecondBoundConfiguration interval = checkInterval();
return interval != SecondBoundConfiguration.ZERO && !interval.equals(SecondBoundConfiguration.ZERO);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.datastax.driver.core.exceptions.CASWriteUnknownException;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.datastax.driver.core.exceptions.QueryConsistencyException;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import org.apache.cassandra.sidecar.common.server.utils.DurationSpec;
Expand Down Expand Up @@ -187,15 +187,14 @@ protected String executeLeaseAction(String actionName,
LOGGER.debug("Attempting to {} lease for sidecarHostId={}", actionName, sidecarHostId);
return actionFn.apply(sidecarHostId).currentOwner;
}
catch (CASWriteUnknownException | NoHostAvailableException e)
catch (QueryConsistencyException | NoHostAvailableException e)
{
LOGGER.debug("Unable to {} lease for sidecarHostId={}", actionName, sidecarHostId, e);
}
catch (Exception e)
{
LOGGER.error("Unable to {} lease for sidecarHostId={}", actionName, sidecarHostId, e);
}
LOGGER.debug("Unable to perform lease operation for sidecarHostId={}", sidecarHostId);
return null; // owner is unknown
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,15 @@ protected boolean initializeInternal(@NotNull Session session,
{
super.initializeInternal(session, shouldCreateSchema);

boolean initialized = true;
for (AbstractSchema schema : tableSchemas)
{
if (!schema.initialize(session, shouldCreateSchema))
return false;
// Attempts to initialize all schemas.
// Sets initialized to false if any of the schema initialization fails
initialized = schema.initialize(session, shouldCreateSchema) && initialized;
}

return true;
return initialized;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,8 @@

import static org.apache.cassandra.sidecar.common.ApiEndpointsV1.API_V1_ALL_ROUTES;
import static org.apache.cassandra.sidecar.common.server.utils.ByteUtils.bytesToHumanReadableBinaryPrefix;
import static org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CASSANDRA_CQL_READY;
import static org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SERVER_STOP;
import static org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SIDECAR_SCHEMA_INITIALIZED;

/**
* Provides main binding for more complex Guice dependencies
Expand Down Expand Up @@ -853,7 +853,7 @@ public PeriodicTaskExecutor periodicTaskExecutor(Vertx vertx,
ClusterLeaseClaimTask clusterLeaseClaimTask)
{
PeriodicTaskExecutor periodicTaskExecutor = new PeriodicTaskExecutor(executorPools, clusterLease);
vertx.eventBus().localConsumer(ON_SIDECAR_SCHEMA_INITIALIZED.address(),
vertx.eventBus().localConsumer(ON_CASSANDRA_CQL_READY.address(),
ignored -> periodicTaskExecutor.schedule(clusterLeaseClaimTask));
return periodicTaskExecutor;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.StreamSupport;

import com.google.common.util.concurrent.Uninterruptibles;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.params.ParameterizedTest;
Expand Down Expand Up @@ -89,8 +90,9 @@
@Tag("heavy")
class ClusterLeaseClaimTaskIntegrationTest
{
private static final Logger LOGGER = LoggerFactory.getLogger(ClusterLeaseClaimTaskIntegrationTest.class);
public static final int CONCURRENT_PROCESSES = 12;
private static final Logger LOGGER = LoggerFactory.getLogger(ClusterLeaseClaimTaskIntegrationTest.class);
private static final int LEASE_SCHEMA_TTL_SECONDS = 10;
final List<CQLSessionProvider> sessionProviderList = new ArrayList<>();
final Vertx vertx = Vertx.vertx();
SchemaKeyspaceConfiguration mockSchemaConfig;
Expand All @@ -105,7 +107,7 @@ void setup()
when(mockSchemaConfig.keyspace()).thenReturn(SchemaKeyspaceConfigurationImpl.DEFAULT_KEYSPACE);
when(mockSchemaConfig.replicationStrategy()).thenReturn(SchemaKeyspaceConfigurationImpl.DEFAULT_REPLICATION_STRATEGY);
when(mockSchemaConfig.replicationFactor()).thenReturn(SchemaKeyspaceConfigurationImpl.DEFAULT_REPLICATION_FACTOR);
when(mockSchemaConfig.leaseSchemaTTL()).thenReturn(SecondBoundConfiguration.parse("5s"));
when(mockSchemaConfig.leaseSchemaTTL()).thenReturn(SecondBoundConfiguration.parse(LEASE_SCHEMA_TTL_SECONDS + "s"));
}

@ParameterizedTest(name = "{index} => version {0}")
Expand All @@ -117,10 +119,9 @@ void test(TestVersion version) throws IOException
Versions.Version requestedVersion = versions.getLatest(new Semver(version.version(), Semver.SemverType.LOOSE));

// Spin up a 3-node cluster
try (AbstractCluster<?> cluster = UpgradeableCluster.build(0)
try (AbstractCluster<?> cluster = UpgradeableCluster.build(3)
.withDynamicPortAllocation(true) // to allow parallel test runs
.withVersion(requestedVersion)
.withDC("dc0", 3)
.withConfig(config -> config.with(Feature.NATIVE_PROTOCOL))
.start())
{
Expand All @@ -146,6 +147,7 @@ private void simulate(AbstractCluster<?> cluster)
AtomicReference<Object[][]> currentLeaseholderQueryResult = new AtomicReference<>();
AtomicReference<TestInstanceWrapper> currentLeaseholder = new AtomicReference<>();
loopAssert(3, () -> {
cleanupDeltaGaugeMetrics(simulatedInstances);
runLeaseAcquireProcess(pool, simulatedInstances);
Object[][] resultSet = queryCurrentLeaseholders(cluster);
currentLeaseholderQueryResult.set(resultSet);
Expand Down Expand Up @@ -246,37 +248,18 @@ private void simulate(AbstractCluster<?> cluster)
// then disable binary
simulateDisableBinaryOfLeaseholder(simulatedInstances);

ScheduleDecision scheduleDecision = null;
for (int i = 0; i < 20; i++)
{
leaseholder.clusterLeaseClaimTask.runClaimProcess();
scheduleDecision = leaseholder.clusterLease.toScheduleDecision();
// Wait for lease to expire in both leaseholder and database
Uninterruptibles.sleepUninterruptibly(LEASE_SCHEMA_TTL_SECONDS, TimeUnit.SECONDS);

if (scheduleDecision != ScheduleDecision.RESCHEDULE)
{
int ttlSeconds = Math.max(1, maybeDetermineTTL(cluster));
LOGGER.info("TTL is {} seconds", ttlSeconds);
// wait for the leaseholder to give the lease
// query the TTL value and sleep for that amount of time
// before attempting again
sleepUninterruptibly(ttlSeconds, TimeUnit.SECONDS);
}
else break;
}
assertThat(scheduleDecision).as("The leaseholder should give up the lease")
.isEqualTo(ScheduleDecision.RESCHEDULE);
// ensure the data is TTL'd in the database
for (int i = 0; i < 20; i++)
{
loopAssert(3, 1000, () -> {
leaseholder.clusterLeaseClaimTask.runClaimProcess();
ScheduleDecision scheduleDecision = leaseholder.clusterLease.toScheduleDecision();
assertThat(scheduleDecision).as("The leaseholder should give up the lease")
.isEqualTo(ScheduleDecision.RESCHEDULE);
// ensure the data is TTL'd in the database
long rowCount = rowCountInLeaseTable(cluster);
if (rowCount == 0)
{
// data has been TTL'd
return;
}
sleepUninterruptibly(1, TimeUnit.SECONDS);
}
fail("Data was not TTL'd in the database");
assertThat(rowCount).describedAs("Lease should be TTL'd").isZero();
});
}

private void validateMetrics(List<TestInstanceWrapper> simulatedInstances, int expectedLeaseholderCount)
Expand All @@ -289,8 +272,18 @@ private void validateMetrics(List<TestInstanceWrapper> simulatedInstances, int e
.isEqualTo(expectedLeaseholderCount);
}

// similar to validateMetrics but w/o validation. Read the delta gauge values out to reset
private void cleanupDeltaGaugeMetrics(List<TestInstanceWrapper> simulatedInstances)
{
// Validate metrics, metrics instance is shared so we check on any instance
CoordinationMetrics coordinationMetrics = simulatedInstances.get(0).metrics.server().coordination();
coordinationMetrics.participants.metric.getValue();
coordinationMetrics.leaseholders.metric.getValue();
}

private void runLeaseAcquireProcess(ExecutorService pool, List<TestInstanceWrapper> simulatedInstances)
{
cleanupDeltaGaugeMetrics(simulatedInstances);
int electorateSize = simulatedInstances.size();
CountDownLatch latch = new CountDownLatch(electorateSize);
CountDownLatch completedLatch = new CountDownLatch(electorateSize);
Expand Down Expand Up @@ -438,16 +431,6 @@ static List<TestInstanceWrapper> getCurrentLeaseholderInstances(List<TestInstanc
return instances;
}

static int maybeDetermineTTL(AbstractCluster<?> cluster)
{
SimpleQueryResult result
= cluster.getFirstRunningInstance()
.coordinator()
.executeWithResult("SELECT ttl(owner) FROM sidecar_internal.sidecar_lease_v1 WHERE name = 'cluster_lease_holder'",
ConsistencyLevel.LOCAL_QUORUM);
return result.hasNext() ? result.next().getInteger(0) : 0;
}

static long rowCountInLeaseTable(AbstractCluster<?> cluster)
{
SimpleQueryResult rows = cluster.getFirstRunningInstance()
Expand Down Expand Up @@ -476,7 +459,9 @@ void removeLeaseholderFromDatabase(AbstractCluster<?> cluster)
{
try
{
cluster.getFirstRunningInstance().coordinator().execute("DELETE FROM sidecar_internal.sidecar_lease_v1 WHERE name = 'cluster_lease_holder'",
cluster.getFirstRunningInstance().coordinator().execute("DELETE FROM sidecar_internal.sidecar_lease_v1 " +
"WHERE name = 'cluster_lease_holder' " +
"IF EXISTS",
ConsistencyLevel.QUORUM);
LOGGER.info("Successfully removed current leaseholder from database");
return;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.sidecar.db;

import java.util.concurrent.TimeUnit;

import com.google.inject.AbstractModule;
import com.google.inject.Provides;
import com.google.inject.Singleton;
import io.vertx.core.Vertx;
import org.apache.cassandra.sidecar.common.server.utils.DurationSpec;
import org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration;
import org.apache.cassandra.sidecar.config.CoordinationConfiguration;
import org.apache.cassandra.sidecar.config.PeriodicTaskConfiguration;
import org.apache.cassandra.sidecar.config.ServiceConfiguration;
import org.apache.cassandra.sidecar.config.yaml.CoordinationConfigurationImpl;
import org.apache.cassandra.sidecar.config.yaml.PeriodicTaskConfigurationImpl;
import org.apache.cassandra.sidecar.coordination.ClusterLease;
import org.apache.cassandra.sidecar.coordination.ClusterLeaseClaimTask;
import org.apache.cassandra.sidecar.coordination.ElectorateMembership;
import org.apache.cassandra.sidecar.db.schema.SidecarSchema;
import org.apache.cassandra.sidecar.metrics.SidecarMetrics;
import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
import org.apache.cassandra.testing.CassandraIntegrationTest;

import static org.assertj.core.api.Assertions.assertThat;

class SidecarSchemaIntTest extends IntegrationTestBase
{
@Override
protected void beforeSetup()
{
installTestSpecificModule(new AbstractModule()
{
@Provides
@Singleton
public ClusterLease clusterLease()
{
// start with INDETERMINATE to compete for a leaseholder first, then init schema
return new ClusterLease(ClusterLease.Ownership.INDETERMINATE);
}

@Provides
@Singleton
public CoordinationConfiguration clusterLeaseClaimTaskConfiguration()
{
// increase the claim frequency
PeriodicTaskConfiguration taskConfig = new PeriodicTaskConfigurationImpl(true,
MillisecondBoundConfiguration.parse("1s"),
MillisecondBoundConfiguration.parse("1s"));
return new CoordinationConfigurationImpl(taskConfig);
}

@Provides
@Singleton
public ClusterLeaseClaimTask clusterLeaseClaimTask(Vertx vertx,
ServiceConfiguration serviceConfiguration,
ElectorateMembership electorateMembership,
SidecarLeaseDatabaseAccessor accessor,
ClusterLease clusterLease,
SidecarMetrics metrics)
{
return new ClusterLeaseClaimTask(vertx,
serviceConfiguration,
electorateMembership,
accessor,
clusterLease,
metrics)
{
@Override
public DurationSpec delay()
{
// ignore the minimum delay check that is coded in ClusterLeaseClaimTask
return MillisecondBoundConfiguration.parse("1s");
}
};
}
});
}

@CassandraIntegrationTest
void testSidecarSchemaInitializationFromBlank()
{
waitForSchemaReady(60, TimeUnit.SECONDS);
SidecarSchema sidecarSchema = injector.getInstance(SidecarSchema.class);
assertThat(sidecarSchema.isInitialized())
.describedAs("SidecarSchema should be initialized")
.isTrue();
ClusterLease clusterLease = injector.getInstance(ClusterLease.class);
assertThat(clusterLease.isClaimedByLocalSidecar())
.describedAs("ClusterLease should be claimed by the local sidecar")
.isTrue();
}
}
Loading

0 comments on commit c4b5f40

Please sign in to comment.