Skip to content

Commit

Permalink
What type of PR is this?
Browse files Browse the repository at this point in the history
 feature
What this PR does / why we need it:
 get slot by dataInfoId from session
  • Loading branch information
yuzhi.lyz committed Dec 24, 2020
1 parent a8067c6 commit 4fe6a8c
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 19 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
<lookout.version>1.5.2</lookout.version>
<mockito.version>1.10.19</mockito.version>
<powermock.version>1.6.6</powermock.version>
<jraft.version>1.3.5.Alpha1</jraft.version>
<jraft.version>1.3.5</jraft.version>
<metrics.version>4.0.2</metrics.version>
<commons-io.version>2.4</commons-io.version>
<jetty.version>[9.4.17.v20190418,9.4.19.v20190610]</jetty.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,17 @@
*/
package com.alipay.sofa.registry.common.model.slot;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.io.Serializable;
import java.util.*;

/**
* @author yuzhi.lyz
* @version v 0.1 2020-10-30 10:12 yuzhi.lyz Exp $
*/
public final class Slot implements Serializable {
public final class Slot implements Serializable, Cloneable {
public enum Role {
Leader, Follower,
}
Expand All @@ -33,15 +36,18 @@ public enum Role {
private final long leaderEpoch;
private final Set<String> followers;

public Slot(int id, String leader, long leaderEpoch, Collection<String> followers) {
@JsonCreator(mode = JsonCreator.Mode.PROPERTIES)
public Slot(@JsonProperty("id") int id, @JsonProperty("leader") String leader,
@JsonProperty("leaderEpoch") long leaderEpoch,
@JsonProperty("followers") Collection<String> followers) {
this.id = id;
this.leader = leader;
this.leaderEpoch = leaderEpoch;
this.followers = Collections.unmodifiableSet(new HashSet<>(followers));
}

@Override
protected Slot clone() throws CloneNotSupportedException {
protected Slot clone() {
return new Slot(this.id, this.leader, this.leaderEpoch, this.followers);
}

Expand Down Expand Up @@ -85,7 +91,7 @@ public boolean equals(Object o) {
return false;
Slot slot = (Slot) o;
return id == slot.id && leaderEpoch == slot.leaderEpoch
&& Objects.equals(leader, slot.leader) && Objects.equals(followers, slot.followers);
&& Objects.equals(leader, slot.leader) && Objects.equals(followers, slot.followers);
}

@Override
Expand All @@ -96,6 +102,6 @@ public int hashCode() {
@Override
public String toString() {
return "Slot{" + "id=" + id + ", leader='" + leader + '\'' + ", leaderEpoch=" + leaderEpoch
+ ", followers=" + followers + '}';
+ ", followers=" + followers + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@
*/
package com.alipay.sofa.registry.server.session.resource;

import com.alipay.sofa.registry.common.model.slot.Slot;
import com.alipay.sofa.registry.server.session.bootstrap.SessionServerConfig;
import com.alipay.sofa.registry.server.session.slot.SlotTableCache;
import com.alipay.sofa.registry.server.shared.meta.MetaServerService;
import com.alipay.sofa.registry.util.ParaCheckUtil;
import com.google.common.base.Joiner;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -27,7 +30,9 @@
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
Expand All @@ -42,7 +47,10 @@ public class SessionOpenResource {
private SessionServerConfig sessionServerConfig;

@Autowired
private MetaServerService mataNodeService;
private MetaServerService mataNodeService;

@Autowired
private SlotTableCache slotTableCache;

@GET
@Path("query.json")
Expand Down Expand Up @@ -91,4 +99,13 @@ private List<String> getSessionServers(String zone) {
.collect(Collectors.toList());
return serverList;
}

@GET
@Path("slot")
@Produces(MediaType.APPLICATION_JSON)
public Slot getSlot(@QueryParam("dataInfoId") String dataInfoId) {
ParaCheckUtil.checkNotBlank(dataInfoId,"dataInfoId");
return slotTableCache.getSlot(dataInfoId);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -293,8 +293,8 @@ private void fireUserDataElementPushTask(Datum datum, Subscriber subscriber,

taskLogger.info(
"send {} taskURL:{},dataInfoId={},dataCenter={},pubSize={},subSize={},taskId={}",
taskEvent.getTaskType(), subscriber.getSourceAddress(), dataInfoId, dataCenter, size,
subscribers.size(), taskEvent.getTaskId());
taskEvent.getTaskType(), subscriber.getSourceAddress(), dataInfoId, dataCenter, size,
subscribers.size(), taskEvent.getTaskId());
taskListenerManager.sendTaskEvent(taskEvent);
}

Expand All @@ -315,8 +315,8 @@ private void fireUserDataElementMultiPushTask(Datum datum, Subscriber subscriber

taskLogger.info(
"send {} taskURL:{},dataInfoId={},dataCenter={},pubSize={},subSize={},taskId={}",
taskEvent.getTaskType(), subscriber.getSourceAddress(), dataInfoId, dataCenter, size,
subscribers.size(), taskEvent.getTaskId());
taskEvent.getTaskType(), subscriber.getSourceAddress(), dataInfoId, dataCenter, size,
subscribers.size(), taskEvent.getTaskId());
taskListenerManager.sendTaskEvent(taskEvent);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,27 +34,27 @@
public class AppPublisherConverterTest {

@Test
public void testConvert() throws Exception{
public void testConvert() throws Exception {

String box = "{\"url\":\"127.0.0.1:8080\",\"revision\":\"faf447f9a7990b4be937f0e06664ee41\",\"baseParams\":{\"a\":[\"2\"]},"
+ "\"interfaceParams\":{\"com.alipay.test.Simple4#@#DEFAULT_INSTANCE_ID#@#DEFAULT_GROUP\":{},\"com.alipay.test"
+ ".Simple5#@#DEFAULT_INSTANCE_ID#@#DEFAULT_GROUP\":{\"b\":[\"3\",\"4\"]}}}";

ObjectMapper mapper = JsonUtils.getJacksonObjectMapper();
Map<String,Object> jsonObject = mapper.readValue(box, HashMap.class);
Map<String, Object> jsonObject = mapper.readValue(box, HashMap.class);
AppRegisterServerDataBox serverDataBox = new AppRegisterServerDataBox();
serverDataBox.setUrl((String) jsonObject.get(AppRegisterConstant.URL_KEY));
serverDataBox.setRevision((String) jsonObject.get(AppRegisterConstant.REVISION_KEY));
serverDataBox.setBaseParams((Map)jsonObject.get(AppRegisterConstant.BASE_PARAMS_KEY));
serverDataBox.setInterfaceParams((Map) jsonObject.get(AppRegisterConstant.INTERFACE_PARAMS_KEY));
serverDataBox.setBaseParams((Map) jsonObject.get(AppRegisterConstant.BASE_PARAMS_KEY));
serverDataBox.setInterfaceParams((Map) jsonObject
.get(AppRegisterConstant.INTERFACE_PARAMS_KEY));
Assert.assertEquals(serverDataBox.getBaseParams().get("a").size(), 1);
Assert.assertEquals(
serverDataBox.getInterfaceParams()
.get("com.alipay.test.Simple5#@#DEFAULT_INSTANCE_ID#@#DEFAULT_GROUP").get("b")
.size(), 2);

AppRegisterServerDataBox dataBox = mapper.readValue(box,
AppRegisterServerDataBox.class);
AppRegisterServerDataBox dataBox = mapper.readValue(box, AppRegisterServerDataBox.class);
Assert.assertEquals(
dataBox.getInterfaceParams()
.get("com.alipay.test.Simple5#@#DEFAULT_INSTANCE_ID#@#DEFAULT_GROUP").get("b")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,11 @@ protected ClientSideExchanger(String serverType) {
}

@PostConstruct
public void init(){
public void init() {
ConcurrentUtils.createDaemonThread(serverType + "-async-connector", connector).start();
LOGGER.info("init connector");
}


@Override
public Response request(Request request) throws RequestException {
if (LOGGER.isDebugEnabled()) {
Expand Down

0 comments on commit 4fe6a8c

Please sign in to comment.