Skip to content

Commit

Permalink
优化
Browse files Browse the repository at this point in the history
  • Loading branch information
injoyai committed Oct 10, 2023
1 parent e5cbe25 commit 129c0a7
Show file tree
Hide file tree
Showing 7 changed files with 209 additions and 75 deletions.
28 changes: 20 additions & 8 deletions extend/p2p/p2p_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,35 @@ const (
)

type Msg struct {
Type string `json:"type"` //消息类型
MsgID string `json:"msgId"` //消息标识
Data interface{} `json:"data"` //消息数据
Code int `json:"code,omitempty"` //状态
Type string `json:"type"` //消息类型
MsgID string `json:"msgId"` //消息标识
Data interface{} `json:"data,omitempty"` //消息数据
}

type MsgRegister struct {
Name string `json:"name"` //名称
NodeID string `json:"nodeID"` //名称
Version string `json:"version"` //版本信息
StartTime int64 `json:"startTime"` //运行时间
ConnectKey string `json:"connectKey"` //连接秘钥
LocalAddr string `json:"localAddr"` //本地地址
RemoteAddr string `json:"-"` //远程地址
}

type MsgGetPeer struct {
RemoteAddr string `json:"remoteAddr"` //远程外网地址
type MsgGetRegister struct {
NodeID string `json:"nodeID"` //节点标识
}

// MsgFind 查找站点信息
type MsgFind struct {
type MsgConnectNotice struct {
NodeID string `json:"nodeID"` //节点标识
}

type MsgConnect struct {
NodeID string `json:"nodeID"` //名称
}

type MsgError struct {
Code int `json:"code"`
Data interface{} `json:"data"`
Msg string `json:"msg"`
}
185 changes: 148 additions & 37 deletions extend/p2p/p2p_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package p2p

import (
"encoding/json"
"github.com/injoyai/base/g"
"github.com/injoyai/base/maps"
"github.com/injoyai/base/maps/wait"
"github.com/injoyai/conv"
"github.com/injoyai/io"
"github.com/injoyai/io/listen"
Expand All @@ -17,9 +19,17 @@ var (
const (
Version = "1.0.0"

TypeRegister = "register" //注册自己节点信息
TypeGetPeers = "getPeers" //获取其他节点信息
TypeConnect = "connect" //连接其他节点
CodeNotFind = 403
CodeSuccess = 200

TypeError = "error" //错误信息
TypeRegisterReq = "registerReq" //注册自己节点信息
TypeRegisterRes = "registerRes" //注册自己节点信息
TypeGetRegisterReq = "getRegisterReq" //获取其他节点信息
TypeGetRegisterRes = "getRegisterRes" //获取节点信息响应
TypeConnectReq = "connectReq" //连接其他节点
TypeConnectNoticeReq = "connectNotice" //连接其他节点通知
TypeConnectNoticeRes = "connectNoticeRes" //连接其他节点通知
)

type Peer interface {
Expand All @@ -28,25 +38,107 @@ type Peer interface {

}

func NewPeer(port int, options ...io.OptionServer) (*peer, error) {
s, err := listen.NewUDPServer(port, func(s *io.Server) {
func NewPeer(port int, options ...io.OptionServer) (p *peer, err error) {
p = &peer{
localAddr: &net.UDPAddr{Port: port},
clients: maps.NewSafe(),
wait: wait.New(time.Second * 2),
}
p.Server, err = listen.NewUDPServer(port, func(s *io.Server) {
s.SetReadWriteWithPkg()
s.SetDealFunc(func(c *io.Client, msg io.Message) {

m := new(Msg)
json.Unmarshal(msg.Bytes(), m)

switch m.Type {
case TypeRegister:

case TypeError:
//响应的错误信息
errMsg := new(MsgError)
json.Unmarshal(conv.Bytes(m.Data), errMsg)
switch errMsg.Code {
case CodeNotFind:
s.Tag().Del(TypeRegisterReq + "_" + conv.String(errMsg.Data))
}
p.wait.Done(m.MsgID, errMsg.Msg)

case TypeRegisterReq:
//上报注册信息
registerMsg := new(MsgRegister)
json.Unmarshal(conv.Bytes(m.Data), registerMsg)
//保存注册信息
s.Tag().Set(TypeRegister, registerMsg)

case TypeGetPeers:
getPeerMsg := new(MsgGetPeer)
json.Unmarshal(conv.Bytes(m.Data), getPeerMsg)
s.Tag().Get(getPeerMsg.RemoteAddr)
registerMsg.RemoteAddr = c.GetKey()
c.SetKey(registerMsg.NodeID)
c.Tag().Set("register", registerMsg)
s.Tag().Set(TypeRegisterReq+"_"+registerMsg.NodeID, registerMsg)

case TypeRegisterRes:
//响应注册成功
p.wait.Done(m.MsgID, nil)

case TypeGetRegisterReq:
//上报获取注册信息
getRegisterMsg := new(MsgGetRegister)
json.Unmarshal(conv.Bytes(m.Data), getRegisterMsg)
registerMsg := new(MsgRegister)
v, ok := s.Tag().Get(getRegisterMsg.NodeID)
if ok {
registerMsg = v.(*MsgRegister)
}
c.WriteAny(Msg{
Type: TypeRegisterRes,
MsgID: m.MsgID,
Data: registerMsg,
})

case TypeGetRegisterRes:
//响应注册信息成功
registerMsg := new(MsgRegister)
json.Unmarshal(conv.Bytes(m.Data), registerMsg)
p.wait.Done(m.MsgID, registerMsg)

case TypeConnectNoticeReq:
//连接其他节点
connectMsg := new(MsgConnectNotice)
json.Unmarshal(conv.Bytes(m.Data), connectMsg)
nextPeer := s.GetClient(connectMsg.NodeID)
if nextPeer == nil {
c.WriteAny(Msg{
Type: TypeError,
MsgID: m.MsgID,
Data: CodeNotFind,
})
return
}
uuid := g.UUID()
nextPeer.WriteAny(Msg{
Type: TypeConnectReq,
MsgID: uuid,
Data: c.Tag().Get("register"),
})
_, err := p.wait.Wait(uuid)
if err != nil {
c.WriteAny(Msg{
Type: TypeError,
MsgID: m.MsgID,
Data: MsgError{
Code: 0,
Data: nil,
Msg: "",
},
})
} else {
c.WriteAny(Msg{
Type: TypeConnectNoticeRes,
MsgID: m.MsgID,
})
}

case TypeConnectNoticeRes:
p.wait.Done(m.MsgID, nil)

case TypeConnectReq:
//连接其他节点

}

Expand All @@ -56,37 +148,35 @@ func NewPeer(port int, options ...io.OptionServer) (*peer, error) {
if err != nil {
return nil, err
}
return &peer{
Server: s,
localAddr: &net.UDPAddr{Port: port},
clients: maps.NewSafe(),
}, nil
return p, nil
}

type peer struct {
*io.Server
Name string
NodeID string
localAddr *net.UDPAddr
clients *maps.Safe
nat *maps.Safe
wait *wait.Entity
}

func (this *peer) WriteTo(addr string, p []byte) (int, error) {
c, err := this.GetClientOrDial(addr, func() (io.ReadWriteCloser, string, error) {
func (this *peer) getClient(addr string) (*io.Client, error) {
return this.GetClientOrDial(addr, func() (io.ReadWriteCloser, string, error) {
c, err := this.Listener().(*listen.UDPServer).NewUDPClient(addr)
return c, addr, err
})
}

func (this *peer) WriteTo(addr string, p []byte) (int, error) {
c, err := this.getClient(addr)
if err != nil {
return 0, err
}
return c.Write(p)
}

func (this *peer) Ping(addr string, timeout ...time.Duration) error {
c, err := this.GetClientOrDial(addr, func() (io.ReadWriteCloser, string, error) {
c, err := this.Listener().(*listen.UDPServer).NewUDPClient(addr)
return c, addr, err
})
c, err := this.getClient(addr)
if err != nil {
return err
}
Expand All @@ -95,35 +185,56 @@ func (this *peer) Ping(addr string, timeout ...time.Duration) error {

// Register 向服务端注册节点信息
func (this *peer) Register(addr string) error {
c, err := this.GetClientOrDial(addr, func() (io.ReadWriteCloser, string, error) {
c, err := this.Listener().(*listen.UDPServer).NewUDPClient(addr)
return c, addr, err
})
c, err := this.getClient(addr)
if err != nil {
return err
}

uuid := g.UUID()
_, err = c.WriteAny(Msg{
Type: TypeRegister,
Type: TypeRegisterReq,
MsgID: uuid,
Data: MsgRegister{
Name: this.Name,
NodeID: this.NodeID,
Version: Version,
StartTime: StartTime.Unix(),
ConnectKey: "",
LocalAddr: this.localAddr.String(),
},
})

_, err = this.wait.Wait(uuid)
return err
}

func (this *peer) Find(addr string) (MsgFind, error) {
//TODO implement me
panic("implement me")
func (this *peer) GetRegister(addr string, nodeID string) (*MsgRegister, error) {
c, err := this.getClient(addr)
if err != nil {
return nil, err
}

uuid := g.UUID()
_, err = c.WriteAny(Msg{
Type: TypeGetRegisterReq,
MsgID: uuid,
Data: MsgGetRegister{
NodeID: nodeID,
},
})
if err != nil {
return nil, err
}

res, err := wait.Wait(uuid)
if err != nil {
return nil, err
}

return res.(*MsgRegister), nil
}

func (this *peer) Connect(addr string) error {
//TODO implement me
panic("implement me")
func (this *peer) Connect() {

}

func (this *peer) LocalAddr() net.Addr {
Expand Down
30 changes: 26 additions & 4 deletions extend/p2p/p2p_peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,23 @@ import (
"github.com/injoyai/io"
"github.com/injoyai/logs"
"testing"
"time"
)

func TestNewPeer(t *testing.T) {
remoteAddr := "127.0.0.1:20001"
remoteAddr1 := "39.107.120.124:20001"
remoteAddr2 := "39.107.120.124:20002"
p, err := NewPeer(20000)
if err != nil {
t.Log(err)
return
}
go p.Run()
//logs.Debug("开始")
t.Log(p.Ping(remoteAddr))
logs.Debug("结束")
for {
<-time.After(time.Second * 5)
p.WriteTo(remoteAddr1, []byte("666"))
p.WriteTo(remoteAddr2, []byte("666"))
}
select {}
}

Expand All @@ -35,3 +39,21 @@ func TestNewPeer2(t *testing.T) {
})
t.Log(p.Run())
}

func TestNewPeer3(t *testing.T) {
remoteAddr1 := "39.107.120.124:20001"
remoteAddr2 := "39.107.120.124:20002"

p, err := NewPeer(20000)
if err != nil {
t.Log(err)
return
}
go p.Run()
for {
<-time.After(time.Second * 5)
p.WriteTo(remoteAddr1, []byte("666"))
p.WriteTo(remoteAddr2, []byte("666"))
}
select {}
}
5 changes: 4 additions & 1 deletion io_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func NewClientWithContext(ctx context.Context, i ReadWriteCloser, options ...Opt
IReadCloser: NewIReadCloserWithContext(ctx, i),
IWriter: NewIWriter(i),
i: i,
tag: maps.NewSafe(),
tag: nil,
createTime: time.Now(),
}
c.SetKey(fmt.Sprintf("%p", i))
Expand Down Expand Up @@ -106,6 +106,9 @@ func (this *Client) CreateTime() time.Time {

// Tag 自定义信息,方便记录连接信息 例:c.Tag().GetString("imei")
func (this *Client) Tag() *maps.Safe {
if this.tag == nil {
this.tag = maps.NewSafe()
}
return this.tag
}

Expand Down
9 changes: 7 additions & 2 deletions io_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,14 @@ func SwapClient(c1, c2 *Client) {
}

// Swap same two Copy IO数据交换
func Swap(i1, i2 ReadWriter) {
func Swap(i1, i2 ReadWriter) error {
go Copy(i1, i2)
Copy(i2, i1)
_, err := Copy(i2, i1)
return err
}

func Bridge(i1, i2 ReadWriter) error {
return Swap(i1, i2)
}

/*
Expand Down
Loading

0 comments on commit 129c0a7

Please sign in to comment.