Skip to content

Commit

Permalink
Merge pull request #29 from holdno/feat/org
Browse files Browse the repository at this point in the history
Feat/org
  • Loading branch information
holdno authored Sep 17, 2023
2 parents 842c5db + ca3f974 commit 55359a6
Show file tree
Hide file tree
Showing 186 changed files with 4,904 additions and 1,493 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ gopherCronClient
.idea
_build
.DS_Store
/script/image.sh
/script/image.sh
debug
23 changes: 18 additions & 5 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,8 +1,21 @@
FROM alpine
MAINTAINER label <[email protected]>
FROM golang:1.20.8-alpine3.18 AS builder

ENV GOPROXY=https://goproxy.cn,direct

WORKDIR /gophercron
COPY ./_build/config /gophercron/config
COPY ./_build/view /gophercron/view
COPY ./_build/gophercron /gophercron/gophercron
COPY . .
RUN mkdir -p _build
RUN go build -a -ldflags '-extldflags "-static"' -o _build/gophercron ./cmd/


FROM alpine:3.18
LABEL MAINTAINER <[email protected]>

RUN apk update && apk add tzdata diffutils curl && cp -r -f /usr/share/zoneinfo/Asia/Shanghai /etc/localtime

WORKDIR /gophercron
COPY --from=builder /gophercron/_build/config /gophercron/config
COPY --from=builder /gophercron/_build/view /gophercron/view
COPY --from=builder /gophercron/_build/gophercron /gophercron/gophercron

CMD ["./gophercron", "service", "-c", "./config/service-config-default.toml"]
27 changes: 13 additions & 14 deletions agent/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package agent

