Skip to content

Commit

Permalink
Limit the maximum hints size per host
Browse files Browse the repository at this point in the history
patch by Yifan Cai; reviewed by Dinesh Joshi, Francisco Guerrero for CASSANDRA-17142
  • Loading branch information
yifan-c committed Feb 11, 2022
1 parent ce7502a commit 3a6f690
Show file tree
Hide file tree
Showing 18 changed files with 308 additions and 57 deletions.
3 changes: 3 additions & 0 deletions NEWS.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ using the provided 'sstableupgrade' tool.

New features
------------
- New configuration max_hints_size_per_host to limit the size of local hints files per host in megabytes. Setting to
non-positive value disables the limit, which is the default behavior. Setting to a positive value to ensure
the total size of the hints files per host does not exceed the limit.
- Added ability to configure auth caches through corresponding `nodetool` commands.
- CDC data flushing now can be configured to be non-blocking with the configuration cdc_block_writes. Setting to true,
any writes to the CDC-enabled tables will be blocked when reaching to the limit for CDC data on disk, which is the
Expand Down
5 changes: 5 additions & 0 deletions conf/cassandra.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ hints_flush_period: 10000ms
# Maximum size for a single hints file, in megabytes.
max_hints_file_size: 128MiB

# The file size limit to store hints for an unreachable host, in megabytes.
# Once the local hints files have reached the limit, no more new hints will be created.
# Set a non-positive value will disable the size limit.
# max_hints_size_per_host: 0MiB

# Enable / disable automatic cleanup for the expired and orphaned hints file.
# Disable the option in order to preserve those hints on the disk.
auto_hints_cleanup_enabled: false
Expand Down
1 change: 1 addition & 0 deletions src/java/org/apache/cassandra/config/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ public class Config
public SmallestDurationMilliseconds hints_flush_period = new SmallestDurationMilliseconds("10s");
@Replaces(oldName = "max_hints_file_size_in_mb", converter = Converters.MEBIBYTES_DATA_STORAGE, deprecated = true)
public SmallestDataStorageMebibytes max_hints_file_size = new SmallestDataStorageMebibytes("128MiB");
public volatile DataStorageSpec max_hints_size_per_host = new DataStorageSpec("0B"); // 0 means disabled

public ParameterizedClass hints_compression;
public volatile boolean auto_hints_cleanup_enabled = false;
Expand Down
15 changes: 15 additions & 0 deletions src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -2643,6 +2643,21 @@ public static int getMaxHintWindow()
return conf.max_hint_window.toMillisecondsAsInt();
}

public static void setMaxHintsSizePerHostInMb(int value)
{
conf.max_hints_size_per_host = DataStorageSpec.inMebibytes(value);
}

public static int getMaxHintsSizePerHostInMb()
{
return conf.max_hints_size_per_host.toMebibytesAsInt();
}

public static long getMaxHintsSizePerHost()
{
return conf.max_hints_size_per_host.toBytes();
}

public static File getHintsDirectory()
{
return new File(conf.hints_directory);
Expand Down
10 changes: 10 additions & 0 deletions src/java/org/apache/cassandra/hints/HintsDescriptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,16 @@ String checksumFileName()
return String.format("%s-%s-%s.crc32", hostId, timestamp, version);
}

File file(File hintsDirectory)
{
return new File(hintsDirectory, fileName());
}

File checksumFile(File hintsDirectory)
{
return new File(hintsDirectory, checksumFileName());
}

