Skip to content

Commit

Permalink
Add congestion state generation to session, refactor several items.
Browse files Browse the repository at this point in the history
  • Loading branch information
pmoerenhout committed Sep 29, 2021
1 parent 19d4f20 commit 698d8b2
Show file tree
Hide file tree
Showing 24 changed files with 983 additions and 911 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.jsmpp.bean.QuerySm;
import org.jsmpp.bean.ReplaceSm;
import org.jsmpp.bean.SubmitMulti;
import org.jsmpp.bean.SubmitMultiResult;
import org.jsmpp.session.SubmitMultiResult;
import org.jsmpp.bean.SubmitSm;
import org.jsmpp.bean.UnsuccessDelivery;
import org.jsmpp.extra.ProcessRequestException;
Expand All @@ -53,7 +53,7 @@
*/
public class ReceiveSubmittedMessageExample {

private static final Logger LOGGER = LoggerFactory.getLogger(ReceiveSubmittedMessageExample.class);
private static final Logger log = LoggerFactory.getLogger(ReceiveSubmittedMessageExample.class);

public static void main(String[] args) {
try {
Expand All @@ -67,7 +67,7 @@ public static void main(String[] args) {
@Override
public SubmitSmResult onAcceptSubmitSm(SubmitSm submitSm, SMPPServerSession source)
throws ProcessRequestException {
LOGGER.info("Receiving message : {}", new String(submitSm.getShortMessage()));
log.info("Receiving message : {}", new String(submitSm.getShortMessage()));
// need message_id to response submit_sm, optional parameters add in SMPP 5.0
return new SubmitSmResult(messageIdGenerator.newMessageId(), new OptionalParameter[0]);
}
Expand All @@ -83,7 +83,7 @@ public QuerySmResult onAcceptQuerySm(QuerySm querySm,
public SubmitMultiResult onAcceptSubmitMulti(
SubmitMulti submitMulti, SMPPServerSession source)
throws ProcessRequestException {
return new SubmitMultiResult(messageIdGenerator.newMessageId().getValue(), new UnsuccessDelivery[]{});
return new SubmitMultiResult(messageIdGenerator.newMessageId().getValue(), new UnsuccessDelivery[]{}, new OptionalParameter[]{});
}

@Override
Expand Down Expand Up @@ -124,25 +124,25 @@ public QueryBroadcastSmResult onAcceptQueryBroadcastSm(final QueryBroadcastSm qu
}
};

LOGGER.info("Listening ...");
log.info("Listening ...");
SMPPServerSessionListener sessionListener = new SMPPServerSessionListener(8056);
// set all default ServerMessageReceiverListener for all accepted SMPPServerSessionListener
sessionListener.setMessageReceiverListener(messageReceiverListener);

// accepting connection, session still in OPEN state
SMPPServerSession session = sessionListener.accept();
// or we can set for each accepted session session.setMessageReceiverListener(messageReceiverListener)
LOGGER.info("Accept connection");
log.info("Accept connection");

try {
BindRequest request = session.waitForBind(5000);
LOGGER.info("Receive bind request for system id {} and password {}", request.getSystemId(), request.getPassword());
log.info("Receive bind request for system id {} and password {}", request.getSystemId(), request.getPassword());

if ("test".equals(request.getSystemId()) &&
"test".equals(request.getPassword())) {

// accepting request and send bind response immediately
LOGGER.info("Accepting bind request");
log.info("Accepting bind request");
request.accept("sys");

try {
Expand All @@ -152,21 +152,21 @@ public QueryBroadcastSmResult onAcceptQueryBroadcastSm(final QueryBroadcastSm qu
Thread.currentThread().interrupt();
}
} else {
LOGGER.info("Rejecting bind request");
log.info("Rejecting bind request");
request.reject(SMPPConstant.STAT_ESME_RINVPASWD);
}
} catch (TimeoutException e) {
LOGGER.error("No binding request made after 5000 millisecond", e);
log.error("No binding request made after 5000 millisecond", e);
}

LOGGER.info("Closing session");
log.info("Closing session");
session.unbindAndClose();
LOGGER.info("Closing session listener");
log.info("Closing session listener");
sessionListener.close();
} catch (PDUStringException e) {
LOGGER.error("PDUString exception", e);
log.error("PDUString exception", e);
} catch (IOException e) {
LOGGER.error("I/O exception", e);
log.error("I/O exception", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
import org.jsmpp.bean.ReplaceSm;
import org.jsmpp.bean.SMSCDeliveryReceipt;
import org.jsmpp.bean.SubmitMulti;
import org.jsmpp.bean.SubmitMultiResult;
import org.jsmpp.session.SubmitMultiResult;
import org.jsmpp.bean.SubmitSm;
import org.jsmpp.bean.TypeOfNumber;
import org.jsmpp.bean.UnsuccessDelivery;
Expand Down Expand Up @@ -95,10 +95,10 @@ public class SMPPServerSimulator extends ServerResponseDeliveryAdapter implement
private final ExecutorService execService = Executors.newFixedThreadPool(5);
private final ExecutorService execServiceDelReceipt = Executors.newFixedThreadPool(100);
private final MessageIDGenerator messageIDGenerator = new RandomMessageIDGenerator();
private boolean useSsl;
private int port;
private String systemId;
private String password;
private final boolean useSsl;
private final int port;
private final String systemId;
private final String password;

public SMPPServerSimulator(boolean useSsl, int port, String systemId, String password) {
this.useSsl = useSsl;
Expand All @@ -109,6 +109,7 @@ public SMPPServerSimulator(boolean useSsl, int port, String systemId, String pas

@Override
public void run() {
boolean running = true;
/*
* for SSL use the SSLServerSocketConnectionFactory() or DefaultSSLServerSocketConnectionFactory()
*/
Expand All @@ -119,7 +120,7 @@ public void run() {
* for SSL use the SSLServerSocketConnectionFactory() or DefaultSSLServerSocketConnectionFactory()
*/
log.info("Listening on port {}{}", port, useSsl ? " (SSL)" : "");
while (true) {
while (running) {
SMPPServerSession serverSession = sessionListener.accept();
log.info("Accepted connection with session {}", serverSession.getSessionId());
serverSession.setMessageReceiverListener(this);
Expand All @@ -140,9 +141,10 @@ public void run() {
} catch (InterruptedException e){
log.info("Interrupted WaitBind task: {}", e.getMessage());
Thread.currentThread().interrupt();
throw new RuntimeException(e);
running = false;
} catch (ExecutionException e){
log.info("Exception on execute WaitBind task: {}", e.getMessage());
running = false;
} catch (NegativeResponseException | ResponseTimeoutException | PDUException | InvalidResponseException e){
log.info("Could not send deliver_sm: {}", e.getMessage());
}
Expand All @@ -169,7 +171,13 @@ public SubmitSmResult onAcceptSubmitSm(SubmitSm submitSm,
/*
* SMPP 5.0 allows the following optional parameters (SMPP 5.0 paragraph 4.2.5):
* additional_status_info_text, delivery_failure_reason, dpf_result, network_error_code
* Add the congestionState for SMPP 5.0 connections.
*/
if (source.getInterfaceVersion().value() >= InterfaceVersion.IF_50.value()) {
final int congestionRatio = source.getCongestionRatio();
OptionalParameter.Congestion_state congestionState = new OptionalParameter.Congestion_state((byte) congestionRatio);
return new SubmitSmResult(messageId, new OptionalParameter[]{ congestionState });
}
return new SubmitSmResult(messageId, new OptionalParameter[0]);
}

Expand All @@ -189,7 +197,17 @@ public SubmitMultiResult onAcceptSubmitMulti(SubmitMulti submitMulti, SMPPServer
|| SMSCDeliveryReceipt.SUCCESS_FAILURE.containedIn(submitMulti.getRegisteredDelivery())) {
execServiceDelReceipt.execute(new DeliveryReceiptTask(source, submitMulti, messageId));
}
return new SubmitMultiResult(messageId.getValue(), new UnsuccessDelivery[0]);
/*
* SMPP 5.0 allows the following optional parameters (SMPP 5.0 paragraph 4.2.5):
* additional_status_info_text, delivery_failure_reason, dpf_result, network_error_code
* Add the congestionState for SMPP 5.0 connections.
*/
if (source.getInterfaceVersion().value() >= InterfaceVersion.IF_50.value()) {
final int congestionRatio = source.getCongestionRatio();
OptionalParameter.Congestion_state congestionState = new OptionalParameter.Congestion_state((byte) congestionRatio);
return new SubmitMultiResult(messageId.getValue(), new UnsuccessDelivery[0], new OptionalParameter[]{ congestionState });
}
return new SubmitMultiResult(messageId.getValue(), new UnsuccessDelivery[0], new OptionalParameter[0]);
}

@Override
Expand Down Expand Up @@ -239,8 +257,8 @@ public QueryBroadcastSmResult onAcceptQueryBroadcastSm(final QueryBroadcastSm qu
private static class WaitBindTask implements Callable<Boolean> {
private final SMPPServerSession serverSession;
private final long timeout;
private String systemId;
private String password;
private final String systemId;
private final String password;

public WaitBindTask(SMPPServerSession serverSession, long timeout, String systemId, String password) {
this.serverSession = serverSession;
Expand Down
26 changes: 20 additions & 6 deletions jsmpp-examples/src/main/java/org/jsmpp/examples/StressClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,17 @@
import org.jsmpp.bean.BindType;
import org.jsmpp.bean.DataCodings;
import org.jsmpp.bean.ESMClass;
import org.jsmpp.bean.InterfaceVersion;
import org.jsmpp.bean.NumberingPlanIndicator;
import org.jsmpp.bean.OptionalParameter;
import org.jsmpp.bean.OptionalParameters;
import org.jsmpp.bean.RegisteredDelivery;
import org.jsmpp.bean.TypeOfNumber;
import org.jsmpp.extra.NegativeResponseException;
import org.jsmpp.extra.ResponseTimeoutException;
import org.jsmpp.session.BindParameter;
import org.jsmpp.session.SMPPSession;
import org.jsmpp.session.SubmitSmResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -65,8 +70,8 @@ public class StressClient implements Runnable {
private static final Integer DEFAULT_PORT = 8056;
private static final Long DEFAULT_TRANSACTIONTIMER = 2000L;
private static final Integer DEFAULT_BULK_SIZE = 100000;
private static final Integer DEFAULT_PROCESSOR_DEGREE = 3;
private static final Integer DEFAULT_MAX_OUTSTANDING = 10;
private static final Integer DEFAULT_PROCESSOR_DEGREE = 10;
private static final Integer DEFAULT_MAX_OUTSTANDING = 100;

private AtomicInteger requestCounter = new AtomicInteger();
private AtomicInteger totalRequestCounter = new AtomicInteger();
Expand Down Expand Up @@ -172,9 +177,13 @@ public void run() {
new TrafficWatcherThread().start();

try {
smppSession.connectAndBind(host, port, BindType.BIND_TRX, systemId,
password, "cln", TypeOfNumber.UNKNOWN,
NumberingPlanIndicator.UNKNOWN, null);
final BindParameter bindParameter = new BindParameter(BindType.BIND_TRX, systemId,
password, "CLN", TypeOfNumber.UNKNOWN,
NumberingPlanIndicator.UNKNOWN, null, InterfaceVersion.IF_50);
smppSession.connectAndBind(host, port, bindParameter);
// smppSession.connectAndBind(host, port, BindType.BIND_TRX, systemId,
// password, "cln", TypeOfNumber.UNKNOWN,
// NumberingPlanIndicator.UNKNOWN, null);
log.info("Bound to {}:{}", host, port);

log.info("Starting to send {} bulk messages", bulkSize);
Expand Down Expand Up @@ -202,7 +211,7 @@ public void run() {
try {
requestCounter.incrementAndGet();
long startTime = System.currentTimeMillis();
smppSession.submitShortMessage(null, TypeOfNumber.UNKNOWN, NumberingPlanIndicator.UNKNOWN, sourceAddr,
SubmitSmResult submitSmResult = smppSession.submitShortMessage(null, TypeOfNumber.UNKNOWN, NumberingPlanIndicator.UNKNOWN, sourceAddr,
TypeOfNumber.UNKNOWN, NumberingPlanIndicator.UNKNOWN, destinationAddr,
new ESMClass(), (byte) 0, (byte) 0,
null, null, new RegisteredDelivery(0),
Expand All @@ -211,6 +220,11 @@ null, null, new RegisteredDelivery(0),
(byte) 0, message.getBytes());
log.info("There are {} unacknowledged requests", smppSession.getUnacknowledgedRequests());

OptionalParameter.Congestion_state congestionState = OptionalParameters.get(OptionalParameter.Congestion_state.class, submitSmResult.getOptionalParameters());
if (congestionState != null) {
log.info("Remote congestion state: {}", (congestionState.getValue() & 0xff));
}

long delay = System.currentTimeMillis() - startTime;
responseCounter.incrementAndGet();
if (maxDelay.get() < delay) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@
import org.jsmpp.bean.ReplaceSm;
import org.jsmpp.bean.SMSCDeliveryReceipt;
import org.jsmpp.bean.SubmitMulti;
import org.jsmpp.bean.SubmitMultiResult;
import org.jsmpp.bean.SubmitSm;
import org.jsmpp.bean.TypeOfNumber;
import org.jsmpp.bean.UnsuccessDelivery;
import org.jsmpp.extra.ProcessRequestException;
import org.jsmpp.extra.SessionState;
import org.jsmpp.session.BindRequest;
Expand All @@ -60,6 +60,7 @@
import org.jsmpp.session.ServerMessageReceiverListener;
import org.jsmpp.session.Session;
import org.jsmpp.session.SessionStateListener;
import org.jsmpp.session.SubmitMultiResult;
import org.jsmpp.session.SubmitSmResult;
import org.jsmpp.util.AbsoluteTimeFormatter;
import org.jsmpp.util.DeliveryReceiptState;
Expand All @@ -79,7 +80,7 @@ public class StressServer implements Runnable, ServerMessageReceiverListener {
private static final int DEFAULT_MAX_WAIT_BIND = 5;
private static final int DEFAULT_MAX_DELIVERIES = 5;
private static final int DEFAULT_PORT = 8056;
private static final int DEFAULT_PROCESSOR_DEGREE = 3;
private static final int DEFAULT_PROCESSOR_DEGREE = 10;
private static final String CANCELSM_NOT_IMPLEMENTED = "cancel_sm not implemented";
private static final String REPLACESM_NOT_IMPLEMENTED = "replace_sm not implemented";
private final ExecutorService waitBindExecService = Executors.newFixedThreadPool(DEFAULT_MAX_WAIT_BIND);
Expand Down Expand Up @@ -153,7 +154,7 @@ public SubmitMultiResult onAcceptSubmitMulti(SubmitMulti submitMulti,
MessageId messageId = messageIDGenerator.newMessageId();
log.info("Receiving submit_multi {}, and return message id {}", new String(submitMulti.getShortMessage()), messageId.getValue());
requestCounter.incrementAndGet();
return new SubmitMultiResult(messageId.getValue());
return new SubmitMultiResult(messageId.getValue(), new UnsuccessDelivery[0], new OptionalParameter[0]);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.jsmpp.bean.RegisteredDelivery;
import org.jsmpp.bean.ReplaceIfPresentFlag;
import org.jsmpp.bean.SMSCDeliveryReceipt;
import org.jsmpp.bean.SubmitMultiResult;
import org.jsmpp.session.SubmitMultiResult;
import org.jsmpp.bean.TypeOfNumber;
import org.jsmpp.bean.UnsuccessDelivery;
import org.jsmpp.extra.NegativeResponseException;
Expand Down
2 changes: 1 addition & 1 deletion jsmpp/src/main/java/org/jsmpp/bean/MessageRequest.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
Expand Down
12 changes: 5 additions & 7 deletions jsmpp/src/main/java/org/jsmpp/bean/OptionalParameters.java
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package org.jsmpp.bean;

Expand Down Expand Up @@ -286,7 +286,6 @@ public static <U extends OptionalParameter> U get(Class<U> tagClass, OptionalPar
}
}
}
log.trace("Optional Parameter Tag {} not found", tagClass);
return null;
}

Expand All @@ -299,7 +298,6 @@ public static OptionalParameter get(short tag, OptionalParameter[] parameters)
}
}
}
log.trace("Optional Parameter Tag {} not found", tag);
return null;
}
}
Loading

0 comments on commit 698d8b2

Please sign in to comment.