Skip to content

Commit 04952bc

Browse files
authoredMar 24, 2025
Merge pull request #6256 from zeusoo001/event-limit-speed
feat(event): limit block loading speed
2 parents 80b9cad + f22c18a commit 04952bc

File tree

6 files changed

+91
-2
lines changed

6 files changed

+91
-2
lines changed
 

‎framework/src/main/java/org/tron/common/logsfilter/EventPluginLoader.java

+20
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
import java.util.Objects;
1010
import java.util.Set;
1111
import java.util.stream.Collectors;
12+
13+
import lombok.Getter;
1214
import lombok.extern.slf4j.Slf4j;
1315
import org.bouncycastle.util.encoders.Hex;
1416
import org.pf4j.CompoundPluginDescriptorFinder;
@@ -31,6 +33,8 @@ public class EventPluginLoader {
3133

3234
private static EventPluginLoader instance;
3335

36+
private long MAX_PENDING_SIZE = 50000;
37+
3438
private PluginManager pluginManager = null;
3539

3640
private List<IPluginEventListener> eventListeners;
@@ -73,6 +77,7 @@ public class EventPluginLoader {
7377

7478
private FilterQuery filterQuery;
7579

80+
@Getter
7681
private boolean useNativeQueue = false;
7782

7883
public static EventPluginLoader getInstance() {
@@ -537,6 +542,21 @@ public void postContractEventTrigger(ContractEventTrigger trigger) {
537542
}
538543
}
539544

545+
public boolean isBusy() {
546+
if (useNativeQueue) {
547+
return false;
548+
}
549+
int queueSize = 0;
550+
for (IPluginEventListener listener : eventListeners) {
551+
try {
552+
queueSize += listener.getPendingSize();
553+
} catch (AbstractMethodError error) {
554+
break;
555+
}
556+
}
557+
return queueSize >= MAX_PENDING_SIZE;
558+
}
559+
540560
private String toJsonString(Object data) {
541561
String jsonData = "";
542562

‎framework/src/main/java/org/tron/common/logsfilter/IPluginEventListener.java

+2
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ public interface IPluginEventListener extends ExtensionPoint {
1313
// start should be called after setServerAddress, setTopic, setDBConfig
1414
void start();
1515

16+
int getPendingSize();
17+
1618
void handleBlockEvent(Object trigger);
1719

1820
void handleTransactionTrigger(Object trigger);

‎framework/src/main/java/org/tron/core/services/event/BlockEventLoad.java

+11-1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import org.springframework.beans.factory.annotation.Autowired;
1010
import org.springframework.stereotype.Service;
1111
import org.tron.common.es.ExecutorServiceManager;
12+
import org.tron.common.logsfilter.EventPluginLoader;
1213
import org.tron.core.db.Manager;
1314
import org.tron.core.services.event.bo.BlockEvent;
1415
import org.tron.core.services.event.bo.Event;
@@ -26,13 +27,19 @@ public class BlockEventLoad {
2627
@Autowired
2728
private BlockEventGet blockEventGet;
2829

30+
private EventPluginLoader instance = EventPluginLoader.getInstance();
31+
2932
private final ScheduledExecutorService executor = ExecutorServiceManager
3033
.newSingleThreadScheduledExecutor("event-load");
3134

35+
private long MAX_LOAD_NUM = 100;
36+
3237
public void init() {
3338
executor.scheduleWithFixedDelay(() -> {
3439
try {
35-
load();
40+
if (!instance.isBusy()) {
41+
load();
42+
}
3643
} catch (Exception e) {
3744
close();
3845
logger.error("Event load service fail.", e);
@@ -62,6 +69,9 @@ public synchronized void load() throws Exception {
6269
if (cacheHeadNum >= tmpNum) {
6370
return;
6471
}
72+
if (tmpNum > cacheHeadNum + MAX_LOAD_NUM) {
73+
tmpNum = cacheHeadNum + MAX_LOAD_NUM;
74+
}
6575
List<BlockEvent> l1 = new ArrayList<>();
6676
List<BlockEvent> l2 = new ArrayList<>();
6777
BlockEvent tmp = BlockEventCache.getHead();

‎framework/src/main/java/org/tron/core/services/event/HistoryEventService.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -55,12 +55,17 @@ private void syncEvent() {
5555
long tmp = instance.getStartSyncBlockNum();
5656
long endNum = manager.getDynamicPropertiesStore().getLatestSolidifiedBlockNum();
5757
while (tmp <= endNum) {
58+
if (instance.isUseNativeQueue()) {
59+
Thread.sleep(20);
60+
} else if (instance.isBusy()) {
61+
Thread.sleep(100);
62+
continue;
63+
}
5864
BlockEvent blockEvent = blockEventGet.getBlockEvent(tmp);
5965
realtimeEventService.flush(blockEvent, false);
6066
solidEventService.flush(blockEvent);
6167
tmp++;
6268
endNum = manager.getDynamicPropertiesStore().getLatestSolidifiedBlockNum();
63-
Thread.sleep(30);
6469
}
6570
initEventService(manager.getChainBaseManager().getBlockIdByNum(endNum));
6671
} catch (InterruptedException e1) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package org.tron.core.event;
2+
3+
import static org.mockito.Mockito.mock;
4+
5+
import java.util.ArrayList;
6+
import java.util.List;
7+
import org.junit.Assert;
8+
import org.junit.Test;
9+
import org.mockito.Mockito;
10+
import org.tron.common.logsfilter.EventPluginLoader;
11+
import org.tron.common.logsfilter.IPluginEventListener;
12+
import org.tron.common.utils.ReflectUtils;
13+
14+
public class EventPluginLoaderTest {
15+
16+
@Test
17+
public void testIsBusy() {
18+
19+
EventPluginLoader eventPluginLoader = EventPluginLoader.getInstance();
20+
ReflectUtils.setFieldValue(eventPluginLoader, "useNativeQueue", true);
21+
boolean flag = eventPluginLoader.isBusy();
22+
Assert.assertFalse(flag);
23+
24+
ReflectUtils.setFieldValue(eventPluginLoader, "useNativeQueue", false);
25+
26+
IPluginEventListener p1 = mock(IPluginEventListener.class);
27+
List<IPluginEventListener> list = new ArrayList<>();
28+
list.add(p1);
29+
ReflectUtils.setFieldValue(eventPluginLoader, "eventListeners", list);
30+
31+
Mockito.when(p1.getPendingSize()).thenReturn(100);
32+
flag = eventPluginLoader.isBusy();
33+
Assert.assertFalse(flag);
34+
35+
Mockito.when(p1.getPendingSize()).thenReturn(60000);
36+
flag = eventPluginLoader.isBusy();
37+
Assert.assertTrue(flag);
38+
39+
Mockito.when(p1.getPendingSize()).thenThrow(new AbstractMethodError());
40+
flag = eventPluginLoader.isBusy();
41+
Assert.assertFalse(flag);
42+
}
43+
}

‎framework/src/test/java/org/tron/core/event/HistoryEventServiceTest.java

+9
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ public class HistoryEventServiceTest {
2626
@Test
2727
public void test() throws Exception {
2828
EventPluginLoader instance = mock(EventPluginLoader.class);
29+
Mockito.when(instance.isUseNativeQueue()).thenReturn(true);
30+
Mockito.when(instance.isUseNativeQueue()).thenReturn(false);
31+
2932
ReflectUtils.setFieldValue(historyEventService, "instance", instance);
3033

3134
DynamicPropertiesStore dynamicPropertiesStore = mock(DynamicPropertiesStore.class);
@@ -39,6 +42,7 @@ public void test() throws Exception {
3942
SolidEventService solidEventService = new SolidEventService();
4043
RealtimeEventService realtimeEventService = new RealtimeEventService();
4144
BlockEventLoad blockEventLoad = new BlockEventLoad();
45+
ReflectUtils.setFieldValue(blockEventLoad, "instance", instance);
4246

4347
ReflectUtils.setFieldValue(historyEventService, "solidEventService", solidEventService);
4448
ReflectUtils.setFieldValue(historyEventService, "realtimeEventService", realtimeEventService);
@@ -77,11 +81,16 @@ public void test() throws Exception {
7781
Mockito.when(chainBaseManager.getBlockIdByNum(1L))
7882
.thenReturn(new BlockCapsule.BlockId(Sha256Hash.ZERO_HASH, 1));
7983

84+
Mockito.when(instance.isUseNativeQueue()).thenReturn(true);
85+
8086
Method method1 = historyEventService.getClass().getDeclaredMethod("syncEvent");
8187
method1.setAccessible(true);
8288
method1.invoke(historyEventService);
8389

90+
Mockito.when(instance.isUseNativeQueue()).thenReturn(false);
91+
Mockito.when(instance.isBusy()).thenReturn(true);
8492
historyEventService.init();
93+
Thread.sleep(1000);
8594
historyEventService.close();
8695
}
8796
}

0 commit comments

Comments
 (0)