Skip to content

Commit

Permalink
packet unmarshalling code into main server loop
Browse files Browse the repository at this point in the history
This means we are now passing full packets to the workers. This makes
the request id's available which is required for fixing packet ordering.
Benchmark tests weren't affected at all.
  • Loading branch information
eikenb committed Mar 23, 2017
1 parent 4f1fe4f commit 0b42971
Showing 1 changed file with 12 additions and 9 deletions.
21 changes: 12 additions & 9 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type Server struct {
serverConn
debugStream io.Writer
readOnly bool
pktChan chan rxPacket
pktChan chan packet
openFiles map[string]*os.File
openFilesLock sync.RWMutex
handleCount int
Expand Down Expand Up @@ -83,7 +83,7 @@ func NewServer(rwc io.ReadWriteCloser, options ...ServerOption) (*Server, error)
},
},
debugStream: ioutil.Discard,
pktChan: make(chan rxPacket, sftpServerWorkerCount),
pktChan: make(chan packet, sftpServerWorkerCount),
openFiles: make(map[string]*os.File),
maxTxPacket: 1 << 15,
}
Expand Down Expand Up @@ -123,12 +123,7 @@ type rxPacket struct {

// Up to N parallel servers
func (svr *Server) sftpServerWorker() error {
for p := range svr.pktChan {

pkt, err := makePacket(p)
if err != nil {
return err
}
for pkt := range svr.pktChan {

// readonly checks
readonly := true
Expand Down Expand Up @@ -297,14 +292,22 @@ func (svr *Server) Serve() error {
}

var err error
var pkt packet
var pktType uint8
var pktBytes []byte
for {
pktType, pktBytes, err = svr.recvPacket()
if err != nil {
break
}
svr.pktChan <- rxPacket{fxp(pktType), pktBytes}

pkt, err = makePacket(rxPacket{fxp(pktType), pktBytes})
if err != nil {
svr.conn.Close() // shuts down recvPacket
break
}

svr.pktChan <- pkt
}

close(svr.pktChan) // shuts down sftpServerWorkers
Expand Down

0 comments on commit 0b42971

Please sign in to comment.