Skip to content

Commit

Permalink
Idendation and code cleanup.
Browse files Browse the repository at this point in the history
  • Loading branch information
aadeetyeah committed Dec 31, 2022
1 parent 7ba42a6 commit db24a86
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ public class Transaction {

@Enumerated(value = EnumType.STRING)
private TransactionStatus transactionStatus;

private String purpose;

private Double amount;

@CreationTimestamp
private Date transactionDate;

private String sender;
private String receiver;
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@ public class TransactionController {
public String doTransaction(@RequestBody TransactionRequest transactionRequest){
//Sender
//Receiver
//text msg
//text msg(Purpose)
//amount

if(!transactionRequest.validate()){
throw new RuntimeException("Request is not valid");
}
String TransactionId = transactionService.doTransaction(transactionRequest);
return TransactionId;
return transactionService.doTransaction(transactionRequest);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,36 +61,39 @@ public void updateTransaction(String msg){
String receiver = (String) walletUpdate.get(CommonConstants.RECEIVER_ATTRIBUTE);
Double amount = (Double) walletUpdate.get(AMOUNT_ATTRIBUTE);

JSONObject receiverNotification = null;

//Send Notification to users of credit and debit
JSONObject senderNotification = new JSONObject();
senderNotification.put(EMAIL_ATTRIBUTE,sender);
senderNotification.put("isSender",true);
senderNotification.put(AMOUNT_ATTRIBUTE,amount);
senderNotification.put(TRANSACTION_ID_ATTRIBUTE,trxId);
senderNotification.put(TRANSACTION_STATUS_ATTRIBUTE,TransactionStatus.SUCCESS.name());

if(WALLET_UPDATE_SUCCESS_STATUS.equals(trxStatus)){

transactionRepository.updateTrx(trxId,TransactionStatus.SUCCESS);
receiverNotification = new JSONObject();

JSONObject receiverNotification = new JSONObject();
receiverNotification.put(EMAIL_ATTRIBUTE,receiver);
receiverNotification.put(AMOUNT_ATTRIBUTE,amount);
receiverNotification.put("isSender",false);
receiverNotification.put(TRANSACTION_ID_ATTRIBUTE,trxId);
receiverNotification.put(TRANSACTION_STATUS_ATTRIBUTE,TransactionStatus.SUCCESS.name());

//notify receiver
kafkaTemplate.send(TRANSACTION_COMPLETE_TOPIC,receiverNotification.toString());

}else{
transactionRepository.updateTrx(trxId,TransactionStatus.FAILED);

transactionRepository.updateTrx(trxId,TransactionStatus.FAILED);
senderNotification.put(TRANSACTION_STATUS_ATTRIBUTE,TransactionStatus.FAILED.name());
}

//Failed TRX notify to sender
//Success TRX notify to sender and receiver

//Send Notification to users of credit and debit
JSONObject senderNotification = new JSONObject();
senderNotification.put(EMAIL_ATTRIBUTE,sender);
senderNotification.put("isSender",true);
senderNotification.put(AMOUNT_ATTRIBUTE,amount);
senderNotification.put(TRANSACTION_ID_ATTRIBUTE,trxId);
senderNotification.put(TRANSACTION_STATUS_ATTRIBUTE,TransactionStatus.SUCCESS.name());

//notify sender
kafkaTemplate.send(TRANSACTION_COMPLETE_TOPIC,senderNotification.toString());

//notify receiver
kafkaTemplate.send(TRANSACTION_COMPLETE_TOPIC,receiverNotification.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,4 @@ public class Wallet {

@CreationTimestamp
private Date createdOn;

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
public class WalletConfig {

Properties getKafkaProps(){
Properties properties=new Properties();
Properties properties = new Properties();

//Producer Properties
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
Expand Down Expand Up @@ -43,8 +43,11 @@ ConsumerFactory<String,String> getConsumerFactory(){

@Bean
ConcurrentKafkaListenerContainerFactory<String,String> getConcurrentKafkaListenerFactory(){
ConcurrentKafkaListenerContainerFactory concurrentKafkaListenerContainerFactory=new ConcurrentKafkaListenerContainerFactory();
ConcurrentKafkaListenerContainerFactory concurrentKafkaListenerContainerFactory =
new ConcurrentKafkaListenerContainerFactory();

concurrentKafkaListenerContainerFactory.setConsumerFactory(getConsumerFactory());

return concurrentKafkaListenerContainerFactory;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,37 +49,35 @@ public void createWallet(String msg){

@KafkaListener(topics = {CommonConstants.TRANSACTION_CREATE_TOPIC},groupId="ewallet_grp")
public void updateWallet(String msg){
JSONParser jsonParser = new JSONParser();
JSONObject updateWalletRequest = null;
JSONParser jsonParser = new JSONParser();
JSONObject updateWalletRequest = null;

try {
updateWalletRequest = (JSONObject) jsonParser.parse(msg);
updateWalletRequest = (JSONObject) jsonParser.parse(msg);
}catch (ParseException parseException){
logger.warning("Empty or null transaction cannot update wallet.");
}

String trxId = (String) updateWalletRequest.get(CommonConstants.TRANSACTION_ID_ATTRIBUTE);
String sender = (String) updateWalletRequest.get(CommonConstants.SENDER_ATTRIBUTE);
String receiver = (String) updateWalletRequest.get(CommonConstants.RECEIVER_ATTRIBUTE);
Double amount = (Double) updateWalletRequest.get(CommonConstants.AMOUNT_ATTRIBUTE);
String trxId = (String) updateWalletRequest.get(CommonConstants.TRANSACTION_ID_ATTRIBUTE);
String sender = (String) updateWalletRequest.get(CommonConstants.SENDER_ATTRIBUTE);
String receiver = (String) updateWalletRequest.get(CommonConstants.RECEIVER_ATTRIBUTE);
Double amount = (Double) updateWalletRequest.get(CommonConstants.AMOUNT_ATTRIBUTE);

Wallet senderWallet = walletRepository.findByEmail(sender);

JSONObject trxStatusObj = new JSONObject();
Wallet senderWallet = walletRepository.findByEmail(sender);

JSONObject trxStatusObj = new JSONObject();
trxStatusObj.put(CommonConstants.TRANSACTION_ID_ATTRIBUTE,trxId);
trxStatusObj.put(CommonConstants.WALLET_UPDATE_STATUS_ATTRIBUTE,CommonConstants.WALLET_UPDATE_FAILED_STATUS);
trxStatusObj.put(CommonConstants.SENDER_ATTRIBUTE,sender);
trxStatusObj.put(CommonConstants.RECEIVER_ATTRIBUTE,receiver);
trxStatusObj.put(CommonConstants.AMOUNT_ATTRIBUTE,amount);

if(senderWallet!=null && senderWallet.getBalance()-amount>=0){
if(senderWallet != null && senderWallet.getBalance()-amount >= 0){
walletRepository.updateWallet(receiver,amount);
walletRepository.updateWallet(sender,0-amount);
trxStatusObj.put(CommonConstants.WALLET_UPDATE_STATUS_ATTRIBUTE,CommonConstants.WALLET_UPDATE_SUCCESS_STATUS);
}

kafkaTemplate.send(CommonConstants.WALLET_UPDATE_TOPIC,trxStatusObj.toString());

}
}

0 comments on commit db24a86

Please sign in to comment.