Skip to content

Commit

Permalink
export flows to unix socket file
Browse files Browse the repository at this point in the history
  • Loading branch information
javadmohebbi committed Sep 18, 2022
1 parent 361ec73 commit 2efe067
Showing 1 changed file with 104 additions and 1 deletion.
105 changes: 104 additions & 1 deletion collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/goNfCollector/influxdb"
"github.com/goNfCollector/location"
"github.com/gookit/color"
"github.com/ip2location/ip2location-go"
"github.com/sirupsen/logrus"
"github.com/tehmaze/netflow"
"github.com/tehmaze/netflow/ipfix"
Expand Down Expand Up @@ -461,7 +462,6 @@ func (nf *Collector) getExporters() []exporters.Exporter {

// export if needed
func (nf *Collector) export(metrics []common.Metric) {

nf.waitGroup.Add(1)

// check if there are valid exporters
Expand All @@ -478,6 +478,109 @@ func (nf *Collector) export(metrics []common.Metric) {
}
}

// export to unix socket client
nf.exportFSClient(metrics)

// go nf.ztdb.Store(metrics)
}

// export to unix socket client
func (nf *Collector) exportFSClient(metrics []common.Metric) {

var _metrics []common.Metric

for _, _metric := range metrics {

metr := _metric

ilSrc := nf.getLocation(metr.SrcIP)
metr.SrcIp2lCountryShort = ilSrc.Country_long
metr.SrcIp2lCountryShort = ilSrc.Country_short
metr.SrcIp2lState = ilSrc.Region
metr.SrcIp2lCity = ilSrc.City
metr.SrcIp2lLat = fmt.Sprintf("%f", ilSrc.Latitude)
metr.SrcIp2lLong = fmt.Sprintf("%f", ilSrc.Longitude)

ilDst := nf.getLocation(metr.DstIP)
metr.DstIp2lCountryShort = ilDst.Country_long
metr.DstIp2lCountryShort = ilDst.Country_short
metr.DstIp2lState = ilDst.Region
metr.DstIp2lCity = ilDst.City
metr.DstIp2lLat = fmt.Sprintf("%f", ilDst.Latitude)
metr.DstIp2lLong = fmt.Sprintf("%f", ilDst.Longitude)

_metrics = append(_metrics, metr)
}

// create requests
req := fwsock.ClientServerReqResp{
Command: fwsock.CMD_EXPORTED,
RequestID: fmt.Sprintf("reqId:%d", time.Now().Unix()),
Payload: _metrics,
}

bts, err := req.JSONToStringClientServerReqResp()
if err != nil {
nf.d.Verbose(fmt.Sprintf("[%d]-%s: (%v)",
configurations.ERROR_CAN_T_EXPORT_COL_SERVER_LINUX_SOCKET.Int(),
configurations.ERROR_CAN_T_EXPORT_COL_SERVER_LINUX_SOCKET, err),
logrus.ErrorLevel,
)
// os.Exit(configurations.ERROR_CAN_T_EXPORT_COL_SERVER_LINUX_SOCKET.Int())
}
_, err = nf.FwSockClient.Conn.Write([]byte(fmt.Sprintf("%s\n", bts)))
if err != nil {
nf.d.Verbose(fmt.Sprintf("[%d]-%s: (%v)",
configurations.ERROR_CAN_T_EXPORT_COL_SERVER_LINUX_SOCKET.Int(),
configurations.ERROR_CAN_T_EXPORT_COL_SERVER_LINUX_SOCKET, err),
logrus.ErrorLevel,
)
// os.Exit(configurations.ERROR_CAN_T_EXPORT_COL_SERVER_LINUX_SOCKET.Int())
}

}

func (nf *Collector) getLocation(ip string) *ip2location.IP2Locationrecord {
// get public ip
il, _ := nf.iploc.GetAll(ip)

if il.Country_short == "-" {
// maybe a local IP address
il, _ = nf.iploc.GetAllPrivate(ip)
}

//remove -,_ from strings in order to use them as tag in influxDB
il.Country_long = nf.removeInvalidCharFromTags(il.Country_long)
il.Country_short = nf.removeInvalidCharFromTags(il.Country_short)
il.City = nf.removeInvalidCharFromTags(il.City)
il.Region = nf.removeInvalidCharFromTags(il.Region)
il.Isp = nf.removeInvalidCharFromTags(il.Isp)
il.Domain = nf.removeInvalidCharFromTags(il.Domain)
il.Netspeed = nf.removeInvalidCharFromTags(il.Netspeed)
il.Iddcode = nf.removeInvalidCharFromTags(il.Iddcode)
il.Areacode = nf.removeInvalidCharFromTags(il.Areacode)
il.Weatherstationcode = nf.removeInvalidCharFromTags(il.Weatherstationcode)
il.Weatherstationname = nf.removeInvalidCharFromTags(il.Weatherstationname)
il.Mcc = nf.removeInvalidCharFromTags(il.Mcc)
il.Mnc = nf.removeInvalidCharFromTags(il.Mnc)
il.Mobilebrand = nf.removeInvalidCharFromTags(il.Mobilebrand)
il.Usagetype = nf.removeInvalidCharFromTags(il.Usagetype)

// return ip2location info
return il
}

func (nf *Collector) removeInvalidCharFromTags(s string) string {
if s == "-" {
return "NA"
}
if strings.Contains(s, "Please upgrade the data file") {
return "NA"
}

// rs := strings.Replace(s, ",", " ", -1)
// rs = strings.Replace(rs, "'", " ", -1)
// rs = strings.Replace(rs, " ", "_", -1)

return s
}

0 comments on commit 2efe067

Please sign in to comment.