Skip to content

Commit

Permalink
Move secret status scan to scanner
Browse files Browse the repository at this point in the history
  • Loading branch information
noboruma committed Mar 22, 2023
1 parent d588934 commit f28411b
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 115 deletions.
4 changes: 2 additions & 2 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
[submodule "deepfence_agent/plugins/agent-plugins-grpc"]
path = deepfence_agent/plugins/agent-plugins-grpc
url = https://github.com/deepfence/agent-plugins-grpc
branch = package-scanner
branch = v2
[submodule "deepfence_agent/plugins/package-scanner"]
path = deepfence_agent/plugins/package-scanner
url = https://github.com/deepfence/package-scanner
branch = merge-vulnerability-mapper
[submodule "deepfence_agent/plugins/SecretScanner"]
path = deepfence_agent/plugins/SecretScanner
url = https://github.com/deepfence/SecretScanner
branch = kafka-rest
branch = v2
[submodule "deepfence_agent/plugins/compliance"]
path = deepfence_agent/plugins/compliance
url = https://github.com/deepfence/compliance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"github.com/bytedance/sonic"
openapi "github.com/deepfence/golang_deepfence_sdk/client"
ctl "github.com/deepfence/golang_deepfence_sdk/utils/controls"
"github.com/klauspost/compress/gzip"
"github.com/sirupsen/logrus"
"github.com/weaveworks/scope/common/xfer"
Expand Down Expand Up @@ -96,15 +95,43 @@ const (
MAX_AGENT_WORKLOAD = 2
)

func GetScannersWorkloads() int32 {
res := int32(0)
res += host.GetSecretScannerJobCount()
//TODO: Add more scanners workfload
return res
}

var upgrade atomic.Bool

func SetUpgrade() {
upgrade.Store(true)
}

func getUpgradeWorkload() int32 {
if upgrade.Load() {
return MAX_AGENT_WORKLOAD
}
return 0
}

func getMaxAllocatable() int32 {
workload := MAX_AGENT_WORKLOAD - GetScannersWorkloads() - getUpgradeWorkload()
if workload <= 0 {
workload = 0
}
logrus.Infof("Workload: %v\n", workload)
return workload
}

