Skip to content

Commit

Permalink
[PDR-16627][fix(cloudtrail)]: 修复读取s3 gzip压缩文件内存溢出问题
Browse files Browse the repository at this point in the history
  • Loading branch information
PapaPiya committed Mar 4, 2022
1 parent 480816e commit cc4380f
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 22 deletions.
42 changes: 23 additions & 19 deletions reader/cloudtrail/cloudtrail.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,9 +582,8 @@ DONE:
continue
}
basename := filepath.Base(s3file)
unzipPath := strings.TrimSuffix(basename, ".gz")
if !s.syncedFiles[basename] {
filePath := strings.Join([]string{s.target, unzipPath}, "/")
filePath := filepath.Join(s.target, basename)
if filepath.Dir(filePath) != "." {
err := os.MkdirAll(filepath.Dir(filePath), 0755)
if err != nil {
Expand All @@ -609,19 +608,24 @@ DONE:
pool <- struct{}{}
}(filePath, bucket, s3file)
} else {
log.Debugf("Runner[%v] %q already synced, skipped this time", s.meta.RunnerName, unzipPath)
log.Debugf("Runner[%v] %q already synced, skipped this time", s.meta.RunnerName, basename)
}
}
wg.Wait()
}

func writeFile(filename string, bucket *s3.Bucket, path string) error {
data, err := bucket.Get(path)
if err != nil {
return err
if strings.HasSuffix(filename, ".tar.gz") {
log.Warnf("cloudtrail not support archive file, will ignore %s", filename)
return nil
}
if strings.HasSuffix(filename, ".zip") {
rd, err := zip.NewReader(bytes.NewReader(data), int64(len(data)))
data, err := bucket.Get(path)
if err != nil {
return err
}
r := bytes.NewReader(data)
rd, err := zip.NewReader(r, int64(len(data)))
if err != nil {
log.Errorf("reader file %v as zip error %v", filename, err)
return ioutil.WriteFile(filename, data, os.FileMode(0644))
Expand All @@ -635,20 +639,20 @@ func writeFile(filename string, bucket *s3.Bucket, path string) error {
}
return writeErr
}
if utils.IsGzipped(data) {
gzipData, err := gzip.NewReader(bytes.NewReader(data))
if err != nil {
log.Errorf("reader file %v as gzip error %v, write to file as raw_text", filename, err)
return ioutil.WriteFile(filename, data, os.FileMode(0644))
}
gdata, err := ioutil.ReadAll(gzipData)
if err != nil {
log.Errorf("reader gzip reader error %v, write to file as raw_text", err)
return ioutil.WriteFile(filename, data, os.FileMode(0644))
r, err := bucket.GetReader(path)
if err != nil {
return err
}
defer r.Close()
if strings.HasSuffix(filename, ".gz") {
gr, err := gzip.NewReader(r)
if err == nil {
defer gr.Close()
return utils.WriteReaderToFile(gr, filename)
}
return ioutil.WriteFile(filename, gdata, os.FileMode(0644))
log.Errorf("reader file %v as gzip error %v, write to file as raw_text", filename, err)
}
return ioutil.WriteFile(filename, data, os.FileMode(0644))
return utils.WriteReaderToFile(r, filename)
}

func newPool(concurrent int) chan struct{} {
Expand Down
10 changes: 7 additions & 3 deletions utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,12 +199,16 @@ func WriteZipToFile(zipf *zip.File, filename string) error {
return err
}
defer srcF.Close()
distF, err := os.OpenFile(filepath.Join(filepath.Dir(filename), zipf.Name), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, os.FileMode(0644))
return WriteReaderToFile(srcF, filename)
}

func WriteReaderToFile(src io.Reader, dstFile string) error {
dstF, err := os.OpenFile(dstFile, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, os.FileMode(0644))
if err != nil {
return err
}
defer distF.Close()
_, err = io.Copy(distF, srcF)
defer dstF.Close()
_, err = io.Copy(dstF, src)
if err != nil {
return err
}
Expand Down

0 comments on commit cc4380f

Please sign in to comment.