Skip to content

Commit

Permalink
大改
Browse files Browse the repository at this point in the history
  • Loading branch information
injoyai committed Sep 14, 2023
1 parent eb3210b commit 9ef4787
Show file tree
Hide file tree
Showing 14 changed files with 334 additions and 97 deletions.
5 changes: 3 additions & 2 deletions dial/dial_dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/gorilla/websocket"
"github.com/injoyai/base/maps"
"github.com/injoyai/io"
"github.com/injoyai/io/internal/common"
"golang.org/x/crypto/ssh"
"net"
"net/http"
Expand Down Expand Up @@ -115,11 +116,11 @@ func NewFile(path string, options ...io.OptionClient) (*io.Client, error) {

// Memory 内存
func Memory(key string) (io.ReadWriteCloser, error) {
s := memoryServerManage.MustGet(key)
s := common.MemoryServerManage.MustGet(key)
if s == nil {
return nil, errors.New("服务不存在")
}
return s.(*_memoryServer).connect()
return s.(*common.MemoryServer).Connect()
}

func WithMemory(key string) func() (io.ReadWriteCloser, error) {
Expand Down
23 changes: 0 additions & 23 deletions dial/dial_dial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"github.com/injoyai/io"
"github.com/injoyai/logs"
"testing"
"time"
)

func TestRedialWebsocket(t *testing.T) {
Expand Down Expand Up @@ -70,28 +69,6 @@ func TestRedialSSH(t *testing.T) {
select {}
}

// 测试传输速度
func TestIOSpeed(t *testing.T) {
start := time.Now() //当前时间
length := 20 //传输的数据大小
go RunTCPServer(10086, func(s *io.Server) {
s.SetPrintWithHEX()
s.SetDealFunc(func(msg *io.IMessage) {
t.Log("数据长度: ", msg.Len())
t.Log("传输耗时: ", time.Now().Sub(start))
})
})
<-RedialTCP(":10086", func(c *io.Client) {
c.SetPrintWithHEX()
data := make([]byte, length)
start = time.Now()
c.Write(data)
c.SetDealFunc(func(msg *io.IMessage) {
t.Log(msg)
})
}).DoneAll()
}

func TestRedialUDP(t *testing.T) {
RedialUDP("127.0.0.1")
}
1 change: 0 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWm
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/injoyai/base v1.0.3 h1:WfCOKd+JOYJ4F+rD6Fd9VqtM03cLsUnF/BvHYCDCvaU=
github.com/injoyai/base v1.0.3/go.mod h1:YHGPQC4ts8gmj3+JDPNZxTayLziSSVKzFCsJL0vJFns=
github.com/injoyai/conv v1.0.4 h1:hEt6jgaGQWFqZJwWFSpaXpDo1zSKkbzRGjQX7C+asK8=
github.com/injoyai/conv v1.0.4/go.mod h1:rqBoiZt/YSKjGYEH4F6z+56djYLFkgKt/kWK5/1X640=
github.com/injoyai/conv v1.0.6 h1:F2mYlLaiaXvIRkq6EljPkIl5PfQbqYPeOJ3Vqnv68hU=
github.com/injoyai/conv v1.0.6/go.mod h1:rqBoiZt/YSKjGYEH4F6z+56djYLFkgKt/kWK5/1X640=
Expand Down
50 changes: 50 additions & 0 deletions internal/common/common_memory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package common

import (
"bytes"
"fmt"
"github.com/injoyai/base/maps"
"github.com/injoyai/io"
"time"
)

var MemoryServerManage = maps.NewSafe()

// MemoryServer 虚拟服务,为了实现接口
type MemoryServer struct {
Key string
Ch chan io.ReadWriteCloser
}

func (this *MemoryServer) Connect() (io.ReadWriteCloser, error) {
c := &MemoryClient{Buffer: bytes.NewBuffer(nil)}
select {
case this.Ch <- c:
case <-time.After(io.DefaultConnectTimeout):
return nil, io.ErrWithTimeout
}
return c, nil
}

func (this *MemoryServer) Accept() (io.ReadWriteCloser, string, error) {
c := <-this.Ch
return c, fmt.Sprintf("%p", c), nil
}

func (this *MemoryServer) Close() error {
MemoryServerManage.Del(this.Key)
return nil
}

func (this *MemoryServer) Addr() string {
return fmt.Sprintf("%p", this)
}

type MemoryClient struct {
*bytes.Buffer
}

func (this *MemoryClient) Close() error {
this.Reset()
return nil
}
168 changes: 168 additions & 0 deletions internal/frame/frame_frame.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
package frame

import (
"bufio"
"encoding/hex"
"errors"
"github.com/injoyai/base/bytes"
"github.com/injoyai/conv"
"time"
)

// Frame 通用分包配置,适用99%的协议
type Frame struct {
*StartEndFrame
*LenFrame
Timeout time.Duration //超时时间
}

func (this *Frame) ReadMessage(buf *bufio.Reader) ([]byte, error) {

interval := time.Millisecond
result := []byte(nil)

for {
b, err := buf.ReadByte()
if err != nil {
return nil, err
}
result = append(result, b)

//校验数据是否满足帧头帧尾
seFull, err := this.StartEndFrame.Check(&result)
if err != nil {
return nil, err
}

//校验数据长度
leFull, err := this.LenFrame.Check(result)
if err != nil {
return nil, err
}

//如果满足条件,则返回结果
if seFull && leFull && !(this.StartEndFrame == nil && this.LenFrame == nil) {
return result, nil
}

//未设置任何参数,读取全部数据
if (this.StartEndFrame == nil || seFull) &&
(this.LenFrame == nil || leFull) &&
this.Timeout == 0 && buf.Buffered() == 0 {
return result, nil
}

//根据超时时间结束读取
waitTime := time.Duration(0)
for buf.Buffered() == 0 && this.Timeout > 0 {
<-time.After(interval)
waitTime += interval
if waitTime >= this.Timeout {
return result, nil
}
}

}

}

type StartEndFrame struct {
Start, End []byte //帧头,帧尾
}

// Check 校验字节,响应数据完整性和错误
func (this *StartEndFrame) Check(bs *[]byte) (bool, error) {

//未设置
if this == nil {
return true, nil
}

// 长度,减少代码长度...
lenBs, lenStart, lenEnd := len(*bs), len(this.Start), len(this.End)

//设置了帧头
if lenStart > 0 {

if lenBs <= lenStart {
isStart := true
for i, b := range *bs {
if isStart {
if b != this.Start[i] {
isStart = false
}
}

if !isStart {
//寻找帧头
if b == this.Start[0] {
*bs = append((*bs)[:0], (*bs)[i:]...)
break
} else {
*bs = append((*bs)[:0], (*bs)[:0]...)
break
}
}

}
}

}

//基本数据长度不足
if lenBs < lenStart+lenEnd {
return false, nil
}

if lenEnd > 0 {

//帧尾不符合,等待读取新的数据
if hex.EncodeToString((*bs)[lenBs-lenEnd:]) != hex.EncodeToString(this.End) {
return false, nil
}

}

//未设置帧头,帧尾,任意数据皆满足条件
return true, nil

}

type LenFrame struct {
LittleEndian bool //支持大端小端(默认false,大端),暂不支持2143,3412...
LenStart, LenEnd uint //长度起始位置,长度结束位置
LenFixed int //固定增加长度
}

func (this *LenFrame) Check(bs []byte) (bool, error) {

//未设置
if this == nil {
return true, nil
}

//设置了错误的参数
if this.LenStart > this.LenEnd {
return false, errors.New("参数设置有误")
}

//数据还不满足条件
if len(bs) <= int(this.LenEnd) {
return false, nil
}

//获取数据总长度
lenBytes := bs[this.LenStart : this.LenEnd+1]
if this.LittleEndian {
lenBytes = bytes.Entity(lenBytes).Reverse()
}
length := conv.Int(lenBytes) + this.LenFixed

//数据异常,或设置的参数有误
if length < len(bs) {
return false, errors.New("数据长度过长")
}

//返回结果
return length == len(bs), nil
}
54 changes: 53 additions & 1 deletion io_func_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package io

import (
"bufio"
"github.com/injoyai/io/internal/frame"
"io"
"time"
)

func DealReader(r io.Reader, fn DealReaderFunc) (err error) {
Expand Down Expand Up @@ -40,4 +42,54 @@ func ReadWithLine(buf *bufio.Reader) (bytes []byte, err error) {
return
}

//func
/*
*/

func NewReadWithKB(n uint) func(buf *bufio.Reader) ([]byte, error) {
return func(buf *bufio.Reader) ([]byte, error) {
bytes := make([]byte, n<<10)
length, err := buf.Read(bytes)
return bytes[:length], err
}
}

// NewReadWithWriter 新建读取到数据立马写入到io.writer
func NewReadWithWriter(write io.Writer) ReadFunc {
return func(buf *bufio.Reader) (bytes []byte, err error) {
_, err = io.Copy(write, buf)
return
}
}

// NewReadWithStartEnd 新建buf.Reader , 根据帧头帧尾
func NewReadWithStartEnd(start, end []byte) ReadFunc {
f := &frame.Frame{StartEndFrame: &frame.StartEndFrame{Start: start, End: end}}
return f.ReadMessage
}

// NewWriteWithStartEnd 新建buf.Writer,根据帧头帧尾
func NewWriteWithStartEnd(start, end []byte) WriteFunc {
return func(req []byte) ([]byte, error) {
return append(start, append(req, end...)...), nil
}
}

// NewReadWithLen 根据长度配置分包
func NewReadWithLen(l *frame.LenFrame) ReadFunc {
f := &frame.Frame{LenFrame: l}
return f.ReadMessage
}

// NewReadWithTimeout 读取全部数据,根据超时时间分包
func NewReadWithTimeout(timeout time.Duration) ReadFunc {
f := &frame.Frame{Timeout: timeout}
return f.ReadMessage
}

// NewReadWithFrame 根据Frame配置读取数据
func NewReadWithFrame(f *frame.Frame) ReadFunc {
return f.ReadMessage
}
20 changes: 13 additions & 7 deletions io_i_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package io
import (
"bufio"
"github.com/injoyai/io/buf"
"github.com/injoyai/io/internal/frame"
"io"
"time"
)
Expand Down Expand Up @@ -150,25 +151,30 @@ func (this *IReader) SetReadWithKB(n uint) *IReader {

// SetReadWithStartEnd 设置根据包头包尾读取数据
func (this *IReader) SetReadWithStartEnd(packageStart, packageEnd []byte) *IReader {
return this.SetReadFunc(buf.NewReadWithStartEnd(packageStart, packageEnd))
return this.SetReadFunc(NewReadWithStartEnd(packageStart, packageEnd))
}

// SetReadWithWriter same io.Copy 注意不能设置读取超时
func (this *IReader) SetReadWithWriter(writer io.Writer) *IReader {
return this.SetReadFunc(buf.NewReadWithWriter(writer))
return this.SetReadFunc(NewReadWithWriter(writer))
}

// Bridge 桥接模式,等同SetReadWithWriter
func (this *IReader) Bridge(w ...io.Writer) *IReader {
return this.SetReadFunc(NewReadWithWriter(MultiWriter(w...)))
}

// SetReadWithLenFrame 根据动态长度读取数据
func (this *IReader) SetReadWithLenFrame(f *buf.LenFrame) *IReader {
return this.SetReadFunc(buf.NewReadWithLen(f))
func (this *IReader) SetReadWithLenFrame(f *frame.LenFrame) *IReader {
return this.SetReadFunc(NewReadWithLen(f))
}

// SetReadWithTimeout 根据超时时间读取数据(需要及时读取,避免阻塞产生粘包)
func (this *IReader) SetReadWithTimeout(timeout time.Duration) *IReader {
return this.SetReadFunc(buf.NewReadWithTimeout(timeout))
return this.SetReadFunc(NewReadWithTimeout(timeout))
}

// SetReadWithFrame 适配预大部分读取
func (this *IReader) SetReadWithFrame(f *buf.Frame) *IReader {
return this.SetReadFunc(buf.NewReadWithFrame(f))
func (this *IReader) SetReadWithFrame(f *frame.Frame) *IReader {
return this.SetReadFunc(NewReadWithFrame(f))
}
Loading

0 comments on commit 9ef4787

Please sign in to comment.