Skip to content

Commit

Permalink
runsim pushes logs to S3 bucket (cosmos#4677)
Browse files Browse the repository at this point in the history
  • Loading branch information
mircea-c authored and Alessio Treglia committed Jul 4, 2019
1 parent 1bfac99 commit 74915f1
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 44 deletions.
32 changes: 24 additions & 8 deletions contrib/runsim/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ var (
genesis string
exitOnFail bool
githubConfig string
gitRevision string
logObjKey string
slackConfig string

// integration with Slack and Github
Expand Down Expand Up @@ -82,7 +82,7 @@ func init() {
flag.StringVar(&genesis, "g", "", "Genesis file")
flag.StringVar(&seedOverrideList, "seeds", "", "run the supplied comma-separated list of seeds instead of defaults")
flag.BoolVar(&exitOnFail, "e", false, "Exit on fail during multi-sim, print error")
flag.StringVar(&gitRevision, "rev", "", "git revision")
flag.StringVar(&logObjKey, "log", "", "S3 object key for log files")
flag.StringVar(&githubConfig, "github", "", "Report results to Github's PR")
flag.StringVar(&slackConfig, "slack", "", "Report results to slack channel")

Expand Down Expand Up @@ -224,7 +224,16 @@ func makeCmd(cmdStr string) *exec.Cmd {
}

func makeFilename(seed int) string {
return fmt.Sprintf("app-simulation-seed-%d-date-%s", seed, time.Now().Format("01-02-2006_15:04:05.000000000"))
return fmt.Sprintf("app-simulation-seed-%d-date-%s", seed, time.Now().Format("01-02-2006_150405"))
}

func makeFailSlackMsg(seed int, stdoutKey, stderrKey, bucket string, logsPushed bool) string {
if logsPushed {
return fmt.Sprintf("*Seed %s: FAILED*. *<https://%s.s3.amazonaws.com/%s|stdout>* *<https://%s.s3.amazonaws.com/%s|stderr>*\nTo reproduce run: ```\n%s\n```",
strconv.Itoa(seed), bucket, stdoutKey, bucket, stderrKey, buildCommand(testname, blocks, period, genesis, seed))
}
return fmt.Sprintf("*Seed %s: FAILED*. \nTo reproduce run: ```\n%s\n```\n*Could not upload logs:* ```\n%s\n```",
strconv.Itoa(seed), buildCommand(testname, blocks, period, genesis, seed), bucket)
}

func worker(id int, seeds <-chan int) {
Expand All @@ -236,18 +245,25 @@ func worker(id int, seeds <-chan int) {
log.Printf("[W%d] Seed %d: FAILED", id, seed)
log.Printf("To reproduce run: %s", buildCommand(testname, blocks, period, genesis, seed))
if slackConfigSupplied() {
slackMessage(slackToken, slackChannel, nil, "Seed "+strconv.Itoa(seed)+" failed. To reproduce, run: "+buildCommand(testname, blocks, period, genesis, seed))
objKeys, bucket, err := pushLogs(stdOut, stdErr, logObjKey)
if err != nil {
slackMessage(slackToken, slackChannel, nil, makeFailSlackMsg(seed, "", "", err.Error(), false))
}
slackMessage(slackToken, slackChannel, nil, makeFailSlackMsg(seed, objKeys[0], objKeys[1], *bucket, true))
}
if exitOnFail {
log.Printf("\bERROR OUTPUT \n\n%s", err)
panic("halting simulations")
}
} else {
log.Printf("[W%d] Seed %d: OK", id, seed)
}
pushLogs(stdOut, stdErr, gitRevision)
log.Printf("[W%d] Seed %d: OK", id, seed)
if slackConfigSupplied() {
_, _, err = pushLogs(stdOut, stdErr, logObjKey)
if err != nil {
log.Printf("%v", err)
}
}
}

log.Printf("[W%d] no seeds left, shutting down", id)
}

Expand Down
75 changes: 39 additions & 36 deletions contrib/runsim/notification.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"os"
"path/filepath"
"strings"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
Expand All @@ -20,61 +19,65 @@ const (
awsRegion = "us-east-1"
)

var (
simTimeStamp = time.Now().Format("01-02-2006_15:05:05")
)

func awsErrHandler(err error) {
func awsErrHandler(err error) error {
if awsErr, ok := err.(awserr.Error); ok {
switch awsErr.Code() {
default:
log.Println(awsErr.Error())
return awsErr
}
} else {
log.Println(err.Error())
}
return err
}

func makeObjKey(folderName string, fileName string) string {
return fmt.Sprintf("%s/%s/%s", folderName, simTimeStamp, fileName)
func makeObjKey(objKeyPrefix string, fileName string) string {
return fmt.Sprintf("%s/%s", objKeyPrefix, fileName)
}

func putObj(fileHandle *os.File, svc *s3.S3, folderName string, bucketName string) {
_, _ = fileHandle.Seek(0, 0)

stdOutObjInput := &s3.PutObjectInput{
Body: aws.ReadSeekCloser(fileHandle),
Bucket: aws.String(bucketName),
Key: aws.String(makeObjKey(folderName, filepath.Base(fileHandle.Name()))),
}
if output, err := svc.PutObject(stdOutObjInput); err != nil {
awsErrHandler(err)
} else {
log.Printf("Log file pushed: %s", output.String())
// putObjects attempts to upload to an S3 bucket the content of each file from fileHandles.
// File descriptors have their read offset set to 0 to ensure all the content is uploaded.
// Each file will become an S3 bucket object that can be accessed via its object key.
//
// Function returns the list of object keys and an error, if any.
func putObjects(svc *s3.S3, objKeyPrefix string, bucketName string, fileHandles ...*os.File) ([]string, error) {
objKeys := make([]string, len(fileHandles))
for index, fileHandle := range fileHandles {
_, _ = fileHandle.Seek(0, 0)
objKey := makeObjKey(objKeyPrefix, filepath.Base(fileHandle.Name()))
stdOutObjInput := &s3.PutObjectInput{
Body: aws.ReadSeekCloser(fileHandle),
Bucket: aws.String(bucketName),
Key: aws.String(objKey),
}
_, err := svc.PutObject(stdOutObjInput)
if err != nil {
return nil, awsErrHandler(err)
}
objKeys[index] = objKey
}
return objKeys, nil
}

func pushLogs(stdOut *os.File, stdErr *os.File, folderName string) {
func pushLogs(stdOut *os.File, stdErr *os.File, folderName string) ([]string, *string, error) {
var logBucket *string

sessionS3 := s3.New(session.Must(session.NewSession(&aws.Config{
Region: aws.String(awsRegion),
})))
if listBucketsOutput, err := sessionS3.ListBuckets(&s3.ListBucketsInput{}); err != nil {
awsErrHandler(err)
} else {
for _, bucket := range listBucketsOutput.Buckets {
if strings.Contains(*bucket.Name, logBucketPrefix) {
logBucket = bucket.Name
putObj(stdOut, sessionS3, folderName, *logBucket)
putObj(stdErr, sessionS3, folderName, *logBucket)
break
listBucketsOutput, err := sessionS3.ListBuckets(&s3.ListBucketsInput{})
if err != nil {
return nil, nil, awsErrHandler(err)
}
for _, bucket := range listBucketsOutput.Buckets {
if strings.Contains(*bucket.Name, logBucketPrefix) {
logBucket = bucket.Name
objKeys, err := putObjects(sessionS3, folderName, *logBucket, stdOut, stdErr)
if err != nil {
return nil, nil, err
}
return objKeys, bucket.Name, nil
}
}
if logBucket == nil {
log.Println("Log bucket not found")
}
return nil, nil, nil
}

func slackMessage(token string, channel string, threadTS *string, message string) {
Expand Down

0 comments on commit 74915f1

Please sign in to comment.