Skip to content

Commit

Permalink
去除了pipe包,移动到了dial包,整理了部分函数,端口转发测试通过,修复了pkg的bug(4090字节的问题,需要多次读取)
Browse files Browse the repository at this point in the history
  • Loading branch information
[email protected] committed Apr 7, 2023
1 parent 9affced commit ce1bdc8
Show file tree
Hide file tree
Showing 18 changed files with 176 additions and 232 deletions.
20 changes: 7 additions & 13 deletions dial/dial_dial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,17 @@ package dial
import (
"context"
"github.com/injoyai/io"
"strings"
"testing"
"time"
)

func TestNewWebsocket(t *testing.T) {
//"ws://192.168.10.3:1880/node-red/comms"
RedialWebsocket("ws://192.168.10.103:1880/comms", map[string][]string{
"Sn": {"EA060900FFFBEBBF"},
}, func(ctx context.Context, c *io.Client) {
c.SetRedialMaxTime(time.Second * 2)
c.Debug()
c.SetDealQueueFunc(10, func(msg io.Message) {
t.Log(msg)
})
})
url := "ws://192.168.10.24:8200/ops/notice/ws"
url += "?token=jbYKl72cbOGvbVRwIqM4r6eoirw8f1JRD44+4D5E/URRY4L6TTZYYb/9yhedvd2Ii2GtLo9MieBy5FBeUhugK5jHvppFjExz3B5DVFPqsomF5wezKDFc8a2hZSQ9IDHTS/C+j/3ESSRdbkVHPFxbzQ=="
url = strings.ReplaceAll(url, "+", "%2B")
t.Log(url)
RedialWebsocket(url, map[string][]string{}, io.WithClientDebug())
select {}
}

Expand All @@ -31,8 +27,6 @@ func TestNewTCP(t *testing.T) {
}

func TestRtsp(t *testing.T) {
RedialTCP("34.227.104.115:554", func(ctx context.Context, c *io.Client) {
c.Debug()
})
RedialTCP("34.227.104.115:554", io.WithClientDebug())
select {}
}
73 changes: 73 additions & 0 deletions dial/dial_pipe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package dial

import (
"context"
"github.com/injoyai/io"
)

/*
Client
抽象管道概念
例如 用485线通讯,正常的TCP连接 都属于管道
需要 客户端对客户端 客户端对服务端 2种方式
需要 一个管道通讯多个io数据,并且不能长期占用 写入前建议分包
只做数据加密(可选),不解析数据,不分包数据
提供io.Reader io.Writer接口
写入数据会封装一层(封装连接信息,动作,数据)
*/

// RedialPipe 通道客户端
func RedialPipe(addr string, fn ...func(ctx context.Context, c *io.Client)) *io.Client {
return RedialTCP(addr, func(ctx context.Context, c *io.Client) {
c.SetReadWriteWithPkg()
c.SetKeepAlive(io.DefaultTimeout)
c.SetPrintFunc(func(msg io.Message, tag ...string) {
io.PrintWithASCII(msg, append([]string{"PI|C"}, tag...)...)
})
for _, v := range fn {
v(ctx, c)
}
})
}

// NewPipeServer 通道服务端
func NewPipeServer(port int, fn ...func(s *io.Server)) (*io.Server, error) {
return NewTCPServer(port, func(s *io.Server) {
s.SetReadWriteWithPkg()
s.SetPrintFunc(func(msg io.Message, tag ...string) {
io.PrintWithASCII(msg, append([]string{"PI|S"}, tag...)...)
})
for _, v := range fn {
v(s)
}
})
}

// NewPipeTransmit 通过客户端数据转发,例如客户端1的数据会广播其他所有客户端
func NewPipeTransmit(port int, fn ...func(s *io.Server)) (*io.Server, error) {
return NewPipeServer(port, func(s *io.Server) {
s.SetPrintFunc(func(msg io.Message, tag ...string) {
if len(tag) > 0 {
switch tag[0] {
case io.TagWrite, io.TagRead:
default:
io.PrintWithASCII(msg, append([]string{"PI|T"}, tag...)...)
}
}
})
s.SetDealFunc(func(msg *io.IMessage) {
//当另一端代理未开启时,无法转发数据
for _, v := range s.GetClientMap() {
if v.GetKey() != msg.GetKey() {
//队列执行,避免阻塞其他
v.WriteQueue(msg.Bytes())
}
}
})
for _, v := range fn {
v(s)
}
})
}
43 changes: 0 additions & 43 deletions dial/pipe/pipe_client.go