int messagingVersion()
{
return messagingVersion(version);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ private boolean dispatch(HintsDescriptor descriptor)

private boolean deliver(HintsDescriptor descriptor, InetAddressAndPort address)
{
File file = new File(hintsDirectory, descriptor.fileName());
File file = descriptor.file(hintsDirectory);
InputPosition offset = store.getDispatchOffset(descriptor);

BooleanSupplier shouldAbort = () -> !isAlive.test(address) || isPaused.get();
Expand Down Expand Up @@ -321,7 +321,7 @@ private void handleDispatchFailure(HintsDispatcher dispatcher, HintsDescriptor d
// for each hint in the hints file for a node that isn't part of the ring anymore, write RF hints for each replica
private void convert(HintsDescriptor descriptor)
{
File file = new File(hintsDirectory, descriptor.fileName());
File file = descriptor.file(hintsDirectory);

try (HintsReader reader = HintsReader.open(file, rateLimiter))
{
Expand Down
13 changes: 13 additions & 0 deletions src/java/org/apache/cassandra/hints/HintsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,19 @@ public void resumeDispatch()
HintsServiceDiagnostics.dispatchingResumed(this);
}

/**
* Get the total size in bytes of all the hints files associating with the host on disk.
* @param hostId, belonging host
* @return total file size, in bytes
*/
public long getTotalHintsSize(UUID hostId)
{
HintsStore store = catalog.getNullable(hostId);
if (store == null)
return 0;
return store.getTotalFileSize();
}

/**
* Gracefully and blockingly shut down the service.
*
Expand Down
18 changes: 16 additions & 2 deletions src/java/org/apache/cassandra/hints/HintsStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ private void deleteHints(Predicate<HintsDescriptor> predicate)

void delete(HintsDescriptor descriptor)
{
File hintsFile = new File(hintsDirectory, descriptor.fileName());
File hintsFile = descriptor.file(hintsDirectory);
if (hintsFile.tryDelete())
logger.info("Deleted hint file {}", descriptor.fileName());
else if (hintsFile.exists())
Expand All @@ -218,7 +218,7 @@ else if (hintsFile.exists())
logger.info("Already deleted hint file {}", descriptor.fileName());

//noinspection ResultOfMethodCallIgnored
new File(hintsDirectory, descriptor.checksumFileName()).tryDelete();
descriptor.checksumFile(hintsDirectory).tryDelete();
}

boolean hasFiles()
Expand All @@ -236,6 +236,20 @@ void markDispatchOffset(HintsDescriptor descriptor, InputPosition inputPosition)
dispatchPositions.put(descriptor, inputPosition);
}


/**
* @return the total size of all files belonging to the hints store, in bytes.
*/
long getTotalFileSize()
{
long total = 0;
for (HintsDescriptor descriptor : Iterables.concat(dispatchDequeue, corruptedFiles))
{
total += descriptor.file(hintsDirectory).length();
}
return total;
}

void cleanUp(HintsDescriptor descriptor)
{
dispatchPositions.remove(descriptor);
Expand Down
4 changes: 2 additions & 2 deletions src/java/org/apache/cassandra/hints/HintsWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ protected HintsWriter(File directory, HintsDescriptor descriptor, File file, Fil
@SuppressWarnings("resource") // HintsWriter owns channel
static HintsWriter create(File directory, HintsDescriptor descriptor) throws IOException
{
File file = new File(directory, descriptor.fileName());
File file = descriptor.file(directory);

FileChannel channel = FileChannel.open(file.toPath(), StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
int fd = NativeLibrary.getfd(channel);
Expand Down Expand Up @@ -102,7 +102,7 @@ HintsDescriptor descriptor()

private void writeChecksum()
{
File checksumFile = new File(directory, descriptor.checksumFileName());
File checksumFile = descriptor.checksumFile(directory);
try (OutputStream out = Files.newOutputStream(checksumFile.toPath()))
{
out.write(Integer.toHexString((int) globalCRC.getValue()).getBytes(StandardCharsets.UTF_8));
Expand Down
70 changes: 54 additions & 16 deletions src/java/org/apache/cassandra/service/StorageProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -2275,16 +2275,40 @@ public void setMaxHintWindow(int ms)
DatabaseDescriptor.setMaxHintWindow(ms);
}

public int getMaxHintsSizePerHostInMb()
{
return DatabaseDescriptor.getMaxHintsSizePerHostInMb();
}

public void setMaxHintsSizePerHostInMb(int value)
{
DatabaseDescriptor.setMaxHintsSizePerHostInMb(value);
}

public static boolean shouldHint(Replica replica)
{
return shouldHint(replica, true);
}

/**
* Determines whether a hint should be stored or not.
* It rejects early if any of the condition is met:
* - Hints disabled entirely or for the belonging datacetner of the replica
* - The replica is transient or is the self node
* - The replica is no longer part of the ring
* - The hint window has expired
* - The hints have reached to the size limit for the node
* Otherwise, it permits.
*
* @param replica, the replica for the hint
* @param tryEnablePersistentWindow, true to consider hint_window_persistent_enabled; otherwise, ignores
* @return true to permit or false to reject hint
*/
public static boolean shouldHint(Replica replica, boolean tryEnablePersistentWindow)
{
if (!DatabaseDescriptor.hintedHandoffEnabled())
return false;
if (replica.isTransient() || replica.isSelf())
if (!DatabaseDescriptor.hintedHandoffEnabled()
|| replica.isTransient()
|| replica.isSelf())
return false;

Set<String> disabledDCs = DatabaseDescriptor.hintedHandoffDisabledDCs();
Expand All @@ -2303,26 +2327,40 @@ public static boolean shouldHint(Replica replica, boolean tryEnablePersistentWin
long endpointDowntime = Gossiper.instance.getEndpointDowntime(endpoint);
boolean hintWindowExpired = endpointDowntime > maxHintWindow;

if (tryEnablePersistentWindow && !hintWindowExpired && DatabaseDescriptor.hintWindowPersistentEnabled())
UUID hostIdForEndpoint = StorageService.instance.getHostIdForEndpoint(endpoint);
if (hostIdForEndpoint == null)
{
UUID hostIdForEndpoint = StorageService.instance.getHostIdForEndpoint(endpoint);
if (hostIdForEndpoint != null)
{
long earliestHint = HintsService.instance.getEarliestHintForHost(hostIdForEndpoint);
hintWindowExpired = Clock.Global.currentTimeMillis() - maxHintWindow > earliestHint;
if (hintWindowExpired)
Tracing.trace("Not hinting {} for which there is the earliest hint stored at {}", replica, earliestHint);
}
Tracing.trace("Discarding hint for endpoint not part of ring: {}", endpoint);
return false;
}
else if (hintWindowExpired)

// if persisting hints window, hintWindowExpired might be updated according to the timestamp of the earliest hint
if (tryEnablePersistentWindow && !hintWindowExpired && DatabaseDescriptor.hintWindowPersistentEnabled())
{
Tracing.trace("Not hinting {} which has been down {} ms", replica, endpointDowntime);
long earliestHint = HintsService.instance.getEarliestHintForHost(hostIdForEndpoint);
hintWindowExpired = Clock.Global.currentTimeMillis() - maxHintWindow > earliestHint;
if (hintWindowExpired)
Tracing.trace("Not hinting {} for which there is the earliest hint stored at {}", replica, earliestHint);
}

if (hintWindowExpired)
HintsService.instance.metrics.incrPastWindow(replica.endpoint());
{
HintsService.instance.metrics.incrPastWindow(endpoint);
Tracing.trace("Not hinting {} which has been down {} ms", endpoint, endpointDowntime);
return false;
}

long maxHintsSize = DatabaseDescriptor.getMaxHintsSizePerHost();
long actualTotalHintsSize = HintsService.instance.getTotalHintsSize(hostIdForEndpoint);
boolean hasHintsReachedMaxSize = maxHintsSize > 0 && actualTotalHintsSize > maxHintsSize;
if (hasHintsReachedMaxSize)
{
Tracing.trace("Not hinting {} which has reached to the max hints size {} bytes on disk. The actual hints size on disk: {}",
endpoint, maxHintsSize, actualTotalHintsSize);
return false;
}

return !hintWindowExpired;
return true;
}

/**
Expand Down
2 changes: 2 additions & 0 deletions src/java/org/apache/cassandra/service/StorageProxyMBean.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public interface StorageProxyMBean
public Set<String> getHintedHandoffDisabledDCs();
public int getMaxHintWindow();
public void setMaxHintWindow(int ms);
public int getMaxHintsSizePerHostInMb();
public void setMaxHintsSizePerHostInMb(int value);
public int getMaxHintsInProgress();
public void setMaxHintsInProgress(int qs);
public int getHintsInProgress();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ private void writeHints(File directory, HintsDescriptor descriptor) throws IOExc

private static void verifyChecksum(File directory, HintsDescriptor descriptor) throws IOException
{
File hintsFile = new File(directory, descriptor.fileName());
File checksumFile = new File(directory, descriptor.checksumFileName());
File hintsFile = descriptor.file(directory);
File checksumFile = descriptor.checksumFile(directory);

assertTrue(checksumFile.exists());

Expand All @@ -114,7 +114,7 @@ private void verifyHints(File directory, HintsDescriptor descriptor)
long baseTimestamp = descriptor.timestamp;
int index = 0;

try (HintsReader reader = HintsReader.open(new File(directory, descriptor.fileName())))
try (HintsReader reader = HintsReader.open(descriptor.file(directory)))
{
for (HintsReader.Page page : reader)
{
Expand Down
2 changes: 1 addition & 1 deletion test/unit/org/apache/cassandra/hints/AlteredHints.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public void multiFlushAndDeserializeTest() throws Exception
}
}

try (HintsReader reader = HintsReader.open(new File(dir, descriptor.fileName())))
try (HintsReader reader = HintsReader.open(descriptor.file(dir)))
{
Assert.assertTrue(looksLegit(reader.getInput()));
List<Hint> deserialized = new ArrayList<>(hintNum);
Expand Down
Loading

0 comments on commit 3a6f690

Please sign in to comment.