Skip to content

Commit

Permalink
Add MQTT Publisher
Browse files Browse the repository at this point in the history
  • Loading branch information
kimutansk committed Oct 27, 2015
1 parent aa0cbd4 commit 5246bb8
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 9 deletions.
25 changes: 25 additions & 0 deletions src/main/scala/com/github/kimutansk/awsiot/MqttPublisher.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.github.kimutansk.awsiot

import org.eclipse.paho.client.mqttv3.{MqttMessage, MqttConnectOptions, MqttClient}

/** MQTT Publish Test Class */
object MqttPublisher {
def main(args: Array[String]) {
// Connect Target
val brokerURI:String = "ssl://A3869B885YB2CX.iot.ap-northeast-1.amazonaws.com:8883"

// SocketFactoryGenerate
val socketFactory = SocketFactoryGenerator.generateFromFilePath("/etc/cert/rootCA.pem", "/etc/cert/cert.pem", "/etc/cert/private.pem", "password")

// MQTT Client generate
val client:MqttClient = new MqttClient(brokerURI, "mqtt-publisher")
client.setCallback(new PublishMqttCallback)
val options:MqttConnectOptions = new MqttConnectOptions()
options.setSocketFactory(socketFactory)
client.connect(options)


val message:MqttMessage = new MqttMessage("Test Message".getBytes("UTF-8"))
client.publish("test-topic", message)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.github.kimutansk.awsiot

import org.eclipse.paho.client.mqttv3.{IMqttDeliveryToken, MqttMessage, MqttCallback}

/** Publish MqttCallBack */
class PublishMqttCallback extends MqttCallback{
// Nop
override def deliveryComplete(iMqttDeliveryToken: IMqttDeliveryToken): Unit = ???

override def messageArrived(s: String, mqttMessage: MqttMessage): Unit = ???

override def connectionLost(throwable: Throwable): Unit = ???
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package com.github.kimutansk.awsiot

import java.io.{ByteArrayInputStream, InputStreamReader}
import java.nio.file.{Paths, Files}
import java.io.{ByteArrayInputStream, InputStream, InputStreamReader}
import java.nio.file.{Files, Paths}
import java.security.cert.{CertificateFactory, X509Certificate}
import java.security.{KeyPair, KeyStore, Security}
import java.security.cert.X509Certificate
import javax.net.ssl.{SSLContext, KeyManagerFactory, TrustManagerFactory, SSLSocketFactory}
import javax.net.ssl.{KeyManagerFactory, SSLContext, SSLSocketFactory, TrustManagerFactory}

import org.bouncycastle.cert.X509CertificateHolder
import org.bouncycastle.jce.provider.BouncyCastleProvider
import org.bouncycastle.openssl.PEMParser
import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter
import org.bouncycastle.openssl.{PEMKeyPair, PEMParser}

/** Factory for [[javax.net.ssl.SSLSocketFactory]] instances. */
object SocketFactoryGenerator {
Expand All @@ -25,23 +27,26 @@ object SocketFactoryGenerator {

// load Root CA certificate
val rootCaParser:PEMParser = new PEMParser(new InputStreamReader(new ByteArrayInputStream(Files.readAllBytes(Paths.get(rootCaFilePath)))))
val rootCaCert:X509Certificate = rootCaParser.readObject().asInstanceOf[X509Certificate]
val rootCaCertHolder:X509CertificateHolder = rootCaParser.readObject().asInstanceOf[X509CertificateHolder]
val rootCaCert:X509Certificate = convertToJavaCertificate(rootCaCertHolder)
rootCaParser.close()

// load Server certificate
val certParser:PEMParser = new PEMParser(new InputStreamReader(new ByteArrayInputStream(Files.readAllBytes(Paths.get(certFilePath)))))
val serverCert:X509Certificate = certParser.readObject.asInstanceOf[X509Certificate]
val serverCertHolder:X509CertificateHolder = certParser.readObject.asInstanceOf[X509CertificateHolder]
val serverCert:X509Certificate = convertToJavaCertificate(serverCertHolder)
certParser.close()

// load Private Key
val keyParser:PEMParser = new PEMParser(new InputStreamReader(new ByteArrayInputStream(Files.readAllBytes(Paths.get(keyFilePath)))))
val keyPair:KeyPair = keyParser.readObject.asInstanceOf[KeyPair]
val pemKeyPair:PEMKeyPair = keyParser.readObject.asInstanceOf[PEMKeyPair]
val keyPair:KeyPair = new JcaPEMKeyConverter().getKeyPair(pemKeyPair)
keyParser.close()

// Root CA certificate is used to authenticate server
val rootCAKeyStore:KeyStore = KeyStore.getInstance(KeyStore.getDefaultType())
rootCAKeyStore.load(null, null)
rootCAKeyStore.setCertificateEntry("ca-certificate", rootCaCert);
rootCAKeyStore.setCertificateEntry("ca-certificate", convertToJavaCertificate(rootCaCertHolder))
val tmf:TrustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm())
tmf.init(rootCAKeyStore);

Expand All @@ -59,4 +64,12 @@ object SocketFactoryGenerator {

context.getSocketFactory()
}


def convertToJavaCertificate(certificateHolder:X509CertificateHolder):X509Certificate = {
val is:InputStream = new ByteArrayInputStream(certificateHolder.toASN1Structure.getEncoded);
try {
CertificateFactory.getInstance("X.509").generateCertificate(is).asInstanceOf[X509Certificate]
} finally is.close()
}
}

0 comments on commit 5246bb8

Please sign in to comment.