Skip to content

Commit

Permalink
IGNITE-18494 Fix ignoring near cache configuration while starting cac…
Browse files Browse the repository at this point in the history
…he not from affinity node (apache#10476)
  • Loading branch information
ivandasch authored Jan 12, 2023
1 parent 06a1d23 commit 1845649
Show file tree
Hide file tree
Showing 11 changed files with 216 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,9 @@ public CacheConfiguration startCacheConfiguration() {
*/
public void startCacheConfiguration(CacheConfiguration startCfg) {
this.startCfg = startCfg;

if (startCfg.getNearConfiguration() != null)
nearCacheCfg = startCfg.getNearConfiguration();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public PlatformCacheManager(PlatformCallbackGateway gate) {
GridCacheContext ctx = cctx;

if (ctx != null) {
gate.onCacheStopped(cctx.cacheId());
gate.onCacheStopped(cctx.cacheId(), cancel, destroy);
cctx = null;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1264,14 +1264,17 @@ public void platformCacheUpdateFromThreadLocal(long cacheIdAndPartition, long ve
* Notifies about cache stop.
*
* @param cacheId Cache id.
* @param cancel Cancel flag.
* @param destroy Cache destroy flag.
*/
public void onCacheStopped(int cacheId) {
public void onCacheStopped(int cacheId, boolean cancel, boolean destroy) {
// Ignore cache stop during grid stop.
if (!tryEnter())
return;

try {
PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.OnCacheStopped, cacheId);
PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr, PlatformCallbackOp.OnCacheStopped,
cacheId, cancel ? 1L : 0L, destroy ? 1L : 0L, null);
}
finally {
leave();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class IgniteCacheP2pUnmarshallingNearErrorTest extends IgniteCacheP2pUnma
@Test
@Override public void testResponseMessageOnUnmarshallingFailed() throws InterruptedException {
//GridCacheEvictionRequest unmarshalling failed test.
readCnt.set(5); //2 for each put.
readCnt.set(9); //4 for each put (near cache on client works!).

jcache(0).put(new TestKey(String.valueOf(++key)), "");
jcache(0).put(new TestKey(String.valueOf(++key)), "");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.processors.cache.distributed.near;

import java.util.ArrayList;
import java.util.List;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

/**
* Tests that dynamically started caches with near configurations actually start with near caches on all nodes:
* affinity, non-affinity and clients.
*/
@RunWith(Parameterized.class)
public class GridCacheNearDynamicStartTest extends GridCommonAbstractTest {
/** */
private static final int SRV_CNT = 3;

/** */
private static final String CLIENT_ID = "client";

/** */
private static final int NUM_ENTRIES = 1000;

/** */
@Parameterized.Parameters(name = "nodeCacheStart = {0}, nodeNearCheck = {1}")
public static Iterable<Object[]> testParameters() {
List<Object[]> params = new ArrayList<>();

for (NODE_TYPE nodeStart: NODE_TYPE.values()) {
for (NODE_TYPE nodeNearCheck: NODE_TYPE.values())
params.add(new Object[]{ nodeStart, nodeNearCheck});
}

return params;
}

/** */
@Parameterized.Parameter(0)
public NODE_TYPE nodeStart;

/** */
@Parameterized.Parameter(1)
public NODE_TYPE nodeCheck;

/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
startGrids(SRV_CNT);
startClientGrid(CLIENT_ID);
}

/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
IgniteEx ign = grid(0);

ign.cacheNames().forEach(ign::destroyCache);
}

/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
return super.getConfiguration(igniteInstanceName).setConsistentId(igniteInstanceName);
}

/** */
@Test
public void test() throws Exception {
startCache();

IgniteEx ign = testNode(nodeCheck);

IgniteCache<Integer, Integer> cache = ign.cache(DEFAULT_CACHE_NAME);

for (int i = 0; i < NUM_ENTRIES; ++i) {
assertEquals((Integer)i, cache.get(i));

if (ign.affinity(DEFAULT_CACHE_NAME).isPrimary(ign.localNode(), i))
return;

assertEquals((Integer)i, cache.localPeek(i, CachePeekMode.NEAR));
}
}

/** */
private void startCache() {
Ignite ign = testNode(nodeStart);

ign.createCache(
new CacheConfiguration<Integer, Integer>(DEFAULT_CACHE_NAME)
.setNodeFilter(n -> {
if (n.consistentId() == null)
return false;

// Start cache on nodes with indices [0, 1].
return !n.consistentId().toString().contains(String.valueOf(SRV_CNT - 1));
})
.setNearConfiguration(new NearCacheConfiguration<>())
);

try (IgniteDataStreamer<Integer, Integer> streamer = ign.dataStreamer(DEFAULT_CACHE_NAME)) {
for (int i = 0; i < NUM_ENTRIES; ++i)
streamer.addData(i, i);

streamer.flush();
}

assertEquals(ign.cache(DEFAULT_CACHE_NAME).size(CachePeekMode.PRIMARY), 1000);
}

/** */
private IgniteEx testNode(NODE_TYPE type) {
switch (type) {
case AFFINITY:
return grid(SRV_CNT - 2);
case NON_AFFINITY:
return grid(SRV_CNT - 1);
case CLIENT:
default:
return grid(CLIENT_ID);
}
}

/** */
private enum NODE_TYPE {
/** Affinity node. */
AFFINITY,

/** Non affinity node. */
NON_AFFINITY,

/** Client node. */
CLIENT,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.ignite.internal.processors.cache.eviction.paged;

import org.junit.Ignore;
import org.junit.Test;

/**
*
*/
Expand All @@ -25,4 +28,11 @@ public class Random2LruNearEnabledPageEvictionMultinodeTest extends Random2LruPa
@Override protected boolean nearEnabled() {
return true;
}

/** {@inheritDoc} */
@Ignore("https://issues.apache.org/jira/browse/IGNITE-18544")
@Test
@Override public void testPageEviction() {
// Ignored.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.ignite.internal.processors.cache.eviction.paged;

import org.junit.Ignore;
import org.junit.Test;

/**
*
*/
Expand All @@ -25,4 +28,11 @@ public class RandomLruNearEnabledPageEvictionMultinodeTest extends RandomLruPage
@Override protected boolean nearEnabled() {
return true;
}

/** {@inheritDoc} */
@Ignore("https://issues.apache.org/jira/browse/IGNITE-18544")
@Test
@Override public void testPageEviction() {
// Ignored.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheAtomicNearMultiNodeSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheAtomicNearReadersSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheNearClientHitTest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheNearDynamicStartTest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheNearEvictionEventSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheNearJobExecutionSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheNearMultiGetSelfTest;
Expand Down Expand Up @@ -266,6 +267,7 @@ public static List<Class<?>> suite(Collection<Class> ignoredTests) {

GridTestUtils.addTestIfNeeded(suite, GridCacheOffheapUpdateSelfTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, GridCacheNearClientHitTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, GridCacheNearDynamicStartTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, GridCacheNearPrimarySyncSelfTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, GridCacheColocatedPrimarySyncSelfTest.class, ignoredTests);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,10 +340,10 @@ public void TestContinuousTopologyChangeMaintainsCorrectPlatformCacheData([Value
}

/// <summary>
/// Tests that client reconnect to a restarted cluster stops platform cache.
/// Tests that client reconnected to a restarted cluster still has platform cache.
/// </summary>
[Test]
public void TestClientNodeReconnectWithClusterRestartStopsPlatformCache()
public void TestClientNodeReconnectWithClusterRestartKeepsPlatformCache()
{
InitNodes(1);
var clientCache = InitClientAndCache();
Expand Down Expand Up @@ -372,24 +372,15 @@ public void TestClientNodeReconnectWithClusterRestartStopsPlatformCache()
Assert.IsEmpty(clientCache.GetLocalEntries(CachePeekMode.Platform));
Assert.Throws<KeyNotFoundException>(() => clientCache.LocalPeek(1, CachePeekMode.Platform));

// Cache still works for new entries, platform cache is being bypassed.
// Cache still works for new entries after restart.
var serverCache = _cache[0];

serverCache[1] = new Foo(11);
Assert.AreEqual(11, clientCache[1].Bar);

serverCache[1] = new Foo(22);

TestUtils.WaitForTrueCondition(() => clientCache[1] != null);

var foo = clientCache[1];
Assert.AreEqual(22, foo.Bar);
Assert.AreNotSame(foo, clientCache[1]);

// This is a full cluster restart, so client platform cache is stopped.
Assert.IsNull(clientCache.GetConfiguration().NearConfiguration);

var ex = Assert.Throws<CacheException>(() =>
client.GetOrCreateNearCache<int, Foo>(clientCache.Name, new NearCacheConfiguration()));
StringAssert.Contains("cache with the same name without near cache is already started", ex.Message);
Assert.AreEqual(foo, clientCache.LocalPeek(1, CachePeekMode.Platform));
}

/// <summary>
Expand Down Expand Up @@ -518,7 +509,7 @@ private static ICache<int, Foo> InitClientAndCache()
{
var client = InitClient();

return client.CreateNearCache<int, Foo>(CacheName, new NearCacheConfiguration());
return client.GetCache<int, Foo>(CacheName);
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,23 @@ public void UpdateFromThreadLocal(int cacheId, int partition, AffinityTopologyVe
/// <summary>
/// Stops platform cache.
/// </summary>
public void Stop(int cacheId)
public void Stop(int cacheId, bool destroy)
{
IPlatformCache cache;
if (_caches.Remove(cacheId, out cache))

if (destroy)
{
cache.Stop();
if (_caches.Remove(cacheId, out cache))
{
cache.Stop();
}
}
else
{
if (_caches.TryGetValue(cacheId, out cache))
{
cache.Clear();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,9 +466,12 @@ private long PlatformCacheUpdateFromThreadLocal(long cacheIdAndPartition, long v
/// Called on cache stop.
/// </summary>
/// <param name="cacheId">Cache id.</param>
private long OnCacheStopped(long cacheId)
/// <param name="cancel">Cancel flag.</param>
/// <param name="destroy">Destroy flag.</param>
/// <param name="arg">Ignored.</param>
private long OnCacheStopped(long cacheId, long cancel, long destroy, void* arg)
{
_ignite.PlatformCacheManager.Stop((int) cacheId);
_ignite.PlatformCacheManager.Stop((int) cacheId, destroy == 1L);

return 0;
}
Expand Down

0 comments on commit 1845649

Please sign in to comment.