import (
"context"
"errors"
"fmt"
"strings"

Expand All @@ -13,10 +14,10 @@ import (
"github.com/holdno/gopherCron/pkg/infra/register"
"github.com/holdno/gopherCron/pkg/warning"
"github.com/holdno/gopherCron/utils"
"go.uber.org/zap"

wregister "github.com/spacegrower/watermelon/infra/register"
"github.com/spacegrower/watermelon/infra/wlog"
"go.uber.org/zap"
"google.golang.org/grpc"
)

Expand All @@ -32,7 +33,6 @@ type client struct {

isClose bool
closeChan chan struct{}
ClientTaskReporter
// etcd protocol.EtcdManager
// protocol.ClientEtcdManager
warning.Warner
Expand All @@ -41,6 +41,7 @@ type client struct {
author *Author

cronpb.UnimplementedAgentServer
metrics *Metrics
}

type Client interface {
Expand Down Expand Up @@ -87,34 +88,32 @@ func (agent *client) loadConfigAndSetupAgentFunc() func() error {
// why 1024. view https://github.com/holdno/snowFlakeByGo
utils.InitIDWorker(clusterID % 1024)
agent.logger = wlog.With()
agent.scheduler = initScheduler()
agent.scheduler = initScheduler(agent)
agent.daemon = daemon.NewProjectDaemon(nil, agent.logger)

if cfg.Address != "" {
agent.localip = strings.Split(cfg.Address, ":")[0]
if cfg.RegisterAddress != "" {
agent.localip = strings.Split(cfg.RegisterAddress, ":")[0]
}
if agent.localip == "" {
var err error
if agent.localip, err = utils.GetLocalIP(); err != nil {
agent.logger.Panic("failed to get local ip", zap.Error(err))
}
}

agent.metrics = NewMonitor(agent.localip)
} else if agent.configPath == "" {
return fmt.Errorf("invalid config path")
}

if cfg.ReportAddr != "" {
agent.logger.Info(fmt.Sprintf("init http task log reporter, address: %s", cfg.ReportAddr))
reporter := warning.NewHttpReporter(cfg.ReportAddr)
agent.ClientTaskReporter = reporter
agent.Warner = reporter
agent.Warner = warning.NewHttpReporter(cfg.ReportAddr, func() (string, error) {
if agent.author != nil {
return agent.author.token, nil
}
return "", errors.New("author is not init")
})
} else {
agent.logger.Info("init default task log reporter, it must be used mysql config")
agent.ClientTaskReporter = NewDefaultTaskReporter(agent.logger, cfg.Mysql)
}

if agent.Warner == nil {
agent.Warner = warning.NewDefaultWarner(agent.logger)
}

Expand Down
15 changes: 15 additions & 0 deletions agent/executer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,30 @@ package agent
import (
"bufio"
"context"
"encoding/json"
"fmt"
"os"
"os/exec"
"strings"
"syscall"
"testing"
"time"

"github.com/holdno/gopherCron/common"
)

func TestTaskUnmarshal(t *testing.T) {
raw := `{"task_id": "12313123"}`

var data common.TaskWithOperator

if err := json.Unmarshal([]byte(raw), &data); err != nil {
t.Fatal(err)
}

t.Log(data.TaskInfo)
}

func fatal(format string, args ...interface{}) {
fmt.Println(fmt.Sprintf(format, args...))
os.Exit(1)
Expand Down
31 changes: 31 additions & 0 deletions agent/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package agent

import (
"github.com/holdno/gopherCron/pkg/metrics"

"github.com/prometheus/client_golang/prometheus"
)

type Metrics struct {
provider *metrics.Metrics
jobs *prometheus.GaugeVec
systemError *prometheus.CounterVec
}

func NewMonitor(instance string) *Metrics {
m := &Metrics{
provider: metrics.NewMetrics("agent", instance),
}

m.jobs = m.provider.NewGaugeVec("job", nil)
m.systemError = m.provider.NewCounterVec("system_error", []string{"reason"})
return m
}

func (s *Metrics) SetJobCount(count int) {
s.jobs.WithLabelValues().Set(float64(count))
}

func (s *Metrics) SystemErrInc(reason string) {
s.systemError.WithLabelValues(reason).Inc()
}
84 changes: 79 additions & 5 deletions agent/micro.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"github.com/holdno/gopherCron/pkg/infra/register"
"github.com/holdno/gopherCron/protocol"
"github.com/holdno/gopherCron/utils"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"

"github.com/gin-contrib/pprof"
"github.com/gin-gonic/gin"
Expand All @@ -30,8 +32,18 @@ import (
"google.golang.org/grpc/status"
)

func prometheusHandler(r *prometheus.Registry) gin.HandlerFunc {
h := promhttp.InstrumentMetricHandler(
r, promhttp.HandlerFor(r, promhttp.HandlerOpts{}),
)

return func(c *gin.Context) {
h.ServeHTTP(c.Writer, c.Request)
}
}

func (a *client) SetupMicroService() *winfra.Srv[infra.NodeMetaRemote] {
register := a.MustSetupRemoteRegister()
register := a.MustSetupRemoteRegisterV2()
cfg := a.Cfg()
if cfg.Address == "" {
cfg.Address = a.GetIP()
Expand All @@ -42,6 +54,12 @@ func (a *client) SetupMicroService() *winfra.Srv[infra.NodeMetaRemote] {
wlog.Info("debug mode will open pprof tools")
pprof.Register(httpEngine)
}
httpEngine.GET("/heatlhy", func(ctx *gin.Context) {
ctx.String(http.StatusOK, "healthy")
})
if a.metrics != nil {
httpEngine.GET("/metrics", prometheusHandler(a.metrics.provider.Registry()))
}

newsrv := infra.NewAgentServer()
var pids []int64
Expand All @@ -57,7 +75,7 @@ func (a *client) SetupMicroService() *winfra.Srv[infra.NodeMetaRemote] {
newsrv.WithHttpServer(&http.Server{
Handler: httpEngine,
}),
newsrv.WithAddress([]infra.Address{{ListenAddress: cfg.Address}}),
newsrv.WithAddress([]infra.Address{{ListenAddress: cfg.Address, RegisterAddress: cfg.RegisterAddress}}),
newsrv.WithTags(map[string]string{
"agent-version": protocol.GetVersion(),
}),
Expand All @@ -71,6 +89,58 @@ func (a *client) SetupMicroService() *winfra.Srv[infra.NodeMetaRemote] {
return srv
}

func (a *client) MustSetupRemoteRegisterV2() wregister.ServiceRegister[infra.NodeMetaRemote] {

genMetadata := func(ctx context.Context, cc *grpc.ClientConn) context.Context {
return metadata.NewOutgoingContext(ctx, metadata.New(map[string]string{
common.GOPHERCRON_AGENT_IP_MD_KEY: a.localip,
common.GOPHERCRON_AGENT_VERSION_KEY: protocol.GetVersion(),
}))
}

r, err := register.NewRemoteRegisterV2(a.localip, func() (register.CenterClient, error) {
if a.centerSrv.Cc != nil {
a.centerSrv.Cc.Close()
}
wlog.Debug("start build new center client")
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(a.cfg.Timeout)*time.Second)
defer cancel()
cc, err := grpc.DialContext(ctx, a.cfg.Micro.Endpoint, grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: time.Second * 15,
Timeout: time.Second * 5,
}),
grpc.WithUnaryInterceptor(func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
ctx = genMetadata(ctx, cc)
if method != cronpb.Center_Auth_FullMethodName {
ctx = metadata.AppendToOutgoingContext(ctx, common.GOPHERCRON_AGENT_AUTH_KEY, a.author.Auth(ctx, cc))
}
return invoker(ctx, method, req, reply, cc, opts...)
}),
grpc.WithStreamInterceptor(func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
ctx = genMetadata(ctx, cc)
if method != cronpb.Center_Auth_FullMethodName {
ctx = metadata.AppendToOutgoingContext(ctx, common.GOPHERCRON_AGENT_AUTH_KEY, a.author.Auth(ctx, cc))
}
return streamer(ctx, desc, cc, method, opts...)
}))
if err != nil {
wlog.Error("failed to dial center service", zap.String("endpoint", a.cfg.Micro.Endpoint), zap.Error(err))
return register.CenterClient{}, err
}
a.centerSrv = register.CenterClient{
CenterClient: cronpb.NewCenterClient(cc),
Cc: cc,
}
return a.centerSrv, nil
}, a.handlerEventFromCenterV2)
if err != nil {
panic(err)
}
return r
}

// MustSetupRemoteRegister v2.2.1 版本之后不再维护,使用 MustSetupRemoteRegisterV2
func (a *client) MustSetupRemoteRegister() wregister.ServiceRegister[infra.NodeMetaRemote] {

genMetadata := func(ctx context.Context, cc *grpc.ClientConn) context.Context {
Expand Down Expand Up @@ -138,8 +208,8 @@ func (a *client) CheckRunning(ctx context.Context, req *cronpb.CheckRunningReque
}

func (a *client) Schedule(ctx context.Context, req *cronpb.ScheduleRequest) (*cronpb.Result, error) {
unmarshalTask := func(value []byte) (*common.TaskInfo, error) {
var task common.TaskInfo
unmarshalTask := func(value []byte) (*common.TaskWithOperator, error) {
var task common.TaskWithOperator
if err := json.Unmarshal(value, &task); err != nil {
return nil, status.Error(codes.InvalidArgument, "failed to unmarshal task")
}
Expand Down Expand Up @@ -178,7 +248,7 @@ func (a *client) Schedule(ctx context.Context, req *cronpb.ScheduleRequest) (*cr
if _, taskExecuting := a.scheduler.CheckTaskExecuting(task.SchedulerKey()); taskExecuting {
return nil, status.Error(codes.AlreadyExists, "the task already executing, try again later")
}
plan, err := common.BuildWorkflowTaskSchedulerPlan(task)
plan, err := common.BuildWorkflowTaskSchedulerPlan(task.TaskInfo)
if err != nil {
return nil, status.Error(codes.Internal, "failed to build workflow task schedule plan")
}
Expand Down Expand Up @@ -238,6 +308,10 @@ func NewAuthor(projectAuths []config.ProjectAuth) *Author {
}
}

func (a *Author) GetToken() string {
return a.token
}

func (a *Author) Auth(ctx context.Context, cc *grpc.ClientConn) string {
if !a.expTime.IsZero() && a.expTime.Before(time.Now().Add(time.Second*10)) {
return a.token
Expand Down
Loading

0 comments on commit 55359a6

Please sign in to comment.