Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
… into fix-3323

� Conflicts:
�	eventmesh-connector-plugin/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/consumer/PulsarConsumerImpl.java
  • Loading branch information
yuyuyu333333 committed Mar 2, 2023
2 parents 269d755 + a0b13d4 commit f248190
Show file tree
Hide file tree
Showing 506 changed files with 6,320 additions and 6,041 deletions.
7 changes: 4 additions & 3 deletions .github/workflows/greetings.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ jobs:
Want to get closer to the community?
WeChat Group:
![wechat_qr](https://github.com/apache/incubator-eventmesh/blob/master/docs/images/contact/wechat-assistant.jpg?raw=true)
|WeChat Assistant|WeChat Public Account|Slack|
|-|-|-|
|<img src="https://github.com/apache/incubator-eventmesh/blob/master/docs/images/contact/wechat-assistant.jpg?raw=true" width="128"/>|<img src="https://github.com/apache/incubator-eventmesh/blob/master/docs/images/contact/wechat-official.jpg?raw=true" width="128"/>|[Join Slack Chat](https://join.slack.com/t/apacheeventmesh/shared_invite/zt-1ooav8slu-ijUI98MiEI501jhK1cabKQ)|
Mailing Lists:
| Name | Description |Subscribe |Unsubscribe|Archive
Expand All @@ -63,7 +64,7 @@ jobs:
|WeChat Assistant|WeChat Public Account|Slack|
|-|-|-|
|<img src="docs/images/contact/wechat-assistant.jpg" width="128"/>|<img src="docs/images/contact/wechat-official.jpg" width="128"/>|[Join Slack Chat](https://join.slack.com/t/apacheeventmesh/shared_invite/zt-1ooav8slu-ijUI98MiEI501jhK1cabKQ)|
|<img src="https://github.com/apache/incubator-eventmesh/blob/master/docs/images/contact/wechat-assistant.jpg?raw=true" width="128"/>|<img src="https://github.com/apache/incubator-eventmesh/blob/master/docs/images/contact/wechat-official.jpg?raw=true" width="128"/>|[Join Slack Chat](https://join.slack.com/t/apacheeventmesh/shared_invite/zt-1ooav8slu-ijUI98MiEI501jhK1cabKQ)|
Mailing Lists:
| Name | Description |Subscribe |Unsubscribe|Archive
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

### EventMesh Architecture

![EventMesh Architecture](docs/images/eventmesh-architecture-2.png)
![EventMesh Architecture](docs/images/eventmesh-architecture-3.png)

### EventMesh Dashboard
![EventMesh Dashboard](docs/images/dashboard.png)
Expand Down
Binary file added docs/images/eventmesh-architecture-3.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
87 changes: 87 additions & 0 deletions docs/zh/design-document/07-tracing.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# 分布式追踪

## OpenTelemetry概述

OpenTelemetry是一组API和SDK的工具,您可以使用它来仪器化、生成、收集和导出遥测数据(指标、日志和追踪),以便进行分析,以了解您的软件性能和行为。

## 需求

- 设置追踪器
- 不同的导出器
- 在服务器中开始和结束跨度

## 设计细节

- 跨度处理器:BatchSpanProcessor

- 导出器:默认为日志,可以从属性中更改

```java
// Configure the batch spans processor. This span processor exports span in batches.
BatchSpanProcessor batchSpansProcessor =
BatchSpanProcessor.builder(exporter)
.setMaxExportBatchSize(512) // set the maximum batch size to use
.setMaxQueueSize(2048) // set the queue size. This must be >= the export batch size
.setExporterTimeout(
30, TimeUnit.SECONDS) // set the max amount of time an export can run before getting
// interrupted
.setScheduleDelay(5, TimeUnit.SECONDS) // set time between two different exports
.build();
OpenTelemetrySdk.builder()
.setTracerProvider(
SdkTracerProvider.builder().addSpanProcessor(batchSpansProcessor).build())
.build();
```

1. 当使用`EventMeshHTTPServer`类的`init()`方法时,类`AbstractHTTPServer`将获取跟踪器。

```java
super.openTelemetryTraceFactory = new OpenTelemetryTraceFactory(eventMeshHttpConfiguration);
super.tracer = openTelemetryTraceFactory.getTracer(this.getClass().toString());
super.textMapPropagator = openTelemetryTraceFactory.getTextMapPropagator();
```

2. 然后,在类`AbstractHTTPServer`中的跟踪将起作用。

## 问题

### 如何在类“OpenTelemetryTraceFactory”中设置不同的导出器?(已解决)

在从属性中获取导出器类型之后,如何处理它。

`logExporter`只需要创建新实例即可。

但是,“zipkinExporter”需要新建并使用“getZipkinExporter()”方法。

## 解决方案

### 不同导出器的解决方案

使用反射获取导出器。

首先,不同的导出器必须实现接口“EventMeshExporter”。

然后,我们从配置中获取导出器名称,并反射到该类。

```java
//different spanExporter
String exporterName = configuration.eventMeshTraceExporterType;
//use reflection to get spanExporter
String className = String.format("org.apache.eventmesh.runtime.exporter.%sExporter",exporterName);
EventMeshExporter eventMeshExporter = (EventMeshExporter) Class.forName(className).newInstance();
spanExporter = eventMeshExporter.getSpanExporter(configuration);
```

另外,这将包含try catch。如果无法成功获取指定的导出器,则将使用默认的日志导出器。

#### 不同导出器的改进

SPI(待完成)

## 附录

### 参考资料

- <https://github.com/open-telemetry/docs-cn/blob/main/QUICKSTART.md>

- <https://github.com/open-telemetry/opentelemetry-java-instrumentation/tree/main/instrumentation/netty>
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class UrlMappingPattern {
private static final String URL_PARAMETER_REGEX = "\\{(\\w*?)\\}";

private static final String URL_PARAMETER_MATCH_REGEX =
"\\([%\\\\w-.\\\\~!\\$&'\\\\(\\\\)\\\\*\\\\+,;=:\\\\[\\\\]@]+?\\)";
"\\([%\\\\w-.\\\\~!\\$&'\\\\(\\\\)\\\\*\\\\+,;=:\\\\[\\\\]@]+?\\)";

private static final Pattern URL_PARAMETER_PATTERN = Pattern.compile(URL_PARAMETER_REGEX);

Expand Down Expand Up @@ -75,7 +75,7 @@ public boolean matches(String url) {
public void compile() {
acquireParamNames();
String parsedPattern =
getUrlMappingPattern().replaceFirst(URL_FORMAT_REGEX, URL_FORMAT_MATCH_REGEX);
getUrlMappingPattern().replaceFirst(URL_FORMAT_REGEX, URL_FORMAT_MATCH_REGEX);
parsedPattern = parsedPattern.replaceAll(URL_PARAMETER_REGEX, URL_PARAMETER_MATCH_REGEX);
this.compiledUrlMappingPattern = Pattern.compile(parsedPattern + URL_QUERY_STRING_REGEX);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,10 @@ public EventMeshThreadFactory(final String threadNamePrefix) {
}

/**
* Constructs a new {@code Thread}. Implementations may also initialize
* priority, name, daemon status, {@code ThreadGroup}, etc.
* Constructs a new {@code Thread}. Implementations may also initialize priority, name, daemon status, {@code ThreadGroup}, etc.
*
* @param runnable a runnable to be executed by new thread instance
* @return constructed thread, or {@code null} if the request to
* create a thread is rejected
* @return constructed thread, or {@code null} if the request to create a thread is rejected
*/
@Override
public Thread newThread(final Runnable runnable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
@NoArgsConstructor
@Config(prefix = "eventMesh")
public class CommonConfiguration {

@ConfigFiled(field = "sysid", beNumber = true, notEmpty = true)
private String sysID = "5477";

Expand Down Expand Up @@ -118,4 +119,4 @@ public void reload() {

meshGroup = String.join("-", this.eventMeshEnv, this.eventMeshCluster, this.sysID);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@interface Configs {

Config[] value();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
*/
@Data
public class ConvertInfo {

char hump;
String key;
Field field;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ default Object processFieldValue(ConvertInfo convertInfo, String key, ConfigFile
String fieldName = configFiled.field();

if (StringUtils.isBlank(value) && !StringUtils.isBlank(fieldName) && findEnv) {
value = Optional.ofNullable(System.getProperty(fieldName)).orElseGet(() -> System.getenv(fieldName));
value = Optional.ofNullable(System.getProperty(fieldName)).orElse(System.getenv(fieldName));
}

if (StringUtils.isBlank(value) && configFiled.notEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,11 @@ public List<Object> convert(ConvertInfo convertInfo, String separator) {
list = (List<Object>) convertInfo.getField().getType().newInstance();
}

Type parameterizedType = ((ParameterizedType) convertInfo.getField()
.getGenericType()).getActualTypeArguments()[0];
Type parameterizedType = ((ParameterizedType) convertInfo.getField().getGenericType()).getActualTypeArguments()[0];

ConvertValue<?> clazzConverter = ConverterMap.getClazzConverter((Class<?>) parameterizedType);

List<String> values = Splitter.on(separator).omitEmptyStrings().trimResults()
.splitToList((String) convertInfo.getValue());
List<String> values = Splitter.on(separator).omitEmptyStrings().trimResults().splitToList((String) convertInfo.getValue());
for (String value : values) {
convertInfo.setValue(value);
list.add(clazzConverter.convert(convertInfo));
Expand All @@ -83,6 +81,7 @@ public List<Object> convert(ConvertInfo convertInfo, String separator) {


public static class ListConverterSemi extends ListConverter {

public String separator = ";";

public String getSeparator() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
/**
* Config field conversion class for String
*/
public class StringConverter implements ConvertValue<String> {
public class StringConverter implements ConvertValue<String> {

@Override
public String convert(ConvertInfo convertInfo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.eventmesh.common.exception;

public class EventMeshException extends RuntimeException {

public static final long serialVersionUID = 5648256502005456586L;

public EventMeshException(String message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@
package org.apache.eventmesh.common.file;

/**
* Users can register {@link FileChangeListener} with WatchFileManager via the
* {@link WatchFileManager#registerFileChangeListener(java.lang.String, org.apache.eventmesh.common.file.FileChangeListener)} method.
* The {@link FileChangeListener#onChanged(org.apache.eventmesh.common.file.FileChangeContext)} method fires when a file changes,
* and the {@link FileChangeListener#support(org.apache.eventmesh.common.file.FileChangeContext)} method let the user customize
* which files are supported
* Users can register {@link FileChangeListener} with WatchFileManager via the {@link WatchFileManager#registerFileChangeListener(java.lang.String,
* org.apache.eventmesh.common.file.FileChangeListener)} method.
* The {@link FileChangeListener#onChanged(org.apache.eventmesh.common.file.FileChangeContext)}
* method fires when a file changes, and the {@link FileChangeListener#support(org.apache.eventmesh.common.file.FileChangeContext)} method let the
* user customize which files are supported
*/
public interface FileChangeListener {

/**
* triggered when a file change occurs
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ public class WatchFileManager {
}));
}

public static void registerFileChangeListener(String directoryPath,
FileChangeListener listener) {
public static void registerFileChangeListener(String directoryPath, FileChangeListener listener) {
WatchFileTask task = WATCH_FILE_TASK_MAP.get(directoryPath);
if (task == null) {
task = new WatchFileTask(directoryPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public WatchFileTask(String directoryPath) {
try (WatchService watchService = FILE_SYSTEM.newWatchService()) {
this.watchService = watchService;
path.register(this.watchService, StandardWatchEventKinds.OVERFLOW, StandardWatchEventKinds.ENTRY_MODIFY,
StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE);
StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE);
} catch (Exception ex) {
throw new UnsupportedOperationException("WatchService registry fail", ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@
import lombok.extern.slf4j.Slf4j;

/**
* This selector use random strategy.
* Each selection will randomly give one from the given list
* This selector use random strategy. Each selection will randomly give one from the given list
*
* @param <T> Target type
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ public AtomicInteger getCurrentWeight() {
@Override
public String toString() {
return "Wight{"
+ "target=" + target
+ ", value=" + value
+ ", currentWeight=" + currentWeight
+ '}';
+ "target=" + target
+ ", value=" + value
+ ", currentWeight=" + currentWeight
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,8 @@
import java.util.concurrent.ThreadLocalRandom;

/**
* This selector use the weighted random strategy to select from list.
* If all the weights are same, it will randomly select one from list.
* If the weights are different, it will select one by using RandomUtils.nextInt(0, w0 + w1 ... + wn)
* This selector use the weighted random strategy to select from list. If all the weights are same, it will randomly select one from list. If the
* weights are different, it will select one by using RandomUtils.nextInt(0, w0 + w1 ... + wn)
*
* @param <T> Target type
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@
import lombok.extern.slf4j.Slf4j;

/**
* This selector use the weighted round robin strategy to select from list.
* If the weight is greater, the probability of being selected is larger.
* This selector use the weighted round robin strategy to select from list. If the weight is greater, the probability of being selected is larger.
*
* @param <T> Target type
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,5 @@
* </ul>
*/
public interface ProtocolTransportObject extends Serializable {

}
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ public void setMode(SubscriptionMode mode) {
@Override
public String toString() {
return "SubscriptionItem{"
+ "topic=" + topic
+ ", mode=" + mode
+ ", type=" + type
+ '}';
+ "topic=" + topic
+ ", mode=" + mode
+ ", type=" + type
+ '}';
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.databind.util.Converter;

public class SubscriptionModeConverter implements Converter<String, SubscriptionMode> {

@Override
public SubscriptionMode convert(String value) {
return SubscriptionMode.valueOf(value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.databind.util.Converter;

public class SubscriptionTypeConverter implements Converter<String, SubscriptionType> {

@Override
public SubscriptionType convert(String value) {
return SubscriptionType.valueOf(value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.eventmesh.common.protocol.grpc.protos.BatchMessage;

public class BatchMessageWrapper implements ProtocolTransportObject {

private static final long serialVersionUID = -3296467364340663768L;
private final BatchMessage batchMessage;

Expand Down
Loading

0 comments on commit f248190

Please sign in to comment.