Skip to content

Commit

Permalink
101
Browse files Browse the repository at this point in the history
  • Loading branch information
injoyai committed Jun 27, 2024
1 parent b5d6057 commit b982606
Show file tree
Hide file tree
Showing 9 changed files with 324 additions and 9 deletions.
11 changes: 10 additions & 1 deletion dial/dial_mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (this *MQTTClient) Read(p []byte) (int, error) {

func (this *MQTTClient) ReadAck() (io.Acker, error) {
msg := <-this.ch
return msg, nil
return &Message{msg}, nil
}

func (this *MQTTClient) Write(p []byte) (int, error) {
Expand Down Expand Up @@ -191,3 +191,12 @@ type MQTTSubscribe struct {
Topic string
Qos uint8
}

type Message struct {
mqtt.Message
}

func (this *Message) Ack() error {
this.Message.Ack()
return nil
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/injoyai/base v1.0.12
github.com/injoyai/conv v1.1.5
github.com/injoyai/logs v1.0.7
github.com/rabbitmq/amqp091-go v1.10.0
go.bug.st/serial v1.5.0
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9
)
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ github.com/injoyai/logs v1.0.7 h1:pVBB8/rJ/nsnlSvxZoBfCfu+kL84q2pqEx5oN4qPG7I=
github.com/injoyai/logs v1.0.7/go.mod h1:CLchJCGhb39Obyrci816R+KMtbxZhgPs0FuikhyixK4=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
Expand All @@ -42,6 +43,8 @@ github.com/pelletier/go-toml/v2 v2.1.1 h1:LWAJwfNvjQZCFIDKWYQaM62NcYeYViCmWIwmOS
github.com/pelletier/go-toml/v2 v2.1.1/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand All @@ -56,6 +59,8 @@ github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXl
github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
go.bug.st/serial v1.5.0 h1:ThuUkHpOEmCVXxGEfpoExjQCS2WBVV4ZcUKVYInM9T4=
go.bug.st/serial v1.5.0/go.mod h1:UABfsluHAiaNI+La2iESysd9Vetq7VRdpxvjx7CmmOE=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
Expand Down Expand Up @@ -85,6 +90,7 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA=
Expand Down
12 changes: 12 additions & 0 deletions internal/mqtt/mqtt_ack.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package mqtt

import mqtt "github.com/eclipse/paho.mqtt.golang"

type Message struct {
mqtt.Message
}

func (this *Message) Ack() error {
this.Message.Ack()
return nil
}
12 changes: 6 additions & 6 deletions internal/mqtt/mqtt_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type MQTT struct {

func (this *MQTT) ReadAck() (io.Acker, error) {
msg := <-this.ch
return msg, nil
return &Message{msg}, nil
}

func (this *MQTT) Write(p []byte) (int, error) {
Expand Down Expand Up @@ -53,14 +53,14 @@ func (this *MQTT) Close() error {
return err
}

func New(cfg *Config, topic *Topic) (io.ReadWriteCloser, string, error) {
func New(cfg *Config, topic *Topic) (io.ReadWriteCloser, error) {
c := mqtt.NewClient(cfg)
token := c.Connect()
if !token.WaitTimeout(cfg.ConnectTimeout) {
return nil, "", io.ErrWithConnectTimeout
return nil, io.ErrWithConnectTimeout
}
if token.Error() != nil {
return nil, "", token.Error()
return nil, token.Error()
}
r := &MQTT{
Client: c,
Expand All @@ -78,10 +78,10 @@ func New(cfg *Config, topic *Topic) (io.ReadWriteCloser, string, error) {
err = token.Error()
}
}
return r, cfg.Servers[0].Host, err
return r, err
}

func NewEasy(cfg *EasyConfig, topic *Topic) (io.ReadWriteCloser, string, error) {
func NewEasy(cfg *EasyConfig, topic *Topic) (io.ReadWriteCloser, error) {
cfg.init()
return New(WithMQTTBase(cfg), topic)
}
Expand Down
115 changes: 115 additions & 0 deletions internal/rabbitmq/rabbitmq_io.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package rabbitmq

import (
"context"
"github.com/injoyai/io"
amqp "github.com/rabbitmq/amqp091-go"
)

type Config struct {
Address string
Name string //队列名称
Durable bool //是否持久化,true为是。持久化会把队列存盘,服务器重启后,不会丢失队列以及队列内的信息。(注:1、不丢失是相对的,如果宕机时有消息没来得及存盘,还是会丢失的。2、存盘影响性能。)
AutoDelete bool //是否自动删除,true为是。至少有一个消费者连接到队列时才可以触发。当所有消费者都断开时,队列会自动删除。
Exclusive bool //是否设置排他,true为是。如果设置为排他,则队列仅对首次声明他的连接可见,并在连接断开时自动删除。(注意,这里说的是连接不是信道,相同连接不同信道是可见的)。
Nowait bool //是否非阻塞,true表示是。阻塞:表示创建交换器的请求发送后,阻塞等待RMQ Server返回信息。非阻塞:不会阻塞等待RMQ Server的返回信息,而RMQ Server也不会返回信息。(不推荐使用)
Debug bool //调试模式
}

type Client struct {
//cfg *Config
conn *amqp.Connection
channel *amqp.Channel
ctx context.Context
}

func (this *Client) Closed() bool {
return this.conn.IsClosed()
}

func (this *Client) ReadWriteCloser(cfg *Config) (io.ReadWriteCloser, error) {
r, err := this.AckReader(cfg)
if err != nil {
return nil, err
}
w, err := this.Writer(cfg)
if err != nil {
return nil, err
}
return io.NewAReadWriteCloser(r, w, this.conn), nil
}

func (this *Client) AckReader(cfg *Config) (io.AckReader, error) {
queue, err := this.channel.QueueDeclare(cfg.Name, cfg.Durable, cfg.AutoDelete, cfg.Exclusive, cfg.Nowait, nil)
if err != nil {
return nil, err
}
return &AckReader{
Client: this,
queue: queue,
}, nil
}

func (this *Client) Writer(cfg *Config) (io.Writer, error) {
queue, err := this.channel.QueueDeclare(cfg.Name, cfg.Durable, cfg.AutoDelete, cfg.Exclusive, cfg.Nowait, nil)
if err != nil {
return nil, err
}
return &Writer{
Client: this,
queue: queue,
}, nil
}

type AckReader struct {
*Client
queue amqp.Queue
}

func (this *AckReader) ReadAck() (io.Acker, error) {
//获取一个消息
msg, ok, err := this.channel.Get(this.queue.Name, false)
if err != nil {
return nil, err
}
if !ok {
return nil, nil
}
return &Message{&msg}, nil
}

type Writer struct {
*Client
queue amqp.Queue
}

func (this *Writer) Write(p []byte) (int, error) {
/*
exchange:要发送到的交换机名称,对应图中exchangeName。
key:路由键,对应图中RoutingKey。
mandatory:直接false,不建议使用,后面有专门章节讲解。
immediate :直接false,不建议使用,后面有专门章节讲解。
msg:要发送的消息,msg对应一个Publishing结构,Publishing结构里面有很多参数,这里只强调几个参数,其他参数暂时列出,但不解释。
*/
err := this.channel.PublishWithContext(this.ctx, "", this.queue.Name, false, false, amqp.Publishing{
ContentType: "text/plain", //消息的类型,通常为“text/plain”
ContentEncoding: "", //消息的编码,一般默认不用写
DeliveryMode: 0, //消息是否持久化,2表示持久化,0或1表示非持久化。
Priority: 0, //消息的优先级 0 to 9
Body: p,
})
return len(p), err
}

type Message struct {
*amqp.Delivery
}

func (this *Message) Payload() []byte {
return this.Delivery.Body
}

func (this *Message) Ack() error {
//当参数为true时,标识多重确认,会确认之前所有同一队列的未确认
return this.Delivery.Ack(false)
}
22 changes: 22 additions & 0 deletions io_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,13 @@ func Swap(i1, i2 ReadWriter) error {
return err
}

// SwapClose 交换数据并关闭
func SwapClose(c1, c2 io.ReadWriteCloser) error {
defer c1.Close()
defer c2.Close()
return Swap(c1, c2)
}

// Bridge 桥接,桥接两个ReadWriter
// 例如,桥接串口(客户端)和网口(tcp客户端),可以实现通过串口上网
func Bridge(i1, i2 ReadWriter) error {
Expand Down Expand Up @@ -177,9 +184,24 @@ func ReadFuncToAck(f func(r *bufio.Reader) ([]byte, error)) func(r *bufio.Reader
}

func NewReadWriteCloser(r io.Reader, w io.Writer, c io.Closer) io.ReadWriteCloser {
if c == nil {
c = NullCloser
}
return struct {
io.Reader
io.Writer
io.Closer
}{r, w, c}
}

func NewAReadWriteCloser(r AckReader, w Writer, c io.Closer) io.ReadWriteCloser {
if c == nil {
c = NullCloser
}
return struct {
Reader
AckReader
Writer
io.Closer
}{AReaderToReader(r), r, w, c}
}
Loading

0 comments on commit b982606

Please sign in to comment.