func (ct *OpenapiClient) StartControlsWatching(nodeId string, isClusterAgent bool) error {
workload_allocator := ctl.NewWorkloadAllocator(MAX_AGENT_WORKLOAD)
if isClusterAgent {

} else {
req := ct.client.ControlsApi.GetAgentInitControls(context.Background())
req = req.ModelInitAgentReq(
*openapi.NewModelInitAgentReq(
workload_allocator.MaxAllocable(),
getMaxAllocatable(),
nodeId,
host.AgentVersionNo,
),
Expand All @@ -117,31 +144,27 @@ func (ct *OpenapiClient) StartControlsWatching(nodeId string, isClusterAgent boo
return err
}

workload_allocator.Reserve(int32(len(ctl.Commands)))

for _, action := range ctl.Commands {
logrus.Infof("Init execute :%v", action.Id)
err := controls.ApplyControl(action)
if err != nil {
logrus.Errorf("Control %v failed: %v\n", action, err)
}
// TODO: call when work truly completes
workload_allocator.Free()
}
}

if isClusterAgent {
go func() {
req := ct.client.ControlsApi.GetKubernetesClusterControls(context.Background())
agentId := openapi.NewModelAgentId(workload_allocator.MaxAllocable(), nodeId)
agentId := openapi.NewModelAgentId(getMaxAllocatable(), nodeId)
req = req.ModelAgentId(*agentId)
for {
select {
case <-time.After(time.Second * 10):
case <-ct.stopControlListening:
break
}
agentId.SetAvailableWorkload(workload_allocator.MaxAllocable())
agentId.SetAvailableWorkload(getMaxAllocatable())
req = req.ModelAgentId(*agentId)
ctl, _, err := ct.client.ControlsApi.GetKubernetesClusterControlsExecute(req)
if err != nil {
Expand All @@ -151,31 +174,27 @@ func (ct *OpenapiClient) StartControlsWatching(nodeId string, isClusterAgent boo

ct.publishInterval.Store(ctl.Beatrate)

workload_allocator.Reserve(int32(len(ctl.Commands)))

for _, action := range ctl.Commands {
logrus.Infof("Execute :%v", action.Id)
err := controls.ApplyControl(action)
if err != nil {
logrus.Errorf("Control %v failed: %v\n", action, err)
}
// TODO: call when work truly completes
workload_allocator.Free()
}
}
}()
} else {
go func() {
req := ct.client.ControlsApi.GetAgentControls(context.Background())
agentId := openapi.NewModelAgentId(workload_allocator.MaxAllocable(), nodeId)
agentId := openapi.NewModelAgentId(getMaxAllocatable(), nodeId)
req = req.ModelAgentId(*agentId)
for {
select {
case <-time.After(time.Second * 10):
case <-ct.stopControlListening:
break
}
agentId.SetAvailableWorkload(workload_allocator.MaxAllocable())
agentId.SetAvailableWorkload(getMaxAllocatable())
req = req.ModelAgentId(*agentId)
ctl, _, err := ct.client.ControlsApi.GetAgentControlsExecute(req)
if err != nil {
Expand All @@ -185,16 +204,12 @@ func (ct *OpenapiClient) StartControlsWatching(nodeId string, isClusterAgent boo

ct.publishInterval.Store(ctl.Beatrate)

workload_allocator.Reserve(int32(len(ctl.Commands)))

for _, action := range ctl.Commands {
logrus.Infof("Execute :%v", action.Id)
err := controls.ApplyControl(action)
if err != nil {
logrus.Errorf("Control %v failed: %v\n", action, err)
}
// TODO: call when work truly completes
workload_allocator.Free()
}
}
}()
Expand Down
135 changes: 42 additions & 93 deletions deepfence_agent/tools/apache/scope/probe/host/secret_scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,12 @@ package host
import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
"path/filepath"
"reflect"
"strconv"
"strings"
"time"

"github.com/Jeffail/tunny"
log "github.com/sirupsen/logrus"
Expand All @@ -21,11 +19,9 @@ import (
)

const (
ebpfSocketPath = "/tmp/secret-scanner.sock"
ssEbpfLogPath = "/var/log/fenced/secretScanner.log"
defaultScanConcurrency = 1
secretScanIndexName = "secret-scan"
secretScanLogsIndexName = "secret-scan-logs"
ebpfSocketPath = "/tmp/secret-scanner.sock"
ssEbpfLogPath = "/var/log/fenced/secretScanner.log"
defaultScanConcurrency = 1
)

var certPath = "/etc/filebeat/filebeat.crt"
Expand All @@ -36,6 +32,9 @@ var (
mgmtConsoleUrl string
deepfenceKey string
scanDir string

scanFilename = getDfInstallDir() + "/var/log/fenced/secret-scan/secret_scan.log"
scanStatusFilename = getDfInstallDir() + "/var/log/fenced/secret-scan-log/secret_scan_log.log"
)

type secretScanParameters struct {
Expand Down Expand Up @@ -72,19 +71,24 @@ func StartSecretsScan(req ctl.StartSecretScanRequest) error {
var greq pb.FindRequest
switch req.NodeType {
case ctl.Container:
greq = pb.FindRequest{Input: &pb.FindRequest_Container{
Container: &pb.Container{Id: req.BinArgs["node_id"]},
}}
greq = pb.FindRequest{
Input: &pb.FindRequest_Container{
Container: &pb.Container{Id: req.BinArgs["node_id"]},
},
ScanId: req.BinArgs["scan_id"],
}
case ctl.Image:
splits := strings.Split(req.BinArgs["node_id"], ";")
if len(splits) != 2 {
return errors.New("image id format is incorrect")
greq = pb.FindRequest{
Input: &pb.FindRequest_Image{
Image: &pb.DockerImage{Id: req.BinArgs["node_id"], Name: req.BinArgs["image_name"]},
},
ScanId: req.BinArgs["scan_id"],
}
greq = pb.FindRequest{Input: &pb.FindRequest_Image{
Image: &pb.DockerImage{Id: splits[0], Name: splits[1]},
}}
case ctl.Host:
greq = pb.FindRequest{Input: &pb.FindRequest_Path{Path: "/fenced/mnt/host"}}
greq = pb.FindRequest{
Input: &pb.FindRequest_Path{Path: "/fenced/mnt/host"},
ScanId: req.BinArgs["scan_id"],
}
}

ssClient, err := newSecretScannerClient()
Expand All @@ -111,22 +115,7 @@ func getAndPublishSecretScanResultsWrapper(scanParametersInterface interface{})
}

func getAndPublishSecretScanResults(client pb.SecretScannerClient, req *pb.FindRequest, controlArgs map[string]string, hostName string) {
var secretScanLogDoc = make(map[string]interface{})
secretScanLogDoc["scan_id"] = controlArgs["scan_id"]
secretScanLogDoc["scan_status"] = "IN_PROGRESS"
secretScanLogDoc["@timestamp"] = getCurrentTime()

byteJson, err := json.Marshal(secretScanLogDoc)
if err != nil {
fmt.Println("Error marshalling json: ", err)
return
}
// byteJson := formatToKafka(secretScanLogDoc)

err = writeScanDataToFile(string(byteJson), secretScanLogsIndexName)
if err != nil {
fmt.Println("Error in sending data to secretScanLogsIndex to mark in progress:" + err.Error())
}
res, err := client.FindSecretInfo(context.Background(), req)
if req.GetPath() != "" && err == nil && res != nil {
if scanDir == HostMountDir {
Expand All @@ -135,27 +124,17 @@ func getAndPublishSecretScanResults(client pb.SecretScannerClient, req *pb.FindR
}
}
}
currTime := getCurrentTime()

if err != nil {
secretScanLogDoc["scan_status"] = "ERROR"
secretScanLogDoc["scan_message"] = err.Error()
secretScanLogDoc["@timestamp"] = currTime
byteJson, err = json.Marshal(secretScanLogDoc)
if err != nil {
fmt.Println("Error marshalling json: ", err)
return
}
// byteJson = formatToKafka(secretScanLogDoc)
writeScanDataToFile(string(byteJson), secretScanLogsIndexName)
fmt.Println("FindSecretInfo error" + err.Error())
return
} else {
fmt.Println("Number of results received from SecretScanner for scan id:" + controlArgs["scan_id"] + " - " + strconv.Itoa(len(res.Secrets)))
}

fmt.Println("Number of results received from SecretScanner for scan id:" + controlArgs["scan_id"] + " - " + strconv.Itoa(len(res.Secrets)))

for _, secret := range res.Secrets {
var secretScanDoc = make(map[string]interface{})
secretScanDoc["masked"] = false
secretScanDoc["scan_id"] = controlArgs["scan_id"]
secretScanDoc["@timestamp"] = currTime
values := reflect.ValueOf(*secret)
typeOfS := values.Type()
for index := 0; index < values.NumField(); index++ {
Expand All @@ -168,49 +147,14 @@ func getAndPublishSecretScanResults(client pb.SecretScannerClient, req *pb.FindR
fmt.Println("Error marshalling json: ", err)
continue
}
// byteJson := formatToKafka(secretScanDoc)
err = writeScanDataToFile(string(byteJson), secretScanIndexName)
err = writeScanDataToFile(string(byteJson), scanFilename)
if err != nil {
fmt.Println("Error in sending data to secretScanIndex:" + err.Error())
}
}
if err == nil {
secretScanLogDoc["scan_status"] = "COMPLETE"
} else {
secretScanLogDoc["scan_status"] = "ERROR"
secretScanLogDoc["scan_message"] = err.Error()
}
secretScanLogDoc["@timestamp"] = currTime
byteJson, err = json.Marshal(secretScanLogDoc)
if err != nil {
fmt.Println("Error marshalling json: ", err)
return
}
// byteJson = formatToKafka(secretScanLogDoc)
err = writeScanDataToFile(string(byteJson), secretScanLogsIndexName)
if err != nil {
fmt.Println("Error in sending data to secretScanLogsIndex:" + err.Error())
}

}

func getTimestamp() int64 {
return time.Now().UTC().UnixNano() / 1000000
}

func getCurrentTime() string {
return time.Now().UTC().Format("2006-01-02T15:04:05.000") + "Z"
}

func writeScanDataToFile(secretScanMsg string, index string) error {
scanFilename := getDfInstallDir() + "/var/log/fenced/secret-scan/secret_scan.log"
scanStatusFilename := getDfInstallDir() + "/var/log/fenced/secret-scan-log/secret_scan_log.log"
files := map[string]string{
secretScanIndexName: scanFilename,
secretScanLogsIndexName: scanStatusFilename,
}

filename := files[index]
func writeScanDataToFile(secretScanMsg string, filename string) error {
err := os.MkdirAll(filepath.Dir(filename), 0755)
f, err := os.OpenFile(filename, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0600)
if err != nil {
Expand All @@ -236,12 +180,17 @@ func newSecretScannerClient() (pb.SecretScannerClient, error) {
return pb.NewSecretScannerClient(conn), nil
}

// func formatToKafka(data map[string]interface{}) []byte {
// encoded, err := json.Marshal(&data)
// if err != nil {
// fmt.Println("Error in marshalling in progress secretScan data to json:" + err.Error())
// return nil
// }
// value := "{\"value\":" + string(encoded) + "}"
// return []byte("{\"records\":[" + value + "]}")
// }
func GetSecretScannerJobCount() int32 {
conn, err := grpc.Dial("unix://"+ebpfSocketPath, grpc.WithAuthority("dummy"),
grpc.WithInsecure())
if err != nil {
fmt.Printf("error in creating secret scanner client: %s\n", err.Error())
return 0
}
client := pb.NewScannersClient(conn)
jobReport, err := client.ReportJobsStatus(context.Background(), &pb.Empty{})
if err != nil {
return 0
}
return jobReport.RunningJobs
}
3 changes: 3 additions & 0 deletions deepfence_agent/tools/apache/scope/prog/probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ func setClusterAgentControls(k8sClusterName string) {
}
err = controls.RegisterControl(ctl.StartAgentUpgrade,
func(req ctl.StartAgentUpgradeRequest) error {
log.Info("Start Cluster Agent Upgrade")
appclient.SetUpgrade()
return kubernetes.StartClusterAgentUpgrade(req)
})
if err != nil {
Expand Down Expand Up @@ -196,6 +198,7 @@ func setAgentControls() {
err = controls.RegisterControl(ctl.StartAgentUpgrade,
func(req ctl.StartAgentUpgradeRequest) error {
log.Info("Start Agent Upgrade")
appclient.SetUpgrade()
return host.StartAgentUpgrade(req)
})
if err != nil {
Expand Down

0 comments on commit f28411b

Please sign in to comment.