diff --git a/jsmpp-examples/src/main/java/org/jsmpp/examples/ReceiveSubmittedMessageExample.java b/jsmpp-examples/src/main/java/org/jsmpp/examples/ReceiveSubmittedMessageExample.java index 960431e3..657cdcc0 100644 --- a/jsmpp-examples/src/main/java/org/jsmpp/examples/ReceiveSubmittedMessageExample.java +++ b/jsmpp-examples/src/main/java/org/jsmpp/examples/ReceiveSubmittedMessageExample.java @@ -28,7 +28,7 @@ import org.jsmpp.bean.QuerySm; import org.jsmpp.bean.ReplaceSm; import org.jsmpp.bean.SubmitMulti; -import org.jsmpp.bean.SubmitMultiResult; +import org.jsmpp.session.SubmitMultiResult; import org.jsmpp.bean.SubmitSm; import org.jsmpp.bean.UnsuccessDelivery; import org.jsmpp.extra.ProcessRequestException; @@ -53,7 +53,7 @@ */ public class ReceiveSubmittedMessageExample { - private static final Logger LOGGER = LoggerFactory.getLogger(ReceiveSubmittedMessageExample.class); + private static final Logger log = LoggerFactory.getLogger(ReceiveSubmittedMessageExample.class); public static void main(String[] args) { try { @@ -67,7 +67,7 @@ public static void main(String[] args) { @Override public SubmitSmResult onAcceptSubmitSm(SubmitSm submitSm, SMPPServerSession source) throws ProcessRequestException { - LOGGER.info("Receiving message : {}", new String(submitSm.getShortMessage())); + log.info("Receiving message : {}", new String(submitSm.getShortMessage())); // need message_id to response submit_sm, optional parameters add in SMPP 5.0 return new SubmitSmResult(messageIdGenerator.newMessageId(), new OptionalParameter[0]); } @@ -83,7 +83,7 @@ public QuerySmResult onAcceptQuerySm(QuerySm querySm, public SubmitMultiResult onAcceptSubmitMulti( SubmitMulti submitMulti, SMPPServerSession source) throws ProcessRequestException { - return new SubmitMultiResult(messageIdGenerator.newMessageId().getValue(), new UnsuccessDelivery[]{}); + return new SubmitMultiResult(messageIdGenerator.newMessageId().getValue(), new UnsuccessDelivery[]{}, new OptionalParameter[]{}); } @Override @@ -124,7 +124,7 @@ public QueryBroadcastSmResult onAcceptQueryBroadcastSm(final QueryBroadcastSm qu } }; - LOGGER.info("Listening ..."); + log.info("Listening ..."); SMPPServerSessionListener sessionListener = new SMPPServerSessionListener(8056); // set all default ServerMessageReceiverListener for all accepted SMPPServerSessionListener sessionListener.setMessageReceiverListener(messageReceiverListener); @@ -132,17 +132,17 @@ public QueryBroadcastSmResult onAcceptQueryBroadcastSm(final QueryBroadcastSm qu // accepting connection, session still in OPEN state SMPPServerSession session = sessionListener.accept(); // or we can set for each accepted session session.setMessageReceiverListener(messageReceiverListener) - LOGGER.info("Accept connection"); + log.info("Accept connection"); try { BindRequest request = session.waitForBind(5000); - LOGGER.info("Receive bind request for system id {} and password {}", request.getSystemId(), request.getPassword()); + log.info("Receive bind request for system id {} and password {}", request.getSystemId(), request.getPassword()); if ("test".equals(request.getSystemId()) && "test".equals(request.getPassword())) { // accepting request and send bind response immediately - LOGGER.info("Accepting bind request"); + log.info("Accepting bind request"); request.accept("sys"); try { @@ -152,21 +152,21 @@ public QueryBroadcastSmResult onAcceptQueryBroadcastSm(final QueryBroadcastSm qu Thread.currentThread().interrupt(); } } else { - LOGGER.info("Rejecting bind request"); + log.info("Rejecting bind request"); request.reject(SMPPConstant.STAT_ESME_RINVPASWD); } } catch (TimeoutException e) { - LOGGER.error("No binding request made after 5000 millisecond", e); + log.error("No binding request made after 5000 millisecond", e); } - LOGGER.info("Closing session"); + log.info("Closing session"); session.unbindAndClose(); - LOGGER.info("Closing session listener"); + log.info("Closing session listener"); sessionListener.close(); } catch (PDUStringException e) { - LOGGER.error("PDUString exception", e); + log.error("PDUString exception", e); } catch (IOException e) { - LOGGER.error("I/O exception", e); + log.error("I/O exception", e); } } } diff --git a/jsmpp-examples/src/main/java/org/jsmpp/examples/SMPPServerSimulator.java b/jsmpp-examples/src/main/java/org/jsmpp/examples/SMPPServerSimulator.java index 1357d612..8e88a675 100644 --- a/jsmpp-examples/src/main/java/org/jsmpp/examples/SMPPServerSimulator.java +++ b/jsmpp-examples/src/main/java/org/jsmpp/examples/SMPPServerSimulator.java @@ -47,7 +47,7 @@ import org.jsmpp.bean.ReplaceSm; import org.jsmpp.bean.SMSCDeliveryReceipt; import org.jsmpp.bean.SubmitMulti; -import org.jsmpp.bean.SubmitMultiResult; +import org.jsmpp.session.SubmitMultiResult; import org.jsmpp.bean.SubmitSm; import org.jsmpp.bean.TypeOfNumber; import org.jsmpp.bean.UnsuccessDelivery; @@ -95,10 +95,10 @@ public class SMPPServerSimulator extends ServerResponseDeliveryAdapter implement private final ExecutorService execService = Executors.newFixedThreadPool(5); private final ExecutorService execServiceDelReceipt = Executors.newFixedThreadPool(100); private final MessageIDGenerator messageIDGenerator = new RandomMessageIDGenerator(); - private boolean useSsl; - private int port; - private String systemId; - private String password; + private final boolean useSsl; + private final int port; + private final String systemId; + private final String password; public SMPPServerSimulator(boolean useSsl, int port, String systemId, String password) { this.useSsl = useSsl; @@ -109,6 +109,7 @@ public SMPPServerSimulator(boolean useSsl, int port, String systemId, String pas @Override public void run() { + boolean running = true; /* * for SSL use the SSLServerSocketConnectionFactory() or DefaultSSLServerSocketConnectionFactory() */ @@ -119,7 +120,7 @@ public void run() { * for SSL use the SSLServerSocketConnectionFactory() or DefaultSSLServerSocketConnectionFactory() */ log.info("Listening on port {}{}", port, useSsl ? " (SSL)" : ""); - while (true) { + while (running) { SMPPServerSession serverSession = sessionListener.accept(); log.info("Accepted connection with session {}", serverSession.getSessionId()); serverSession.setMessageReceiverListener(this); @@ -140,9 +141,10 @@ public void run() { } catch (InterruptedException e){ log.info("Interrupted WaitBind task: {}", e.getMessage()); Thread.currentThread().interrupt(); - throw new RuntimeException(e); + running = false; } catch (ExecutionException e){ log.info("Exception on execute WaitBind task: {}", e.getMessage()); + running = false; } catch (NegativeResponseException | ResponseTimeoutException | PDUException | InvalidResponseException e){ log.info("Could not send deliver_sm: {}", e.getMessage()); } @@ -169,7 +171,13 @@ public SubmitSmResult onAcceptSubmitSm(SubmitSm submitSm, /* * SMPP 5.0 allows the following optional parameters (SMPP 5.0 paragraph 4.2.5): * additional_status_info_text, delivery_failure_reason, dpf_result, network_error_code + * Add the congestionState for SMPP 5.0 connections. */ + if (source.getInterfaceVersion().value() >= InterfaceVersion.IF_50.value()) { + final int congestionRatio = source.getCongestionRatio(); + OptionalParameter.Congestion_state congestionState = new OptionalParameter.Congestion_state((byte) congestionRatio); + return new SubmitSmResult(messageId, new OptionalParameter[]{ congestionState }); + } return new SubmitSmResult(messageId, new OptionalParameter[0]); } @@ -189,7 +197,17 @@ public SubmitMultiResult onAcceptSubmitMulti(SubmitMulti submitMulti, SMPPServer || SMSCDeliveryReceipt.SUCCESS_FAILURE.containedIn(submitMulti.getRegisteredDelivery())) { execServiceDelReceipt.execute(new DeliveryReceiptTask(source, submitMulti, messageId)); } - return new SubmitMultiResult(messageId.getValue(), new UnsuccessDelivery[0]); + /* + * SMPP 5.0 allows the following optional parameters (SMPP 5.0 paragraph 4.2.5): + * additional_status_info_text, delivery_failure_reason, dpf_result, network_error_code + * Add the congestionState for SMPP 5.0 connections. + */ + if (source.getInterfaceVersion().value() >= InterfaceVersion.IF_50.value()) { + final int congestionRatio = source.getCongestionRatio(); + OptionalParameter.Congestion_state congestionState = new OptionalParameter.Congestion_state((byte) congestionRatio); + return new SubmitMultiResult(messageId.getValue(), new UnsuccessDelivery[0], new OptionalParameter[]{ congestionState }); + } + return new SubmitMultiResult(messageId.getValue(), new UnsuccessDelivery[0], new OptionalParameter[0]); } @Override @@ -239,8 +257,8 @@ public QueryBroadcastSmResult onAcceptQueryBroadcastSm(final QueryBroadcastSm qu private static class WaitBindTask implements Callable { private final SMPPServerSession serverSession; private final long timeout; - private String systemId; - private String password; + private final String systemId; + private final String password; public WaitBindTask(SMPPServerSession serverSession, long timeout, String systemId, String password) { this.serverSession = serverSession; diff --git a/jsmpp-examples/src/main/java/org/jsmpp/examples/StressClient.java b/jsmpp-examples/src/main/java/org/jsmpp/examples/StressClient.java index 9783cc7b..2b57bf88 100644 --- a/jsmpp-examples/src/main/java/org/jsmpp/examples/StressClient.java +++ b/jsmpp-examples/src/main/java/org/jsmpp/examples/StressClient.java @@ -26,12 +26,17 @@ import org.jsmpp.bean.BindType; import org.jsmpp.bean.DataCodings; import org.jsmpp.bean.ESMClass; +import org.jsmpp.bean.InterfaceVersion; import org.jsmpp.bean.NumberingPlanIndicator; +import org.jsmpp.bean.OptionalParameter; +import org.jsmpp.bean.OptionalParameters; import org.jsmpp.bean.RegisteredDelivery; import org.jsmpp.bean.TypeOfNumber; import org.jsmpp.extra.NegativeResponseException; import org.jsmpp.extra.ResponseTimeoutException; +import org.jsmpp.session.BindParameter; import org.jsmpp.session.SMPPSession; +import org.jsmpp.session.SubmitSmResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,8 +70,8 @@ public class StressClient implements Runnable { private static final Integer DEFAULT_PORT = 8056; private static final Long DEFAULT_TRANSACTIONTIMER = 2000L; private static final Integer DEFAULT_BULK_SIZE = 100000; - private static final Integer DEFAULT_PROCESSOR_DEGREE = 3; - private static final Integer DEFAULT_MAX_OUTSTANDING = 10; + private static final Integer DEFAULT_PROCESSOR_DEGREE = 10; + private static final Integer DEFAULT_MAX_OUTSTANDING = 100; private AtomicInteger requestCounter = new AtomicInteger(); private AtomicInteger totalRequestCounter = new AtomicInteger(); @@ -172,9 +177,13 @@ public void run() { new TrafficWatcherThread().start(); try { - smppSession.connectAndBind(host, port, BindType.BIND_TRX, systemId, - password, "cln", TypeOfNumber.UNKNOWN, - NumberingPlanIndicator.UNKNOWN, null); + final BindParameter bindParameter = new BindParameter(BindType.BIND_TRX, systemId, + password, "CLN", TypeOfNumber.UNKNOWN, + NumberingPlanIndicator.UNKNOWN, null, InterfaceVersion.IF_50); + smppSession.connectAndBind(host, port, bindParameter); +// smppSession.connectAndBind(host, port, BindType.BIND_TRX, systemId, +// password, "cln", TypeOfNumber.UNKNOWN, +// NumberingPlanIndicator.UNKNOWN, null); log.info("Bound to {}:{}", host, port); log.info("Starting to send {} bulk messages", bulkSize); @@ -202,7 +211,7 @@ public void run() { try { requestCounter.incrementAndGet(); long startTime = System.currentTimeMillis(); - smppSession.submitShortMessage(null, TypeOfNumber.UNKNOWN, NumberingPlanIndicator.UNKNOWN, sourceAddr, + SubmitSmResult submitSmResult = smppSession.submitShortMessage(null, TypeOfNumber.UNKNOWN, NumberingPlanIndicator.UNKNOWN, sourceAddr, TypeOfNumber.UNKNOWN, NumberingPlanIndicator.UNKNOWN, destinationAddr, new ESMClass(), (byte) 0, (byte) 0, null, null, new RegisteredDelivery(0), @@ -211,6 +220,11 @@ null, null, new RegisteredDelivery(0), (byte) 0, message.getBytes()); log.info("There are {} unacknowledged requests", smppSession.getUnacknowledgedRequests()); + OptionalParameter.Congestion_state congestionState = OptionalParameters.get(OptionalParameter.Congestion_state.class, submitSmResult.getOptionalParameters()); + if (congestionState != null) { + log.info("Remote congestion state: {}", (congestionState.getValue() & 0xff)); + } + long delay = System.currentTimeMillis() - startTime; responseCounter.incrementAndGet(); if (maxDelay.get() < delay) { diff --git a/jsmpp-examples/src/main/java/org/jsmpp/examples/StressServer.java b/jsmpp-examples/src/main/java/org/jsmpp/examples/StressServer.java index 05c9d218..f18e2530 100644 --- a/jsmpp-examples/src/main/java/org/jsmpp/examples/StressServer.java +++ b/jsmpp-examples/src/main/java/org/jsmpp/examples/StressServer.java @@ -45,9 +45,9 @@ import org.jsmpp.bean.ReplaceSm; import org.jsmpp.bean.SMSCDeliveryReceipt; import org.jsmpp.bean.SubmitMulti; -import org.jsmpp.bean.SubmitMultiResult; import org.jsmpp.bean.SubmitSm; import org.jsmpp.bean.TypeOfNumber; +import org.jsmpp.bean.UnsuccessDelivery; import org.jsmpp.extra.ProcessRequestException; import org.jsmpp.extra.SessionState; import org.jsmpp.session.BindRequest; @@ -60,6 +60,7 @@ import org.jsmpp.session.ServerMessageReceiverListener; import org.jsmpp.session.Session; import org.jsmpp.session.SessionStateListener; +import org.jsmpp.session.SubmitMultiResult; import org.jsmpp.session.SubmitSmResult; import org.jsmpp.util.AbsoluteTimeFormatter; import org.jsmpp.util.DeliveryReceiptState; @@ -79,7 +80,7 @@ public class StressServer implements Runnable, ServerMessageReceiverListener { private static final int DEFAULT_MAX_WAIT_BIND = 5; private static final int DEFAULT_MAX_DELIVERIES = 5; private static final int DEFAULT_PORT = 8056; - private static final int DEFAULT_PROCESSOR_DEGREE = 3; + private static final int DEFAULT_PROCESSOR_DEGREE = 10; private static final String CANCELSM_NOT_IMPLEMENTED = "cancel_sm not implemented"; private static final String REPLACESM_NOT_IMPLEMENTED = "replace_sm not implemented"; private final ExecutorService waitBindExecService = Executors.newFixedThreadPool(DEFAULT_MAX_WAIT_BIND); @@ -153,7 +154,7 @@ public SubmitMultiResult onAcceptSubmitMulti(SubmitMulti submitMulti, MessageId messageId = messageIDGenerator.newMessageId(); log.info("Receiving submit_multi {}, and return message id {}", new String(submitMulti.getShortMessage()), messageId.getValue()); requestCounter.incrementAndGet(); - return new SubmitMultiResult(messageId.getValue()); + return new SubmitMultiResult(messageId.getValue(), new UnsuccessDelivery[0], new OptionalParameter[0]); } @Override diff --git a/jsmpp-examples/src/main/java/org/jsmpp/examples/SubmitMultiExample.java b/jsmpp-examples/src/main/java/org/jsmpp/examples/SubmitMultiExample.java index b0b0b021..ca5004b3 100644 --- a/jsmpp-examples/src/main/java/org/jsmpp/examples/SubmitMultiExample.java +++ b/jsmpp-examples/src/main/java/org/jsmpp/examples/SubmitMultiExample.java @@ -29,7 +29,7 @@ import org.jsmpp.bean.RegisteredDelivery; import org.jsmpp.bean.ReplaceIfPresentFlag; import org.jsmpp.bean.SMSCDeliveryReceipt; -import org.jsmpp.bean.SubmitMultiResult; +import org.jsmpp.session.SubmitMultiResult; import org.jsmpp.bean.TypeOfNumber; import org.jsmpp.bean.UnsuccessDelivery; import org.jsmpp.extra.NegativeResponseException; diff --git a/jsmpp/src/main/java/org/jsmpp/bean/MessageRequest.java b/jsmpp/src/main/java/org/jsmpp/bean/MessageRequest.java index 29ff6708..2ba71fd7 100644 --- a/jsmpp/src/main/java/org/jsmpp/bean/MessageRequest.java +++ b/jsmpp/src/main/java/org/jsmpp/bean/MessageRequest.java @@ -1,6 +1,6 @@ /* * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. + * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 diff --git a/jsmpp/src/main/java/org/jsmpp/bean/OptionalParameters.java b/jsmpp/src/main/java/org/jsmpp/bean/OptionalParameters.java index dd8edeee..97635d08 100644 --- a/jsmpp/src/main/java/org/jsmpp/bean/OptionalParameters.java +++ b/jsmpp/src/main/java/org/jsmpp/bean/OptionalParameters.java @@ -1,16 +1,16 @@ /* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * + * */ package org.jsmpp.bean; @@ -286,7 +286,6 @@ public static U get(Class tagClass, OptionalPar } } } - log.trace("Optional Parameter Tag {} not found", tagClass); return null; } @@ -299,7 +298,6 @@ public static OptionalParameter get(short tag, OptionalParameter[] parameters) } } } - log.trace("Optional Parameter Tag {} not found", tag); return null; } } diff --git a/jsmpp/src/main/java/org/jsmpp/session/AbstractSession.java b/jsmpp/src/main/java/org/jsmpp/session/AbstractSession.java index 48a54265..afb47234 100644 --- a/jsmpp/src/main/java/org/jsmpp/session/AbstractSession.java +++ b/jsmpp/src/main/java/org/jsmpp/session/AbstractSession.java @@ -53,7 +53,7 @@ public abstract class AbstractSession implements Session, Closeable { private static final Logger log = LoggerFactory.getLogger(AbstractSession.class); private static final Random random = new Random(); - private final Map> pendingResponse = new ConcurrentHashMap<>(); + private final Map> pendingResponses = new ConcurrentHashMap<>(); private final Sequence sequence = new Sequence(1); private final PDUSender pduSender; private int pduProcessorDegree = 3; @@ -82,7 +82,7 @@ protected Sequence sequence() { } protected PendingResponse removePendingResponse(int sequenceNumber) { - return pendingResponse.remove(sequenceNumber); + return pendingResponses.remove(sequenceNumber); } @Override @@ -129,7 +129,7 @@ public long getTransactionTimer() { @Override public int getUnacknowledgedRequests() { - return this.pendingResponse.size(); + return this.pendingResponses.size(); } @Override @@ -185,18 +185,19 @@ public int getPduProcessorDegree() { } /** - * Get the capacity of the working queue for PDU processing. The default is 100. + * Get the capacity of the receiving working queue for PDU processing. The default is 100. + * If the all threads (pduProcessorDegree) are busy, they are waiting in the work queue. * - * @return the ThreadPoolExecutor queue capacity. + * @return the ThreadPoolExecutor work queue capacity. */ public int getQueueCapacity() { return queueCapacity; } /** - * Set the capacity of the working queue for PDU processing. + * Set the capacity of the receiving work queue for PDU processing. * - * @param queueCapacity the capacity of the working queue for receive and transmit + * @param queueCapacity the capacity of the work queue for receive working threads */ public void setQueueCapacity(final int queueCapacity) { this.queueCapacity = queueCapacity; @@ -316,7 +317,7 @@ protected Command executeSendCommand(SendCommandTask task, long timeout) int seqNum = sequence.nextValue(); PendingResponse pendingResp = new PendingResponse<>(timeout); - pendingResponse.put(seqNum, pendingResp); + pendingResponses.put(seqNum, pendingResp); try { task.executeTask(connection().getOutputStream(), seqNum); } catch (IOException e) { @@ -325,7 +326,7 @@ protected Command executeSendCommand(SendCommandTask task, long timeout) if("enquire_link".equals(task.getCommandName())) { log.info("Ignore failure of sending enquire_link, wait to see if connection is restored"); } else { - pendingResponse.remove(seqNum); + pendingResponses.remove(seqNum); close(); throw e; } @@ -341,13 +342,13 @@ protected Command executeSendCommand(SendCommandTask task, long timeout) log.debug("{} response with sequence_number {} received for session {}", task.getCommandName(), seqNum, sessionId); } } catch (ResponseTimeoutException e) { - pendingResponse.remove(seqNum); + pendingResponses.remove(seqNum); throw new ResponseTimeoutException("No response after waiting for " + timeout + " millis when executing " + task.getCommandName() + " with session " + sessionId + " and sequence_number " + seqNum, e); } catch (InvalidResponseException e) { - pendingResponse.remove(seqNum); + pendingResponses.remove(seqNum); throw e; } diff --git a/jsmpp/src/main/java/org/jsmpp/session/ClientSession.java b/jsmpp/src/main/java/org/jsmpp/session/ClientSession.java index 751b796b..31d6b986 100644 --- a/jsmpp/src/main/java/org/jsmpp/session/ClientSession.java +++ b/jsmpp/src/main/java/org/jsmpp/session/ClientSession.java @@ -26,7 +26,6 @@ import org.jsmpp.bean.OptionalParameter; import org.jsmpp.bean.RegisteredDelivery; import org.jsmpp.bean.ReplaceIfPresentFlag; -import org.jsmpp.bean.SubmitMultiResult; import org.jsmpp.bean.TypeOfNumber; import org.jsmpp.extra.NegativeResponseException; import org.jsmpp.extra.ResponseTimeoutException; @@ -75,14 +74,14 @@ public interface ClientSession extends Session, AutoCloseable { * @throws IOException if there is an I/O error found. */ SubmitSmResult submitShortMessage(String serviceType, TypeOfNumber sourceAddrTon, - NumberingPlanIndicator sourceAddrNpi, String sourceAddr, - TypeOfNumber destAddrTon, NumberingPlanIndicator destAddrNpi, - String destinationAddr, ESMClass esmClass, byte protocolId, - byte priorityFlag, String scheduleDeliveryTime, - String validityPeriod, RegisteredDelivery registeredDelivery, - byte replaceIfPresentFlag, DataCoding dataCoding, - byte smDefaultMsgId, byte[] shortMessage, - OptionalParameter... optionalParameters) throws PDUException, + NumberingPlanIndicator sourceAddrNpi, String sourceAddr, + TypeOfNumber destAddrTon, NumberingPlanIndicator destAddrNpi, + String destinationAddr, ESMClass esmClass, byte protocolId, + byte priorityFlag, String scheduleDeliveryTime, + String validityPeriod, RegisteredDelivery registeredDelivery, + byte replaceIfPresentFlag, DataCoding dataCoding, + byte smDefaultMsgId, byte[] shortMessage, + OptionalParameter... optionalParameters) throws PDUException, ResponseTimeoutException, InvalidResponseException, NegativeResponseException, IOException; diff --git a/jsmpp/src/main/java/org/jsmpp/session/DataSmResult.java b/jsmpp/src/main/java/org/jsmpp/session/DataSmResult.java index b408dde7..69a8c1a9 100644 --- a/jsmpp/src/main/java/org/jsmpp/session/DataSmResult.java +++ b/jsmpp/src/main/java/org/jsmpp/session/DataSmResult.java @@ -1,6 +1,6 @@ /* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 diff --git a/jsmpp/src/main/java/org/jsmpp/session/QueryBroadcastSmResult.java b/jsmpp/src/main/java/org/jsmpp/session/QueryBroadcastSmResult.java index bdc698c9..f8147c4c 100644 --- a/jsmpp/src/main/java/org/jsmpp/session/QueryBroadcastSmResult.java +++ b/jsmpp/src/main/java/org/jsmpp/session/QueryBroadcastSmResult.java @@ -24,8 +24,8 @@ * Result of query broadcast short message. * * @author pmoerenhout - * @version 2.4.0 - * @since 2.4.0 + * @version 3.0.0 + * @since 3.0.0 */ public class QueryBroadcastSmResult { diff --git a/jsmpp/src/main/java/org/jsmpp/session/SMPPOutboundServerSession.java b/jsmpp/src/main/java/org/jsmpp/session/SMPPOutboundServerSession.java index e119f43c..751fda89 100644 --- a/jsmpp/src/main/java/org/jsmpp/session/SMPPOutboundServerSession.java +++ b/jsmpp/src/main/java/org/jsmpp/session/SMPPOutboundServerSession.java @@ -163,7 +163,7 @@ private String sendBind(BindType bindType, String systemId, /** * Wait for outbind request. * - * @param timeout is the timeout. + * @param timeout is the timeout in milliseconds. * @return the {@link OutbindRequest}. * @throws IllegalStateException if this invocation of this method has been made or invoke when state is not OPEN. * @throws TimeoutException if the timeout has been reach and {@link SMPPOutboundServerSession} are no more valid because the connection will be close @@ -376,8 +376,10 @@ public void sendUnbindResp(int sequenceNumber) throws IOException { * @author uudashr */ private class PDUReaderWorker extends Thread { - // start with serial execution of pdu processing, when the session is bound the pool will be enlarge up to the PduProcessorDegree + // start with serial execution of pdu processing, when the session is bound the pool will be enlarged up to the PduProcessorDegree private ThreadPoolExecutor pduExecutor; + private LinkedBlockingQueue workQueue; + private int queueCapacity; private Runnable onIOExceptionTask = new Runnable() { @Override public void run() { @@ -387,9 +389,11 @@ public void run() { PDUReaderWorker(final int queueCapacity) { super("PDUReaderWorker-" + getSessionId()); + this.queueCapacity = queueCapacity; + workQueue = new LinkedBlockingQueue<>(queueCapacity); pduExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue(queueCapacity), new RejectedExecutionHandler() { + workQueue, new RejectedExecutionHandler() { @Override public void rejectedExecution(final Runnable runnable, final ThreadPoolExecutor executor) { log.info("Receiving queue is full, please increasing queue capacity, and/or let other side obey the window size"); @@ -484,6 +488,14 @@ private void notifyNoActivity() { enquireLinkSender.enquireLink(); } } + + /* + * Return an integer between 0 (Idle) and 100 (Congested/Maximum Load). Only used for SMPP 5.0. + */ + public int getCongestionRatio() { + return ((80 * pduExecutor.getActiveCount()) / pduExecutor.getMaximumPoolSize()) + + ((20 * workQueue.size()) / queueCapacity); + } } private class BoundSessionStateListener implements SessionStateListener { @@ -500,10 +512,10 @@ public void onStateChange(SessionState newState, SessionState oldState, Session } catch (IOException e) { log.error("Failed setting so_timeout for session timer", e); } - - log.debug("Changing processor degree to {}", getPduProcessorDegree()); - pduReaderWorker.pduExecutor.setMaximumPoolSize(getPduProcessorDegree()); - pduReaderWorker.pduExecutor.setCorePoolSize(getPduProcessorDegree()); + int pduProcessorDegree = getPduProcessorDegree(); + log.debug("Changing processor degree to {}", pduProcessorDegree); + pduReaderWorker.pduExecutor.setMaximumPoolSize(pduProcessorDegree); + pduReaderWorker.pduExecutor.setCorePoolSize(pduProcessorDegree); } } } diff --git a/jsmpp/src/main/java/org/jsmpp/session/SMPPOutboundSession.java b/jsmpp/src/main/java/org/jsmpp/session/SMPPOutboundSession.java index c799bdec..cf997c18 100644 --- a/jsmpp/src/main/java/org/jsmpp/session/SMPPOutboundSession.java +++ b/jsmpp/src/main/java/org/jsmpp/session/SMPPOutboundSession.java @@ -1,16 +1,16 @@ /* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * + * */ package org.jsmpp.session; @@ -61,8 +61,7 @@ /** - * This is an object that used to communicate with an ESME. It hide - * all un-needed SMPP operation that might harm if the user code use it such as: + * This is an object that used to communicate with an ESME. It hide all un-needed SMPP operation that might harm if the user code use it such as: * *
    *
  • DELIVER_SM_RESP, should be called only as response to DELIVER_SM
  • @@ -71,7 +70,7 @@ *
  • ENQUIRE_LINK_RESP, should be called only as response to ENQUIRE_LINK
  • *
  • GENERIC_NACK, should be called only as response to GENERIC_NACK
  • *
- * + *

* All SMPP operations (request-response) are blocking, for an example: DELIVER_SM * will be blocked until DELIVER_SM_RESP received or timeout. This looks like * synchronous communication, but the {@link SMPPOutboundSession} implementation give @@ -82,7 +81,7 @@ * @author pmoerenhout */ public class SMPPOutboundSession extends AbstractSession implements OutboundClientSession { - private static final Logger logger = LoggerFactory.getLogger(SMPPOutboundSession.class); + private static final Logger log = LoggerFactory.getLogger(SMPPOutboundSession.class); /* Utility */ private final PDUReader pduReader; @@ -102,8 +101,7 @@ public class SMPPOutboundSession extends AbstractSession implements OutboundClie private BindRequestReceiver bindRequestReceiver = new BindRequestReceiver(responseHandler); /** - * Default constructor of {@link SMPPOutboundSession}. The next action might be - * connect and bind to a destination message center. + * Default constructor of {@link SMPPOutboundSession}. The next action might be connect and bind to a destination message center. * * @see #connectAndOutbind(String, int, String, String) */ @@ -178,13 +176,13 @@ public BindRequest connectAndOutbind(String host, int port, */ public BindRequest connectAndOutbind(String host, int port, OutbindParameter outbindParameter, long timeout) throws IOException { - logger.debug("Connect and bind to {} port {}", host, port); + log.debug("Connect and bind to {} port {}", host, port); if (getSessionState() != SessionState.CLOSED) { throw new IOException("Session state is not closed"); } conn = connFactory.createConnection(host, port); - logger.info("Connected to {}", conn.getInetAddress()); + log.info("Connected to {}", conn.getInetAddress()); conn.setSoTimeout(getEnquireLinkTimer()); @@ -196,9 +194,8 @@ public BindRequest connectAndOutbind(String host, int port, OutbindParameter out pduReaderWorker = new PDUReaderWorker(getPduProcessorDegree(), getQueueCapacity()); pduReaderWorker.start(); sendOutbind(outbindParameter.getSystemId(), outbindParameter.getPassword()); - } - catch (IOException e) { - logger.error("IO error occurred", e); + } catch (IOException e) { + log.error("I/O error occurred", e); close(); throw e; } @@ -210,16 +207,14 @@ public BindRequest connectAndOutbind(String host, int port, OutbindParameter out enquireLinkSender.start(); return bindRequest; - } - catch (IllegalStateException e) { + } catch (IllegalStateException e) { String message = "System error"; - logger.error(message, e); + log.error(message, e); close(); throw new IOException(message + ": " + e.getMessage(), e); - } - catch (TimeoutException e) { + } catch (TimeoutException e) { String message = "Wait for bind response timed out"; - logger.error(message, e); + log.error(message, e); throw new IOException(message + ": " + e.getMessage(), e); } } @@ -230,8 +225,8 @@ public BindRequest connectAndOutbind(String host, int port, OutbindParameter out * @param timeout is the timeout in milliseconds. * @return the {@link BindRequest}. * @throws IllegalStateException if this invocation of this method has been made or invoke when state is not OPEN. - * @throws TimeoutException if the timeout has been reach and {@link SMPPServerSession} are no more valid because - * the connection will be close automatically. + * @throws TimeoutException if the timeout has been reach and {@link SMPPServerSession} are no more valid because the connection will be close + * automatically. */ private BindRequest waitForBind(long timeout) throws IllegalStateException, TimeoutException { @@ -239,17 +234,14 @@ private BindRequest waitForBind(long timeout) if (currentSessionState.equals(SessionState.OPEN)) { try { return bindRequestReceiver.waitForRequest(timeout); - } - catch (IllegalStateException e) { + } catch (IllegalStateException e) { throw new IllegalStateException( "Invocation of waitForBind() has been made", e); - } - catch (TimeoutException e) { + } catch (TimeoutException e) { close(); throw e; } - } - else { + } else { throw new IllegalStateException( "waitForBind() should be invoked on OPEN state, actual state is " + currentSessionState); @@ -317,9 +309,8 @@ public void sendBindResp(String systemId, InterfaceVersion interfaceVersion, Bin sessionContext.bound(bindType, interfaceVersion); try { pduSender().sendBindResp(out, bindType.responseCommandId(), sequenceNumber, systemId, interfaceVersion); - } - catch (PDUStringException e) { - logger.error("Failed sending bind response", e); + } catch (PDUStringException e) { + log.error("Failed sending bind response", e); // TODO uudashr: we have double checking when accept the bind request } } @@ -328,13 +319,11 @@ public DataSmResult processDataSm(DataSm dataSm) throws ProcessRequestException { try { return fireAcceptDataSm(dataSm); - } - catch (ProcessRequestException e) { + } catch (ProcessRequestException e) { throw e; - } - catch (Exception e) { + } catch (Exception e) { String msg = "Invalid runtime exception thrown when processing data_sm"; - logger.error(msg, e); + log.error(msg, e); throw new ProcessRequestException(msg, SMPPConstant.STAT_ESME_RX_T_APPN); } } @@ -346,13 +335,12 @@ public void sendDataSmResp(DataSmResult dataSmResult, int sequenceNumber) pduSender().sendDataSmResp(out, sequenceNumber, dataSmResult.getMessageId(), dataSmResult.getOptionalParameters()); - } - catch (PDUStringException e) { + } catch (PDUStringException e) { /* * There should be no PDUStringException thrown since creation * of MessageId should be save. */ - logger.error("Failed sending data_sm_resp", e); + log.error("Failed sending data_sm_resp", e); } } @@ -369,7 +357,6 @@ public void notifyUnbonded() { @Override public void sendDeliverSmResp(int commandStatus, int sequenceNumber, String messageId) throws IOException { pduSender().sendDeliverSmResp(out, commandStatus, sequenceNumber, messageId); - logger.debug("deliver_sm_resp with sequence_number {} has been sent", sequenceNumber); } @Override @@ -404,8 +391,10 @@ public void processBind(Bind bind) { * @author uudashr */ private class PDUReaderWorker extends Thread { - // start with serial execution of pdu processing, when the session is bound the pool will be enlarge up to the PduProcessorDegree + // start with serial execution of pdu processing, when the session is bound the pool will be enlarged up to the PduProcessorDegree private ThreadPoolExecutor pduExecutor = null; + private LinkedBlockingQueue workQueue; + private int queueCapacity; private Runnable onIOExceptionTask = new Runnable() { @Override public void run() { @@ -415,12 +404,27 @@ public void run() { private PDUReaderWorker(final int pduProcessorDegree, final int queueCapacity) { super("PDUReaderWorker-" + getSessionId()); + this.queueCapacity = queueCapacity; + workQueue = new LinkedBlockingQueue<>(queueCapacity); pduExecutor = new ThreadPoolExecutor(pduProcessorDegree, pduProcessorDegree, - 0L, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue(queueCapacity), new RejectedExecutionHandler() { + 0L, TimeUnit.MILLISECONDS, workQueue, new RejectedExecutionHandler() { @Override - public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) { - throw new QueueMaxException("Queue capacity " + queueCapacity + " exceeded"); + public void rejectedExecution(final Runnable runnable, final ThreadPoolExecutor executor) { + log.info("Receiving queue is full, please increasing receive queue capacity, and/or let other side obey the window size"); + Command pduHeader = ((PDUProcessTask) runnable).getPduHeader(); + if ((pduHeader.getCommandId() & SMPPConstant.MASK_CID_RESP) == SMPPConstant.MASK_CID_RESP) { + try { + boolean success = executor.getQueue().offer(runnable, 60000, TimeUnit.MILLISECONDS); + if (!success) { + log.warn("Offer to receive queue failed for {}", pduHeader); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } else { + throw new QueueMaxException("Receiving queue capacity " + queueCapacity + " exceeded"); + } } }); } @@ -434,12 +438,11 @@ public void run() { pduExecutor.shutdown(); try { pduExecutor.awaitTermination(getTransactionTimer(), TimeUnit.MILLISECONDS); - } - catch (InterruptedException e) { - logger.warn("Interrupted while waiting for PDU executor pool to finish"); + } catch (InterruptedException e) { + log.warn("Interrupted while waiting for PDU executor pool to finish"); Thread.currentThread().interrupt(); } - logger.debug("{} stopped", getName()); + log.debug("{} stopped", getName()); } private void readPDU() { @@ -447,43 +450,38 @@ private void readPDU() { try { pduHeader = pduReader.readPDUHeader(in); byte[] pdu = pduReader.readPDU(in, pduHeader); - /* - * When the processing PDU is need user interaction via event, - * the code on event might take non-short time, so we need to - * process it concurrently. - */ + /* + * When the processing PDU is need user interaction via event, + * the code on event might take non-short time, so we need to + * process it concurrently. + */ PDUProcessOutboundTask task = new PDUProcessOutboundTask(pduHeader, pdu, sessionContext, responseHandler, sessionContext, onIOExceptionTask); pduExecutor.execute(task); } catch (QueueMaxException e) { - logger.info("Notify other side to throttle: {} ({} threads active)", e.getMessage(), pduExecutor.getActiveCount()); + log.info("Notify other side to throttle: {} ({} threads active)", e.getMessage(), pduExecutor.getActiveCount()); try { responseHandler.sendNegativeResponse(pduHeader.getCommandId(), SMPPConstant.STAT_ESME_RTHROTTLED, pduHeader.getSequenceNumber()); } catch (IOException ioe) { - logger.warn("Failed sending negative response: {}", ioe.getMessage()); + log.warn("Failed sending negative response: {}", ioe.getMessage()); close(); } - } - catch (InvalidCommandLengthException e) { - logger.warn("Received invalid command length: {}", e.getMessage()); + } catch (InvalidCommandLengthException e) { + log.warn("Received invalid command length: {}", e.getMessage()); try { pduSender().sendGenericNack(out, SMPPConstant.STAT_ESME_RINVCMDLEN, 0); - } - catch (IOException ee) { - logger.warn("Failed sending generic nack", ee); + } catch (IOException ee) { + log.warn("Failed sending generic nack", ee); } unbindAndClose(); - } - catch (SocketTimeoutException e) { + } catch (SocketTimeoutException e) { notifyNoActivity(); - } - catch (IOException e) { - logger.info("Reading PDU session {} in state {}: {}", getSessionId(), getSessionState(), e.getMessage()); + } catch (IOException e) { + log.info("Reading PDU session {} in state {}: {}", getSessionId(), getSessionState(), e.getMessage()); close(); - } - catch (RuntimeException e) { - logger.warn("Runtime error while reading", e); + } catch (RuntimeException e) { + log.warn("Runtime error while reading", e); close(); } } @@ -495,10 +493,18 @@ private void notifyNoActivity() { SessionState sessionState = sessionContext().getSessionState(); if ((getInterfaceVersion().compareTo(InterfaceVersion.IF_34) > 0 && sessionState.isNotClosed()) || sessionState.isBound()) { - logger.trace("No activity notified, sending enquire_link"); + log.trace("No activity notified, sending enquire_link"); enquireLinkSender.enquireLink(); } } + + /* + * Return an integer between 0 (Idle) and 100 (Congested/Maximum Load). Only used for SMPP 5.0. + */ + public int getCongestionRatio() { + return ((80 * pduExecutor.getActiveCount()) / pduExecutor.getMaximumPoolSize()) + + ((20 * workQueue.size()) / queueCapacity); + } } /** @@ -519,14 +525,14 @@ public void onStateChange(SessionState newState, SessionState oldState, */ try { conn.setSoTimeout(getEnquireLinkTimer()); - } - catch (IOException e) { - logger.error("Failed setting so_timeout for enquire link timer", e); + } catch (IOException e) { + log.error("Failed setting so_timeout for enquire link timer", e); } } else if (newState.isBound()) { - logger.info("Changing processor degree to {}", getPduProcessorDegree()); - pduReaderWorker.pduExecutor.setMaximumPoolSize(getPduProcessorDegree()); - pduReaderWorker.pduExecutor.setCorePoolSize(getPduProcessorDegree()); + int pduProcessorDegree = getPduProcessorDegree(); + log.debug("Changing processor degree to {}", pduProcessorDegree); + pduReaderWorker.pduExecutor.setMaximumPoolSize(pduProcessorDegree); + pduReaderWorker.pduExecutor.setCorePoolSize(pduProcessorDegree); } } } diff --git a/jsmpp/src/main/java/org/jsmpp/session/SMPPServerSession.java b/jsmpp/src/main/java/org/jsmpp/session/SMPPServerSession.java index bbf57fb7..7c732b29 100755 --- a/jsmpp/src/main/java/org/jsmpp/session/SMPPServerSession.java +++ b/jsmpp/src/main/java/org/jsmpp/session/SMPPServerSession.java @@ -10,7 +10,6 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * */ package org.jsmpp.session; @@ -54,7 +53,6 @@ import org.jsmpp.bean.RegisteredDelivery; import org.jsmpp.bean.ReplaceSm; import org.jsmpp.bean.SubmitMulti; -import org.jsmpp.bean.SubmitMultiResult; import org.jsmpp.bean.SubmitSm; import org.jsmpp.bean.TypeOfNumber; import org.jsmpp.extra.NegativeResponseException; @@ -74,17 +72,18 @@ public class SMPPServerSession extends AbstractSession implements ServerSession private static final String MESSAGE_RECEIVER_LISTENER_IS_NULL = "Received {} but message receiver listener is null"; private static final String NO_MESSAGE_RECEIVER_LISTENER_REGISTERED = "No message receiver listener registered"; - private static final Logger logger = LoggerFactory.getLogger(SMPPServerSession.class); + private static final Logger log = LoggerFactory.getLogger(SMPPServerSession.class); private final Connection conn; private final DataInputStream in; private final OutputStream out; private final PDUReader pduReader; - + private SMPPServerSessionContext sessionContext = new SMPPServerSessionContext(this); private final ServerResponseHandler responseHandler = new ResponseHandlerImpl(); - + + private PDUReaderWorker pduReaderWorker; private ServerMessageReceiverListener messageReceiverListener; private ServerResponseDeliveryListener responseDeliveryListener; private BindRequestReceiver bindRequestReceiver = new BindRequestReceiver(responseHandler); @@ -142,7 +141,8 @@ public BindRequest waitForBind(long timeout) throws IllegalStateException, TimeoutException { SessionState currentSessionState = getSessionState(); if (currentSessionState.equals(SessionState.OPEN)) { - new PDUReaderWorker(getPduProcessorDegree(), getQueueCapacity()).start(); + pduReaderWorker = new PDUReaderWorker(getPduProcessorDegree(), getQueueCapacity()); + pduReaderWorker.start(); try { return bindRequestReceiver.waitForRequest(timeout); } catch (IllegalStateException e) { @@ -204,7 +204,7 @@ private SubmitSmResult fireAcceptSubmitSm(SubmitSm submitSm) throws ProcessReque if (messageReceiverListener != null) { return messageReceiverListener.onAcceptSubmitSm(submitSm, this); } - logger.warn("Received submit_sm but MessageReceiverListener is null, returning SMPP error"); + log.warn("Received submit_sm but MessageReceiverListener is null, returning SMPP error"); throw new ProcessRequestException(NO_MESSAGE_RECEIVER_LISTENER_REGISTERED, SMPPConstant.STAT_ESME_RX_R_APPN); } @@ -213,7 +213,7 @@ private SubmitMultiResult fireAcceptSubmitMulti(SubmitMulti submitMulti) throws if (messageReceiverListener != null) { return messageReceiverListener.onAcceptSubmitMulti(submitMulti, this); } - logger.warn(MESSAGE_RECEIVER_LISTENER_IS_NULL, "submit_multi"); + log.warn(MESSAGE_RECEIVER_LISTENER_IS_NULL, "submit_multi"); throw new ProcessRequestException(NO_MESSAGE_RECEIVER_LISTENER_REGISTERED, SMPPConstant.STAT_ESME_RX_R_APPN); } @@ -222,7 +222,7 @@ private QuerySmResult fireAcceptQuerySm(QuerySm querySm) throws ProcessRequestEx if (messageReceiverListener != null) { return messageReceiverListener.onAcceptQuerySm(querySm, this); } - logger.warn(MESSAGE_RECEIVER_LISTENER_IS_NULL, "query_sm"); + log.warn(MESSAGE_RECEIVER_LISTENER_IS_NULL, "query_sm"); throw new ProcessRequestException(NO_MESSAGE_RECEIVER_LISTENER_REGISTERED, SMPPConstant.STAT_ESME_RX_R_APPN); } @@ -231,7 +231,7 @@ private void fireAcceptReplaceSm(ReplaceSm replaceSm) throws ProcessRequestExcep if (messageReceiverListener != null) { messageReceiverListener.onAcceptReplaceSm(replaceSm, this); } else { - logger.warn(MESSAGE_RECEIVER_LISTENER_IS_NULL, "replace_sm"); + log.warn(MESSAGE_RECEIVER_LISTENER_IS_NULL, "replace_sm"); throw new ProcessRequestException(NO_MESSAGE_RECEIVER_LISTENER_REGISTERED, SMPPConstant.STAT_ESME_RX_R_APPN); } @@ -241,7 +241,7 @@ private void fireAcceptCancelSm(CancelSm cancelSm) throws ProcessRequestExceptio if (messageReceiverListener != null) { messageReceiverListener.onAcceptCancelSm(cancelSm, this); } else { - logger.warn(MESSAGE_RECEIVER_LISTENER_IS_NULL, "cancel_sm"); + log.warn(MESSAGE_RECEIVER_LISTENER_IS_NULL, "cancel_sm"); throw new ProcessRequestException(NO_MESSAGE_RECEIVER_LISTENER_REGISTERED, SMPPConstant.STAT_ESME_RX_R_APPN); } @@ -251,7 +251,7 @@ private BroadcastSmResult fireAcceptBroadcastSm(BroadcastSm broadcastSm) throws if (messageReceiverListener != null) { return messageReceiverListener.onAcceptBroadcastSm(broadcastSm, this); } - logger.warn(MESSAGE_RECEIVER_LISTENER_IS_NULL, "broadcast_sm"); + log.warn(MESSAGE_RECEIVER_LISTENER_IS_NULL, "broadcast_sm"); throw new ProcessRequestException(NO_MESSAGE_RECEIVER_LISTENER_REGISTERED, SMPPConstant.STAT_ESME_RX_R_APPN); } @@ -260,7 +260,7 @@ private void fireAcceptCancelBroadcastSm(CancelBroadcastSm cancelBroadcastSm) th if (messageReceiverListener != null) { messageReceiverListener.onAcceptCancelBroadcastSm(cancelBroadcastSm, this); } else { - logger.warn(MESSAGE_RECEIVER_LISTENER_IS_NULL, "cancel_broadcast_sm"); + log.warn(MESSAGE_RECEIVER_LISTENER_IS_NULL, "cancel_broadcast_sm"); throw new ProcessRequestException(NO_MESSAGE_RECEIVER_LISTENER_REGISTERED, SMPPConstant.STAT_ESME_RX_R_APPN); } @@ -270,7 +270,7 @@ private QueryBroadcastSmResult fireAcceptQueryBroadcastSm(QueryBroadcastSm query if (messageReceiverListener != null) { return messageReceiverListener.onAcceptQueryBroadcastSm(queryBroadcastSm, this); } - logger.warn(MESSAGE_RECEIVER_LISTENER_IS_NULL, "query_broadcast_sm"); + log.warn(MESSAGE_RECEIVER_LISTENER_IS_NULL, "query_broadcast_sm"); throw new ProcessRequestException(NO_MESSAGE_RECEIVER_LISTENER_REGISTERED, SMPPConstant.STAT_ESME_RX_R_APPN); } @@ -331,6 +331,14 @@ public void setResponseDeliveryListener( ServerResponseDeliveryListener responseDeliveryListener) { this.responseDeliveryListener = responseDeliveryListener; } + + /* + * Return an integer between 0 and 100. + * Only used for SMPP 5.0 + */ + public int getCongestionRatio() { + return pduReaderWorker.getCongestionRatio(); + } private class ResponseHandlerImpl implements ServerResponseHandler { @@ -372,7 +380,7 @@ public void sendBindResp(String systemId, InterfaceVersion interfaceVersion, Bin try { pduSender().sendBindResp(out, bindType.responseCommandId(), sequenceNumber, systemId, interfaceVersion); } catch (PDUStringException e) { - logger.error("Failed sending bind response", e); + log.error("Failed sending bind response", e); } } @@ -388,7 +396,7 @@ public SubmitSmResult processSubmitSm(SubmitSm submitSm) SubmitSmResult submitSmResult = fireAcceptSubmitSm(submitSm); if (submitSmResult == null) { String msg = "Invalid submitSmResult, shouldn't null value. " + ServerMessageReceiverListener.class + "#onAcceptSubmitSm(SubmitSm) return null value"; - logger.error(msg); + log.error(msg); throw new ProcessRequestException(msg, SMPPConstant.STAT_ESME_RX_R_APPN); } return submitSmResult; @@ -398,7 +406,7 @@ public SubmitSmResult processSubmitSm(SubmitSm submitSm) } catch(Exception e) { String msg = "Invalid runtime exception thrown when processing submit_sm"; - logger.error(msg, e); + log.error(msg, e); throw new ProcessRequestException(msg, SMPPConstant.STAT_ESME_RSYSERR); } } @@ -415,7 +423,7 @@ public void sendSubmitSmResponse(SubmitSmResult submitSmResult, int sequenceNumb * There should be no PDUStringException thrown since creation * of MessageId should be save. */ - logger.error("Failed sending submit_sm_resp", e); + log.error("Failed sending submit_sm_resp", e); fireSubmitSmRespFailed(submitSmResult, e); } catch (IOException | RuntimeException e) { fireSubmitSmRespFailed(submitSmResult, e); @@ -430,7 +438,7 @@ public SubmitMultiResult processSubmitMulti(SubmitMulti submitMulti) return fireAcceptSubmitMulti(submitMulti); } catch(Exception e) { String msg = "Invalid runtime exception thrown when processing SubmitMultiSm"; - logger.error(msg, e); + log.error(msg, e); throw new ProcessRequestException(msg, SMPPConstant.STAT_ESME_RSYSERR); } } @@ -449,7 +457,7 @@ public void sendSubmitMultiResponse( * There should be no PDUStringException thrown since creation * of the response parameter has been validated. */ - logger.error("Failed sending submit_multi_resp", e); + log.error("Failed sending submit_multi_resp", e); fireSubmitMultiRespSentError(submitMultiResult, e); } catch (IOException | RuntimeException e) { fireSubmitMultiRespSentError(submitMultiResult, e); @@ -464,7 +472,7 @@ public QuerySmResult processQuerySm(QuerySm querySm) return fireAcceptQuerySm(querySm); } catch(Exception e) { String msg = "Invalid runtime exception thrown when processing query_sm"; - logger.error(msg, e); + log.error(msg, e); throw new ProcessRequestException(msg, SMPPConstant.STAT_ESME_RSYSERR); } } @@ -480,7 +488,7 @@ public void sendQuerySmResp(String messageId, String finalDate, * There should be no PDUStringException thrown since creation * of parsed messageId has been validated. */ - logger.error("Failed sending query_sm_resp", e); + log.error("Failed sending query_sm_resp", e); } } @@ -491,7 +499,7 @@ public DataSmResult processDataSm(DataSm dataSm) return fireAcceptDataSm(dataSm); } catch(Exception e) { String msg = "Invalid runtime exception thrown when processing data_sm"; - logger.error(msg, e); + log.error(msg, e); throw new ProcessRequestException(msg, SMPPConstant.STAT_ESME_RSYSERR); } } @@ -508,7 +516,7 @@ public void sendDataSmResp(DataSmResult dataSmResult, int sequenceNumber) * There should be no PDUStringException thrown since creation * of MessageId should be save. */ - logger.error("Failed sending data_sm_resp", e); + log.error("Failed sending data_sm_resp", e); } } @@ -519,7 +527,7 @@ public void processCancelSm(CancelSm cancelSm) fireAcceptCancelSm(cancelSm); } catch(Exception e) { String msg = "Invalid runtime exception thrown when processing cancel_sm"; - logger.error(msg, e); + log.error(msg, e); throw new ProcessRequestException(msg, SMPPConstant.STAT_ESME_RSYSERR); } } @@ -536,7 +544,7 @@ public void processReplaceSm(ReplaceSm replaceSm) fireAcceptReplaceSm(replaceSm); } catch(Exception e) { String msg = "Invalid runtime exception thrown when processing replace_sm"; - logger.error(msg, e); + log.error(msg, e); throw new ProcessRequestException(msg, SMPPConstant.STAT_ESME_RSYSERR); } } @@ -552,7 +560,7 @@ public BroadcastSmResult processBroadcastSm(final BroadcastSm broadcastSm) throw BroadcastSmResult broadcastSmResult = fireAcceptBroadcastSm(broadcastSm); if (broadcastSmResult == null) { String msg = "Invalid broadcastSmResult, shouldn't null value. " + ServerMessageReceiverListener.class + "#onAcceptBroadcastSm(broadcastSm) return null value"; - logger.error(msg); + log.error(msg); throw new ProcessRequestException(msg, SMPPConstant.STAT_ESME_RX_R_APPN); } return broadcastSmResult; @@ -562,7 +570,7 @@ public BroadcastSmResult processBroadcastSm(final BroadcastSm broadcastSm) throw } catch (Exception e) { String msg = "Invalid runtime exception thrown when processing broadcast_sm"; - logger.error(msg, e); + log.error(msg, e); throw new ProcessRequestException(msg, SMPPConstant.STAT_ESME_RSYSERR); } } @@ -578,7 +586,7 @@ public void sendBroadcastSmResp(final BroadcastSmResult broadcastSmResult, final * There should be no PDUStringException thrown since creation * of MessageId should be save. */ - logger.error("Failed sending broadcast_sm_resp", e); + log.error("Failed sending broadcast_sm_resp", e); } } @@ -589,7 +597,7 @@ public void processCancelBroadcastSm(final CancelBroadcastSm cancelBroadcastSm) fireAcceptCancelBroadcastSm(cancelBroadcastSm); } catch(Exception e) { String msg = "Invalid runtime exception thrown when processing cancel_broadcast_sm"; - logger.error(msg, e); + log.error(msg, e); throw new ProcessRequestException(msg, SMPPConstant.STAT_ESME_RSYSERR); } } @@ -606,13 +614,13 @@ public QueryBroadcastSmResult processQueryBroadcastSm(final QueryBroadcastSm que QueryBroadcastSmResult queryBroadcastSmResult = fireAcceptQueryBroadcastSm(queryBroadcastSm); if (queryBroadcastSmResult == null) { String msg = "Invalid queryBroadcastSmResult, shouldn't null value. " + ServerMessageReceiverListener.class + "#onAcceptQueryBroadcastSm(broadcastSm) return null value"; - logger.error(msg); + log.error(msg); throw new ProcessRequestException(msg, SMPPConstant.STAT_ESME_RX_R_APPN); } return queryBroadcastSmResult; } catch(Exception e) { String msg = "Invalid runtime exception thrown when processing query_broadcast_sm"; - logger.error(msg, e); + log.error(msg, e); throw new ProcessRequestException(msg, SMPPConstant.STAT_ESME_RSYSERR); } } @@ -628,13 +636,15 @@ public void sendQueryBroadcastSmResp(final QueryBroadcastSmResult queryBroadcast * There should be no PDUStringException thrown since creation * of MessageId should be save. */ - logger.error("Sending failed query_broadcast_sm_resp", e); + log.error("Sending failed query_broadcast_sm_resp", e); } } } private class PDUReaderWorker extends Thread { private ThreadPoolExecutor pduExecutor; + private LinkedBlockingQueue workQueue; + private int queueCapacity; private Runnable onIOExceptionTask = new Runnable() { @Override public void run() { @@ -644,18 +654,20 @@ public void run() { private PDUReaderWorker(final int pduProcessorDegree, final int queueCapacity) { super("PDUReaderWorker-" + getSessionId()); + this.queueCapacity = queueCapacity; + workQueue = new LinkedBlockingQueue<>(queueCapacity); pduExecutor = new ThreadPoolExecutor(pduProcessorDegree, pduProcessorDegree, 0L, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue(queueCapacity), new RejectedExecutionHandler() { + workQueue, new RejectedExecutionHandler() { @Override public void rejectedExecution(final Runnable runnable, final ThreadPoolExecutor executor) { - logger.info("Receiving queue is full, please increasing queue capacity, and/or let other side obey the window size"); + log.info("Receiving queue is full, please increasing queue capacity, and/or let other side obey the window size"); Command pduHeader = ((PDUProcessServerTask)runnable).getPduHeader(); if ((pduHeader.getCommandId() & SMPPConstant.MASK_CID_RESP) == SMPPConstant.MASK_CID_RESP) { try { boolean success = executor.getQueue().offer(runnable, 60000, TimeUnit.MILLISECONDS); if (!success){ - logger.warn("Offer to queue failed for {}", pduHeader); + log.warn("Offer to queue failed for {}", pduHeader); } } catch (InterruptedException e){ @@ -679,10 +691,10 @@ public void run() { pduExecutor.awaitTermination(getTransactionTimer(), TimeUnit.MILLISECONDS); } catch (InterruptedException e) { - logger.warn("Interrupted while waiting for PDU executor pool to finish"); + log.warn("Interrupted while waiting for PDU executor pool to finish"); Thread.currentThread().interrupt(); } - logger.debug("{} stopped", getName()); + log.debug("{} stopped", getName()); } private void readPDU() { @@ -690,41 +702,41 @@ private void readPDU() { try { pduHeader = pduReader.readPDUHeader(in); byte[] pdu = pduReader.readPDU(in, pduHeader); - + PDUProcessServerTask task = new PDUProcessServerTask(pduHeader, pdu, sessionContext.getStateProcessor(), sessionContext, responseHandler, onIOExceptionTask); pduExecutor.execute(task); } catch (QueueMaxException e) { - logger.info("Notify other side to throttle: {} ({} threads active)", e.getMessage(), pduExecutor.getActiveCount()); + log.info("Notify other side to throttle: {} ({} threads active)", e.getMessage(), pduExecutor.getActiveCount()); try { responseHandler.sendNegativeResponse(pduHeader.getCommandId(), SMPPConstant.STAT_ESME_RTHROTTLED, pduHeader.getSequenceNumber()); } catch (IOException ioe) { - logger.warn("Failed sending negative response: {}", ioe.getMessage()); + log.warn("Failed sending negative response: {}", ioe.getMessage()); close(); } } catch (InvalidCommandLengthException e) { - logger.warn("Received invalid command length: {}", e.getMessage()); + log.warn("Received invalid command length: {}", e.getMessage()); try { pduSender().sendGenericNack(out, SMPPConstant.STAT_ESME_RINVCMDLEN, 0); } catch (IOException ee) { - logger.warn("Failed sending generic_nack", ee); + log.warn("Failed sending generic_nack", ee); } unbindAndClose(); } catch (SocketTimeoutException e) { notifyNoActivity(); } catch (EOFException e) { if (sessionContext.getSessionState() == SessionState.UNBOUND){ - logger.info("Unbound session {} socket closed", getSessionId()); + log.info("Unbound session {} socket closed", getSessionId()); } else { - logger.warn("Session {} socket closed unexpected", getSessionId()); + log.warn("Session {} socket closed unexpected", getSessionId()); } close(); } catch (IOException e) { - logger.info("Reading PDU session {} in state {}: {}", getSessionId(), getSessionState(), e.getMessage()); + log.info("Reading PDU session {} in state {}: {}", getSessionId(), getSessionState(), e.getMessage()); close(); } catch (RuntimeException e) { - logger.warn("Runtime error while reading", e); + log.warn("Runtime error while reading", e); close(); } } @@ -733,9 +745,17 @@ private void readPDU() { * Notify for no activity. */ private void notifyNoActivity() { - logger.debug("No activity notified, sending enquire_link"); + log.debug("No activity notified, sending enquire_link"); enquireLinkSender.enquireLink(); } + + /* + * Return an integer between 0 (Idle) and 100 (Congested/Maximum Load). Only used for SMPP 5.0. + */ + public int getCongestionRatio() { + return ((80 * pduExecutor.getActiveCount()) / pduExecutor.getMaximumPoolSize()) + + ((20 * workQueue.size()) / queueCapacity); + } } private class BoundSessionStateListener implements SessionStateListener { @@ -750,7 +770,7 @@ public void onStateChange(SessionState newState, SessionState oldState, Session try { connection().setSoTimeout(source.getEnquireLinkTimer()); } catch (IOException e) { - logger.error("Failed setting so_timeout for session timer", e); + log.error("Failed setting so_timeout for session timer", e); } enquireLinkSender.start(); } diff --git a/jsmpp/src/main/java/org/jsmpp/session/SMPPSession.java b/jsmpp/src/main/java/org/jsmpp/session/SMPPSession.java index 6de85e59..4108c432 100644 --- a/jsmpp/src/main/java/org/jsmpp/session/SMPPSession.java +++ b/jsmpp/src/main/java/org/jsmpp/session/SMPPSession.java @@ -52,7 +52,6 @@ import org.jsmpp.bean.RegisteredDelivery; import org.jsmpp.bean.ReplaceIfPresentFlag; import org.jsmpp.bean.SubmitMultiResp; -import org.jsmpp.bean.SubmitMultiResult; import org.jsmpp.bean.SubmitSmResp; import org.jsmpp.bean.TypeOfNumber; import org.jsmpp.extra.NegativeResponseException; @@ -71,8 +70,7 @@ /** - * This is an object that used to communicate with SMPP Server or SMSC. It hide - * all un-needed SMPP operation that might harm if the user code use it such as : + * This is an object that used to communicate with SMPP Server or SMSC. It hide all un-needed SMPP operation that might harm if the user code use it such as : *

    *
  • DELIVER_SM_RESP, should be called only as response to DELIVER_SM
  • *
  • UNBIND_RESP, should be called only as response to UNBIND_RESP
  • @@ -80,695 +78,700 @@ *
  • ENQUIRE_LINK_RESP, should be called only as response to ENQUIRE_LINK
  • *
  • GENERIC_NACK, should be called only as response to GENERIC_NACK
  • *
- * + *

* All SMPP operations (request-response) are blocking, for an example: SUBMIT_SM * will be blocked until SUBMIT_SM_RESP received or timeout. This looks like * synchronous communication, but the {@link SMPPSession} implementation give * ability to the asynchronous way by executing the SUBMIT_SM operation parallel * on a different thread. The very simple implementation by using Thread pool, * {@link ExecutorService} will do. - * + *

* To receive the incoming message such as DELIVER_SM or DATA_SM will be managed * by internal thread. User code only have to set listener * {@link MessageReceiverListener}. * * @author uudashr - * */ public class SMPPSession extends AbstractSession implements ClientSession { - private static final Logger log = LoggerFactory.getLogger(SMPPSession.class); - private static final String MESSAGE_RECEIVER_LISTENER_IS_NULL = "Received {} but message receiver listener is null"; + private static final Logger log = LoggerFactory.getLogger(SMPPSession.class); + private static final String MESSAGE_RECEIVER_LISTENER_IS_NULL = "Received {} but message receiver listener is null"; - /* Utility */ - private final PDUReader pduReader; + /* Utility */ + private final PDUReader pduReader; /* Connection */ - private final ConnectionFactory connFactory; - private Connection conn; - private DataInputStream in; - private OutputStream out; - - private PDUReaderWorker pduReaderWorker; - private final ResponseHandler responseHandler = new ResponseHandlerImpl(); - private MessageReceiverListener messageReceiverListener; + private final ConnectionFactory connFactory; + private final ResponseHandler responseHandler = new ResponseHandlerImpl(); + private Connection conn; + private DataInputStream in; + private OutputStream out; + private PDUReaderWorker pduReaderWorker; + private MessageReceiverListener messageReceiverListener; private BoundSessionStateListener sessionStateListener = new BoundSessionStateListener(); private SMPPSessionContext sessionContext = new SMPPSessionContext(this, sessionStateListener); - /** - * Default constructor of {@link SMPPSession}. The next action might be - * connect and bind to a destination message center. - * - * @see #connectAndBind(String, int, BindType, String, String, String, TypeOfNumber, NumberingPlanIndicator, String) - */ - public SMPPSession() { - this(new SynchronizedPDUSender(new DefaultPDUSender(new DefaultComposer())), - new DefaultPDUReader(), - SocketConnectionFactory.getInstance()); - } - - public SMPPSession(ConnectionFactory connFactory) { - this(new SynchronizedPDUSender(new DefaultPDUSender(new DefaultComposer())), - new DefaultPDUReader(), - connFactory); - } - - public SMPPSession(PDUSender pduSender, PDUReader pduReader, - ConnectionFactory connFactory) { - super(pduSender); - this.pduReader = pduReader; - this.connFactory = connFactory; - } - - public SMPPSession(String host, int port, BindParameter bindParam, - PDUSender pduSender, PDUReader pduReader, - ConnectionFactory connFactory) throws IOException { - this(pduSender, pduReader, connFactory); - connectAndBind(host, port, bindParam); - } - - public SMPPSession(String host, int port, BindParameter bindParam) throws IOException { - this(); - connectAndBind(host, port, bindParam); - } - - public int getLocalPort(){ - return conn.getLocalPort(); - } - - /** - * Open connection and bind immediately. - * - * @param host is the SMSC host address. - * @param port is the SMSC listen port. - * @param bindType is the bind type. - * @param systemId is the system id. - * @param password is the password. - * @param systemType is the system type. - * @param addrTon is the address TON. - * @param addrNpi is the address NPI. - * @param addressRange is the address range. - * @throws IOException if there is an IO error found. - */ - @Override - public void connectAndBind(String host, int port, BindType bindType, - String systemId, String password, String systemType, - TypeOfNumber addrTon, NumberingPlanIndicator addrNpi, - String addressRange) throws IOException { - connectAndBind(host, port, new BindParameter(bindType, systemId, - password, systemType, addrTon, addrNpi, addressRange), 60000); + /** + * Default constructor of {@link SMPPSession}. The next action might be connect and bind to a destination message center. + * + * @see #connectAndBind(String, int, BindType, String, String, String, TypeOfNumber, NumberingPlanIndicator, String) + */ + public SMPPSession() { + this(new SynchronizedPDUSender(new DefaultPDUSender(new DefaultComposer())), + new DefaultPDUReader(), + SocketConnectionFactory.getInstance()); + } + + public SMPPSession(ConnectionFactory connFactory) { + this(new SynchronizedPDUSender(new DefaultPDUSender(new DefaultComposer())), + new DefaultPDUReader(), + connFactory); + } + + public SMPPSession(PDUSender pduSender, PDUReader pduReader, + ConnectionFactory connFactory) { + super(pduSender); + this.pduReader = pduReader; + this.connFactory = connFactory; + } + + public SMPPSession(String host, int port, BindParameter bindParam, + PDUSender pduSender, PDUReader pduReader, + ConnectionFactory connFactory) throws IOException { + this(pduSender, pduReader, connFactory); + connectAndBind(host, port, bindParam); + } + + public SMPPSession(String host, int port, BindParameter bindParam) throws IOException { + this(); + connectAndBind(host, port, bindParam); + } + + public int getLocalPort() { + return conn.getLocalPort(); + } + + /** + * Open connection and bind immediately. + * + * @param host is the SMSC host address. + * @param port is the SMSC listen port. + * @param bindType is the bind type. + * @param systemId is the system id. + * @param password is the password. + * @param systemType is the system type. + * @param addrTon is the address TON. + * @param addrNpi is the address NPI. + * @param addressRange is the address range. + * @throws IOException if there is an IO error found. + */ + @Override + public void connectAndBind(String host, int port, BindType bindType, + String systemId, String password, String systemType, + TypeOfNumber addrTon, NumberingPlanIndicator addrNpi, + String addressRange) throws IOException { + connectAndBind(host, port, new BindParameter(bindType, systemId, + password, systemType, addrTon, addrNpi, addressRange), 60000); + } + + /** + * Open connection and bind immediately with specified timeout. + * + * @param host is the SMSC host address. + * @param port is the SMSC listen port. + * @param bindType is the bind type. + * @param systemId is the system id. + * @param password is the password. + * @param systemType is the system type. + * @param addrTon is the address TON. + * @param addrNpi is the address NPI. + * @param addressRange is the address range. + * @param timeout is the timeout. + * @throws IOException if there is an IO error found. + */ + @Override + public void connectAndBind(String host, int port, BindType bindType, + String systemId, String password, String systemType, + TypeOfNumber addrTon, NumberingPlanIndicator addrNpi, + String addressRange, long timeout) throws IOException { + connectAndBind(host, port, new BindParameter(bindType, systemId, + password, systemType, addrTon, addrNpi, addressRange), timeout); + } + + /** + * Open connection and bind immediately with timeout of 1 minute. + * + * @param host is the SMSC host address. + * @param port is the SMSC listen port. + * @param bindParam is the bind parameters. + * @return the SMSC system id. + * @throws IOException if there is an IO error found. + */ + @Override + public String connectAndBind(String host, int port, + BindParameter bindParam) + throws IOException { + return connectAndBind(host, port, bindParam, 60000); + } + + /** + * Open connection. + * + * @param host is the SMSC host address. + * @param port is the SMSC listen port. + * @throws IOException if there is an IO error found. + */ + public void connect(String host, int port) + throws IOException { + log.debug("Connect to {} port {}", host, port); + if (getSessionState().isNotClosed()) { + throw new IOException("Session state is not closed"); } - /** - * Open connection and bind immediately with specified timeout. - * - * @param host is the SMSC host address. - * @param port is the SMSC listen port. - * @param bindType is the bind type. - * @param systemId is the system id. - * @param password is the password. - * @param systemType is the system type. - * @param addrTon is the address TON. - * @param addrNpi is the address NPI. - * @param addressRange is the address range. - * @param timeout is the timeout. - * @throws IOException if there is an IO error found. - */ - @Override - public void connectAndBind(String host, int port, BindType bindType, - String systemId, String password, String systemType, - TypeOfNumber addrTon, NumberingPlanIndicator addrNpi, - String addressRange, long timeout) throws IOException { - connectAndBind(host, port, new BindParameter(bindType, systemId, - password, systemType, addrTon, addrNpi, addressRange), timeout); - } - - /** - * Open connection and bind immediately with timeout of 1 minute. - * - * @param host is the SMSC host address. - * @param port is the SMSC listen port. - * @param bindParam is the bind parameters. - * @return the SMSC system id. - * @throws IOException if there is an IO error found. - */ - @Override - public String connectAndBind(String host, int port, - BindParameter bindParam) - throws IOException { - return connectAndBind(host, port, bindParam, 60000); + conn = connFactory.createConnection(host, port); + log.info("Connected to {}", conn.getInetAddress()); + + conn.setSoTimeout(getEnquireLinkTimer()); + + sessionContext.open(); + in = new DataInputStream(conn.getInputStream()); + out = conn.getOutputStream(); + + pduReaderWorker = new PDUReaderWorker(getQueueCapacity()); + pduReaderWorker.start(); + + enquireLinkSender = new EnquireLinkSender(); + enquireLinkSender.start(); + } + + /** + * Open connection and bind immediately. + * + * @param host is the SMSC host address. + * @param port is the SMSC listen port. + * @param bindParam is the bind parameters. + * @param timeout is the timeout. + * @return the SMSC system id. + * @throws IOException if there is an IO error found. + */ + @Override + public String connectAndBind(String host, int port, + BindParameter bindParam, long timeout) + throws IOException { + log.debug("Connect and bind to {} port {}", host, port); + if (getSessionState() != SessionState.CLOSED) { + throw new IOException("Session state is not closed"); } - /** - * Open connection. - * - * @param host is the SMSC host address. - * @param port is the SMSC listen port. - * @throws IOException if there is an IO error found. - */ - public void connect(String host, int port) - throws IOException { - log.debug("Connect to {} port {}", host, port); - if (getSessionState().isNotClosed()) { - throw new IOException("Session state is not closed"); - } - - conn = connFactory.createConnection(host, port); - log.info("Connected to {}", conn.getInetAddress()); - - conn.setSoTimeout(getEnquireLinkTimer()); - - sessionContext.open(); - in = new DataInputStream(conn.getInputStream()); - out = conn.getOutputStream(); - - pduReaderWorker = new PDUReaderWorker(getQueueCapacity()); - pduReaderWorker.start(); - - enquireLinkSender = new EnquireLinkSender(); - enquireLinkSender.start(); - } - - /** - * Open connection and bind immediately. - * - * @param host is the SMSC host address. - * @param port is the SMSC listen port. - * @param bindParam is the bind parameters. - * @param timeout is the timeout. - * @return the SMSC system id. - * @throws IOException if there is an IO error found. - */ - @Override - public String connectAndBind(String host, int port, - BindParameter bindParam, long timeout) - throws IOException { - log.debug("Connect and bind to {} port {}", host, port); - if (getSessionState() != SessionState.CLOSED) { - throw new IOException("Session state is not closed"); - } - - conn = connFactory.createConnection(host, port); - log.info("Connected from port {} to {}:{}", conn.getLocalPort(), conn.getInetAddress(), conn.getPort()); - - conn.setSoTimeout(getEnquireLinkTimer()); - - sessionContext.open(); - try { - in = new DataInputStream(conn.getInputStream()); - out = conn.getOutputStream(); - - pduReaderWorker = new PDUReaderWorker(getQueueCapacity()); - pduReaderWorker.start(); - String smscSystemId = sendBind(bindParam.getBindType(), bindParam.getSystemId(), bindParam.getPassword(), bindParam.getSystemType(), - bindParam.getInterfaceVersion(), bindParam.getAddrTon(), bindParam.getAddrNpi(), bindParam.getAddressRange(), timeout); - sessionContext.bound(bindParam.getBindType(), bindParam.getInterfaceVersion()); - - enquireLinkSender = new EnquireLinkSender(); - enquireLinkSender.start(); - return smscSystemId; - } catch (PDUException e) { - log.error("Failed sending bind command", e); - throw new IOException("Failed sending bind since some string parameter area invalid: " + e.getMessage(), e); - } catch (NegativeResponseException e) { - String message = "Receive negative bind response"; - log.error(message, e); - close(); - throw new IOException(message + ": " + e.getMessage(), e); - } catch (InvalidResponseException e) { - String message = "Receive invalid bind response"; - log.error(message, e); - close(); - throw new IOException(message + ": " + e.getMessage(), e); - } catch (ResponseTimeoutException e) { - String message = "Time out waiting for bind response"; - log.error(message, e); - close(); - throw new IOException(message + ": " + e.getMessage(), e); - } catch (IOException e) { - log.error("I/O error occurred", e); - close(); - throw e; - } - } - - /** - * Sending bind. - * - * @param bindType is the bind type. - * @param systemId is the system id. - * @param password is the password. - * @param systemType is the system type. - * @param interfaceVersion is the interface version. - * @param addrTon is the address TON. - * @param addrNpi is the address NPI. - * @param addressRange is the address range. - * @param timeout is the max time waiting for bind response. - * @return SMSC system id. - * @throws PDUException if we enter invalid bind parameter(s). - * @throws ResponseTimeoutException if there is no valid response after defined millisecond. - * @throws InvalidResponseException if there is invalid response found. - * @throws NegativeResponseException if we receive negative response. - * @throws IOException if there is an IO error occur. - */ - private String sendBind(BindType bindType, String systemId, - String password, String systemType, - InterfaceVersion interfaceVersion, TypeOfNumber addrTon, - NumberingPlanIndicator addrNpi, String addressRange, long timeout) - throws PDUException, ResponseTimeoutException, - InvalidResponseException, NegativeResponseException, IOException { - - BindCommandTask task = new BindCommandTask(pduSender(), bindType, - systemId, password, systemType, interfaceVersion, addrTon, - addrNpi, addressRange); - - BindResp resp = (BindResp)executeSendCommand(task, timeout); - OptionalParameter.Sc_interface_version scVersion = resp.getOptionalParameter(Sc_interface_version.class); - if (scVersion != null) { - log.info("Other side reports SMPP interface version {}", scVersion); - } - InterfaceVersion commonInterfaceVersion = scVersion != null ? - InterfaceVersion.IF_50.min(InterfaceVersion.valueOf(scVersion.getValue())) : - interfaceVersion.IF_34; - - sessionContext.bound(bindType, commonInterfaceVersion); - - return resp.getSystemId(); - } - /* (non-Javadoc) - * @see org.jsmpp.session.ClientSession#submitShortMessage(java.lang.String, org.jsmpp.bean.TypeOfNumber, org.jsmpp.bean.NumberingPlanIndicator, java.lang.String, org.jsmpp.bean.TypeOfNumber, org.jsmpp.bean.NumberingPlanIndicator, java.lang.String, org.jsmpp.bean.ESMClass, byte, byte, java.lang.String, java.lang.String, org.jsmpp.bean.RegisteredDelivery, byte, org.jsmpp.bean.DataCoding, byte, byte[], org.jsmpp.bean.OptionalParameter[]) - */ - @Override - public SubmitSmResult submitShortMessage(String serviceType, - TypeOfNumber sourceAddrTon, NumberingPlanIndicator sourceAddrNpi, - String sourceAddr, TypeOfNumber destAddrTon, - NumberingPlanIndicator destAddrNpi, String destinationAddr, - ESMClass esmClass, byte protocolId, byte priorityFlag, - String scheduleDeliveryTime, String validityPeriod, - RegisteredDelivery registeredDelivery, byte replaceIfPresentFlag, - DataCoding dataCoding, byte smDefaultMsgId, byte[] shortMessage, - OptionalParameter... optionalParameters) throws PDUException, - ResponseTimeoutException, InvalidResponseException, - NegativeResponseException, IOException { - - ensureTransmittable("submitShortMessage"); - - SubmitSmCommandTask submitSmTask = new SubmitSmCommandTask( - pduSender(), serviceType, sourceAddrTon, sourceAddrNpi, - sourceAddr, destAddrTon, destAddrNpi, destinationAddr, - esmClass, protocolId, priorityFlag, scheduleDeliveryTime, - validityPeriod, registeredDelivery, replaceIfPresentFlag, - dataCoding, smDefaultMsgId, shortMessage, optionalParameters); - - SubmitSmResp resp = (SubmitSmResp)executeSendCommand(submitSmTask, getTransactionTimer()); - - return new SubmitSmResult(resp.getMessageId(), resp.getOptionalParameters()); + conn = connFactory.createConnection(host, port); + log.info("Connected from port {} to {}:{}", conn.getLocalPort(), conn.getInetAddress(), conn.getPort()); + + conn.setSoTimeout(getEnquireLinkTimer()); + + sessionContext.open(); + try { + in = new DataInputStream(conn.getInputStream()); + out = conn.getOutputStream(); + + pduReaderWorker = new PDUReaderWorker(getQueueCapacity()); + pduReaderWorker.start(); + String smscSystemId = sendBind(bindParam.getBindType(), bindParam.getSystemId(), bindParam.getPassword(), bindParam.getSystemType(), + bindParam.getInterfaceVersion(), bindParam.getAddrTon(), bindParam.getAddrNpi(), bindParam.getAddressRange(), timeout); + sessionContext.bound(bindParam.getBindType(), bindParam.getInterfaceVersion()); + + enquireLinkSender = new EnquireLinkSender(); + enquireLinkSender.start(); + return smscSystemId; + } catch (PDUException e) { + log.error("Failed sending bind command", e); + throw new IOException("Failed sending bind since some string parameter area invalid: " + e.getMessage(), e); + } catch (NegativeResponseException e) { + String message = "Receive negative bind response"; + log.error(message, e); + close(); + throw new IOException(message + ": " + e.getMessage(), e); + } catch (InvalidResponseException e) { + String message = "Receive invalid bind response"; + log.error(message, e); + close(); + throw new IOException(message + ": " + e.getMessage(), e); + } catch (ResponseTimeoutException e) { + String message = "Time out waiting for bind response"; + log.error(message, e); + close(); + throw new IOException(message + ": " + e.getMessage(), e); + } catch (IOException e) { + log.error("I/O error occurred", e); + close(); + throw e; + } + } + + /** + * Sending bind. + * + * @param bindType is the bind type. + * @param systemId is the system id. + * @param password is the password. + * @param systemType is the system type. + * @param interfaceVersion is the interface version. + * @param addrTon is the address TON. + * @param addrNpi is the address NPI. + * @param addressRange is the address range. + * @param timeout is the max time waiting for bind response. + * @return SMSC system id. + * @throws PDUException if we enter invalid bind parameter(s). + * @throws ResponseTimeoutException if there is no valid response after defined millisecond. + * @throws InvalidResponseException if there is invalid response found. + * @throws NegativeResponseException if we receive negative response. + * @throws IOException if there is an IO error occur. + */ + private String sendBind(BindType bindType, String systemId, + String password, String systemType, + InterfaceVersion interfaceVersion, TypeOfNumber addrTon, + NumberingPlanIndicator addrNpi, String addressRange, long timeout) + throws PDUException, ResponseTimeoutException, + InvalidResponseException, NegativeResponseException, IOException { + + BindCommandTask task = new BindCommandTask(pduSender(), bindType, + systemId, password, systemType, interfaceVersion, addrTon, + addrNpi, addressRange); + + BindResp resp = (BindResp) executeSendCommand(task, timeout); + OptionalParameter.Sc_interface_version scVersion = resp.getOptionalParameter(Sc_interface_version.class); + if (scVersion != null) { + log.info("Other side reports SMPP interface version {}", scVersion); } + InterfaceVersion commonInterfaceVersion = scVersion != null ? + InterfaceVersion.IF_50.min(InterfaceVersion.valueOf(scVersion.getValue())) : + interfaceVersion.IF_34; + + sessionContext.bound(bindType, commonInterfaceVersion); + + return resp.getSystemId(); + } + + /* (non-Javadoc) + * @see org.jsmpp.session.ClientSession#submitShortMessage(java.lang.String, org.jsmpp.bean.TypeOfNumber, org.jsmpp.bean.NumberingPlanIndicator, java.lang.String, org.jsmpp.bean.TypeOfNumber, org.jsmpp.bean.NumberingPlanIndicator, java.lang.String, org.jsmpp.bean.ESMClass, byte, byte, java.lang.String, java.lang.String, org.jsmpp.bean.RegisteredDelivery, byte, org.jsmpp.bean.DataCoding, byte, byte[], org.jsmpp.bean.OptionalParameter[]) + */ + @Override + public SubmitSmResult submitShortMessage(String serviceType, + TypeOfNumber sourceAddrTon, NumberingPlanIndicator sourceAddrNpi, + String sourceAddr, TypeOfNumber destAddrTon, + NumberingPlanIndicator destAddrNpi, String destinationAddr, + ESMClass esmClass, byte protocolId, byte priorityFlag, + String scheduleDeliveryTime, String validityPeriod, + RegisteredDelivery registeredDelivery, byte replaceIfPresentFlag, + DataCoding dataCoding, byte smDefaultMsgId, byte[] shortMessage, + OptionalParameter... optionalParameters) throws PDUException, + ResponseTimeoutException, InvalidResponseException, + NegativeResponseException, IOException { + + ensureTransmittable("submitShortMessage"); + + SubmitSmCommandTask submitSmTask = new SubmitSmCommandTask( + pduSender(), serviceType, sourceAddrTon, sourceAddrNpi, + sourceAddr, destAddrTon, destAddrNpi, destinationAddr, + esmClass, protocolId, priorityFlag, scheduleDeliveryTime, + validityPeriod, registeredDelivery, replaceIfPresentFlag, + dataCoding, smDefaultMsgId, shortMessage, optionalParameters); + + SubmitSmResp resp = (SubmitSmResp) executeSendCommand(submitSmTask, getTransactionTimer()); + + return new SubmitSmResult(resp.getMessageId(), resp.getOptionalParameters()); + } + + /* (non-Javadoc) + * @see org.jsmpp.session.ClientSession#submitMultiple(java.lang.String, org.jsmpp.bean.TypeOfNumber, org.jsmpp.bean.NumberingPlanIndicator, java.lang.String, org.jsmpp.bean.Address[], org.jsmpp.bean.ESMClass, byte, byte, java.lang.String, java.lang.String, org.jsmpp.bean.RegisteredDelivery, org.jsmpp.bean.ReplaceIfPresentFlag, org.jsmpp.bean.DataCoding, byte, byte[], org.jsmpp.bean.OptionalParameter[]) + */ + @Override + public SubmitMultiResult submitMultiple(String serviceType, + TypeOfNumber sourceAddrTon, NumberingPlanIndicator sourceAddrNpi, + String sourceAddr, Address[] destinationAddresses, + ESMClass esmClass, byte protocolId, byte priorityFlag, + String scheduleDeliveryTime, String validityPeriod, + RegisteredDelivery registeredDelivery, + ReplaceIfPresentFlag replaceIfPresentFlag, DataCoding dataCoding, + byte smDefaultMsgId, byte[] shortMessage, + OptionalParameter... optionalParameters) throws PDUException, + ResponseTimeoutException, InvalidResponseException, + NegativeResponseException, IOException { + + ensureTransmittable("submitMultiple"); + + SubmitMultiCommandTask task = new SubmitMultiCommandTask(pduSender(), + serviceType, sourceAddrTon, sourceAddrNpi, sourceAddr, + destinationAddresses, esmClass, protocolId, priorityFlag, + scheduleDeliveryTime, validityPeriod, registeredDelivery, + replaceIfPresentFlag, dataCoding, smDefaultMsgId, shortMessage, + optionalParameters); + + SubmitMultiResp resp = (SubmitMultiResp) executeSendCommand(task, + getTransactionTimer()); + + return new SubmitMultiResult(resp.getMessageId(), resp.getUnsuccessSmes(), resp.getOptionalParameters()); + } + + /* (non-Javadoc) + * @see org.jsmpp.session.ClientSession#queryShortMessage(java.lang.String, org.jsmpp.bean.TypeOfNumber, org.jsmpp.bean.NumberingPlanIndicator, java.lang.String) + */ + @Override + public QuerySmResult queryShortMessage(String messageId, + TypeOfNumber sourceAddrTon, NumberingPlanIndicator sourceAddrNpi, + String sourceAddr) throws PDUException, ResponseTimeoutException, + InvalidResponseException, NegativeResponseException, IOException { + + ensureTransmittable("queryShortMessage"); + + QuerySmCommandTask task = new QuerySmCommandTask(pduSender(), + messageId, sourceAddrTon, sourceAddrNpi, sourceAddr); + + QuerySmResp resp = (QuerySmResp) executeSendCommand(task, + getTransactionTimer()); + + if (resp.getMessageId().equals(messageId)) { + return new QuerySmResult(resp.getFinalDate(), resp + .getMessageState(), resp.getErrorCode()); + } else { + // message id requested not the same as the returned + throw new InvalidResponseException( + "Requested message_id doesn't match with the result"); + } + } + + /* (non-Javadoc) + * @see org.jsmpp.session.ClientSession#replaceShortMessage(java.lang.String, org.jsmpp.bean.TypeOfNumber, org.jsmpp.bean.NumberingPlanIndicator, java.lang.String, java.lang.String, java.lang.String, org.jsmpp.bean.RegisteredDelivery, byte, byte[]) + */ + @Override + public void replaceShortMessage(String messageId, + TypeOfNumber sourceAddrTon, NumberingPlanIndicator sourceAddrNpi, + String sourceAddr, String scheduleDeliveryTime, + String validityPeriod, RegisteredDelivery registeredDelivery, + byte smDefaultMsgId, byte[] shortMessage) throws PDUException, + ResponseTimeoutException, InvalidResponseException, NegativeResponseException, IOException { + + ensureTransmittable("replaceShortMessage"); + + ReplaceSmCommandTask replaceSmTask = new ReplaceSmCommandTask( + pduSender(), messageId, sourceAddrTon, sourceAddrNpi, + sourceAddr, scheduleDeliveryTime, validityPeriod, + registeredDelivery, smDefaultMsgId, shortMessage); + + executeSendCommand(replaceSmTask, getTransactionTimer()); + } + + /* (non-Javadoc) + * @see org.jsmpp.session.ClientSession#cancelShortMessage(java.lang.String, java.lang.String, org.jsmpp.bean.TypeOfNumber, org.jsmpp.bean.NumberingPlanIndicator, java.lang.String, org.jsmpp.bean.TypeOfNumber, org.jsmpp.bean.NumberingPlanIndicator, java.lang.String) + */ + @Override + public void cancelShortMessage(String serviceType, String messageId, + TypeOfNumber sourceAddrTon, NumberingPlanIndicator sourceAddrNpi, + String sourceAddr, TypeOfNumber destAddrTon, + NumberingPlanIndicator destAddrNpi, String destinationAddress) + throws PDUException, ResponseTimeoutException, + InvalidResponseException, NegativeResponseException, IOException { + + ensureTransmittable("cancelShortMessage"); + + CancelSmCommandTask task = new CancelSmCommandTask(pduSender(), + serviceType, messageId, sourceAddrTon, sourceAddrNpi, + sourceAddr, destAddrTon, destAddrNpi, destinationAddress); + + executeSendCommand(task, getTransactionTimer()); + } + + @Override + public MessageReceiverListener getMessageReceiverListener() { + return messageReceiverListener; + } + + @Override + public void setMessageReceiverListener( + MessageReceiverListener messageReceiverListener) { + this.messageReceiverListener = messageReceiverListener; + } + + @Override + protected Connection connection() { + return conn; + } + + @Override + protected AbstractSessionContext sessionContext() { + return sessionContext; + } + + @Override + protected GenericMessageReceiverListener messageReceiverListener() { + return messageReceiverListener; + } + + @Override + protected void finalize() throws Throwable { + close(); + super.finalize(); + } + + private void fireAcceptDeliverSm(DeliverSm deliverSm) throws ProcessRequestException { + if (messageReceiverListener != null) { + messageReceiverListener.onAcceptDeliverSm(deliverSm); + } else { + log.warn("Received deliver_sm but message receiver listener is null. Short message = {}", + new String(deliverSm.getShortMessage(), StandardCharsets.ISO_8859_1)); + throw new ProcessRequestException("No message receiver listener registered", SMPPConstant.STAT_ESME_RX_T_APPN); + } + } - /* (non-Javadoc) - * @see org.jsmpp.session.ClientSession#submitMultiple(java.lang.String, org.jsmpp.bean.TypeOfNumber, org.jsmpp.bean.NumberingPlanIndicator, java.lang.String, org.jsmpp.bean.Address[], org.jsmpp.bean.ESMClass, byte, byte, java.lang.String, java.lang.String, org.jsmpp.bean.RegisteredDelivery, org.jsmpp.bean.ReplaceIfPresentFlag, org.jsmpp.bean.DataCoding, byte, byte[], org.jsmpp.bean.OptionalParameter[]) - */ - @Override - public SubmitMultiResult submitMultiple(String serviceType, - TypeOfNumber sourceAddrTon, NumberingPlanIndicator sourceAddrNpi, - String sourceAddr, Address[] destinationAddresses, - ESMClass esmClass, byte protocolId, byte priorityFlag, - String scheduleDeliveryTime, String validityPeriod, - RegisteredDelivery registeredDelivery, - ReplaceIfPresentFlag replaceIfPresentFlag, DataCoding dataCoding, - byte smDefaultMsgId, byte[] shortMessage, - OptionalParameter... optionalParameters) throws PDUException, - ResponseTimeoutException, InvalidResponseException, - NegativeResponseException, IOException { - - ensureTransmittable("submitMultiple"); - - SubmitMultiCommandTask task = new SubmitMultiCommandTask(pduSender(), - serviceType, sourceAddrTon, sourceAddrNpi, sourceAddr, - destinationAddresses, esmClass, protocolId, priorityFlag, - scheduleDeliveryTime, validityPeriod, registeredDelivery, - replaceIfPresentFlag, dataCoding, smDefaultMsgId, shortMessage, - optionalParameters); - - SubmitMultiResp resp = (SubmitMultiResp)executeSendCommand(task, - getTransactionTimer()); - - return new SubmitMultiResult(resp.getMessageId(), resp - .getUnsuccessSmes()); + private void fireAcceptAlertNotification(AlertNotification alertNotification) { + if (messageReceiverListener != null) { + messageReceiverListener.onAcceptAlertNotification(alertNotification); + } else { + log.warn(MESSAGE_RECEIVER_LISTENER_IS_NULL, "alert_notification"); + } + } + + private class ResponseHandlerImpl implements ResponseHandler { + + @Override + public void processDeliverSm(DeliverSm deliverSm) throws ProcessRequestException { + try { + fireAcceptDeliverSm(deliverSm); + } catch (ProcessRequestException e) { + throw e; + } catch (Exception e) { + String msg = "Invalid runtime exception thrown when processing deliver_sm"; + log.error(msg, e); + throw new ProcessRequestException(msg, SMPPConstant.STAT_ESME_RX_T_APPN); + } } - /* (non-Javadoc) - * @see org.jsmpp.session.ClientSession#queryShortMessage(java.lang.String, org.jsmpp.bean.TypeOfNumber, org.jsmpp.bean.NumberingPlanIndicator, java.lang.String) - */ - @Override - public QuerySmResult queryShortMessage(String messageId, - TypeOfNumber sourceAddrTon, NumberingPlanIndicator sourceAddrNpi, - String sourceAddr) throws PDUException, ResponseTimeoutException, - InvalidResponseException, NegativeResponseException, IOException { - - ensureTransmittable("queryShortMessage"); - - QuerySmCommandTask task = new QuerySmCommandTask(pduSender(), - messageId, sourceAddrTon, sourceAddrNpi, sourceAddr); - - QuerySmResp resp = (QuerySmResp)executeSendCommand(task, - getTransactionTimer()); - - if (resp.getMessageId().equals(messageId)) { - return new QuerySmResult(resp.getFinalDate(), resp - .getMessageState(), resp.getErrorCode()); - } else { - // message id requested not the same as the returned - throw new InvalidResponseException( - "Requested message_id doesn't match with the result"); - } + @Override + public DataSmResult processDataSm(DataSm dataSm) + throws ProcessRequestException { + try { + return fireAcceptDataSm(dataSm); + } catch (ProcessRequestException e) { + throw e; + } catch (Exception e) { + String msg = "Invalid runtime exception thrown when processing data_sm"; + log.error(msg, e); + throw new ProcessRequestException(msg, SMPPConstant.STAT_ESME_RX_T_APPN); + } } - /* (non-Javadoc) - * @see org.jsmpp.session.ClientSession#replaceShortMessage(java.lang.String, org.jsmpp.bean.TypeOfNumber, org.jsmpp.bean.NumberingPlanIndicator, java.lang.String, java.lang.String, java.lang.String, org.jsmpp.bean.RegisteredDelivery, byte, byte[]) - */ - @Override - public void replaceShortMessage(String messageId, - TypeOfNumber sourceAddrTon, NumberingPlanIndicator sourceAddrNpi, - String sourceAddr, String scheduleDeliveryTime, - String validityPeriod, RegisteredDelivery registeredDelivery, - byte smDefaultMsgId, byte[] shortMessage) throws PDUException, - ResponseTimeoutException, InvalidResponseException, - NegativeResponseException, IOException { - - ensureTransmittable("replaceShortMessage"); - - ReplaceSmCommandTask replaceSmTask = new ReplaceSmCommandTask( - pduSender(), messageId, sourceAddrTon, sourceAddrNpi, - sourceAddr, scheduleDeliveryTime, validityPeriod, - registeredDelivery, smDefaultMsgId, shortMessage); - - executeSendCommand(replaceSmTask, getTransactionTimer()); + @Override + public void processAlertNotification(AlertNotification alertNotification) { + try { + fireAcceptAlertNotification(alertNotification); + } catch (Exception e) { + log.error("Invalid runtime exception thrown when processing alert_notification", e); + } } - /* (non-Javadoc) - * @see org.jsmpp.session.ClientSession#cancelShortMessage(java.lang.String, java.lang.String, org.jsmpp.bean.TypeOfNumber, org.jsmpp.bean.NumberingPlanIndicator, java.lang.String, org.jsmpp.bean.TypeOfNumber, org.jsmpp.bean.NumberingPlanIndicator, java.lang.String) - */ - @Override - public void cancelShortMessage(String serviceType, String messageId, - TypeOfNumber sourceAddrTon, NumberingPlanIndicator sourceAddrNpi, - String sourceAddr, TypeOfNumber destAddrTon, - NumberingPlanIndicator destAddrNpi, String destinationAddress) - throws PDUException, ResponseTimeoutException, - InvalidResponseException, NegativeResponseException, IOException { + @Override + public void sendDataSmResp(DataSmResult dataSmResult, int sequenceNumber) + throws IOException { + try { + pduSender().sendDataSmResp(out, sequenceNumber, + dataSmResult.getMessageId(), + dataSmResult.getOptionalParameters()); + } catch (PDUStringException e) { + /* + * There should be no PDUStringException thrown since creation + * of MessageId should be safe. + */ + log.error("Failed sending data_sm_resp", e); + } + } - ensureTransmittable("cancelShortMessage"); + @Override + public PendingResponse removeSentItem(int sequenceNumber) { + return removePendingResponse(sequenceNumber); + } - CancelSmCommandTask task = new CancelSmCommandTask(pduSender(), - serviceType, messageId, sourceAddrTon, sourceAddrNpi, - sourceAddr, destAddrTon, destAddrNpi, destinationAddress); + @Override + public void notifyUnbonded() { + sessionContext.unbound(); + } - executeSendCommand(task, getTransactionTimer()); + @Override + public void sendDeliverSmResp(int commandStatus, int sequenceNumber, String messageId) throws IOException { + log.debug("Sending deliver_sm_resp with sequence_number {}", sequenceNumber); + pduSender().sendDeliverSmResp(out, commandStatus, sequenceNumber, messageId); } - @Override - public MessageReceiverListener getMessageReceiverListener() { - return messageReceiverListener; + @Override + public void sendEnquireLinkResp(int sequenceNumber) throws IOException { + pduSender().sendEnquireLinkResp(out, sequenceNumber); } - @Override - public void setMessageReceiverListener( - MessageReceiverListener messageReceiverListener) { - this.messageReceiverListener = messageReceiverListener; - } - - @Override - protected Connection connection() { - return conn; - } - - @Override - protected AbstractSessionContext sessionContext() { - return sessionContext; - } - - @Override - protected GenericMessageReceiverListener messageReceiverListener() { - return messageReceiverListener; - } - - @Override - protected void finalize() throws Throwable { - close(); - super.finalize(); - } - - private void fireAcceptDeliverSm(DeliverSm deliverSm) throws ProcessRequestException { - if (messageReceiverListener != null) { - messageReceiverListener.onAcceptDeliverSm(deliverSm); - } else { - log.warn("Received deliver_sm but message receiver listener is null. Short message = {}", - new String(deliverSm.getShortMessage(), StandardCharsets.ISO_8859_1)); - throw new ProcessRequestException("No message receiver listener registered", SMPPConstant.STAT_ESME_RX_T_APPN); - } - } - - private void fireAcceptAlertNotification(AlertNotification alertNotification) { - if (messageReceiverListener != null) { - messageReceiverListener.onAcceptAlertNotification(alertNotification); - } else { - log.warn(MESSAGE_RECEIVER_LISTENER_IS_NULL, "alert_notification"); - } - } - - private class ResponseHandlerImpl implements ResponseHandler { - - @Override - public void processDeliverSm(DeliverSm deliverSm) throws ProcessRequestException { - try { - fireAcceptDeliverSm(deliverSm); - } catch(ProcessRequestException e) { - throw e; - } catch(Exception e) { - String msg = "Invalid runtime exception thrown when processing deliver_sm"; - log.error(msg, e); - throw new ProcessRequestException(msg, SMPPConstant.STAT_ESME_RX_T_APPN); - } - } - - @Override - public DataSmResult processDataSm(DataSm dataSm) - throws ProcessRequestException { - try { - return fireAcceptDataSm(dataSm); - } catch(ProcessRequestException e) { - throw e; - } catch(Exception e) { - String msg = "Invalid runtime exception thrown when processing data_sm"; - log.error(msg, e); - throw new ProcessRequestException(msg, SMPPConstant.STAT_ESME_RX_T_APPN); - } - } - - @Override - public void processAlertNotification(AlertNotification alertNotification) { - try { - fireAcceptAlertNotification(alertNotification); - } catch(Exception e) { - log.error("Invalid runtime exception thrown when processing alert_notification", e); - } - } - - @Override - public void sendDataSmResp(DataSmResult dataSmResult, int sequenceNumber) - throws IOException { - try { - pduSender().sendDataSmResp(out, sequenceNumber, - dataSmResult.getMessageId(), - dataSmResult.getOptionalParameters()); - } catch (PDUStringException e) { - /* - * There should be no PDUStringException thrown since creation - * of MessageId should be safe. - */ - log.error("Failed sending data_sm_resp", e); - } - } - - @Override - public PendingResponse removeSentItem(int sequenceNumber) { - return removePendingResponse(sequenceNumber); - } - - @Override - public void notifyUnbonded() { - sessionContext.unbound(); - } - - @Override - public void sendDeliverSmResp(int commandStatus, int sequenceNumber, String messageId) throws IOException { - log.debug("Sending deliver_sm_resp with sequence_number {}", sequenceNumber); - pduSender().sendDeliverSmResp(out, commandStatus, sequenceNumber, messageId); - } - - @Override - public void sendEnquireLinkResp(int sequenceNumber) throws IOException { - pduSender().sendEnquireLinkResp(out, sequenceNumber); - } - - @Override - public void sendGenerickNack(int commandStatus, int sequenceNumber) throws IOException { - pduSender().sendGenericNack(out, commandStatus, sequenceNumber); - } - - @Override - public void sendNegativeResponse(int originalCommandId, int commandStatus, int sequenceNumber) throws IOException { - pduSender().sendHeader(out, originalCommandId | SMPPConstant.MASK_CID_RESP, commandStatus, sequenceNumber); - } - - @Override - public void sendUnbindResp(int sequenceNumber) throws IOException { - pduSender().sendUnbindResp(out, SMPPConstant.STAT_ESME_ROK, sequenceNumber); - } - } - - /** - * Worker to read the PDU. - * - * @author uudashr - * - */ - private class PDUReaderWorker extends Thread { - // start with serial execution of pdu processing, when the session is bound the pool will be enlarged up to the PduProcessorDegree - private ThreadPoolExecutor pduExecutor; - private Runnable onIOExceptionTask = new Runnable() { - @Override - public void run() { - close(); - } - }; - - private PDUReaderWorker(final int queueCapacity) { - super("PDUReaderWorker-" + getSessionId()); - pduExecutor = new ThreadPoolExecutor(1, 1, - 0L, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue(queueCapacity), new RejectedExecutionHandler() { + @Override + public void sendGenerickNack(int commandStatus, int sequenceNumber) throws IOException { + pduSender().sendGenericNack(out, commandStatus, sequenceNumber); + } + + @Override + public void sendNegativeResponse(int originalCommandId, int commandStatus, int sequenceNumber) throws IOException { + pduSender().sendHeader(out, originalCommandId | SMPPConstant.MASK_CID_RESP, commandStatus, sequenceNumber); + } + + @Override + public void sendUnbindResp(int sequenceNumber) throws IOException { + pduSender().sendUnbindResp(out, SMPPConstant.STAT_ESME_ROK, sequenceNumber); + } + } + + /** + * Worker to read the PDU. + * + * @author uudashr + */ + private class PDUReaderWorker extends Thread { + // start with serial execution of pdu processing, when the session is bound the pool will be enlarged up to the PduProcessorDegree + private ThreadPoolExecutor pduExecutor; + private LinkedBlockingQueue workQueue; + private int queueCapacity; + private Runnable onIOExceptionTask = new Runnable() { + @Override + public void run() { + close(); + } + }; + + private PDUReaderWorker(final int queueCapacity) { + super("PDUReaderWorker-" + getSessionId()); + this.queueCapacity = queueCapacity; + workQueue = new LinkedBlockingQueue<>(queueCapacity); + pduExecutor = new ThreadPoolExecutor(1, 1, + 0L, TimeUnit.MILLISECONDS, workQueue, new RejectedExecutionHandler() { @Override public void rejectedExecution(final Runnable runnable, final ThreadPoolExecutor executor) { - log.info("Receiving queue is full, please increasing queue capacity, and/or let other side obey the window size"); - Command pduHeader = ((PDUProcessTask)runnable).getPduHeader(); - if ((pduHeader.getCommandId() & SMPPConstant.MASK_CID_RESP) == SMPPConstant.MASK_CID_RESP) { - try { - boolean success = executor.getQueue().offer(runnable, 60000, TimeUnit.MILLISECONDS); - if (!success){ - log.warn("Offer to queue failed for {}", pduHeader); - } - } - catch (InterruptedException e){ - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } - } else { - throw new QueueMaxException("Queue capacity " + queueCapacity + " exceeded"); - } + log.info("Receiving queue is full, please increasing receive queue capacity, and/or let other side obey the window size"); + Command pduHeader = ((PDUProcessTask) runnable).getPduHeader(); + if ((pduHeader.getCommandId() & SMPPConstant.MASK_CID_RESP) == SMPPConstant.MASK_CID_RESP) { + try { + boolean success = executor.getQueue().offer(runnable, 60000, TimeUnit.MILLISECONDS); + if (!success) { + log.warn("Offer to receive queue failed for {}", pduHeader); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } else { + throw new QueueMaxException("Receiving queue capacity " + queueCapacity + " exceeded"); + } } }); } - - @Override - public void run() { - log.info("Starting PDUReaderWorker"); - while (isReadPdu()) { - readPDU(); - } - close(); + + @Override + public void run() { + log.info("Starting PDUReaderWorker"); + while (isReadPdu()) { + readPDU(); + } + close(); pduExecutor.shutdown(); - try { + try { pduExecutor.awaitTermination(getTransactionTimer(), TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - log.warn("Interrupted while waiting for PDU executor pool to finish"); - Thread.currentThread().interrupt(); - } - log.debug("{} stopped", this.getName()); - } - - private void readPDU() { - Command pduHeader = null; - try { - pduHeader = pduReader.readPDUHeader(in); - byte[] pdu = pduReader.readPDU(in, pduHeader); - log.info("Received PDU in session {} in state {}: {}", getSessionId(), getSessionState(), HexUtil.convertBytesToHexString(pdu)); - - /* - * When the processing PDU is need user interaction via event, - * the code on event might take non-short time, so we need to - * process it concurrently. - */ - PDUProcessTask task = new PDUProcessTask(pduHeader, pdu, - sessionContext, responseHandler, - sessionContext, onIOExceptionTask); - pduExecutor.execute(task); - } catch (QueueMaxException e) { - log.info("Notify other side to throttle: {} ({} threads active)", e.getMessage(), pduExecutor.getActiveCount()); - try { - responseHandler.sendNegativeResponse(pduHeader.getCommandId(), SMPPConstant.STAT_ESME_RTHROTTLED, pduHeader.getSequenceNumber()); - } catch (IOException ioe) { - log.warn("Failed sending negative resp: {}", ioe.getMessage()); - close(); - } - } catch (InvalidCommandLengthException e) { - log.warn("Received invalid command length: {}", e.getMessage()); - try { - pduSender().sendGenericNack(out, SMPPConstant.STAT_ESME_RINVCMDLEN, 0); - } catch (IOException ee) { - log.warn("Failed sending generic_nack", ee); - } - unbindAndClose(); - } catch (SocketTimeoutException e) { - notifyNoActivity(); - } catch (IOException e) { - log.info("Reading PDU session {} in state {}: {}", getSessionId(), getSessionState(), e.getMessage()); - close(); - } catch (RuntimeException e) { - log.warn("Runtime error while reading PDU", e); - close(); - } - } - - /** - * Notify for no activity. - */ - private void notifyNoActivity() { - SessionState sessionState = sessionContext().getSessionState(); - if ((getInterfaceVersion().compareTo(InterfaceVersion.IF_34) > 0 && sessionState.isNotClosed()) || - sessionState.isBound()) { - log.trace("No activity notified, sending enquire_link"); - enquireLinkSender.enquireLink(); - } - } - } - - /** - * Session state listener for internal class use. - * - * @author uudashr - * - */ - private class BoundSessionStateListener implements SessionStateListener { - @Override - public void onStateChange(SessionState newState, SessionState oldState, - Session source) { - /* - * We need to set SO_TIMEOUT to sessionTimer so when timeout occur, - * a SocketTimeoutException will be raised. When Exception raised we - * can send an enquireLinkCommand. - */ - if (newState.equals(SessionState.OPEN)) { - try { - conn.setSoTimeout(getEnquireLinkTimer()); - } catch (IOException e) { - log.error("Failed setting so_timeout for session timer", e); - } - } - if (newState.isBound()) { - log.debug("Changing processor degree to {}", getPduProcessorDegree()); - pduReaderWorker.pduExecutor.setMaximumPoolSize(getPduProcessorDegree()); - pduReaderWorker.pduExecutor.setCorePoolSize(getPduProcessorDegree()); - } - } - } + } catch (InterruptedException e) { + log.warn("Interrupted while waiting for PDU executor pool to finish"); + Thread.currentThread().interrupt(); + } + log.debug("{} stopped", this.getName()); + } + + private void readPDU() { + Command pduHeader = null; + try { + pduHeader = pduReader.readPDUHeader(in); + byte[] pdu = pduReader.readPDU(in, pduHeader); + log.debug("Received PDU in session {} in state {}: {}", getSessionId(), getSessionState(), HexUtil.convertBytesToHexString(pdu)); + + /* + * When the processing PDU is need user interaction via event, + * the code on event might take non-short time, so we need to + * process it concurrently. + */ + PDUProcessTask task = new PDUProcessTask(pduHeader, pdu, + sessionContext, responseHandler, + sessionContext, onIOExceptionTask); + pduExecutor.execute(task); + } catch (QueueMaxException e) { + log.info("Notify other side to throttle: {} ({} threads active)", e.getMessage(), pduExecutor.getActiveCount()); + try { + responseHandler.sendNegativeResponse(pduHeader.getCommandId(), SMPPConstant.STAT_ESME_RTHROTTLED, pduHeader.getSequenceNumber()); + } catch (IOException ioe) { + log.warn("Failed sending negative resp: {}", ioe.getMessage()); + close(); + } + } catch (InvalidCommandLengthException e) { + log.warn("Received invalid command length: {}", e.getMessage()); + try { + pduSender().sendGenericNack(out, SMPPConstant.STAT_ESME_RINVCMDLEN, 0); + } catch (IOException ee) { + log.warn("Failed sending generic_nack", ee); + } + unbindAndClose(); + } catch (SocketTimeoutException e) { + notifyNoActivity(); + } catch (IOException e) { + log.info("Reading PDU session {} in state {}: {}", getSessionId(), getSessionState(), e.getMessage()); + close(); + } catch (RuntimeException e) { + log.warn("Runtime error while reading PDU", e); + close(); + } + } + + /** + * Notify for no activity. + */ + private void notifyNoActivity() { + SessionState sessionState = sessionContext().getSessionState(); + if ((getInterfaceVersion().compareTo(InterfaceVersion.IF_34) > 0 && sessionState.isNotClosed()) || + sessionState.isBound()) { + log.trace("No activity notified, sending enquire_link"); + enquireLinkSender.enquireLink(); + } + } + + /* + * Return an integer between 0 (Idle) and 100 (Congested/Maximum Load). Only used for SMPP 5.0. + */ + public int getCongestionRatio() { + return ((80 * pduExecutor.getActiveCount()) / pduExecutor.getMaximumPoolSize()) + + ((20 * workQueue.size()) / queueCapacity); + } + } + + /** + * Session state listener for internal class use. + * + * @author uudashr + */ + private class BoundSessionStateListener implements SessionStateListener { + @Override + public void onStateChange(SessionState newState, SessionState oldState, + Session source) { + /* + * We need to set SO_TIMEOUT to sessionTimer so when timeout occur, + * a SocketTimeoutException will be raised. When Exception raised we + * can send an enquireLinkCommand. + */ + if (newState.equals(SessionState.OPEN)) { + try { + conn.setSoTimeout(getEnquireLinkTimer()); + } catch (IOException e) { + log.error("Failed setting so_timeout for session timer", e); + } + } + if (newState.isBound()) { + int pduProcessorDegree = getPduProcessorDegree(); + log.debug("Changing processor degree to {}", pduProcessorDegree); + pduReaderWorker.pduExecutor.setMaximumPoolSize(pduProcessorDegree); + pduReaderWorker.pduExecutor.setCorePoolSize(pduProcessorDegree); + } + } + } } diff --git a/jsmpp/src/main/java/org/jsmpp/session/ServerMessageReceiverListener.java b/jsmpp/src/main/java/org/jsmpp/session/ServerMessageReceiverListener.java index 5cbf92df..ba9aa652 100644 --- a/jsmpp/src/main/java/org/jsmpp/session/ServerMessageReceiverListener.java +++ b/jsmpp/src/main/java/org/jsmpp/session/ServerMessageReceiverListener.java @@ -21,7 +21,6 @@ import org.jsmpp.bean.QuerySm; import org.jsmpp.bean.ReplaceSm; import org.jsmpp.bean.SubmitMulti; -import org.jsmpp.bean.SubmitMultiResult; import org.jsmpp.bean.SubmitSm; import org.jsmpp.extra.ProcessRequestException; diff --git a/jsmpp/src/main/java/org/jsmpp/session/ServerResponseDeliveryAdapter.java b/jsmpp/src/main/java/org/jsmpp/session/ServerResponseDeliveryAdapter.java index fc2c9c8c..a47a7cbf 100644 --- a/jsmpp/src/main/java/org/jsmpp/session/ServerResponseDeliveryAdapter.java +++ b/jsmpp/src/main/java/org/jsmpp/session/ServerResponseDeliveryAdapter.java @@ -14,8 +14,6 @@ */ package org.jsmpp.session; -import org.jsmpp.bean.SubmitMultiResult; - /** * It's abstract adapter class that receive event of response delivery, an * implementation of {@link ServerResponseDeliveryListener}. @@ -33,7 +31,7 @@ public abstract class ServerResponseDeliveryAdapter implements * @see org.jsmpp.session.ServerResponseDeliveryListener#onSubmitSmResponseSent(org.jsmpp.util.MessageId, org.jsmpp.session.SMPPServerSession) */ public void onSubmitSmRespSent(SubmitSmResult submitSmResult, - SMPPServerSession source) {} + SMPPServerSession source) {} /* (non-Javadoc) * @see org.jsmpp.session.ServerResponseDeliveryListener#onSubmitSmResponseError(org.jsmpp.util.MessageId, java.lang.Exception, org.jsmpp.session.SMPPServerSession) @@ -42,13 +40,13 @@ public void onSubmitSmRespError(SubmitSmResult submitSmResult, Exception e, SMPPServerSession source) {} /* (non-Javadoc) - * @see org.jsmpp.session.ServerResponseDeliveryListener#onSubmitMultiResponseSent(org.jsmpp.bean.SubmitMultiResult, org.jsmpp.session.SMPPServerSession) + * @see org.jsmpp.session.ServerResponseDeliveryListener#onSubmitMultiResponseSent(org.jsmpp.session.SubmitMultiResult, org.jsmpp.session.SMPPServerSession) */ public void onSubmitMultiRespSent(SubmitMultiResult submitMultiResult, SMPPServerSession source) {} /* (non-Javadoc) - * @see org.jsmpp.session.ServerResponseDeliveryListener#onSubmitMultiResposnseError(org.jsmpp.bean.SubmitMultiResult, java.lang.Exception, org.jsmpp.session.SMPPServerSession) + * @see org.jsmpp.session.ServerResponseDeliveryListener#onSubmitMultiResposnseError(org.jsmpp.session.SubmitMultiResult, java.lang.Exception, org.jsmpp.session.SMPPServerSession) */ public void onSubmitMultiRespError( SubmitMultiResult submitMultiResult, Exception e, diff --git a/jsmpp/src/main/java/org/jsmpp/session/ServerResponseDeliveryListener.java b/jsmpp/src/main/java/org/jsmpp/session/ServerResponseDeliveryListener.java index 7c37f023..66e55707 100644 --- a/jsmpp/src/main/java/org/jsmpp/session/ServerResponseDeliveryListener.java +++ b/jsmpp/src/main/java/org/jsmpp/session/ServerResponseDeliveryListener.java @@ -14,8 +14,6 @@ */ package org.jsmpp.session; -import org.jsmpp.bean.SubmitMultiResult; - /** * This is listener will be used by {@link SMPPServerSession} to notify a user * when a response has been sent. diff --git a/jsmpp/src/main/java/org/jsmpp/session/ServerResponseHandler.java b/jsmpp/src/main/java/org/jsmpp/session/ServerResponseHandler.java index 4f480b63..19fac971 100644 --- a/jsmpp/src/main/java/org/jsmpp/session/ServerResponseHandler.java +++ b/jsmpp/src/main/java/org/jsmpp/session/ServerResponseHandler.java @@ -25,61 +25,59 @@ import org.jsmpp.bean.QuerySm; import org.jsmpp.bean.ReplaceSm; import org.jsmpp.bean.SubmitMulti; -import org.jsmpp.bean.SubmitMultiResult; import org.jsmpp.bean.SubmitSm; import org.jsmpp.extra.ProcessRequestException; /** * @author uudashr - * */ public interface ServerResponseHandler extends GenericServerResponseHandler { - void sendSubmitSmResponse(SubmitSmResult submitSmResult, int sequenceNumber) - throws IOException; + void sendSubmitSmResponse(SubmitSmResult submitSmResult, int sequenceNumber) + throws IOException; + + void processBind(Bind bind); + + SubmitSmResult processSubmitSm(SubmitSm submitSm) + throws ProcessRequestException; + + SubmitMultiResult processSubmitMulti(SubmitMulti submitMulti) + throws ProcessRequestException; + + void sendSubmitMultiResponse(SubmitMultiResult submitMultiResult, + int sequenceNumber) throws IOException; + + QuerySmResult processQuerySm(QuerySm querySm) + throws ProcessRequestException; + + void sendQuerySmResp(String messageId, String finalDate, + MessageState messageState, byte errorCode, int sequenceNumber) + throws IOException; - void processBind(Bind bind); + void processCancelSm(CancelSm cancelSm) throws ProcessRequestException; - SubmitSmResult processSubmitSm(SubmitSm submitSm) - throws ProcessRequestException; - - SubmitMultiResult processSubmitMulti(SubmitMulti submitMulti) - throws ProcessRequestException; - - void sendSubmitMultiResponse(SubmitMultiResult submitMultiResult, - int sequenceNumber) throws IOException; - - QuerySmResult processQuerySm(QuerySm querySm) - throws ProcessRequestException; + void sendCancelSmResp(int sequenceNumber) throws IOException; - void sendQuerySmResp(String messageId, String finalDate, - MessageState messageState, byte errorCode, int sequenceNumber) - throws IOException; + void processReplaceSm(ReplaceSm replaceSm) throws ProcessRequestException; - void processCancelSm(CancelSm cancelSm) throws ProcessRequestException; - - void sendCancelSmResp(int sequenceNumber) throws IOException; - - void processReplaceSm(ReplaceSm replaceSm) throws ProcessRequestException; - - void sendReplaceSmResp(int sequenceNumber) throws IOException; + void sendReplaceSmResp(int sequenceNumber) throws IOException; - BroadcastSmResult processBroadcastSm(BroadcastSm broadcastSm) - throws ProcessRequestException; + BroadcastSmResult processBroadcastSm(BroadcastSm broadcastSm) + throws ProcessRequestException; - void sendBroadcastSmResp(BroadcastSmResult broadcastSmResult, int sequenceNumber) - throws IOException; + void sendBroadcastSmResp(BroadcastSmResult broadcastSmResult, int sequenceNumber) + throws IOException; - void processCancelBroadcastSm(CancelBroadcastSm cancelBroadcastSm) - throws ProcessRequestException; + void processCancelBroadcastSm(CancelBroadcastSm cancelBroadcastSm) + throws ProcessRequestException; - void sendCancelBroadcastSmResp(int sequenceNumber) - throws IOException; + void sendCancelBroadcastSmResp(int sequenceNumber) + throws IOException; - QueryBroadcastSmResult processQueryBroadcastSm(QueryBroadcastSm queryBroadcastSm) - throws ProcessRequestException; + QueryBroadcastSmResult processQueryBroadcastSm(QueryBroadcastSm queryBroadcastSm) + throws ProcessRequestException; - void sendQueryBroadcastSmResp(QueryBroadcastSmResult queryBroadcastSmResult, int sequenceNumber) - throws IOException; + void sendQueryBroadcastSmResp(QueryBroadcastSmResult queryBroadcastSmResult, int sequenceNumber) + throws IOException; } diff --git a/jsmpp/src/main/java/org/jsmpp/bean/SubmitMultiResult.java b/jsmpp/src/main/java/org/jsmpp/session/SubmitMultiResult.java similarity index 60% rename from jsmpp/src/main/java/org/jsmpp/bean/SubmitMultiResult.java rename to jsmpp/src/main/java/org/jsmpp/session/SubmitMultiResult.java index 782365a3..7fb49f55 100644 --- a/jsmpp/src/main/java/org/jsmpp/bean/SubmitMultiResult.java +++ b/jsmpp/src/main/java/org/jsmpp/session/SubmitMultiResult.java @@ -12,24 +12,31 @@ * limitations under the License. * */ -package org.jsmpp.bean; +package org.jsmpp.session; import java.util.Arrays; import java.util.Objects; +import org.jsmpp.bean.OptionalParameter; +import org.jsmpp.bean.UnsuccessDelivery; + /** * @author uudashr * */ public class SubmitMultiResult { - private String messageId; - private UnsuccessDelivery[] unsuccessDeliveries; + private final String messageId; + private final UnsuccessDelivery[] unsuccessDeliveries; + /* OptionalParameters were added in SMPP 5.0 */ + private final OptionalParameter[] optionalParameters; public SubmitMultiResult(String messageId, - UnsuccessDelivery... unsuccessDeliveries) { + UnsuccessDelivery[] unsuccessDeliveries, + OptionalParameter[] optionalParameters ) { this.messageId = messageId; this.unsuccessDeliveries = unsuccessDeliveries; + this.optionalParameters = optionalParameters; } public String getMessageId() { @@ -40,24 +47,28 @@ public UnsuccessDelivery[] getUnsuccessDeliveries() { return unsuccessDeliveries; } + public OptionalParameter[] getOptionalParameters() { + return optionalParameters; + } + @Override public boolean equals(final Object o) { if (this == o) { return true; } - if (o == null || getClass() != o.getClass()) { + if (!(o instanceof SubmitMultiResult)) { return false; } final SubmitMultiResult that = (SubmitMultiResult) o; - return Objects.equals(messageId, that.messageId) && - Arrays.equals(unsuccessDeliveries, that.unsuccessDeliveries); + return Objects.equals(messageId, that.messageId) && Arrays.equals(unsuccessDeliveries, + that.unsuccessDeliveries) && Arrays.equals(optionalParameters, that.optionalParameters); } @Override public int hashCode() { int result = Objects.hash(messageId); result = 31 * result + Arrays.hashCode(unsuccessDeliveries); + result = 31 * result + Arrays.hashCode(optionalParameters); return result; } - } diff --git a/jsmpp/src/main/java/org/jsmpp/session/state/SMPPServerSessionBoundTX.java b/jsmpp/src/main/java/org/jsmpp/session/state/SMPPServerSessionBoundTX.java index 60d17a16..336f2dc6 100644 --- a/jsmpp/src/main/java/org/jsmpp/session/state/SMPPServerSessionBoundTX.java +++ b/jsmpp/src/main/java/org/jsmpp/session/state/SMPPServerSessionBoundTX.java @@ -27,7 +27,7 @@ import org.jsmpp.bean.QuerySm; import org.jsmpp.bean.ReplaceSm; import org.jsmpp.bean.SubmitMulti; -import org.jsmpp.bean.SubmitMultiResult; +import org.jsmpp.session.SubmitMultiResult; import org.jsmpp.bean.SubmitSm; import org.jsmpp.extra.ProcessRequestException; import org.jsmpp.extra.SessionState; diff --git a/jsmpp/src/main/java/org/jsmpp/session/state/SMPPSessionBoundTX.java b/jsmpp/src/main/java/org/jsmpp/session/state/SMPPSessionBoundTX.java index de31844f..47189198 100644 --- a/jsmpp/src/main/java/org/jsmpp/session/state/SMPPSessionBoundTX.java +++ b/jsmpp/src/main/java/org/jsmpp/session/state/SMPPSessionBoundTX.java @@ -23,7 +23,6 @@ import org.jsmpp.bean.CancelSmResp; import org.jsmpp.bean.Command; import org.jsmpp.bean.InterfaceVersion; -import org.jsmpp.bean.OptionalParameter; import org.jsmpp.bean.QueryBroadcastSmResp; import org.jsmpp.bean.QuerySmResp; import org.jsmpp.bean.ReplaceSmResp; @@ -63,8 +62,6 @@ public void processSubmitSmResp(Command pduHeader, byte[] pdu, if (pendingResp != null) { try { SubmitSmResp resp = pduDecomposer.submitSmResp(pdu); - OptionalParameter.Congestion_state c = resp.getOptionalParameter(OptionalParameter.Congestion_state.class); - int cs = c.getValue() & 0xff; pendingResp.done(resp); } catch (PDUStringException e) { log.error("Failed decomposing submit_sm_resp", e); diff --git a/jsmpp/src/test/java/org/jsmpp/session/DummyResponseHandler.java b/jsmpp/src/test/java/org/jsmpp/session/DummyResponseHandler.java index 5a8028b4..ecea3fec 100644 --- a/jsmpp/src/test/java/org/jsmpp/session/DummyResponseHandler.java +++ b/jsmpp/src/test/java/org/jsmpp/session/DummyResponseHandler.java @@ -29,7 +29,6 @@ import org.jsmpp.bean.QuerySm; import org.jsmpp.bean.ReplaceSm; import org.jsmpp.bean.SubmitMulti; -import org.jsmpp.bean.SubmitMultiResult; import org.jsmpp.bean.SubmitSm; import org.jsmpp.extra.PendingResponse; import org.jsmpp.extra.ProcessRequestException; diff --git a/jsmpp/src/test/java/org/jsmpp/session/TestSmppServer.java b/jsmpp/src/test/java/org/jsmpp/session/TestSmppServer.java index 82373ba6..8e625d3a 100644 --- a/jsmpp/src/test/java/org/jsmpp/session/TestSmppServer.java +++ b/jsmpp/src/test/java/org/jsmpp/session/TestSmppServer.java @@ -49,9 +49,9 @@ import org.jsmpp.bean.RegisteredDelivery; import org.jsmpp.bean.ReplaceSm; import org.jsmpp.bean.SubmitMulti; -import org.jsmpp.bean.SubmitMultiResult; import org.jsmpp.bean.SubmitSm; import org.jsmpp.bean.TypeOfNumber; +import org.jsmpp.bean.UnsuccessDelivery; import org.jsmpp.extra.ProcessRequestException; import org.jsmpp.extra.SessionState; import org.jsmpp.util.AbsoluteTimeFormatter; @@ -183,7 +183,7 @@ public SubmitMultiResult onAcceptSubmitMulti(SubmitMulti submitMulti, MessageId messageId = messageIDGenerator.newMessageId(); log.info("Receiving submit_multi {}, and return message id {}", new String(submitMulti.getShortMessage()), messageId.getValue()); increment("submit_multi"); - return new SubmitMultiResult(messageId.getValue()); + return new SubmitMultiResult(messageId.getValue(), new UnsuccessDelivery[0], new OptionalParameter[0]); } @Override