Skip to content

Commit

Permalink
eventmesh-security-auth-token
Browse files Browse the repository at this point in the history
  • Loading branch information
yuyuyu333333 committed Feb 22, 2023
1 parent b4ff427 commit 4bfb20c
Show file tree
Hide file tree
Showing 12 changed files with 240 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ public class CommonConfiguration {
@ConfigFiled(field = "connector.plugin.type", notEmpty = true)
private String eventMeshConnectorPluginType = "rocketmq";

@ConfigFiled(field = "security.validation.type.token", notEmpty = true)
private boolean eventMeshSecurityValidateTypeToken = false;

@ConfigFiled(field = "registry.plugin.username")
private String eventMeshRegistryPluginUsername = "";
Expand All @@ -94,6 +96,8 @@ public class CommonConfiguration {
@ConfigFiled(field = "server.registry.enabled")
private boolean eventMeshServerRegistryEnable = false;

@ConfigFiled(field = "security.publickey")
private String eventMeshSecurityPublickey = "";

@ConfigFiled(field = "server.provide.protocols", reload = true)
private List<String> eventMeshProvideServerProtocols;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public static class ClientInstanceKey {
public static final String UNIQUEID = "uniqueid";
public static final String PRODUCERGROUP = "producergroup";
public static final String CONSUMERGROUP = "consumergroup";

public static final String TOKEN = "token";
}


Expand Down
2 changes: 2 additions & 0 deletions eventmesh-runtime/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ dependencies {
implementation project(":eventmesh-security-plugin:eventmesh-security-api")
implementation project(":eventmesh-security-plugin:eventmesh-security-acl")
implementation project(":eventmesh-security-plugin:eventmesh-security-auth-http-basic")
implementation project(":eventmesh-security-plugin:eventmesh-security-auth-token")

implementation project(":eventmesh-registry-plugin:eventmesh-registry-api")
implementation project(":eventmesh-admin:eventmesh-admin-rocketmq")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,18 @@
import org.apache.eventmesh.api.acl.AclProperties;
import org.apache.eventmesh.api.acl.AclService;
import org.apache.eventmesh.api.exception.AclException;
import org.apache.eventmesh.common.protocol.http.common.ProtocolKey;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
import org.apache.eventmesh.spi.EventMeshExtensionFactory;

import org.apache.commons.lang3.StringUtils;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;

import io.cloudevents.CloudEvent;

import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -128,6 +131,10 @@ public void doAclCheckInHttpSend(String remoteAddr, String user, String pass, St
aclService.doAclCheckInSend(buildHttpAclProperties(remoteAddr, user, pass, subsystem, topic, requestURI));
}

public void doAclCheckInHttpSend(String remoteAddr, String requestURI, CloudEvent event) throws AclException {
aclService.doAclCheckInSend(buildHttpAclProperties(remoteAddr, requestURI, event));
}

public void doAclCheckInHttpReceive(String remoteAddr, String user, String pass, String subsystem, String topic,
int requestCode) throws AclException {
aclService.doAclCheckInReceive(buildHttpAclProperties(remoteAddr, user, pass, subsystem, topic, requestCode));
Expand Down Expand Up @@ -168,4 +175,29 @@ private AclProperties buildHttpAclProperties(String remoteAddr, String user, Str
}
return aclProperties;
}

private static AclProperties buildHttpAclProperties(String remoteAddr, String requestURI, CloudEvent event) {
AclProperties aclProperties = new AclProperties();
final String user = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.USERNAME)).toString();
final String pass = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.PASSWD)).toString();
final String subsystem = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.SYS)).toString();
final String token = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.TOKEN)).toString();
final String topic = event.getSubject();
aclProperties.setClientIp(remoteAddr);
if (StringUtils.isNotBlank(token)) {
aclProperties.setToken(token);
}
if (StringUtils.isNotBlank(user)) {
aclProperties.setUser(user);
}
if (StringUtils.isNotBlank(pass)) {
aclProperties.setPwd(pass);
}
aclProperties.setSubsystem(subsystem);
aclProperties.setRequestURI(requestURI);
if (StringUtils.isNotBlank(topic)) {
aclProperties.setTopic(topic);
}
return aclProperties;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,17 @@
public class ProducerGroupConf {
private String groupName;

private String token;

public ProducerGroupConf(String groupName) {
this.groupName = groupName;
}

public ProducerGroupConf(String groupName, String token) {
this.groupName = groupName;
this.token = token;
}

public String getGroupName() {
return groupName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,18 +167,20 @@ public void handler(final HandlerService.HandlerSpecific handlerSpecific, final
return;
}

String tokenTmp = null;
//do acl check
if (eventMeshHTTPServer.getEventMeshHttpConfiguration().isEventMeshServerSecurityEnable()) {
final String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
final String user = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.USERNAME)).toString();
final String pass = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.PASSWD)).toString();
final String subsystem = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.SYS)).toString();
final String requestURI = requestWrapper.getRequestURI();
try {
this.acl.doAclCheckInHttpSend(remoteAddr, user, pass, subsystem, topic, requestURI);
if (eventMeshHTTPServer.getEventMeshHttpConfiguration().isEventMeshSecurityValidateTypeToken()) {
final String token = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.TOKEN)).toString();
tokenTmp = token;
}
this.acl.doAclCheckInHttpSend(remoteAddr, requestURI, event);
} catch (Exception e) {
handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_ACL_ERR, responseHeaderMap,
responseBodyMap, EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), event));
responseBodyMap, EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), event));
if (log.isWarnEnabled()) {
log.warn("CLIENT HAS NO PERMISSION,SendAsyncMessageProcessor send failed", e);
}
Expand All @@ -188,13 +190,18 @@ public void handler(final HandlerService.HandlerSpecific handlerSpecific, final

// control flow rate limit
if (!eventMeshHTTPServer.getMsgRateLimiter()
.tryAcquire(EventMeshConstants.DEFAULT_FASTFAIL_TIMEOUT_IN_MILLISECONDS, TimeUnit.MILLISECONDS)) {
.tryAcquire(EventMeshConstants.DEFAULT_FASTFAIL_TIMEOUT_IN_MILLISECONDS, TimeUnit.MILLISECONDS)) {
handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR, responseHeaderMap,
responseBodyMap, EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), event));
responseBodyMap, EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), event));
return;
}

