Skip to content

Commit

Permalink
101
Browse files Browse the repository at this point in the history
  • Loading branch information
injoyai committed Jun 28, 2024
1 parent 3adf1fb commit c02fb7e
Show file tree
Hide file tree
Showing 8 changed files with 142 additions and 153 deletions.
2 changes: 1 addition & 1 deletion extend/proxy/proxy_entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (this *Entity) Read(p []byte) (n int, err error) {
return 0, nil
}

// ReadMessage 实现接口 io.MessageReader
// ReadMessage 实现接口 io.MReader
func (this *Entity) ReadMessage() (bs []byte, _ error) {
return <-this.buff, nil
}
Expand Down
4 changes: 2 additions & 2 deletions internal/rabbitmq/rabbitmq_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (this *Client) Closed() bool {
return this.conn.IsClosed()
}

func (this *Client) ReadWriteCloser(cfg *Config) (io.ReadWriteCloser, error) {
func (this *Client) ReadWriteCloser(cfg *Config) (io.AReadWriteCloser, error) {
r, err := this.AckReader(cfg)
if err != nil {
return nil, err
Expand All @@ -39,7 +39,7 @@ func (this *Client) ReadWriteCloser(cfg *Config) (io.ReadWriteCloser, error) {
return io.NewAReadWriteCloser(r, w, this.conn), nil
}

func (this *Client) AckReader(cfg *Config) (io.AckReader, error) {
func (this *Client) AckReader(cfg *Config) (io.AReader, error) {
queue, err := this.channel.QueueDeclare(cfg.Name, cfg.Durable, cfg.AutoDelete, cfg.Exclusive, cfg.Nowait, nil)
if err != nil {
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions io_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,12 +207,12 @@ func (this *Client) reset(i ReadWriteCloser, key string, options ...OptionClient
switch v := i.(type) {
case nil:

case AckReader:
case AReader:
aReader := AReaderToReader(v)
defaultReadFunc = aReader.ReadAck
i = NewReadWriteCloser(aReader, i, i)

case MessageReader:
case MReader:
mReader := MReaderToReader(v)
defaultReadFunc = ReadFuncToAck(mReader.ReadFunc)
i = NewReadWriteCloser(mReader, i, i)
Expand Down
2 changes: 1 addition & 1 deletion io_extend.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ var (
LimitReader = io.LimitReader

// Copy 从reader中读取所有数据到writer
Copy = io.Copy
// Copy = io.Copy

// CopyBuffer 从reader中读取所有数据到writer,设置读取缓存大小
CopyBuffer = io.CopyBuffer
Expand Down
180 changes: 87 additions & 93 deletions io_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,22 @@ package io
import (
"bufio"
"bytes"
"errors"
"fmt"
"io"
)

// NewMessageReader Reader转MessageReader
func NewMessageReader(r io.Reader, read func(buf *bufio.Reader) ([]byte, error)) MessageReader {
return &messageReader{bufio.NewReader(r), read}
// NewMReader Reader转MessageReader
func NewMReader(r io.Reader, read func(buf *bufio.Reader) ([]byte, error)) MReader {
return &mReader{bufio.NewReader(r), read}
}

// DealMessageReader 处理MessageReader
func DealMessageReader(r MessageReader, fn func(msg Message) error) error {
// NewAReader Reader转AckReader
func NewAReader(r io.Reader, read func(buf *bufio.Reader) ([]byte, error)) AReader {
return &aReader{bufio.NewReader(r), read}
}

// DealMReader 处理MessageReader
func DealMReader(r MReader, fn func(msg Message) error) error {
for {
bs, err := r.ReadMessage()
if err != nil {
Expand All @@ -25,6 +30,20 @@ func DealMessageReader(r MessageReader, fn func(msg Message) error) error {
}
}

// DealAReader 处理AckReader
func DealAReader(r AReader, fn func(msg Message) error) error {
for {
a, err := r.ReadAck()
if err != nil {
return err
}
if err = fn(a.Payload()); err != nil {
return err
}
a.Ack()
}
}

// DealReader 处理Reader
func DealReader(r io.Reader, fn func(buf *bufio.Reader) error) (err error) {
buf := bufio.NewReader(r)
Expand Down Expand Up @@ -70,82 +89,6 @@ func ReadByte(r Reader) (byte, error) {
return b[0], err
}

// CopyWith 复制数据,每次固定4KB,并提供函数监听
func CopyWith(w Writer, r Reader, fn func(buf []byte)) (int64, error) {
return CopyNWith(w, r, DefaultBufferSize, fn)
}

// CopyWithPlan 复制数据,返回进度情况
func CopyWithPlan(w Writer, r Reader, f func(p *Plan)) (int64, error) {
p := &Plan{
Index: 0,
Current: 0,
Total: 0,
Bytes: nil,
}
return CopyWith(w, r, func(buf []byte) {
p.Index++
p.Current += int64(len(buf))
p.Bytes = buf
if f != nil {
f(p)
}
})
}

// CopyNWith 复制数据,每次固定大小,并提供函数监听
func CopyNWith(w Writer, r Reader, n int64, fn func(buf []byte)) (int64, error) {
buff := bufio.NewReader(r)
length := int64(0)
buf := make([]byte, n)
for {
num, err := buff.Read(buf)
if err != nil && err != io.EOF {
return length, err
}
length += int64(num)
if fn != nil {
fn(buf[:num])
}
if _, err := w.Write(buf[:num]); err != nil {
return length, err
}
if err == io.EOF {
return length, nil
}
}
}

// SwapClient 数据交换
func SwapClient(c1, c2 *Client) {
c1.SetReadWithWriter(c2)
c1.SetCloseWithCloser(c2)
c2.SetReadWithWriter(c1)
c2.SetCloseWithCloser(c1)
go c1.Run()
go c2.Run()
}

// Swap same two Copy IO数据交换
func Swap(i1, i2 ReadWriter) error {
go Copy(i1, i2)
_, err := Copy(i2, i1)
return err
}

// SwapClose 交换数据并关闭
func SwapClose(c1, c2 io.ReadWriteCloser) error {
defer c1.Close()
defer c2.Close()
return Swap(c1, c2)
}

// Bridge 桥接,桥接两个ReadWriter
// 例如,桥接串口(客户端)和网口(tcp客户端),可以实现通过串口上网
func Bridge(i1, i2 ReadWriter) error {
return Swap(i1, i2)
}

// SplitWithLength 按最大长度分割字节
func SplitWithLength(p []byte, max uint64) [][]byte {
if max == 0 {
Expand Down Expand Up @@ -216,21 +159,27 @@ func NewMReadWriteCloser(r MReader, w Writer, c io.Closer) MReadWriteCloser {
}{r, w, c}
}

func Swap2[T ReadWriter | MReadWriter | AReadWriter](i1, i2 T) error {
go Copy2(interface{}(i1).(io.Writer), i2)
_, err := Copy2(interface{}(i2).(io.Writer), i1)
return err
}
/*
func Copy2[T Reader | MReader | AReader](w Writer, r T) (int64, error) {
return CopyWith2(w, r, nil)
*/

// Copy 如何使用接口约束 [T Reader | MReader | AReader]
func Copy[T any](w Writer, r T) (int64, error) {
return CopyWith(w, r, nil)
}

func CopyWith2[T Reader | MReader | AReader](w Writer, r T, f func(p []byte) ([]byte, error)) (int64, error) {
return CopyNWith2(w, r, 32*1024, f)
// CopyWith 如何使用接口约束 [T Reader | MReader | AReader]
// 复制数据,Reader类型每次固定4KB,并提供函数监听
func CopyWith[T any](w Writer, r T, f func(p []byte) ([]byte, error)) (int64, error) {
return CopyNWith(w, r, 32*1024, f)
}

func CopyNWith2[T Reader | MReader | AReader](w Writer, r T, max int64, f func(p []byte) ([]byte, error)) (int64, error) {
// CopyNWith 复制数据,每次固定大小,并提供函数监听
// 如何使用接口约束 [T Reader | MReader | AReader]
func CopyNWith[T any](w Writer, r T, max int64, f func(p []byte) ([]byte, error)) (int64, error) {

read := func() (Acker, error) {
switch v := interface{}(r).(type) {
Expand All @@ -250,7 +199,7 @@ func CopyNWith2[T Reader | MReader | AReader](w Writer, r T, max int64, f func(p
return v.ReadAck()

default:
return nil, errors.New("unknown type")
return nil, fmt.Errorf("未知类型: %T, 未实现[Reader|MReader|AReader]", r)

}
}
Expand Down Expand Up @@ -279,3 +228,48 @@ func CopyNWith2[T Reader | MReader | AReader](w Writer, r T, max int64, f func(p
}

}

// Swap 如何使用接口约束 [T ReadWriter | MReadWriter | AReadWriter]
func Swap[T io.Writer](i1, i2 T) error {
go Copy(interface{}(i1).(io.Writer), i2)
_, err := Copy(interface{}(i2).(io.Writer), i1)
return err
}

// SwapClient 数据交换
func SwapClient(c1, c2 *Client) {
c1.SetReadWithWriter(c2)
c1.SetCloseWithCloser(c2)
c2.SetReadWithWriter(c1)
c2.SetCloseWithCloser(c1)
go c1.Run()
go c2.Run()
}

// SwapClose 交换数据并关闭
// 约束[T ReadWritCloser | MReadWritCloser | AReadWritCloser]
func SwapClose(c1, c2 WriteCloser) error {
defer c1.Close()
defer c2.Close()
return Swap(c1, c2)
}

// Bridge 桥接,桥接两个ReadWriter
// 例如,桥接串口(客户端)和网口(tcp客户端),可以实现通过串口上网
func Bridge(i1, i2 Writer) error {
return Swap(i1, i2)
}

// CopyWithPlan 复制数据,返回进度情况
func CopyWithPlan(w Writer, r Reader, f func(p *Plan)) (int64, error) {
p := &Plan{}
return CopyWith(w, r, func(buf []byte) ([]byte, error) {
if f != nil {
p.Index++
p.Current += int64(len(buf))
p.Bytes = buf
f(p)
}
return buf, nil
})
}
6 changes: 3 additions & 3 deletions io_func_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ func TestSwap(t *testing.T) {

func TestReadPrefix(t *testing.T) {
r := bytes.NewReader([]byte("hello world woworld"))
t.Log(ReadPrefix(r, []byte("llo"))) //nil
t.Log(ReadPrefix(r, []byte("wor"))) //nil
t.Log(ReadPrefix(r, []byte("wor"))) //nil
t.Log(ReadPrefix(r, []byte("llo"))) //llo nil
t.Log(ReadPrefix(r, []byte("wor"))) //wor nil
t.Log(ReadPrefix(r, []byte("wor"))) //wor nil
t.Log(ReadPrefix(r, []byte("llo"))) //EOF
t.Log(ReadPrefix(r, []byte("aaa"))) //EOF
t.Log(ReadPrefix(r, []byte("aaa"))) //EOF
Expand Down
Loading

0 comments on commit c02fb7e

Please sign in to comment.