From b98260672707fca0216e0125dd2e14a50f6cdcea Mon Sep 17 00:00:00 2001 From: injoyai <1113655791@qq.com> Date: Thu, 27 Jun 2024 09:39:44 +0800 Subject: [PATCH] 101 --- dial/dial_mqtt.go | 11 ++- go.mod | 1 + go.sum | 6 ++ internal/mqtt/mqtt_ack.go | 12 +++ internal/mqtt/mqtt_io.go | 12 +-- internal/rabbitmq/rabbitmq_io.go | 115 ++++++++++++++++++++++++ io_func.go | 22 +++++ io_type_cover.go | 150 +++++++++++++++++++++++++++++++ io_type_reader.go | 4 +- 9 files changed, 324 insertions(+), 9 deletions(-) create mode 100644 internal/mqtt/mqtt_ack.go create mode 100644 internal/rabbitmq/rabbitmq_io.go create mode 100644 io_type_cover.go diff --git a/dial/dial_mqtt.go b/dial/dial_mqtt.go index 2fe0377..18ef1e9 100644 --- a/dial/dial_mqtt.go +++ b/dial/dial_mqtt.go @@ -84,7 +84,7 @@ func (this *MQTTClient) Read(p []byte) (int, error) { func (this *MQTTClient) ReadAck() (io.Acker, error) { msg := <-this.ch - return msg, nil + return &Message{msg}, nil } func (this *MQTTClient) Write(p []byte) (int, error) { @@ -191,3 +191,12 @@ type MQTTSubscribe struct { Topic string Qos uint8 } + +type Message struct { + mqtt.Message +} + +func (this *Message) Ack() error { + this.Message.Ack() + return nil +} diff --git a/go.mod b/go.mod index 2f9c1ac..85b5539 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/injoyai/base v1.0.12 github.com/injoyai/conv v1.1.5 github.com/injoyai/logs v1.0.7 + github.com/rabbitmq/amqp091-go v1.10.0 go.bug.st/serial v1.5.0 golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 ) diff --git a/go.sum b/go.sum index 989e3de..6f506cc 100644 --- a/go.sum +++ b/go.sum @@ -23,6 +23,7 @@ github.com/injoyai/logs v1.0.7 h1:pVBB8/rJ/nsnlSvxZoBfCfu+kL84q2pqEx5oN4qPG7I= github.com/injoyai/logs v1.0.7/go.mod h1:CLchJCGhb39Obyrci816R+KMtbxZhgPs0FuikhyixK4= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= @@ -42,6 +43,8 @@ github.com/pelletier/go-toml/v2 v2.1.1 h1:LWAJwfNvjQZCFIDKWYQaM62NcYeYViCmWIwmOS github.com/pelletier/go-toml/v2 v2.1.1/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw= +github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -56,6 +59,8 @@ github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXl github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg= go.bug.st/serial v1.5.0 h1:ThuUkHpOEmCVXxGEfpoExjQCS2WBVV4ZcUKVYInM9T4= go.bug.st/serial v1.5.0/go.mod h1:UABfsluHAiaNI+La2iESysd9Vetq7VRdpxvjx7CmmOE= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= @@ -85,6 +90,7 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA= diff --git a/internal/mqtt/mqtt_ack.go b/internal/mqtt/mqtt_ack.go new file mode 100644 index 0000000..8fe7de1 --- /dev/null +++ b/internal/mqtt/mqtt_ack.go @@ -0,0 +1,12 @@ +package mqtt + +import mqtt "github.com/eclipse/paho.mqtt.golang" + +type Message struct { + mqtt.Message +} + +func (this *Message) Ack() error { + this.Message.Ack() + return nil +} diff --git a/internal/mqtt/mqtt_io.go b/internal/mqtt/mqtt_io.go index ba191ef..e79a7d1 100644 --- a/internal/mqtt/mqtt_io.go +++ b/internal/mqtt/mqtt_io.go @@ -24,7 +24,7 @@ type MQTT struct { func (this *MQTT) ReadAck() (io.Acker, error) { msg := <-this.ch - return msg, nil + return &Message{msg}, nil } func (this *MQTT) Write(p []byte) (int, error) { @@ -53,14 +53,14 @@ func (this *MQTT) Close() error { return err } -func New(cfg *Config, topic *Topic) (io.ReadWriteCloser, string, error) { +func New(cfg *Config, topic *Topic) (io.ReadWriteCloser, error) { c := mqtt.NewClient(cfg) token := c.Connect() if !token.WaitTimeout(cfg.ConnectTimeout) { - return nil, "", io.ErrWithConnectTimeout + return nil, io.ErrWithConnectTimeout } if token.Error() != nil { - return nil, "", token.Error() + return nil, token.Error() } r := &MQTT{ Client: c, @@ -78,10 +78,10 @@ func New(cfg *Config, topic *Topic) (io.ReadWriteCloser, string, error) { err = token.Error() } } - return r, cfg.Servers[0].Host, err + return r, err } -func NewEasy(cfg *EasyConfig, topic *Topic) (io.ReadWriteCloser, string, error) { +func NewEasy(cfg *EasyConfig, topic *Topic) (io.ReadWriteCloser, error) { cfg.init() return New(WithMQTTBase(cfg), topic) } diff --git a/internal/rabbitmq/rabbitmq_io.go b/internal/rabbitmq/rabbitmq_io.go new file mode 100644 index 0000000..edc535e --- /dev/null +++ b/internal/rabbitmq/rabbitmq_io.go @@ -0,0 +1,115 @@ +package rabbitmq + +import ( + "context" + "github.com/injoyai/io" + amqp "github.com/rabbitmq/amqp091-go" +) + +type Config struct { + Address string + Name string //队列名称 + Durable bool //是否持久化,true为是。持久化会把队列存盘,服务器重启后,不会丢失队列以及队列内的信息。(注:1、不丢失是相对的,如果宕机时有消息没来得及存盘,还是会丢失的。2、存盘影响性能。) + AutoDelete bool //是否自动删除,true为是。至少有一个消费者连接到队列时才可以触发。当所有消费者都断开时,队列会自动删除。 + Exclusive bool //是否设置排他,true为是。如果设置为排他,则队列仅对首次声明他的连接可见,并在连接断开时自动删除。(注意,这里说的是连接不是信道,相同连接不同信道是可见的)。 + Nowait bool //是否非阻塞,true表示是。阻塞:表示创建交换器的请求发送后,阻塞等待RMQ Server返回信息。非阻塞:不会阻塞等待RMQ Server的返回信息,而RMQ Server也不会返回信息。(不推荐使用) + Debug bool //调试模式 +} + +type Client struct { + //cfg *Config + conn *amqp.Connection + channel *amqp.Channel + ctx context.Context +} + +func (this *Client) Closed() bool { + return this.conn.IsClosed() +} + +func (this *Client) ReadWriteCloser(cfg *Config) (io.ReadWriteCloser, error) { + r, err := this.AckReader(cfg) + if err != nil { + return nil, err + } + w, err := this.Writer(cfg) + if err != nil { + return nil, err + } + return io.NewAReadWriteCloser(r, w, this.conn), nil +} + +func (this *Client) AckReader(cfg *Config) (io.AckReader, error) { + queue, err := this.channel.QueueDeclare(cfg.Name, cfg.Durable, cfg.AutoDelete, cfg.Exclusive, cfg.Nowait, nil) + if err != nil { + return nil, err + } + return &AckReader{ + Client: this, + queue: queue, + }, nil +} + +func (this *Client) Writer(cfg *Config) (io.Writer, error) { + queue, err := this.channel.QueueDeclare(cfg.Name, cfg.Durable, cfg.AutoDelete, cfg.Exclusive, cfg.Nowait, nil) + if err != nil { + return nil, err + } + return &Writer{ + Client: this, + queue: queue, + }, nil +} + +type AckReader struct { + *Client + queue amqp.Queue +} + +func (this *AckReader) ReadAck() (io.Acker, error) { + //获取一个消息 + msg, ok, err := this.channel.Get(this.queue.Name, false) + if err != nil { + return nil, err + } + if !ok { + return nil, nil + } + return &Message{&msg}, nil +} + +type Writer struct { + *Client + queue amqp.Queue +} + +func (this *Writer) Write(p []byte) (int, error) { + /* + exchange:要发送到的交换机名称,对应图中exchangeName。 + key:路由键,对应图中RoutingKey。 + mandatory:直接false,不建议使用,后面有专门章节讲解。 + immediate :直接false,不建议使用,后面有专门章节讲解。 + msg:要发送的消息,msg对应一个Publishing结构,Publishing结构里面有很多参数,这里只强调几个参数,其他参数暂时列出,但不解释。 + */ + err := this.channel.PublishWithContext(this.ctx, "", this.queue.Name, false, false, amqp.Publishing{ + ContentType: "text/plain", //消息的类型,通常为“text/plain” + ContentEncoding: "", //消息的编码,一般默认不用写 + DeliveryMode: 0, //消息是否持久化,2表示持久化,0或1表示非持久化。 + Priority: 0, //消息的优先级 0 to 9 + Body: p, + }) + return len(p), err +} + +type Message struct { + *amqp.Delivery +} + +func (this *Message) Payload() []byte { + return this.Delivery.Body +} + +func (this *Message) Ack() error { + //当参数为true时,标识多重确认,会确认之前所有同一队列的未确认 + return this.Delivery.Ack(false) +} diff --git a/io_func.go b/io_func.go index d8da991..f5d027c 100644 --- a/io_func.go +++ b/io_func.go @@ -142,6 +142,13 @@ func Swap(i1, i2 ReadWriter) error { return err } +// SwapClose 交换数据并关闭 +func SwapClose(c1, c2 io.ReadWriteCloser) error { + defer c1.Close() + defer c2.Close() + return Swap(c1, c2) +} + // Bridge 桥接,桥接两个ReadWriter // 例如,桥接串口(客户端)和网口(tcp客户端),可以实现通过串口上网 func Bridge(i1, i2 ReadWriter) error { @@ -177,9 +184,24 @@ func ReadFuncToAck(f func(r *bufio.Reader) ([]byte, error)) func(r *bufio.Reader } func NewReadWriteCloser(r io.Reader, w io.Writer, c io.Closer) io.ReadWriteCloser { + if c == nil { + c = NullCloser + } return struct { io.Reader io.Writer io.Closer }{r, w, c} } + +func NewAReadWriteCloser(r AckReader, w Writer, c io.Closer) io.ReadWriteCloser { + if c == nil { + c = NullCloser + } + return struct { + Reader + AckReader + Writer + io.Closer + }{AReaderToReader(r), r, w, c} +} diff --git a/io_type_cover.go b/io_type_cover.go new file mode 100644 index 0000000..e841179 --- /dev/null +++ b/io_type_cover.go @@ -0,0 +1,150 @@ +package io + +import ( + "encoding/base64" + "encoding/hex" + "github.com/injoyai/conv" + "io" + "sync/atomic" +) + +//=============================覆盖读写============================= + +// CoverWriter 覆盖写,写入经过handler处理后的数据 +type CoverWriter struct { + io.Writer + Handler func(p []byte) ([]byte, error) +} + +func (this *CoverWriter) Write(bs []byte) (n int, err error) { + if this.Handler != nil { + bs, err = this.Handler(bs) + if err != nil { + return 0, err + } + } + return this.Writer.Write(bs) +} + +// WriteString 写入字符串,实现io.StringWriter +func (this *CoverWriter) WriteString(s string) (int, error) { + return this.Write([]byte(s)) +} + +// WriteHEX 写入16进制数据 +func (this *CoverWriter) WriteHEX(s string) (int, error) { + bytes, err := hex.DecodeString(s) + if err != nil { + return 0, err + } + return this.Write(bytes) +} + +// WriteBase64 写入base64数据 +func (this *CoverWriter) WriteBase64(s string) (int, error) { + bytes, err := base64.StdEncoding.DecodeString(s) + if err != nil { + return 0, err + } + return this.Write(bytes) +} + +// WriteAny 写入任意数据,根据conv转成字节 +func (this *CoverWriter) WriteAny(any interface{}) (int, error) { + return this.Write(conv.Bytes(any)) +} + +// WriteSplit 写入字节,分片写入,例如udp需要写入字节小于(1500-20-8=1472) +func (this *CoverWriter) WriteSplit(p []byte, length int) (int, error) { + if length <= 0 { + return this.Write(p) + } + for len(p) > 0 { + var data []byte + if len(p) >= length { + data, p = p[:length], p[length:] + } else { + data, p = p, p[:0] + } + _, err := this.Write(data) + if err != nil { + return 0, err + } + } + return len(p), nil +} + +// WriteReader io.Reader +func (this *CoverWriter) WriteReader(reader Reader) (int64, error) { + return Copy(this, reader) +} + +// WriteChan 监听通道并写入 +func (this *CoverWriter) WriteChan(c chan interface{}) (int64, error) { + var total int64 + for data := range c { + n, err := this.Write(conv.Bytes(data)) + if err != nil { + return 0, err + } + total += int64(n) + } + return total, nil +} + +//=================================================== + +// CoverReader 覆盖读,返回经过handler处理后的数据 +type CoverReader struct { + io.Reader + Handler func(p []byte) ([]byte, error) +} + +func (this *CoverReader) Read(bs []byte) (n int, err error) { + n, err = this.Reader.Read(bs) + if err != nil { + return 0, err + } + if this.Handler != nil { + bs, err = this.Handler(bs) + if err != nil { + return 0, err + } + } + return +} + +//=================================================== + +// CoverCloser 覆盖关闭,返回经过handler处理后的数据 +type CoverCloser struct { + io.Closer + Handler func(err error) + closed uint32 //0是未关闭,1是关闭中,2是已关闭 +} + +func (this *CoverCloser) Close() error { + return this.CloseWithErr(ErrHandClose) +} + +func (this *CoverCloser) CloseWithErr(err error) error { + if err == nil { + return nil + } + if this.Closed() { + return nil + } + if err := this.Closer.Close(); err != nil { + return err + } + defer atomic.StoreUint32(&this.closed, 1) + if this.Handler != nil { + this.Handler(err) + } + return nil +} + +// Closed 是否已关闭 +func (this *CoverCloser) Closed() bool { + return atomic.LoadUint32(&this.closed) > 0 +} diff --git a/io_type_reader.go b/io_type_reader.go index c95dc39..0c8ba18 100644 --- a/io_type_reader.go +++ b/io_type_reader.go @@ -11,7 +11,7 @@ type AckReader interface { type Acker interface { Payload() []byte - Ack() + Ack() error } // MessageReader 读取分包后的数据 @@ -98,7 +98,7 @@ func (this *_aReaderToReader) Read(p []byte) (int, error) { type Ack []byte -func (this Ack) Ack() {} +func (this Ack) Ack() error { return nil } func (this Ack) Payload() []byte { return this }