Skip to content

Commit

Permalink
initial
Browse files Browse the repository at this point in the history
  • Loading branch information
Steve Yeom committed May 11, 2020
1 parent 8e8ce6f commit 3577bdd
Show file tree
Hide file tree
Showing 11 changed files with 770 additions and 0 deletions.
48 changes: 48 additions & 0 deletions examples/drive-logclient/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package main

import (
"fmt"
"log"
"os"
"time"

logclient "github.com/steveyeom/go-btfs-logclient/logclient"
)

func main() {
// init configuration
conf := &logclient.Configuration{
Labels: `{job="btnode-log"}`, // TODO: add cid.String() for the current btfs node as "instance"
LogAPIEnabled: false,
URL: "http://localhost:3100/loki/api/v1/push",
Destination: "loki",
BatchWaitDuration: 10 * time.Second,
BatchCapacity: 5,
NetworkSendTimeout: 15 * time.Second,
NetworkSendRetries: logclient.DEFAULT_NUM_OF_RETRIES,
}

// Open operators
logc, err := logclient.NewLogClient(conf)
if err != nil {
log.Printf("error: failed to create LogClient: %s\n", err)
os.Exit(1)
}
log.Println("demo: opened LogClient.")

// Execute in a loop
var lineEntries []logclient.Entry
inputChan := logc.InputChan
for i := 0; i < 100; i++ {
text := fmt.Sprintf("Error: test error message [%d], possibly monitored from loki and grafana.", i+1)
log.Printf("Sending a log event entry [%s]", text)
lineEntries = append(lineEntries, logclient.LineEntry{Text: text})
inputChan <- lineEntries

time.Sleep(time.Second * 1)
lineEntries = []logclient.Entry{}
}

logc.Close()
log.Println("demo: closed LogAPI.")
}
12 changes: 12 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
module github.com/steveyeom/go-btfs-client

require (
github.com/gogo/protobuf v1.3.1
github.com/golang/protobuf v1.3.2
github.com/golang/snappy v0.0.1
github.com/jpillora/backoff v1.0.0
)

go 1.13

replace k8s.io/client-go => k8s.io/client-go v0.0.0-20190620085101-78d2af792bab
12 changes: 12 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
github.com/gogo/protobuf v1.3.1 h1:DqDEcV5aeaTmdFBePNpYsp3FlcVH/2ISVVM9Qf8PSls=
github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/steveyeom/go-btfs-logclient v0.0.0-20200511203122-8e8ce6f68590 h1:p+npyH0vy70RkhWyKwfC0pHG99sLtAd4Cbe0SapL2Xk=
golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
33 changes: 33 additions & 0 deletions logclient/base.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package logclient

import (
"time"
)

const (
DEFAULT_NUM_OF_RETRIES = 3
)

type Configuration struct {
// label set for the current client.
Labels string // TODO: <-model.LabelSet

// The current plan operator tree related parameters
LogAPIEnabled bool

// URL is the log server http push endpoint
URL string

// Destination is the key value of the centralized server into which
// the current configuration based client is pushing its data.
// For example, "loki" for a log client to push log events into loki server
Destination string

// Transport batch related parameters
BatchWaitDuration time.Duration
BatchCapacity int

// NetworkOut operator parameters
NetworkSendTimeout time.Duration
NetworkSendRetries int
}
59 changes: 59 additions & 0 deletions logclient/entry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package logclient

import (
"github.com/steveyeom/go-btfs-logclient/logproto"
"fmt"
)

type Entry interface {
Type() int
Value() interface{}
SetValue(interface{}) error
}

const (
LINE_ENTRY = iota + 1
PROTO_ENTRY
)

type LineEntry struct {
Text string
}

func (line LineEntry) Type() int {
return LINE_ENTRY
}

func (line LineEntry) Value() interface{} {
return line.Text
}

func (line LineEntry) SetValue(v interface{}) error {
ent, ok := v.(LineEntry)
if !ok {
return fmt.Errorf("expected LineEntry type, but got %T", v)
}
line.Text = ent.Text
return nil
}

type ProtoEntry struct {
Pentry *logproto.Entry
}

func (pe ProtoEntry) Type() int {
return PROTO_ENTRY
}

func (pe ProtoEntry) Value() interface{} {
return pe.Pentry
}

