Skip to content

Commit

Permalink
Merge pull request cubefs#1282 from shuoranliu/enhance-extent-fragmen…
Browse files Browse the repository at this point in the history
…tation-problem

enhance: mitigate the pain of extents fragmentation
  • Loading branch information
shuoranliu authored Dec 24, 2021
2 parents 1cb6e0b + fa7c53d commit d72f7d9
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 26 deletions.
39 changes: 39 additions & 0 deletions sdk/data/stream/extent_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,25 @@ func (cache *ExtentCache) RemoveDiscard(discardExtents []proto.ExtentKey) {
}
}

func (cache *ExtentCache) TruncDiscard(size uint64) {
cache.Lock()
defer cache.Unlock()
if size >= cache.size {
return
}
pivot := &proto.ExtentKey{FileOffset: size}
discardExtents := make([]proto.ExtentKey, 0, cache.discard.Len())
cache.discard.AscendGreaterOrEqual(pivot, func(i btree.Item) bool {
found := i.(*proto.ExtentKey)
discardExtents = append(discardExtents, *found)
return true
})
for _, key := range discardExtents {
cache.discard.Delete(&key)
}
log.LogDebugf("truncate ExtentCache discard: ino(%v) size(%v) discard(%v)", cache.inode, size, discardExtents)
}

