From 7ada4032180b516548fc0263f42da6a7a917f92b Mon Sep 17 00:00:00 2001 From: Andrzej Bialecki Date: Tue, 9 Mar 2021 16:57:34 +0100 Subject: [PATCH] SOLR-14749: Make sure the plugin config is reloaded on Overseer. --- .../solr/api/ContainerPluginsRegistry.java | 20 ++ .../solr/handler/TestContainerPlugin.java | 313 ++++++++++-------- 2 files changed, 193 insertions(+), 140 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/api/ContainerPluginsRegistry.java b/solr/core/src/java/org/apache/solr/api/ContainerPluginsRegistry.java index 8f1d3868058..e92f715ff9a 100644 --- a/solr/core/src/java/org/apache/solr/api/ContainerPluginsRegistry.java +++ b/solr/core/src/java/org/apache/solr/api/ContainerPluginsRegistry.java @@ -26,11 +26,13 @@ import java.lang.reflect.Type; import java.util.*; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Phaser; import java.util.function.Supplier; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.MapperFeature; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; import org.apache.lucene.util.ResourceLoaderAware; import org.apache.solr.client.solrj.SolrRequest; import org.apache.solr.client.solrj.request.beans.PluginMeta; @@ -79,12 +81,30 @@ public class ContainerPluginsRegistry implements ClusterPropertiesListener, MapW private final Map currentPlugins = new HashMap<>(); + private Phaser phaser; + @Override public boolean onChange(Map properties) { refresh(); + Phaser localPhaser = phaser; // volatile read + if (localPhaser != null) { + assert localPhaser.getRegisteredParties() == 1; + localPhaser.arrive(); // we should be the only ones registered, so this will advance phase each time + } return false; } + /** + * A phaser that will advance phases every time {@link #onChange(Map)} is called. + * Useful for allowing tests to know when a new configuration is finished getting set. + */ + + @VisibleForTesting + public void setPhaser(Phaser phaser) { + phaser.register(); + this.phaser = phaser; + } + public void registerListener(PluginRegistryListener listener) { listeners.add(listener); } diff --git a/solr/core/src/test/org/apache/solr/handler/TestContainerPlugin.java b/solr/core/src/test/org/apache/solr/handler/TestContainerPlugin.java index 224caf72d45..fb63e0ead88 100644 --- a/solr/core/src/test/org/apache/solr/handler/TestContainerPlugin.java +++ b/solr/core/src/test/org/apache/solr/handler/TestContainerPlugin.java @@ -22,6 +22,8 @@ import java.nio.ByteBuffer; import java.util.Map; import java.util.concurrent.Callable; +import java.util.concurrent.Phaser; +import java.util.concurrent.TimeUnit; import com.google.common.collect.ImmutableMap; import org.apache.commons.io.IOUtils; @@ -66,10 +68,12 @@ import static org.apache.solr.filestore.TestDistribPackageStore.uploadKey; public class TestContainerPlugin extends SolrCloudTestCase { + private Phaser phaser; @Before public void setup() { System.setProperty("enable.packages", "true"); + phaser = new Phaser(); } @After @@ -80,9 +84,14 @@ public void teardown() { @Test public void testApi() throws Exception { MiniSolrCloudCluster cluster = - configureCluster(4) - .withJettyConfig(jetty -> jetty.enableV2(true)) - .configure(); + configureCluster(4) + .withJettyConfig(jetty -> jetty.enableV2(true)) + .configure(); + ContainerPluginsRegistry pluginsRegistry = cluster.getOpenOverseer().getCoreContainer().getContainerPluginsRegistry(); + pluginsRegistry.setPhaser(phaser); + + int version = phaser.getPhase(); + String errPath = "/error/details[0]/errorMessages[0]"; try { PluginMeta plugin = new PluginMeta(); @@ -90,39 +99,43 @@ public void testApi() throws Exception { plugin.klass = C2.class.getName(); //test with an invalid class V2Request req = new V2Request.Builder("/cluster/plugin") - .forceV2(true) - .POST() - .withPayload(singletonMap("add", plugin)) - .build(); + .forceV2(true) + .POST() + .withPayload(singletonMap("add", plugin)) + .build(); expectError(req, cluster.getSolrClient(), errPath, "No method with @Command in class"); //test with a valid class. This should succeed now plugin.klass = C3.class.getName(); req.process(cluster.getSolrClient()); + version = phaser.awaitAdvanceInterruptibly(version, 10, TimeUnit.SECONDS); + //just check if the plugin is indeed registered V2Request readPluginState = new V2Request.Builder("/cluster/plugin") - .forceV2(true) - .GET() - .build(); + .forceV2(true) + .GET() + .build(); V2Response rsp = readPluginState.process(cluster.getSolrClient()); assertEquals(C3.class.getName(), rsp._getStr("/plugin/testplugin/class", null)); //let's test the plugin TestDistribPackageStore.assertResponseValues(10, - () -> new V2Request.Builder("/plugin/my/plugin") - .forceV2(true) - .GET() - .build().process(cluster.getSolrClient()), - ImmutableMap.of("/testkey", "testval")); + () -> new V2Request.Builder("/plugin/my/plugin") + .forceV2(true) + .GET() + .build().process(cluster.getSolrClient()), + ImmutableMap.of("/testkey", "testval")); //now remove the plugin new V2Request.Builder("/cluster/plugin") - .POST() - .forceV2(true) - .withPayload("{remove : testplugin}") - .build() - .process(cluster.getSolrClient()); + .POST() + .forceV2(true) + .withPayload("{remove : testplugin}") + .build() + .process(cluster.getSolrClient()); + + version = phaser.awaitAdvanceInterruptibly(version, 10, TimeUnit.SECONDS); //verify it is removed rsp = readPluginState.process(cluster.getSolrClient()); @@ -138,50 +151,55 @@ public void testApi() throws Exception { plugin.pathPrefix = "my-random-prefix"; req.process(cluster.getSolrClient()); + version = phaser.awaitAdvanceInterruptibly(version, 10, TimeUnit.SECONDS); //let's test the plugin TestDistribPackageStore.assertResponseValues(10, - () -> new V2Request.Builder("/my-random-name/my/plugin") + () -> new V2Request.Builder("/my-random-name/my/plugin") + .forceV2(true) + .GET() + .build().process(cluster.getSolrClient()), + ImmutableMap.of("/method.name", "m1")); + + TestDistribPackageStore.assertResponseValues(10, + () -> new V2Request.Builder("/my-random-prefix/their/plugin") + .forceV2(true) + .GET() + .build().process(cluster.getSolrClient()), + ImmutableMap.of("/method.name", "m2")); + //now remove the plugin + new V2Request.Builder("/cluster/plugin") + .POST() .forceV2(true) - .GET() - .build().process(cluster.getSolrClient()), - ImmutableMap.of("/method.name", "m1")); + .withPayload("{remove : my-random-name}") + .build() + .process(cluster.getSolrClient()); + + version = phaser.awaitAdvanceInterruptibly(version, 10, TimeUnit.SECONDS); - TestDistribPackageStore.assertResponseValues(10, - () -> new V2Request.Builder("/my-random-prefix/their/plugin") + expectFail(() -> new V2Request.Builder("/my-random-prefix/their/plugin") .forceV2(true) .GET() - .build().process(cluster.getSolrClient()), - ImmutableMap.of("/method.name", "m2")); - //now remove the plugin - new V2Request.Builder("/cluster/plugin") - .POST() - .forceV2(true) - .withPayload("{remove : my-random-name}") - .build() - .process(cluster.getSolrClient()); - - expectFail( () -> new V2Request.Builder("/my-random-prefix/their/plugin") - .forceV2(true) - .GET() - .build() - .process(cluster.getSolrClient())); + .build() + .process(cluster.getSolrClient())); expectFail(() -> new V2Request.Builder("/my-random-prefix/their/plugin") - .forceV2(true) - .GET() - .build() - .process(cluster.getSolrClient())); + .forceV2(true) + .GET() + .build() + .process(cluster.getSolrClient())); // test ClusterSingleton plugin plugin.name = "clusterSingleton"; plugin.klass = C6.class.getName(); req.process(cluster.getSolrClient()); + version = phaser.awaitAdvanceInterruptibly(version, 10, TimeUnit.SECONDS); - //just check if the plugin is indeed registered + + //just check if the plugin is indeed registered readPluginState = new V2Request.Builder("/cluster/plugin") - .forceV2(true) - .GET() - .build(); + .forceV2(true) + .GET() + .build(); rsp = readPluginState.process(cluster.getSolrClient()); assertEquals(C6.class.getName(), rsp._getStr("/plugin/clusterSingleton/class", null)); @@ -189,9 +207,9 @@ public void testApi() throws Exception { assertTrue("startCalled", C6.startCalled); assertFalse("stopCalled", C6.stopCalled); - assertEquals( CConfig.class, ContainerPluginsRegistry.getConfigClass(new CC())); - assertEquals( CConfig.class, ContainerPluginsRegistry.getConfigClass(new CC1())); - assertEquals( CConfig.class, ContainerPluginsRegistry.getConfigClass(new CC2())); + assertEquals(CConfig.class, ContainerPluginsRegistry.getConfigClass(new CC())); + assertEquals(CConfig.class, ContainerPluginsRegistry.getConfigClass(new CC1())); + assertEquals(CConfig.class, ContainerPluginsRegistry.getConfigClass(new CC2())); CConfig cfg = new CConfig(); cfg.boolVal = Boolean.TRUE; @@ -203,34 +221,38 @@ public void testApi() throws Exception { p.config = cfg; new V2Request.Builder("/cluster/plugin") - .forceV2(true) - .POST() - .withPayload(singletonMap("add", p)) - .build() - .process(cluster.getSolrClient()); - TestDistribPackageStore.assertResponseValues(10, - () -> new V2Request.Builder("hello/plugin") .forceV2(true) - .GET() - .build().process(cluster.getSolrClient()), - ImmutableMap.of("/config/boolVal", "true", "/config/strVal", "Something","/config/longVal", "1234" )); + .POST() + .withPayload(singletonMap("add", p)) + .build() + .process(cluster.getSolrClient()); - cfg.strVal = "Something else"; - new V2Request.Builder("/cluster/plugin") - .forceV2(true) - .POST() - .withPayload(singletonMap("update", p)) - .build() - .process(cluster.getSolrClient()); + version = phaser.awaitAdvanceInterruptibly(version, 10, TimeUnit.SECONDS); TestDistribPackageStore.assertResponseValues(10, - () -> new V2Request.Builder("hello/plugin") - .forceV2(true) - .GET() - .build().process(cluster.getSolrClient()), - ImmutableMap.of("/config/boolVal", "true", "/config/strVal", cfg.strVal,"/config/longVal", "1234" )); + () -> new V2Request.Builder("hello/plugin") + .forceV2(true) + .GET() + .build().process(cluster.getSolrClient()), + ImmutableMap.of("/config/boolVal", "true", "/config/strVal", "Something", "/config/longVal", "1234")); - // kill the Overseer leader + cfg.strVal = "Something else"; + new V2Request.Builder("/cluster/plugin") + .forceV2(true) + .POST() + .withPayload(singletonMap("update", p)) + .build() + .process(cluster.getSolrClient()); + version = phaser.awaitAdvanceInterruptibly(version, 10, TimeUnit.SECONDS); + + TestDistribPackageStore.assertResponseValues(10, + () -> new V2Request.Builder("hello/plugin") + .forceV2(true) + .GET() + .build().process(cluster.getSolrClient()), + ImmutableMap.of("/config/boolVal", "true", "/config/strVal", cfg.strVal, "/config/longVal", "1234")); + + // kill the Overseer leader for (JettySolrRunner jetty : cluster.getJettySolrRunners()) { if (!jetty.getCoreContainer().getZkController().getOverseer().isClosed()) { cluster.stopJettySolrRunner(jetty); @@ -244,7 +266,7 @@ public void testApi() throws Exception { } private void expectFail(ThrowingRunnable runnable) throws Exception { - for(int i=0;i< 20;i++) { + for (int i = 0; i < 20; i++) { try { runnable.run(); } catch (Throwable throwable) { @@ -254,34 +276,40 @@ private void expectFail(ThrowingRunnable runnable) throws Exception { } fail("should have failed with an exception"); } + @Test public void testApiFromPackage() throws Exception { MiniSolrCloudCluster cluster = - configureCluster(4) - .withJettyConfig(jetty -> jetty.enableV2(true)) - .configure(); + configureCluster(4) + .withJettyConfig(jetty -> jetty.enableV2(true)) + .configure(); String FILE1 = "/myplugin/v1.jar"; String FILE2 = "/myplugin/v2.jar"; + ContainerPluginsRegistry pluginsRegistry = cluster.getOpenOverseer().getCoreContainer().getContainerPluginsRegistry(); + pluginsRegistry.setPhaser(phaser); + + int version = phaser.getPhase(); + String errPath = "/error/details[0]/errorMessages[0]"; try { byte[] derFile = readFile("cryptokeys/pub_key512.der"); - uploadKey(derFile, PackageStoreAPI.KEYS_DIR+"/pub_key512.der", cluster); + uploadKey(derFile, PackageStoreAPI.KEYS_DIR + "/pub_key512.der", cluster); TestPackages.postFileAndWait(cluster, "runtimecode/containerplugin.v.1.jar.bin", FILE1, - "pmrmWCDafdNpYle2rueAGnU2J6NYlcAey9mkZYbqh+5RdYo2Ln+llLF9voyRj+DDivK9GV1XdtKvD9rgCxlD7Q=="); - TestPackages.postFileAndWait(cluster, "runtimecode/containerplugin.v.2.jar.bin", FILE2, - "StR3DmqaUSL7qjDOeVEiCqE+ouiZAkW99fsL48F9oWG047o7NGgwwZ36iGgzDC3S2tPaFjRAd9Zg4UK7OZLQzg=="); + "pmrmWCDafdNpYle2rueAGnU2J6NYlcAey9mkZYbqh+5RdYo2Ln+llLF9voyRj+DDivK9GV1XdtKvD9rgCxlD7Q=="); + TestPackages.postFileAndWait(cluster, "runtimecode/containerplugin.v.2.jar.bin", FILE2, + "StR3DmqaUSL7qjDOeVEiCqE+ouiZAkW99fsL48F9oWG047o7NGgwwZ36iGgzDC3S2tPaFjRAd9Zg4UK7OZLQzg=="); - // We have two versions of the plugin in 2 different jar files. they are already uploaded to the package store + // We have two versions of the plugin in 2 different jar files. they are already uploaded to the package store Package.AddVersion add = new Package.AddVersion(); add.version = "1.0"; add.pkg = "mypkg"; add.files = singletonList(FILE1); V2Request addPkgVersionReq = new V2Request.Builder("/cluster/package") - .forceV2(true) - .POST() - .withPayload(singletonMap("add", add)) - .build(); + .forceV2(true) + .POST() + .withPayload(singletonMap("add", add)) + .build(); addPkgVersionReq.process(cluster.getSolrClient()); waitForAllNodesToSync(cluster, "/cluster/package", Utils.makeMap( @@ -295,28 +323,30 @@ public void testApiFromPackage() throws Exception { plugin.klass = "mypkg:org.apache.solr.handler.MyPlugin"; plugin.version = add.version; final V2Request req1 = new V2Request.Builder("/cluster/plugin") - .forceV2(true) - .POST() - .withPayload(singletonMap("add", plugin)) - .build(); + .forceV2(true) + .POST() + .withPayload(singletonMap("add", plugin)) + .build(); req1.process(cluster.getSolrClient()); - //verify the plugin creation + version = phaser.awaitAdvanceInterruptibly(version, 10, TimeUnit.SECONDS); + + //verify the plugin creation TestDistribPackageStore.assertResponseValues(10, - () -> new V2Request.Builder("/cluster/plugin"). - GET() - .build().process(cluster.getSolrClient()), - ImmutableMap.of( - "/plugin/myplugin/class", plugin.klass, - "/plugin/myplugin/version", plugin.version - )); + () -> new V2Request.Builder("/cluster/plugin"). + GET() + .build().process(cluster.getSolrClient()), + ImmutableMap.of( + "/plugin/myplugin/class", plugin.klass, + "/plugin/myplugin/version", plugin.version + )); //let's test this now Callable invokePlugin = () -> new V2Request.Builder("/plugin/my/path") - .forceV2(true) - .GET() - .build().process(cluster.getSolrClient()); + .forceV2(true) + .GET() + .build().process(cluster.getSolrClient()); TestDistribPackageStore.assertResponseValues(10, - invokePlugin, - ImmutableMap.of("/myplugin.version", "1.0")); + invokePlugin, + ImmutableMap.of("/myplugin.version", "1.0")); //now let's upload the jar file for version 2.0 of the plugin add.version = "2.0"; @@ -326,32 +356,34 @@ public void testApiFromPackage() throws Exception { //here the plugin version is updated plugin.version = add.version; new V2Request.Builder("/cluster/plugin") - .forceV2(true) - .POST() - .withPayload(singletonMap("update", plugin)) - .build() - .process(cluster.getSolrClient()); + .forceV2(true) + .POST() + .withPayload(singletonMap("update", plugin)) + .build() + .process(cluster.getSolrClient()); + version = phaser.awaitAdvanceInterruptibly(version, 10, TimeUnit.SECONDS); //now verify if it is indeed updated TestDistribPackageStore.assertResponseValues(10, - () -> new V2Request.Builder("/cluster/plugin"). - GET() - .build().process(cluster.getSolrClient()), - ImmutableMap.of( - "/plugin/myplugin/class", plugin.klass, - "/plugin/myplugin/version", "2.0" - )); + () -> new V2Request.Builder("/cluster/plugin"). + GET() + .build().process(cluster.getSolrClient()), + ImmutableMap.of( + "/plugin/myplugin/class", plugin.klass, + "/plugin/myplugin/version", "2.0" + )); // invoke the plugin and test thye output TestDistribPackageStore.assertResponseValues(10, - invokePlugin, - ImmutableMap.of("/myplugin.version", "2.0")); + invokePlugin, + ImmutableMap.of("/myplugin.version", "2.0")); plugin.name = "plugin2"; - plugin.klass = "mypkg:"+ C5.class.getName(); + plugin.klass = "mypkg:" + C5.class.getName(); plugin.version = "2.0"; req1.process(cluster.getSolrClient()); + version = phaser.awaitAdvanceInterruptibly(version, 10, TimeUnit.SECONDS); assertNotNull(C5.classData); - assertEquals( 1452, C5.classData.limit()); + assertEquals(1452, C5.classData.limit()); } finally { cluster.shutdown(); } @@ -360,14 +392,15 @@ public void testApiFromPackage() throws Exception { public static class CC1 extends CC { } + public static class CC2 extends CC1 { } + public static class CC implements ConfigurablePlugin { private CConfig cfg; - @Override public void configure(CConfig cfg) { this.cfg = cfg; @@ -375,8 +408,8 @@ public void configure(CConfig cfg) { } @EndPoint(method = GET, - path = "/hello/plugin", - permission = PermissionNameProvider.Name.READ_PERM) + path = "/hello/plugin", + permission = PermissionNameProvider.Name.READ_PERM) public void m2(SolrQueryRequest req, SolrQueryResponse rsp) { rsp.add("config", cfg); } @@ -436,24 +469,24 @@ public void stop() { public static class C5 implements ResourceLoaderAware { static ByteBuffer classData; - private SolrResourceLoader resourceLoader; + private SolrResourceLoader resourceLoader; @Override public void inform(ResourceLoader loader) throws IOException { this.resourceLoader = (SolrResourceLoader) loader; try { InputStream is = resourceLoader.openResource("org/apache/solr/handler/MyPlugin.class"); - byte[] buf = new byte[1024*5]; + byte[] buf = new byte[1024 * 5]; int sz = IOUtils.read(is, buf); - classData = ByteBuffer.wrap(buf, 0,sz); + classData = ByteBuffer.wrap(buf, 0, sz); } catch (IOException e) { //do not do anything } } @EndPoint(method = GET, - path = "/$plugin-name/m2", - permission = PermissionNameProvider.Name.COLL_READ_PERM) + path = "/$plugin-name/m2", + permission = PermissionNameProvider.Name.COLL_READ_PERM) public void m2() { @@ -466,18 +499,18 @@ public static class C1 { } @EndPoint( - method = GET, - path = "/plugin/my/plugin", - permission = PermissionNameProvider.Name.COLL_READ_PERM) + method = GET, + path = "/plugin/my/plugin", + permission = PermissionNameProvider.Name.COLL_READ_PERM) public class C2 { } @EndPoint( - method = GET, - path = "/plugin/my/plugin", - permission = PermissionNameProvider.Name.COLL_READ_PERM) + method = GET, + path = "/plugin/my/plugin", + permission = PermissionNameProvider.Name.COLL_READ_PERM) public static class C3 { @Command public void read(SolrQueryRequest req, SolrQueryResponse rsp) { @@ -489,15 +522,15 @@ public void read(SolrQueryRequest req, SolrQueryResponse rsp) { public static class C4 { @EndPoint(method = GET, - path = "$plugin-name/my/plugin", - permission = PermissionNameProvider.Name.READ_PERM) + path = "$plugin-name/my/plugin", + permission = PermissionNameProvider.Name.READ_PERM) public void m1(SolrQueryRequest req, SolrQueryResponse rsp) { rsp.add("method.name", "m1"); } @EndPoint(method = GET, - path = "$path-prefix/their/plugin", - permission = PermissionNameProvider.Name.READ_PERM) + path = "$path-prefix/their/plugin", + permission = PermissionNameProvider.Name.READ_PERM) public void m2(SolrQueryRequest req, SolrQueryResponse rsp) { rsp.add("method.name", "m2"); } @@ -505,7 +538,7 @@ public void m2(SolrQueryRequest req, SolrQueryResponse rsp) { } @SuppressWarnings("unchecked") - public static void waitForAllNodesToSync(MiniSolrCloudCluster cluster, String path, Map expected) throws Exception { + public static void waitForAllNodesToSync(MiniSolrCloudCluster cluster, String path, Map expected) throws Exception { for (JettySolrRunner jettySolrRunner : cluster.getJettySolrRunners()) { String baseUrl = jettySolrRunner.getBaseUrl().toString().replace("/solr", "/api"); String url = baseUrl + path + "?wt=javabin";