Skip to content

Commit

Permalink
fix race conditioning
Browse files Browse the repository at this point in the history
  • Loading branch information
sijms committed Mar 23, 2024
1 parent 800bce3 commit 4984576
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 10 deletions.
5 changes: 4 additions & 1 deletion v2/network/data_packet.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"encoding/binary"
"errors"
"sync"
)

type DataPacket struct {
Expand All @@ -23,9 +24,11 @@ func (pck *DataPacket) bytes() []byte {
return ret.Bytes()
}

func newDataPacket(initialData []byte, sessionCtx *SessionContext) (*DataPacket, error) {
func newDataPacket(initialData []byte, sessionCtx *SessionContext, mu *sync.Mutex) (*DataPacket, error) {
//var outputData []byte = initialData
var err error
mu.Lock()
defer mu.Unlock()
if sessionCtx.AdvancedService.HashAlgo != nil {
hashData := sessionCtx.AdvancedService.HashAlgo.Compute(initialData)
initialData = append(initialData, hashData...)
Expand Down
18 changes: 9 additions & 9 deletions v2/network/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func NewSession(connOption *ConnectionOption) *Session {
}

// SaveState save current session state and accept new state
// if new state is nil the session will be resetted
// if new state is nil the session will be reset
func (session *Session) SaveState(newState *SessionState) {
session.mu.Lock()
defer session.mu.Unlock()
Expand Down Expand Up @@ -188,10 +188,10 @@ func (session *Session) StartContext(ctx context.Context) {
done := make(chan struct{})
session.doneContext = append(session.doneContext, done)
go func(idone chan struct{}, mu *sync.Mutex) {
//mu.Lock()
//defer mu.Unlock()
var err error
mu.Lock()
var tracer = session.Context.ConnOption.Tracer
mu.Unlock()
select {
case <-idone:
return
Expand Down Expand Up @@ -417,7 +417,7 @@ func (session *Session) RestoreIndex() bool {
return false
}

// BreakConnection elicit connetion break to cancel the current operation
// BreakConnection elicit connection break to cancel the current operation
func (session *Session) BreakConnection() error {
tracer := session.Context.ConnOption.Tracer
tracer.Print("Break Connection")
Expand Down Expand Up @@ -597,7 +597,7 @@ func (session *Session) Connect(ctx context.Context) error {
}

func (session *Session) WriteFinalPacket() error {
data, err := newDataPacket(nil, session.Context)
data, err := newDataPacket(nil, session.Context, &session.mu)
if err != nil {
return err
}
Expand Down Expand Up @@ -627,7 +627,7 @@ func (session *Session) Write() error {
size := session.outBuffer.Len()
if size == 0 {
// send empty data packet
pck, err := newDataPacket(nil, session.Context)
pck, err := newDataPacket(nil, session.Context, &session.mu)
if err != nil {
return err
}
Expand All @@ -641,7 +641,7 @@ func (session *Session) Write() error {
segment := make([]byte, segmentLen)
for size > segmentLen {
copy(segment, outputBytes[offset:offset+segmentLen])
pck, err := newDataPacket(segment, session.Context)
pck, err := newDataPacket(segment, session.Context, &session.mu)
if err != nil {
return err
}
Expand All @@ -655,7 +655,7 @@ func (session *Session) Write() error {
}
}
if size != 0 {
pck, err := newDataPacket(outputBytes[offset:], session.Context)
pck, err := newDataPacket(outputBytes[offset:], session.Context, &session.mu)
if err != nil {
return err
}
Expand Down Expand Up @@ -877,7 +877,7 @@ func (session *Session) read(numBytes int) ([]byte, error) {
ret := session.inBuffer[session.index : session.index+numBytes]
session.index += numBytes
if len(ret) < requiredLen {
return nil, errors.New("buffer underrun during read operation")
return nil, errors.New("buffer under-run during read operation")
}
return ret, nil
}
Expand Down

0 comments on commit 4984576

Please sign in to comment.