final EventMeshProducer eventMeshProducer = eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup);
final EventMeshProducer eventMeshProducer;
if (StringUtils.isNotBlank(tokenTmp)) {
eventMeshProducer = eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup, tokenTmp);
} else {
eventMeshProducer = eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup);
}

if (!eventMeshProducer.getStarted().get()) {
handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_GROUP_PRODUCER_STOPED_ERR, responseHeaderMap,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,27 @@ public EventMeshProducer getEventMeshProducer(String producerGroup) throws Excep
return eventMeshProducer;
}

public EventMeshProducer getEventMeshProducer(String producerGroup, String token) throws Exception {
EventMeshProducer eventMeshProducer = null;
if (!producerTable.containsKey(producerGroup)) {
synchronized (producerTable) {
if (!producerTable.containsKey(producerGroup)) {
ProducerGroupConf producerGroupConfig = new ProducerGroupConf(producerGroup, token);
eventMeshProducer = createEventMeshProducer(producerGroupConfig);
eventMeshProducer.start();
}
}
}

eventMeshProducer = producerTable.get(producerGroup);

if (!eventMeshProducer.getStarted().get()) {
eventMeshProducer.start();
}

return eventMeshProducer;
}

public synchronized EventMeshProducer createEventMeshProducer(ProducerGroupConf producerGroupConfig) throws Exception {
if (producerTable.containsKey(producerGroupConfig.getGroupName())) {
return producerTable.get(producerGroupConfig.getGroupName());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


def jwtVersion = '0.11.1'

dependencies {

implementation project(":eventmesh-common")
implementation "io.jsonwebtoken:jjwt-api:${jwtVersion}"
implementation "io.jsonwebtoken:jjwt-impl:${jwtVersion}"
implementation "io.jsonwebtoken:jjwt-jackson:${jwtVersion}"

implementation project(":eventmesh-security-plugin:eventmesh-security-api")

testImplementation project(":eventmesh-security-plugin:eventmesh-security-api")


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

pluginType=security
pluginName=auth-token
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package org.apache.eventmesh.auth.token.impl;

import org.apache.eventmesh.api.acl.AclProperties;
import org.apache.eventmesh.api.acl.AclService;
import org.apache.eventmesh.api.exception.AclException;
import org.apache.eventmesh.auth.token.impl.auth.AuthTokenUtils;

public class AuthTokenServiceImpl implements AclService {

@Override
public void init() throws AclException {
}

@Override
public void start() throws AclException {

}

@Override
public void shutdown() throws AclException {

}

@Override
public void doAclCheckInConnect(AclProperties aclProperties) throws AclException {
AuthTokenUtils.authTokenByPublicKey(aclProperties);
}

@Override
public void doAclCheckInHeartbeat(AclProperties aclProperties) throws AclException {

}

@Override
public void doAclCheckInSend(AclProperties aclProperties) throws AclException {
AuthTokenUtils.authTokenByPublicKey(aclProperties);
}

@Override
public void doAclCheckInReceive(AclProperties aclProperties) throws AclException {
AuthTokenUtils.authTokenByPublicKey(aclProperties);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package org.apache.eventmesh.auth.token.impl.auth;

import org.apache.eventmesh.api.acl.AclProperties;
import org.apache.eventmesh.api.exception.AclException;
import org.apache.eventmesh.common.config.CommonConfiguration;
import org.apache.eventmesh.common.utils.ConfigurationContextUtil;

import org.apache.commons.lang3.StringUtils;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.security.Key;
import java.security.KeyFactory;
import java.security.NoSuchAlgorithmException;
import java.security.spec.InvalidKeySpecException;
import java.security.spec.X509EncodedKeySpec;

import io.jsonwebtoken.JwtException;
import io.jsonwebtoken.JwtParser;
import io.jsonwebtoken.Jwts;

public class AuthTokenUtils {

public static void authTokenByPublicKey(AclProperties aclProperties) {
String publicKeyUrl = "";
for (String key : ConfigurationContextUtil.KEYS) {
CommonConfiguration commonConfiguration = ConfigurationContextUtil.get(key);
if (null == commonConfiguration) {
continue;
}
if (StringUtils.isBlank(commonConfiguration.getEventMeshSecurityPublickey())) {

throw new AclException("publicKeyUrl cannot be null");

}
publicKeyUrl = commonConfiguration.getEventMeshSecurityPublickey();
}
String token = aclProperties.getToken();
if (StringUtils.isNotBlank(token)) {
token = token.replace("Bearer ", "");
byte[] validationKeyBytes = new byte[0];
try {
validationKeyBytes = Files.readAllBytes(Paths.get(publicKeyUrl));
X509EncodedKeySpec spec = new X509EncodedKeySpec(validationKeyBytes);
KeyFactory kf = KeyFactory.getInstance("RSA");
Key validationKey = kf.generatePublic(spec);
JwtParser signedParser = Jwts.parserBuilder().setSigningKey(validationKey).build();
signedParser.parseClaimsJws(token);
} catch (IOException e) {
throw new AclException("public key read error!", e);
} catch (NoSuchAlgorithmException e) {
throw new AclException("no such RSA algorithm!", e);
} catch (InvalidKeySpecException e) {
throw new AclException("invalid public key spec!", e);
} catch (JwtException e) {
throw new AclException("invalid token!", e);
}
}
}

}
1 change: 1 addition & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ include 'eventmesh-metrics-plugin:eventmesh-metrics-api'
include 'eventmesh-metrics-plugin:eventmesh-metrics-prometheus'

include 'eventmesh-security-plugin:eventmesh-security-auth-http-basic'
include 'eventmesh-security-plugin:eventmesh-security-auth-token'

include 'eventmesh-trace-plugin'
include 'eventmesh-trace-plugin:eventmesh-trace-api'
Expand Down

0 comments on commit 4bfb20c

Please sign in to comment.