This file was deleted.

50 changes: 0 additions & 50 deletions dial/pipe/pipe_common.go

This file was deleted.

39 changes: 0 additions & 39 deletions dial/pipe/pipe_server.go

This file was deleted.

35 changes: 0 additions & 35 deletions dial/pipe/pipe_server_test.go

This file was deleted.

19 changes: 10 additions & 9 deletions dial/proxy/proxy_entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/injoyai/io"
"github.com/injoyai/io/buf"
"github.com/injoyai/io/dial"
"github.com/injoyai/io/dial/pipe"
"github.com/injoyai/logs"
)

Expand Down Expand Up @@ -54,7 +53,10 @@ func (this *Entity) Read(p []byte) (n int, err error) {
}

// ReadMessage 实现接口 io.MessageReader
func (this *Entity) ReadMessage() ([]byte, error) {
func (this *Entity) ReadMessage() (bs []byte, _ error) {
defer func() {
//logs.Debug(222, string(bs))
}()
return <-this.buff, nil
}

Expand Down Expand Up @@ -101,6 +103,7 @@ func (this *Entity) writeMessage(msg *Message) (err error) {
proxyClient.SetKey(msg.Addr)
proxyClient.SetReadFunc(buf.ReadWithAll)
proxyClient.SetDealFunc(func(m *io.IMessage) {
logs.Debug(111, m.String())
this.AddMessage(msg.Response(m.Bytes()))
})
proxyClient.SetCloseFunc(func(ctx context.Context, m *io.IMessage) {
Expand Down Expand Up @@ -150,7 +153,6 @@ func (this *Entity) SetConnectFunc(fn func(msg *Message) (i io.ReadWriteCloser,

// DefaultConnectFunc 默认连接函数
func DefaultConnectFunc(msg *Message) (i io.ReadWriteCloser, err error) {
logs.Debug(msg.ConnectType, msg.Addr, msg.Data)
err = errors.New("未实现")
switch msg.ConnectType {
case TCP:
Expand All @@ -174,17 +176,16 @@ func DefaultConnectFunc(msg *Message) (i io.ReadWriteCloser, err error) {

func NewTCPClient(addr string, fn ...func(ctx context.Context, c *io.Client, e *Entity)) *io.Client {
e := New()
return pipe.RedialTCP(addr, func(ctx context.Context, c *io.Client) {
return dial.RedialPipe(addr, func(ctx context.Context, c *io.Client) {
for _, v := range fn {
v(ctx, c, e)
}
c.Swap(e)
c.SetPrintFunc(func(msg io.Message, tag ...string) {
//打印函数的处理,不重要
if msg.String() == io.Ping || msg.String() == io.Pong {
return
}
if len(tag) > 0 {
switch true {
case msg.String() == io.Ping || msg.String() == io.Pong:
case len(tag) > 0:
switch tag[0] {
case io.TagWrite, io.TagRead:
m, err := DecodeMessage(msg.Bytes())
Expand All @@ -203,7 +204,7 @@ func NewTCPClient(addr string, fn ...func(ctx context.Context, c *io.Client, e *

// NewSwapTCPServer 和TCP服务端交换数据,带测试
func NewSwapTCPServer(port int, fn ...func(s *io.Server)) error {
s, err := pipe.NewServer(dial.TCPListenFunc(port))
s, err := dial.NewPipeServer(port)
if err != nil {
return err
}
Expand Down
1 change: 0 additions & 1 deletion dial/proxy/proxy_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ type Message struct {
Key string `json:"key"` //会话标识
Addr string `json:"addr"` //目标地址
Data string `json:"data"` //内容
//DataBytes []byte `json:"-"` //内容字节,需要解析
}

func (this *Message) Response(data []byte) *Message {
Expand Down
Loading

0 comments on commit ce1bdc8

Please sign in to comment.