Skip to content

Commit

Permalink
修改函数名NewWriter为NewIWriter,增加IWriteCloser
Browse files Browse the repository at this point in the history
  • Loading branch information
[email protected] committed Apr 24, 2023
1 parent 7bc014b commit 70a98db
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 5 deletions.
2 changes: 1 addition & 1 deletion io_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func NewClientWithContext(ctx context.Context, i ReadWriteCloser) *Client {
}
c := &Client{
IReadCloser: NewIReadCloserWithContext(ctx, i),
IWriter: NewWriter(i),
IWriter: NewIWriter(i),
i: i,
tag: maps.NewSafe(),
createTime: time.Now(),
Expand Down
2 changes: 1 addition & 1 deletion io_i_read_closer.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (this *IReadCloser) SetDealQueueFunc(num int, fn func(msg Message)) *IReadC

// Running 是否在运行
func (this *IReadCloser) Running() bool {
return this.running == 1
return atomic.LoadUint32(&this.running) == 1
}

// Run 开始运行数据读取
Expand Down
77 changes: 77 additions & 0 deletions io_i_write_closer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package io

import (
"context"
"sync/atomic"
)

func NewIWriteCloser(writeCloser WriteCloser) *IWriteCloser {
return NewIWriteCloserWithContext(context.Background(), writeCloser)
}

func NewIWriteCloserWithContext(ctx context.Context, writeCloser WriteCloser) *IWriteCloser {
return &IWriteCloser{
IWriter: NewIWriter(writeCloser),
ICloser: NewICloserWithContext(ctx, writeCloser),
}
}

type IWriteCloser struct {
*IWriter
*ICloser
queue chan []byte //写入队列
running uint32 //是否在运行
}

func (this *IWriteCloser) GetKey() string {
return this.IWriter.GetKey()
}

// SetKey 设置唯一标识
func (this *IWriteCloser) SetKey(key string) *IWriteCloser {
this.IWriter.SetKey(key)
this.ICloser.SetKey(key)
return this
}

// SetPrintFunc 设置打印函数
func (this *IWriteCloser) SetPrintFunc(fn PrintFunc) *IWriteCloser {
this.IWriter.SetPrintFunc(fn)
this.ICloser.SetPrintFunc(fn) //错误信息按ASCII编码?
return this
}

func (this *IWriteCloser) Debug(b ...bool) *IWriteCloser {
this.IWriter.Debug(b...)
this.ICloser.Debug(b...)
return this
}

// WriteQueue 写入队列
func (this *IWriteCloser) WriteQueue(p []byte) *IWriteCloser {
this.runQueue()
this.queue <- p
return this
}

// TryWriteQueue 尝试写入队列
func (this *IWriteCloser) TryWriteQueue(p []byte) *IWriteCloser {
this.runQueue()
select {
case this.queue <- p:
default:
}
return this
}

func (this *IWriteCloser) runQueue() {
if this.queue == nil {
this.queue = this.NewWriteQueue(this.Ctx())
}
if atomic.SwapUint32(&this.running, 1) == 0 {
go this.For(func() error {
_, err := this.Write(<-this.queue)
return err
})
}
}
4 changes: 2 additions & 2 deletions io_i_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
"time"
)

// NewWriter 新建写
func NewWriter(writer Writer) *IWriter {
// NewIWriter 新建写
func NewIWriter(writer Writer) *IWriter {
if c, ok := writer.(*IWriter); ok && c != nil {
return c
}
Expand Down
2 changes: 1 addition & 1 deletion io_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ func NewPool(dial DialFunc, num int, options ...func(ctx context.Context, c *Cli
p := &Pool{
client: make(map[string]*Client),
}
p.IWriter = NewWriter(p)
p.IWriter = NewIWriter(p)
p.ICloser = NewICloser(p)
go func() {
for i := 0; i < num; i++ {
Expand Down
11 changes: 11 additions & 0 deletions io_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,17 @@ func (this *Server) GetClientLen() int {
return len(this.clientMap)
}

// RangeClient 遍历客户端
func (this *Server) RangeClient(fn func(key string, c *Client) bool) {
this.clientMu.RLock()
defer this.clientMu.RUnlock()
for i, v := range this.clientMap {
if fn(i, v) {
break
}
}
}

// Read 无效,使用ReadMessage
func (this *Server) Read(p []byte) (int, error) {
return 0, nil
Expand Down

0 comments on commit 70a98db

Please sign in to comment.