Skip to content

Commit

Permalink
订阅端acl12
Browse files Browse the repository at this point in the history
  • Loading branch information
yuyuyu333333 committed Mar 9, 2023
1 parent d0cc6f7 commit 30cf6c2
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ public class PulsarConsumerImpl implements Consumer {
private final AtomicBoolean started = new AtomicBoolean(false);
private Properties properties;
private PulsarClient pulsarClient;
private org.apache.pulsar.client.api.Consumer<byte[]> consumer;
private EventListener eventListener;

private ConcurrentHashMap<String, org.apache.pulsar.client.api.Consumer<byte[]>> consumerMap = new ConcurrentHashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

import org.apache.eventmesh.api.exception.RegistryException;
import org.apache.eventmesh.api.registry.RegistryService;
import org.apache.eventmesh.api.registry.bo.EventMeshServicePubTopicInfo;
import org.apache.eventmesh.api.registry.bo.EventMeshAppSubTopicInfo;
import org.apache.eventmesh.api.registry.bo.EventMeshServicePubTopicInfo;
import org.apache.eventmesh.api.registry.dto.EventMeshDataInfo;
import org.apache.eventmesh.api.registry.dto.EventMeshRegisterInfo;
import org.apache.eventmesh.api.registry.dto.EventMeshUnRegisterInfo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package org.apache.eventmesh.registry.etcd.service;

import org.apache.eventmesh.api.exception.RegistryException;
import org.apache.eventmesh.api.registry.bo.EventMeshServicePubTopicInfo;
import org.apache.eventmesh.api.registry.bo.EventMeshAppSubTopicInfo;
import org.apache.eventmesh.api.registry.bo.EventMeshServicePubTopicInfo;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.registry.etcd.constant.EtcdConstant;
Expand Down Expand Up @@ -75,6 +75,7 @@ public List<EventMeshServicePubTopicInfo> findEventMeshServicePubTopicInfos() th
return null;
}

@Nullable
public EventMeshAppSubTopicInfo findEventMeshAppSubTopicInfoByGroup(String group) throws RegistryException {

Client client = getEtcdClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,24 +105,6 @@ public void doAclCheckInTcpSend(String remoteAddr, UserAgent userAgent, String t
aclService.doAclCheckInSend(buildTcpAclProperties(remoteAddr, userAgent, topic, requestCode));
}

public void doAclCheckInTcpReceive(String remoteAddr, UserAgent userAgent, String topic, int requestCode) throws AclException {
aclService.doAclCheckInReceive(buildTcpAclProperties(remoteAddr, userAgent, topic, requestCode));
}

private static AclProperties buildTcpAclProperties(String remoteAddr, UserAgent userAgent, String topic, int requestCode) {
AclProperties aclProperties = new AclProperties();
aclProperties.setClientIp(remoteAddr);
aclProperties.setUser(userAgent.getUsername());
aclProperties.setPwd(userAgent.getPassword());
aclProperties.setSubsystem(userAgent.getSubsystem());
aclProperties.setRequestCode(requestCode);
if (StringUtils.isNotBlank(topic)) {
aclProperties.setTopic(topic);
}

return aclProperties;
}

public void doAclCheckInHttpSend(String remoteAddr, String user, String pass, String subsystem, String topic, int requestCode)
throws AclException {
aclService.doAclCheckInSend(buildHttpAclProperties(remoteAddr, user, pass, subsystem, topic, requestCode));
Expand Down Expand Up @@ -188,6 +170,25 @@ private AclProperties buildHttpAclProperties(String remoteAddr, String user, Str
return aclProperties;
}

private AclProperties buildHttpAclProperties(String remoteAddr, String token, String subsystem, String topic, String requestURI, Object obj) {
AclProperties aclProperties = new AclProperties();
aclProperties.setClientIp(remoteAddr);
aclProperties.setSubsystem(subsystem);
aclProperties.setRequestURI(requestURI);
if (StringUtils.isNotBlank(token)) {
aclProperties.setToken(token);
}
if (StringUtils.isNotBlank(topic)) {
aclProperties.setTopic(topic);
}

if (obj instanceof EventMeshServicePubTopicInfo) {
aclProperties.setExtendedField("group", ((EventMeshServicePubTopicInfo) obj).getService());
aclProperties.setExtendedField("topics", ((EventMeshServicePubTopicInfo) obj).getTopics());
}
return aclProperties;
}

private static AclProperties buildTcpAclProperties(String remoteAddr, UserAgent userAgent, String topic, int requestCode) {
AclProperties aclProperties = new AclProperties();
aclProperties.setClientIp(remoteAddr);
Expand Down Expand Up @@ -233,23 +234,4 @@ private AclProperties buildTcpAclProperties(String remoteAddr, String token, Str
return aclProperties;
}

private AclProperties buildHttpAclProperties(String remoteAddr, String token, String subsystem, String topic, String requestURI, Object obj) {
AclProperties aclProperties = new AclProperties();
aclProperties.setClientIp(remoteAddr);
aclProperties.setSubsystem(subsystem);
aclProperties.setRequestURI(requestURI);
if (StringUtils.isNotBlank(token)) {
aclProperties.setToken(token);
}
if (StringUtils.isNotBlank(topic)) {
aclProperties.setTopic(topic);
}

if (obj instanceof EventMeshServicePubTopicInfo) {
aclProperties.setExtendedField("group", ((EventMeshServicePubTopicInfo) obj).getService());
aclProperties.setExtendedField("topics", ((EventMeshServicePubTopicInfo) obj).getTopics());
}
return aclProperties;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ public void shutdown() throws AclException {

@Override
public void doAclCheckInConnect(AclProperties aclProperties) throws AclException {
AuthTokenUtils.authTokenByPublicKey(aclProperties);
AuthTokenUtils.helloTaskAuthTokenByPublicKey(aclProperties);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import java.util.Set;

import io.jsonwebtoken.Claims;
import io.jsonwebtoken.Jws;
import io.jsonwebtoken.Jwt;
import io.jsonwebtoken.JwtException;
import io.jsonwebtoken.JwtParser;
Expand Down Expand Up @@ -72,7 +71,6 @@ public static void authTokenByPublicKey(AclProperties aclProperties) {
Key validationKey = kf.generatePublic(spec);
JwtParser signedParser = Jwts.parserBuilder().setSigningKey(validationKey).build();
Jwt<?, Claims> signJwt = signedParser.parseClaimsJws(token);
Jws<Claims> signJwt = signedParser.parseClaimsJws(token);
String sub = signJwt.getBody().get("sub", String.class);
if (!sub.contains(aclProperties.getExtendedField("group").toString()) && !sub.contains("pulsar-admin")) {
throw new AclException("group:" + aclProperties.getExtendedField("group ") + " has no auth to access eventMesh:"
Expand All @@ -89,7 +87,6 @@ public static void authTokenByPublicKey(AclProperties aclProperties) {
}

} else {
throw new AclException("invalid token!");
{
throw new AclException("invalid token!");
}
Expand Down

0 comments on commit 30cf6c2

Please sign in to comment.