forked from byzer-org/byzer-lang
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
4 changed files
with
351 additions
and
40 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,7 +19,6 @@ | |
package streaming.dsl.mmlib.algs | ||
|
||
import java.util.{Date, Properties} | ||
|
||
import javax.mail.internet.{InternetAddress, MimeMessage} | ||
import javax.mail.{Address, Message, Session, Transport} | ||
import org.apache.spark.ml.param.Param | ||
|
@@ -28,6 +27,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession} | |
import streaming.dsl.mmlib.SQLAlg | ||
import streaming.dsl.mmlib.algs.SQLSendMessage._ | ||
import streaming.dsl.mmlib.algs.param.BaseParams | ||
import tech.mlsql.common.form.{Extra, FormParams, KV, Select, Text} | ||
|
||
/** | ||
* Created by fchen on 2018/8/22. | ||
|
@@ -43,11 +43,12 @@ class SQLSendMessage(override val uid: String) extends SQLAlg with Functions wit | |
val from = params.getOrElse("from", null) | ||
val subject = params.getOrElse("subject", "no subject!") | ||
val smtpHost = params("smtpHost") | ||
val smtpPort = params.getOrElse("smtpPort", "-1") | ||
val method = params("method") | ||
|
||
require(to != null, "to is null!") | ||
require(smtpHost != null) | ||
val agent = new MailAgent(smtpHost) | ||
val agent = new MailAgent(smtpHost, smtpPort) | ||
if (df.count() == 0) { | ||
method.toUpperCase() match { | ||
case "MAIL" => | ||
|
@@ -84,18 +85,160 @@ class SQLSendMessage(override val uid: String) extends SQLAlg with Functions wit | |
train(df, path, params) | ||
} | ||
|
||
final val from: Param[String] = new Param[String](this, "from", | ||
"the sender") | ||
final val to: Param[String] = new Param[String](this, "to", | ||
"the target email") | ||
final val subject: Param[String] = new Param[String](this, "subject", | ||
"the title of email") | ||
final val smtpHost: Param[String] = new Param[String](this, "smtpHost", | ||
"email server") | ||
final val method: Param[String] = new Param[String](this, "method", | ||
"MAIL") | ||
final val content: Param[String] = new Param[String](this, "content", | ||
"the content of email if you do not configure input table") | ||
final val from: Param[String] = new Param[String] (this, "from", | ||
FormParams.toJson(Text( | ||
name = "from", | ||
value = "", | ||
extra = Extra( | ||
doc = | ||
""" | ||
| Required. the sender | ||
| e.g. from = "[email protected]" | ||
""", | ||
label = "the sender", | ||
options = Map( | ||
"valueType" -> "string", | ||
"defaultValue" -> "", | ||
"required" -> "true", | ||
"derivedType" -> "NONE" | ||
)), valueProvider = Option(() => { | ||
"" | ||
}) | ||
) | ||
) | ||
) | ||
|
||
final val to: Param[String] = new Param[String] (this, "to", | ||
FormParams.toJson(Text( | ||
name = "to", | ||
value = "", | ||
extra = Extra( | ||
doc = | ||
""" | ||
| Required. The target email | ||
""", | ||
label = "The target email", | ||
options = Map( | ||
"valueType" -> "string", | ||
"defaultValue" -> "", | ||
"required" -> "true", | ||
"derivedType" -> "NONE" | ||
)), valueProvider = Option(() => { | ||
"" | ||
}) | ||
) | ||
) | ||
) | ||
|
||
final val subject: Param[String] = new Param[String] (this, "subject", | ||
FormParams.toJson(Text( | ||
name = "subject", | ||
value = "", | ||
extra = Extra( | ||
doc = | ||
""" | ||
| The title of email | ||
""", | ||
label = "The title of email", | ||
options = Map( | ||
"valueType" -> "string", | ||
"defaultValue" -> "", | ||
"required" -> "false", | ||
"derivedType" -> "NONE" | ||
)), valueProvider = Option(() => { | ||
"" | ||
}) | ||
) | ||
) | ||
) | ||
|
||
final val smtpHost: Param[String] = new Param[String] (this, "smtpHost", | ||
FormParams.toJson(Text( | ||
name = "smtpHost", | ||
value = "", | ||
extra = Extra( | ||
doc = | ||
""" | ||
| Required. Server of email | ||
""", | ||
label = "Email Server", | ||
options = Map( | ||
"valueType" -> "string", | ||
"defaultValue" -> "", | ||
"required" -> "true", | ||
"derivedType" -> "NONE" | ||
)), valueProvider = Option(() => { | ||
"" | ||
}) | ||
) | ||
) | ||
) | ||
|
||
final val smtpPort: Param[String] = new Param[String](this, "smtpPort", | ||
FormParams.toJson(Text( | ||
name = "smtpPort", | ||
value = "", | ||
extra = Extra( | ||
doc = | ||
""" | ||
| Required. Server port of email | ||
""", | ||
label = "Email Server Port", | ||
options = Map( | ||
"valueType" -> "string", | ||
"defaultValue" -> "", | ||
"required" -> "false", | ||
"derivedType" -> "NONE" | ||
)), valueProvider = Option(() => { | ||
"" | ||
}) | ||
) | ||
) | ||
) | ||
|
||
final val method: Param[String] = new Param[String] (this, "method", | ||
FormParams.toJson(Select( | ||
name = "method", | ||
values = List(), | ||
extra = Extra( | ||
doc = | ||
""" | ||
| Required. Way of sending | ||
""", | ||
label = "Way of sending", | ||
options = Map( | ||
"valueType" -> "string", | ||
"defaultValue" -> "", | ||
"required" -> "true", | ||
"derivedType" -> "NONE" | ||
)), valueProvider = Option(() => { | ||
List(KV(Option("method"), Option("MAIL"))) | ||
}) | ||
) | ||
) | ||
) | ||
|
||
final val content: Param[String] = new Param[String] (this, "content", | ||
FormParams.toJson(Text( | ||
name = "content", | ||
value = "", | ||
extra = Extra( | ||
doc = | ||
""" | ||
| Required. The server of email | ||
""", | ||
label = "The email server", | ||
options = Map( | ||
"valueType" -> "string", | ||
"defaultValue" -> "", | ||
"required" -> "true", | ||
"derivedType" -> "NONE" | ||
)), valueProvider = Option(() => { | ||
"" | ||
}) | ||
) | ||
) | ||
) | ||
|
||
override def explainParams(sparkSession: SparkSession): DataFrame = _explainParams(sparkSession) | ||
} | ||
|
@@ -105,7 +248,7 @@ object SQLSendMessage { | |
|
||
} | ||
|
||
class MailAgent(smtpHost: String) { | ||
class MailAgent(smtpHost: String, smtpPort : String) { | ||
|
||
// throws MessagingException | ||
def sendMessage(to: String, | ||
|
@@ -131,7 +274,14 @@ class MailAgent(smtpHost: String) { | |
|
||
def createMessage: Message = { | ||
val properties = new Properties() | ||
properties.put("mail.host", smtpHost) | ||
properties.put("mail.localhost", smtpHost) | ||
properties.put("mail.smtp.host", smtpHost) | ||
properties.put("mail.smtp.localhost", smtpHost) | ||
properties.put("mail.smtp.auth", "false") | ||
properties.setProperty("mail.smtp.socketFactory.fallback", "false") | ||
properties.setProperty("mail.smtp.port", smtpPort) | ||
properties.setProperty("mail.smtp.socketFactory.port", smtpPort) | ||
val session = Session.getDefaultInstance(properties, null) | ||
return new MimeMessage(session) | ||
} | ||
|
Oops, something went wrong.