// Max returns the max extent key in the cache.
func (cache *ExtentCache) Max() *proto.ExtentKey {
cache.RLock()
Expand Down Expand Up @@ -222,6 +241,26 @@ func (cache *ExtentCache) Get(offset uint64) (ret *proto.ExtentKey) {
return ret
}

// GetEnd returns the extent key whose end offset equals the given offset.
func (cache *ExtentCache) GetEnd(offset uint64) (ret *proto.ExtentKey) {
pivot := &proto.ExtentKey{FileOffset: offset}
cache.RLock()
defer cache.RUnlock()

cache.root.DescendLessOrEqual(pivot, func(i btree.Item) bool {
ek := i.(*proto.ExtentKey)
// skip if the start offset matches with the given offset
if offset == ek.FileOffset {
return true
}
if offset == ek.FileOffset+uint64(ek.Size) {
ret = ek
}
return false
})
return ret
}

// PrepareReadRequests classifies the incoming request.
func (cache *ExtentCache) PrepareReadRequests(offset, size int, data []byte) []*ExtentRequest {
requests := make([]*ExtentRequest, 0)
Expand Down
54 changes: 36 additions & 18 deletions sdk/data/stream/extent_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,13 @@ type ExtentHandler struct {
}

// NewExtentHandler returns a new extent handler.
func NewExtentHandler(stream *Streamer, offset int, storeMode int) *ExtentHandler {
func NewExtentHandler(stream *Streamer, offset int, storeMode int, size int) *ExtentHandler {
eh := &ExtentHandler{
stream: stream,
id: GetExtentHandlerID(),
inode: stream.inode,
fileOffset: offset,
size: size,
storeMode: storeMode,
empty: make(chan struct{}, 1024),
request: make(chan *Packet, 1024),
Expand Down Expand Up @@ -214,9 +215,15 @@ func (eh *ExtentHandler) sender() {
if err = eh.allocateExtent(); err != nil {
eh.setClosed()
eh.setRecovery()
eh.setError()
// if dp is not specified and yet we failed, then error out.
// otherwise, just try to recover.
if eh.key == nil {
eh.setError()
log.LogErrorf("sender: eh(%v) err(%v)", eh, err)
} else {
log.LogWarnf("sender: eh(%v) err(%v)", eh, err)
}
eh.reply <- packet
log.LogErrorf("sender: eh(%v) err(%v)", eh, err)
continue
}
}
Expand Down Expand Up @@ -466,7 +473,7 @@ func (eh *ExtentHandler) recoverPacket(packet *Packet) error {
// Always use normal extent store mode for recovery.
// Because tiny extent files are limited, tiny store
// failures might due to lack of tiny extent file.
handler = NewExtentHandler(eh.stream, int(packet.KernelOffset), proto.NormalExtentType)
handler = NewExtentHandler(eh.stream, int(packet.KernelOffset), proto.NormalExtentType, 0)
handler.setClosed()
}
handler.pushToRequest(packet)
Expand Down Expand Up @@ -497,28 +504,39 @@ func (eh *ExtentHandler) allocateExtent() (err error) {
exclude := make(map[string]struct{})

for i := 0; i < MaxSelectDataPartitionForWrite; i++ {
if dp, err = eh.stream.client.dataWrapper.GetDataPartitionForWrite(exclude); err != nil {
log.LogWarnf("allocateExtent: failed to get write data partition, eh(%v) exclude(%v)", eh, exclude)
continue
}
if eh.key == nil {
if dp, err = eh.stream.client.dataWrapper.GetDataPartitionForWrite(exclude); err != nil {
log.LogWarnf("allocateExtent: failed to get write data partition, eh(%v) exclude(%v)", eh, exclude)
continue
}

extID = 0
if eh.storeMode == proto.NormalExtentType {
extID, err = eh.createExtent(dp)
}
if err != nil {
log.LogWarnf("allocateExtent: delete dp[%v] caused by create extent failed, eh(%v) err(%v) exclude(%v)",
dp, eh, err, exclude)
eh.stream.client.dataWrapper.RemoveDataPartitionForWrite(dp.PartitionID)
dp.CheckAllHostsIsAvail(exclude)
continue
extID = 0
if eh.storeMode == proto.NormalExtentType {
extID, err = eh.createExtent(dp)
}
if err != nil {
log.LogWarnf("allocateExtent: exclude dp[%v] for write caused by create extent failed, eh(%v) err(%v) exclude(%v)",
dp, eh, err, exclude)
eh.stream.client.dataWrapper.RemoveDataPartitionForWrite(dp.PartitionID)
dp.CheckAllHostsIsAvail(exclude)
continue
}
} else {
if dp, err = eh.stream.client.dataWrapper.GetDataPartition(eh.key.PartitionId); err != nil {
log.LogWarnf("allocateExtent: failed to get write data partition, eh(%v)", eh)
break
}
extID = int(eh.key.ExtentId)
}

if conn, err = StreamConnPool.GetConnect(dp.Hosts[0]); err != nil {
log.LogWarnf("allocateExtent: failed to create connection, eh(%v) err(%v) dp(%v) exclude(%v)",
eh, err, dp, exclude)
// If storeMode is tinyExtentType and can't create connection, we also check host status.
dp.CheckAllHostsIsAvail(exclude)
if eh.key != nil {
break
}
continue
}

Expand Down
29 changes: 25 additions & 4 deletions sdk/data/stream/stream_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,17 @@ package stream

import (
"fmt"
"golang.org/x/net/context"
"hash/crc32"
"net"
"sync/atomic"
"syscall"
"time"

"golang.org/x/net/context"

"github.com/chubaofs/chubaofs/proto"
"github.com/chubaofs/chubaofs/sdk/data/wrapper"
"github.com/chubaofs/chubaofs/storage"
"github.com/chubaofs/chubaofs/util"
"github.com/chubaofs/chubaofs/util/errors"
"github.com/chubaofs/chubaofs/util/log"
Expand All @@ -33,7 +35,7 @@ import (
const (
MaxSelectDataPartitionForWrite = 32
MaxNewHandlerRetry = 3
MaxPacketErrorCount = 32
MaxPacketErrorCount = 128
MaxDirtyListLen = 0
)

Expand Down Expand Up @@ -398,17 +400,34 @@ func (s *Streamer) doWrite(data []byte, offset, size int, direct bool) (total in
storeMode int
)

if offset+size > s.tinySizeLimit() {
// Small files are usually written in a single write, so use tiny extent
// store only for the first write operation.
if offset > 0 {
storeMode = proto.NormalExtentType
} else {
storeMode = proto.TinyExtentType
}

log.LogDebugf("doWrite enter: ino(%v) offset(%v) size(%v) storeMode(%v)", s.inode, offset, size, storeMode)

if s.handler == nil && storeMode == proto.NormalExtentType {
if currentEK := s.extents.GetEnd(uint64(offset)); currentEK != nil && !storage.IsTinyExtent(currentEK.ExtentId) {
handler := NewExtentHandler(s, int(currentEK.FileOffset), storeMode, int(currentEK.Size))
handler.key = &proto.ExtentKey{
FileOffset: currentEK.FileOffset,
PartitionId: currentEK.PartitionId,
ExtentId: currentEK.ExtentId,
ExtentOffset: currentEK.ExtentOffset,
Size: currentEK.Size,
}
s.handler = handler
s.dirty = false
}
}

for i := 0; i < MaxNewHandlerRetry; i++ {
if s.handler == nil {
s.handler = NewExtentHandler(s, offset, storeMode)
s.handler = NewExtentHandler(s, offset, storeMode, 0)
s.dirty = false
}

Expand All @@ -421,6 +440,7 @@ func (s *Streamer) doWrite(data []byte, offset, size int, direct bool) (total in
break
}

log.LogDebugf("doWrite handler write failed so close open handler: ino(%v) offset(%v) size(%v) storeMode(%v)", s.inode, offset, size, storeMode)
s.closeOpenHandler()
}

Expand Down Expand Up @@ -584,6 +604,7 @@ func (s *Streamer) truncate(size int) error {
return nil
}

s.extents.TruncDiscard(uint64(size))
return s.GetExtents()
}

Expand Down
9 changes: 5 additions & 4 deletions sdk/meta/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ func (mc *MetaConn) String() string {
func (mw *MetaWrapper) getConn(partitionID uint64, addr string) (*MetaConn, error) {
conn, err := mw.conns.GetConnect(addr)
if err != nil {
log.LogWarnf("GetConnect conn: addr(%v) err(%v)", addr, err)
return nil, err
}
mc := &MetaConn{conn: conn, id: partitionID, addr: addr}
Expand All @@ -72,19 +71,20 @@ func (mw *MetaWrapper) sendToMetaPartition(mp *MetaPartition, req *proto.Packet)

addr = mp.LeaderAddr
if addr == "" {
err = errors.New(fmt.Sprintf("sendToMetaPartition failed: leader addr empty, req(%v) mp(%v)", req, mp))
err = errors.New(fmt.Sprintf("sendToMetaPartition: failed due to empty leader addr and goto retry, req(%v) mp(%v)", req, mp))
goto retry
}
mc, err = mw.getConn(mp.PartitionID, addr)
if err != nil {
log.LogWarnf("sendToMetaPartition: getConn failed and goto retry, req(%v) mp(%v) addr(%v) err(%v)", req, mp, addr, err)
goto retry
}
resp, err = mc.send(req)
mw.putConn(mc, err)
if err == nil && !resp.ShouldRetry() {
goto out
}
log.LogWarnf("sendToMetaPartition: leader failed req(%v) mp(%v) mc(%v) err(%v) resp(%v)", req, mp, mc, err, resp)
log.LogWarnf("sendToMetaPartition: leader failed and goto retry, req(%v) mp(%v) mc(%v) err(%v) resp(%v)", req, mp, mc, err, resp)

retry:
start = time.Now()
Expand All @@ -93,6 +93,7 @@ retry:
mc, err = mw.getConn(mp.PartitionID, addr)
errs[j] = err
if err != nil {
log.LogWarnf("sendToMetaPartition: getConn failed and continue to retry, req(%v) mp(%v) addr(%v) err(%v)", req, mp, addr, err)
continue
}
resp, err = mc.send(req)
Expand All @@ -119,7 +120,7 @@ out:
if err != nil || resp == nil {
return nil, errors.New(fmt.Sprintf("sendToMetaPartition failed: req(%v) mp(%v) errs(%v) resp(%v)", req, mp, errs, resp))
}
log.LogDebugf("sendToMetaPartition successful: req(%v) mc(%v) resp(%v)", req, mc, resp)
log.LogDebugf("sendToMetaPartition: succeed! req(%v) mc(%v) resp(%v)", req, mc, resp)
return resp, nil
}

Expand Down
7 changes: 7 additions & 0 deletions storage/extent.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,13 @@ func (e *Extent) Write(data []byte, offset, size int64, crc uint32, writeType in
if err = e.checkOffsetAndSize(offset, size); err != nil {
return
}

// Check if extent file size matches the write offset just in case
// multiple clients are writing concurrently.
if IsAppendWrite(writeType) && e.dataSize != offset {
err = NewParameterMismatchErr(fmt.Sprintf("extent current size = %v write offset=%v write size=%v", e.dataSize, offset, size))
return
}
if _, err = e.file.WriteAt(data[:size], int64(offset)); err != nil {
return
}
Expand Down

0 comments on commit d72f7d9

Please sign in to comment.