Skip to content

Commit

Permalink
1. Correct Setting's caching behavior
Browse files Browse the repository at this point in the history
2. Add a webhook based setting provider for demo
  • Loading branch information
popduke committed Apr 12, 2024
1 parent 0ac634a commit c387df1
Show file tree
Hide file tree
Showing 14 changed files with 580 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,20 @@

package com.baidu.bifromq.plugin.settingprovider;

import java.util.EnumMap;

class DevOnlySettingProvider implements ISettingProvider {
private final EnumMap<Setting, Object> initialValues = new EnumMap<>(Setting.class);

DevOnlySettingProvider() {
for (Setting setting : Setting.values()) {
initialValues.put(setting, setting.current("DevOnly"));
}
}

@SuppressWarnings("unchecked")
@Override
public <R> R provide(Setting setting, String tenantId) {
return setting.current(tenantId);
return (R) initialValues.get(setting);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright (c) 2024. The BifroMQ Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and limitations under the License.
*/

package com.baidu.bifromq.plugin.settingprovider;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Timer;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class MonitoredSettingProvider implements ISettingProvider {
private final Timer provideCallTimer;
private final Counter provideCallErrorCounter;
private final ISettingProvider delegate;

public MonitoredSettingProvider(ISettingProvider delegate) {
this.delegate = delegate;
provideCallTimer = Timer.builder("call.exec.timer")
.tag("method", "SettingProvider/provide")
.tag("type", delegate.getClass().getName())
.register(Metrics.globalRegistry);
provideCallErrorCounter = Counter.builder("call.exec.fail.count")
.tag("method", "SettingProvider/provide")
.tag("type", delegate.getClass().getName())
.register(Metrics.globalRegistry);
}

@Override
public <R> R provide(Setting setting, String tenantId) {
try {
Timer.Sample sample = Timer.start();
R newVal = delegate.provide(setting, tenantId);
sample.stop(provideCallTimer);
if (setting.isValid(newVal)) {
return newVal;
} else {
log.warn("Invalid setting value: setting={}, value={}", setting.name(), newVal);
provideCallErrorCounter.increment();
return null;
}
} catch (Throwable e) {
log.error("Setting provider throws exception: setting={}", setting.name(), e);
provideCallErrorCounter.increment();
return null;
}
}

@Override
public void close() {
delegate.close();
Metrics.globalRegistry.remove(provideCallTimer);
Metrics.globalRegistry.remove(provideCallErrorCounter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@
package com.baidu.bifromq.plugin.settingprovider;

import com.google.common.base.Preconditions;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Timer;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
Expand All @@ -27,56 +24,32 @@
public class SettingProviderManager implements ISettingProvider {
private final AtomicBoolean stopped = new AtomicBoolean();
private final ISettingProvider provider;
private final Timer provideCallTimer;
private final Counter provideCallErrorCounter;

public SettingProviderManager(String settingProviderFQN, PluginManager pluginMgr) {
Map<String, ISettingProvider> availSettingProviders = pluginMgr.getExtensions(ISettingProvider.class).stream()
.collect(Collectors.toMap(e -> e.getClass().getName(), e -> e));
if (availSettingProviders.isEmpty()) {
log.warn("No setting provider plugin available, use DEV ONLY one instead");
provider = new DevOnlySettingProvider();
provider = new MonitoredSettingProvider(new DevOnlySettingProvider());
} else {
if (settingProviderFQN == null) {
log.warn("Setting provider plugin type are not specified, use DEV ONLY one instead");
provider = new DevOnlySettingProvider();
provider = new MonitoredSettingProvider(new DevOnlySettingProvider());
} else {
Preconditions.checkArgument(availSettingProviders.containsKey(settingProviderFQN),
String.format("Setting provider Plugin '%s' not found", settingProviderFQN));
log.info("Setting provider plugin type: {}", settingProviderFQN);
provider = availSettingProviders.get(settingProviderFQN);
provider = new MonitoredSettingProvider(availSettingProviders.get(settingProviderFQN));
}
}
provideCallTimer = Timer.builder("call.exec.timer")
.tag("method", "SettingProvider/provide")
.tag("type", provider.getClass().getName())
.register(Metrics.globalRegistry);
provideCallErrorCounter = Counter.builder("call.exec.fail.count")
.tag("method", "SettingProvider/provide")
.tag("type", provider.getClass().getName())
.register(Metrics.globalRegistry);
for (Setting setting : Setting.values()) {
setting.setProvider(provider);
}
}

public <R> R provide(Setting setting, String tenantId) {
assert !stopped.get();
R current = setting.current(tenantId);
try {
Timer.Sample sample = Timer.start();
R newVal = provider.provide(setting, tenantId);
sample.stop(provideCallTimer);
if (setting.isValid(newVal)) {
// update the value
setting.current(tenantId, newVal);
} else {
log.warn("Invalid setting value: setting={}, value={}", setting.name(), newVal);
}
} catch (Throwable e) {
log.error("Setting provider throws exception: setting={}", setting.name(), e);
// keep current value in case provider throws
setting.current(tenantId, current);
provideCallErrorCounter.increment();
}
return current;
return setting.current(tenantId);
}

// for testing
Expand All @@ -89,8 +62,6 @@ public void close() {
if (stopped.compareAndSet(false, true)) {
log.info("Closing setting provider manager");
provider.close();
Metrics.globalRegistry.remove(provideCallTimer);
Metrics.globalRegistry.remove(provideCallErrorCounter);
log.info("Setting provider manager closed");
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright (c) 2024. The BifroMQ Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and limitations under the License.
*/

package com.baidu.bifromq.plugin.settingprovider;

import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;

import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import lombok.SneakyThrows;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public class MonitoredSettingProviderTest {
@Mock
private ISettingProvider provider;

private SimpleMeterRegistry registry;
private AutoCloseable closeable;

@BeforeMethod
public void setup() {
registry = new SimpleMeterRegistry();
Metrics.addRegistry(registry);
closeable = MockitoAnnotations.openMocks(this);
}

@SneakyThrows
@AfterMethod
public void tearDown() {
Metrics.removeRegistry(registry);
closeable.close();
}

@Test
public void provideValidValue() {
MonitoredSettingProvider monitoredSettingProvider = new MonitoredSettingProvider(provider);
String tenantId = "tenantA";
when(provider.provide(Setting.MaxTopicLevels, tenantId)).thenReturn(64);
int levels = monitoredSettingProvider.provide(Setting.MaxTopicLevels, tenantId);
assertEquals(levels, 64);
verify(provider).provide(Setting.MaxTopicLevels, tenantId);
assertEquals(registry.get("call.exec.timer").timer().count(), 1);
}

@Test
public void provideInvalidValue() {
MonitoredSettingProvider monitoredSettingProvider = new MonitoredSettingProvider(provider);
String tenantId = "tenantA";
when(provider.provide(Setting.MaxTopicLevels, tenantId)).thenReturn(new Object());
assertNull(monitoredSettingProvider.provide(Setting.MaxTopicLevels, tenantId));
verify(provider).provide(Setting.MaxTopicLevels, tenantId);
assertEquals(registry.get("call.exec.timer").timer().count(), 1);
assertEquals(registry.get("call.exec.fail.count").counter().count(), 1);
}

@Test
public void provideNullValue() {
MonitoredSettingProvider monitoredSettingProvider = new MonitoredSettingProvider(provider);
String tenantId = "tenantA";
when(provider.provide(Setting.MaxTopicLevels, tenantId)).thenReturn(null);
assertNull(monitoredSettingProvider.provide(Setting.MaxTopicLevels, tenantId));
verify(provider).provide(Setting.MaxTopicLevels, tenantId);
assertEquals(registry.get("call.exec.timer").timer().count(), 1);
assertEquals(registry.get("call.exec.fail.count").counter().count(), 1);
}

@Test
public void provideThrows() {
MonitoredSettingProvider monitoredSettingProvider = new MonitoredSettingProvider(provider);
String tenantId = "tenantA";
when(provider.provide(Setting.MaxTopicLevels, tenantId)).thenThrow(new RuntimeException("Mocked exception"));
assertNull(monitoredSettingProvider.provide(Setting.MaxTopicLevels, tenantId));
verify(provider).provide(Setting.MaxTopicLevels, tenantId);
assertEquals(registry.get("call.exec.fail.count").counter().count(), 1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

package com.baidu.bifromq.plugin.settingprovider;

import static org.awaitility.Awaitility.await;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

Expand Down Expand Up @@ -51,9 +51,7 @@ public void devOnlyMode() {
@Test
public void authPluginSpecified() {
manager = new SettingProviderManager(SettingProviderTestStub.class.getName(), pluginManager);
SettingProviderTestStub stub = (SettingProviderTestStub) manager.get();
stub.setValue(Setting.MaxTopicLevels, 64);
await().until(() -> (int) manager.provide(Setting.MaxTopicLevels, tenantId) == 64);
assertEquals((int) manager.provide(Setting.MaxTopicLevels, tenantId), 64);
manager.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,13 @@

@Extension
public class SettingProviderTestStub implements ISettingProvider {
private final Map<Setting, Object> settings = new HashMap<>();
private final Map<Setting, Object> settings = new HashMap<>() {{
put(Setting.MaxTopicLevels, 64);
}};

@SuppressWarnings("unchecked")
@Override
public <R> R provide(Setting setting, String tenantId) {
return settings.containsKey(setting) ? (R) settings.get(setting) : setting.current(tenantId);
}

public void setValue(Setting setting, Object newVal) {
settings.put(setting, newVal);
}
}
Loading

0 comments on commit c387df1

Please sign in to comment.