Skip to content

Commit

Permalink
add subchunk request cancel logical
Browse files Browse the repository at this point in the history
  • Loading branch information
CMA2401PT committed Jul 23, 2022
1 parent 65308bd commit cbd600d
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 34 deletions.
118 changes: 85 additions & 33 deletions mirror/io/assembler/assembler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,14 @@ import (

type Assembler struct {
airRID uint32
taskMu sync.RWMutex
pendingTasks map[define.ChunkPos]*mirror.ChunkData
mu sync.RWMutex
chunkRequestChan chan []*packet.SubChunkRequest
visitTime map[define.ChunkPos]time.Time
chunkRequestChan chan []*packet.SubChunkRequest
queueMu sync.RWMutex
requestQueue map[define.ChunkPos][]*packet.SubChunkRequest
centerChunk *define.ChunkPos
radius int32
}

func NewAssembler() *Assembler {
Expand All @@ -31,6 +35,11 @@ func NewAssembler() *Assembler {
pendingTasks: make(map[define.ChunkPos]*mirror.ChunkData),
chunkRequestChan: make(chan []*packet.SubChunkRequest, 10240),
visitTime: make(map[define.ChunkPos]time.Time),
requestQueue: make(map[define.ChunkPos][]*packet.SubChunkRequest),
taskMu: sync.RWMutex{},
queueMu: sync.RWMutex{},
centerChunk: nil,
radius: 10,
}
return a

Expand All @@ -49,21 +58,21 @@ func (o *Assembler) GenRequestFromLevelChunk(pk *packet.LevelChunk) (requests []

func (o *Assembler) AddPendingTask(pk *packet.LevelChunk) (exist bool) {
cp := define.ChunkPos{pk.Position.X(), pk.Position.Z()}
o.mu.RLock()
o.taskMu.RLock()
if _, hasK := o.pendingTasks[cp]; hasK {
o.mu.RUnlock()
o.taskMu.RUnlock()
return true
}
o.mu.RUnlock()
o.taskMu.RUnlock()
chunk := chunk.New(o.airRID, define.Range{-64, 319})
o.mu.Lock()
o.taskMu.Lock()
o.pendingTasks[cp] = &mirror.ChunkData{
Chunk: chunk,
BlockNbts: make(map[define.CubePos]map[string]interface{}),
TimeStamp: time.Now().Unix(),
ChunkPos: cp,
}
o.mu.Unlock()
o.taskMu.Unlock()
return false
}

Expand All @@ -77,19 +86,19 @@ func (o *Assembler) OnNewSubChunk(pk *packet.SubChunk) *mirror.ChunkData {
}()
cp := define.ChunkPos{pk.SubChunkX, pk.SubChunkZ}
// subChunkIndex := pk.SubChunkY
o.mu.RLock()
o.taskMu.RLock()
if chunkData, hasK := o.pendingTasks[cp]; !hasK {
o.mu.RUnlock()
o.taskMu.RUnlock()
//fmt.Printf("Unexpected chunk\n")
return nil
} else {
o.mu.RUnlock()
o.taskMu.RUnlock()
if pk.RequestResult != packet.SubChunkRequestResultSuccess {
// cancel pending task
// fmt.Println("Cancel Pending Task")
o.mu.Lock()
o.taskMu.Lock()
delete(o.pendingTasks, cp)
o.mu.Unlock()
o.taskMu.Unlock()
return nil
}
subIndex, subChunk, nbts, err := chunk.NEMCSubChunkDecode(pk.Data)
Expand Down Expand Up @@ -124,48 +133,91 @@ func (o *Assembler) OnNewSubChunk(pk *packet.SubChunk) *mirror.ChunkData {
}
fmt.Printf("Finished %v\n", cp)
*/
o.mu.Lock()
o.taskMu.Lock()
delete(o.pendingTasks, cp)
o.visitTime[cp] = time.Now()
o.mu.Unlock()
o.taskMu.Unlock()
return chunkData
}

}

func (o *Assembler) ScheduleRequest(requests []*packet.SubChunkRequest) {
o.chunkRequestChan <- requests
}

func (o *Assembler) CancelQueueByPublishUpdate(p *packet.NetworkChunkPublisherUpdate) {
chunkCenterX := p.Position.X() >> 4
chunkCenterZ := p.Position.Z() >> 4
o.centerChunk = &define.ChunkPos{chunkCenterX, chunkCenterZ}
o.queueMu.Lock()
// cancelCounter := 0
for cp, _ := range o.requestQueue {
if (cp.X() < chunkCenterX-int32(o.radius)) || (cp.X() > chunkCenterX+int32(o.radius)) ||
(cp.Z() < chunkCenterZ-int32(o.radius)) || (cp.Z() > chunkCenterZ+int32(o.radius)) {
delete(o.requestQueue, cp)
// cancelCounter += 1
}
}
// if cancelCounter > 0 {
// pterm.Warning.Printfln("cancel %v pending request task, %v left", cancelCounter, len(o.requestQueue))
// }
o.queueMu.Unlock()
}

func (o *Assembler) CreateRequestScheduler(writeFn func(pk *packet.SubChunkRequest), sendPeriod time.Duration, validCacheTime time.Duration) {
go func() {
// t := time.NewTicker(time.Second / 40)
t := time.NewTicker(sendPeriod)
// visitTime := make(map[protocol.SubChunkPos]time.Time)
for requests := range o.chunkRequestChan {
// fmt.Println("request")
if len(o.chunkRequestChan) > 1024 {
pterm.Warning.Println("chunk request too busy")
tickerAwaked := false
// 16 * 16 256 ~ 420 chunks
requestSender := func() {
// pterm.Info.Println("ticker awaked")
t := time.NewTicker(sendPeriod / 24)
o.queueMu.RLock()
for cp, requests := range o.requestQueue {
pendingTasksNum := len(o.requestQueue)
o.queueMu.RUnlock()
if pendingTasksNum > 512 {
pterm.Warning.Printf("chunk request queue too long, pending %v tasks\n", pendingTasksNum*16)
}
first_subchunk_request := requests[0]
o.mu.RLock()
if visitTime, hasK := o.visitTime[define.ChunkPos{first_subchunk_request.Position.X(), first_subchunk_request.Position.Z()}]; hasK {
o.taskMu.RLock()
if visitTime, hasK := o.visitTime[cp]; hasK {
if time.Since(visitTime) < validCacheTime {
o.mu.RUnlock()
o.taskMu.RUnlock()
o.queueMu.RLock()
continue
}
}
o.mu.RUnlock()
// visitTime[request0.Position] = time.Now()
o.taskMu.RUnlock()
for _, request := range requests {
writeFn(request)
// o.o.adaptor.Write(&packet.SubChunkRequest{
// 0,
// protocol.SubChunkPos{1249, 4, -1249}, nil,
// })
<-t.C
}
o.queueMu.Lock()
delete(o.requestQueue, cp)
o.queueMu.Unlock()
o.queueMu.RLock()
}
tickerAwaked = false
o.queueMu.RUnlock()
}

go func() {
for requests := range o.chunkRequestChan {
firstSubChunkRequest := requests[0]
cp := define.ChunkPos{firstSubChunkRequest.Position.X(), firstSubChunkRequest.Position.Z()}
if o.centerChunk != nil && ((cp.X() < o.centerChunk.X()-o.radius) || (cp.X() > o.centerChunk.X()+o.radius) ||
(cp.Z() < o.centerChunk.Z()-o.radius) || (cp.Z() > o.centerChunk.Z()+o.radius)) {
// pterm.Warning.Printfln("Discard %v", cp)
continue
}
o.queueMu.Lock()
o.requestQueue[cp] = requests
if !tickerAwaked {
tickerAwaked = true
o.queueMu.Unlock()
requestSender()
} else {
o.queueMu.Unlock()
}
<-t.C
}
}()
}
5 changes: 4 additions & 1 deletion omega/mainframe/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,9 @@ func (r *Reactor) React(pkt packet.Packet) {
cb(chunkData)
}
}
case *packet.NetworkChunkPublisherUpdate:
r.chunkAssembler.CancelQueueByPublishUpdate(p)
// fmt.Println("packet.NetworkChunkPublisherUpdate", p)
}
for _, cb := range r.OnAnyPacketCallBack {
cb(pkt)
Expand Down Expand Up @@ -297,7 +300,7 @@ func (o *Reactor) onBootstrap() {
o.chunkAssembler = assembler.NewAssembler()
o.chunkAssembler.CreateRequestScheduler(func(pk *packet.SubChunkRequest) {
o.o.adaptor.Write(pk)
}, time.Second/20, time.Minute*3)
}, time.Second/15, time.Minute*5)
memoryProvider := lru.NewLRUMemoryChunkCacher(8)
worldDir := path.Join(o.o.GetWorldsDir(), "current")
fileProvider, err := mcdb.New(worldDir, opt.FlateCompression)
Expand Down

0 comments on commit cbd600d

Please sign in to comment.