func (pe ProtoEntry) SetValue(v interface{}) error {
ent, ok := v.(*logproto.Entry)
if !ok {
return fmt.Errorf("expected *logproto.Entry type, but got %T", v)
}
pe.Pentry = ent
return nil
}
66 changes: 66 additions & 0 deletions logclient/logclient.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package logclient

const (
DebugLevel = iota + 1
InfoLevel
WarnLevel
ErrorLevel
DevPanicLevel
PanicLevel
)

const (
LogClientAPIEnabled = true
MininumCollectionLogLevel = 4
)

type LogClient struct {
conf *Configuration
logReader *LogReader
networkOut *NetworkOut
// LogClient input channel to which an application sends input entries.
InputChan chan []Entry
// Log events pass through LogClient when their log level is > minCollectLogLevel.
minCollectLogLevel int
}

func NewLogClient(conf *Configuration) (*LogClient, error) {
// initialize operators top down
var inputChan chan []Entry
ntkOut, err := NewNetworkOut(conf)
if err != nil {
return nil, err
}
inputChan = ntkOut.inputChan

var logR *LogReader

if !conf.LogAPIEnabled {
logR, err = NewLogReader(conf, ntkOut.inputChan)
if err != nil {
return nil, err
}
inputChan = logR.inputChan
}

return &LogClient{
minCollectLogLevel: MininumCollectionLogLevel,
conf: conf,
logReader: logR,
networkOut: ntkOut,
InputChan: inputChan,
}, nil
}

func (lc *LogClient) Close() {
lc.logReader.Close()
lc.networkOut.Close()
}

func (lc *LogClient) LogReader() *LogReader {
return lc.logReader
}

func (lc *LogClient) NetworkOut() *NetworkOut {
return lc.networkOut
}
1 change: 1 addition & 0 deletions logclient/logclient_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package logclient
106 changes: 106 additions & 0 deletions logclient/logreader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package logclient

import (
"github.com/steveyeom/go-btfs-logclient/logproto"
"fmt"
"github.com/golang/protobuf/ptypes"
"log"
"sync"
"time"
)

type Line struct {
Text string
}

type LogReader struct {
conf *Configuration
inputChan chan []Entry
outputChan chan []Entry
stopChan chan struct{}
waitGroup sync.WaitGroup
}

func NewLogReader(conf *Configuration, outChan chan []Entry) (*LogReader, error) {
logReader := &LogReader{
conf: conf,
inputChan: make(chan []Entry),
outputChan: outChan,
stopChan: make(chan struct{}),
}
logReader.waitGroup.Add(1)
go logReader.run()
return logReader, nil
}

func (logReader *LogReader) run() {
// Define variables
var lineEntries []Entry
conf := logReader.conf
retry := 1
currBatchSize := 0

// init
defer func() {
logReader.waitGroup.Done()
}()

// main loop
for true {
select {
case inEntries := <-logReader.inputChan:
for _, inEnt := range inEntries {
le, ok := inEnt.(LineEntry)
if !ok {
log.Printf("Error: LogReader: %s", fmt.Errorf("expected *logproto.Entry type, but got %T", inEnt.Value()))
return
}
lineEntries = append(lineEntries, le)
currBatchSize++
}
if currBatchSize >= conf.BatchCapacity {
err := logReader.sendBatch(lineEntries)
if err != nil {
log.Printf("Error: LogReader: tried %d time: %s\n", retry, err)
// TODO: check retry times and save..
} else {
log.Printf("Info: LogReader: sendBatch OK, number of lines: %d\n", len(lineEntries))
}
lineEntries = []Entry{}
currBatchSize = 0
}
case <-logReader.stopChan:
return
}
retry = 1
}
}

// Send a batch after converting lines to logproto.Entry's
func (logReader *LogReader) sendBatch(lines []Entry) error {
ts, err := ptypes.TimestampProto(time.Now())
if err != nil {
return err
}
var protoEntries []Entry
for _, line := range lines {
le, _ := line.(LineEntry)
protoEntries = append(protoEntries, ProtoEntry{&logproto.Entry{
Timestamp: ts,
Line: le.Text,
}})
}
logReader.outputChan <- protoEntries
return nil
}

func (logReader *LogReader) Close() {
close(logReader.stopChan)
}

func (logReader *LogReader) InputChan() (chan []Entry, error) {
if logReader == nil {
return nil, fmt.Errorf("logReader is nil")
}
return logReader.inputChan, nil
}
Loading

0 comments on commit 3577bdd

Please sign in to comment.