Skip to content

Commit

Permalink
Always enable cleaner for clocktrail reader
Browse files Browse the repository at this point in the history
  • Loading branch information
xingfeng2510 committed Apr 10, 2018
1 parent 6f507c0 commit 26a3807
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 4 deletions.
5 changes: 3 additions & 2 deletions cleaner/cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ func NewCleaner(conf conf.MapConf, meta *reader.Meta, cleanChan chan<- CleanSign
if !enable {
return
}
if meta.GetMode() != reader.ModeDir && meta.GetMode() != reader.ModeFile {
log.Errorf("cleaner only support reader mode in dir or file, now mode is %v, cleaner disabled", meta.GetMode())
mode := meta.GetMode()
if mode != reader.ModeDir && mode != reader.ModeFile && mode != reader.ModeClockTrail {
log.Errorf("cleaner only support reader mode dir|file|clocktrail, now mode is %v, cleaner disabled", meta.GetMode())
return
}
interval, _ := conf.GetIntOr(clean_interval, 0) //单位,秒
Expand Down
16 changes: 16 additions & 0 deletions mgr/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,22 @@ func NewLogExportRunner(rc RunnerConfig, cleanChan chan<- cleaner.CleanSignal, p
rd reader.Reader
cl *cleaner.Cleaner
)
mode := rc.ReaderConfig["mode"]
if mode == reader.ModeClockTrail {
syncDir := rc.ReaderConfig[reader.KeySyncDirectory]
if syncDir == "" {
syncDir = reader.DefaultSyncDirectory
}
rc.ReaderConfig[reader.KeyLogPath] = syncDir

if len(rc.CleanerConfig) == 0 {
rc.CleanerConfig = conf.MapConf{
"delete_enable": "true",
"delete_interval": "60",
"reserve_file_number": "50",
}
}
}
meta, err := reader.NewMetaWithConf(rc.ReaderConfig)
if err != nil {
return nil, err
Expand Down
9 changes: 7 additions & 2 deletions reader/clocktrail.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ const (
KeySyncConcurrent = "sync_concurrent"
)

const (
DefaultSyncDirectory = "./data"
DefaultSyncMetaStore = "./.metastore"
)

var (
ignoredSuffixes = []string{".json.gz"}
)
Expand Down Expand Up @@ -124,14 +129,14 @@ func buildSyncOptions(conf conf.MapConf) (*syncOptions, error) {
return nil, emptyConfigError(KeyS3Bucket)
}
opts.prefix, _ = conf.GetStringOr(KeyS3Prefix, "")
opts.directory, _ = conf.GetStringOr(KeySyncDirectory, "./data")
opts.directory, _ = conf.GetStringOr(KeySyncDirectory, DefaultSyncDirectory)
if opts.directory == "" {
return nil, emptyConfigError(KeySyncDirectory)
}
if err = os.MkdirAll(opts.directory, 0755); err != nil {
return nil, fmt.Errorf("cannot create target directory %q: %v", opts.directory, err)
}
opts.metastore, _ = conf.GetStringOr(KeySyncMetastore, "./.metastore")
opts.metastore, _ = conf.GetStringOr(KeySyncMetastore, DefaultSyncMetaStore)
if opts.metastore == "" {
return nil, emptyConfigError(KeySyncMetastore)
}
Expand Down

0 comments on commit 26a3807

Please sign in to comment.