diff --git a/.gitignore b/.gitignore index bcd26510..46c5e574 100644 --- a/.gitignore +++ b/.gitignore @@ -4,7 +4,15 @@ jtimon-darwin-amd64 jtimon-linux-amd64 *.log *.logs -.* debug.test jtimon-windows-amd64.exe coverage.out +up-* +cp-* +cp-configs +bin +sample-log +config-file-lists +config-list.txt +*.tar +diff-* diff --git a/config.go b/config.go index b1084771..e0792ebf 100644 --- a/config.go +++ b/config.go @@ -36,6 +36,7 @@ type Config struct { Alias string `json:"alias"` PasswordDecoder string `json:"password-decoder"` EnableUintSupport bool `json:"enable-uint"` + TCP *TCPConfig `json:"tcp"` } // GnmiConfig definition @@ -361,6 +362,12 @@ func ConfigRead(jctx *JCtx, init bool, restart *bool) error { if err := KafkaInit(jctx); err != nil { log.Printf("KafkaInit error : %v", err) } + // TCP Client + if *tcpPush { + if err := TcpClientInit(jctx.config.TCP, &jctx.tcpCtx); err != nil { + log.Printf("TCPClientInit error : %v", err) + } + } } else { err := HandleConfigChange(jctx, config, restart) if err != nil { diff --git a/jtimon_test.go b/jtimon_test.go index 6d5c5f60..6c81b6fd 100644 --- a/jtimon_test.go +++ b/jtimon_test.go @@ -139,3 +139,16 @@ func prometheusCollect(host string, port int, jctx *JCtx) error { return nil } + +func tcpExport(host string, port int, jctx *JCtx) error { + // var f *os.File + // var err error + + // if f, err = os.Create(jctx.file + ".testres"); err != nil { + // return err + // } + // defer f.Close() + + // url := fmt.Sprintf("http://%s:%d/metrics", host, port) + return nil +} diff --git a/main.go b/main.go index 9f6a11dd..18232ce8 100644 --- a/main.go +++ b/main.go @@ -38,6 +38,7 @@ var ( myCert = flag.String("cert", "./certs/self_signed/server-cert.pem", "Path of server cert") myKey = flag.String("pem", "./certs/self_signed/server-key.pem", "Path of server key") kafkaBroker = flag.String("kafka-broker", "kafka:9092", "Comma seperated list of Kafka brokers each in the form ip:port") + tcpPush = flag.Bool("tcp-push", false, "Send telemetry packet to TCP endpoint as JSON") jtimonVersion = "version-not-available" buildTime = "build-time-not-available" diff --git a/outfile b/outfile new file mode 100644 index 00000000..e69de29b diff --git a/sample-config/3.json b/sample-config/3.json new file mode 100644 index 00000000..7cfbfde2 --- /dev/null +++ b/sample-config/3.json @@ -0,0 +1,26 @@ +{ + "host": "host-or-ip", + "port": 50051, + "cid": "mac1", + "paths": [{ + "path": "/interfaces", + "freq": 2000 + }], + "influx": { + "server": "127.0.0.1", + "port": 8086, + "dbname": "db", + "user": "influx", + "password": "influxdb", + "recreate": true, + "measurement": "m" + }, + "log": { + "file": "jtimon.log" + }, + "tcp": { + "server": "", + "port": 10517, + "": "" + } +} diff --git a/subscribe_juniper_junos.go b/subscribe_juniper_junos.go index 60fc3846..3ac55296 100644 --- a/subscribe_juniper_junos.go +++ b/subscribe_juniper_junos.go @@ -9,10 +9,9 @@ import ( "encoding/json" "os" "syscall" - + na_pb "github.com/nileshsimaria/jtimon/telemetry" "github.com/golang/protobuf/proto" auth_pb "github.com/nileshsimaria/jtimon/authentication" - na_pb "github.com/nileshsimaria/jtimon/telemetry" "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/metadata" @@ -94,11 +93,11 @@ func handleOnePacket(ocData *na_pb.OpenConfigData, jctx *JCtx) { } // subSendAndReceive handles the following -// - Opens up a stream for receiving the telemetry data -// - Handles SIGHUP by terminating the current stream and requests the -// caller to restart the streaming by setting the corresponding return -// code -// - In case of an error, Set the error code to restart the connection. +// - Opens up a stream for receiving the telemetry data +// - Handles SIGHUP by terminating the current stream and requests the +// caller to restart the streaming by setting the corresponding return +// code +// - In case of an error, Set the error code to restart the connection. func subSendAndReceive(conn *grpc.ClientConn, jctx *JCtx, subReqM na_pb.SubscriptionRequest) SubErrorCode { @@ -131,7 +130,6 @@ func subSendAndReceive(conn *grpc.ClientConn, jctx *JCtx, go func() { // Go Routine which actually starts the streaming connection and receives the data jLog(jctx, fmt.Sprintf("Receiving telemetry data from %s:%d\n", jctx.config.Host, jctx.config.Port)) - for { ocData, err := stream.Recv() if err == io.EOF { @@ -158,6 +156,8 @@ func subSendAndReceive(conn *grpc.ClientConn, jctx *JCtx, if b, err := json.MarshalIndent(ocData, "", " "); err == nil { jLog(jctx, fmt.Sprintf("%s\n", b)) } + } else { + jLog(jctx, fmt.Sprintf("%s\n", ocData)) } if *print || *stateHandler || IsVerboseLogging(jctx) { @@ -170,7 +170,6 @@ func subSendAndReceive(conn *grpc.ClientConn, jctx *JCtx, } else { go addIDB(ocData, jctx, rtime) } - // to prometheus if *prom { if *noppgoroutines { @@ -185,6 +184,14 @@ func subSendAndReceive(conn *grpc.ClientConn, jctx *JCtx, } else { go addKafka(ocData, jctx, rtime) } + // to tcp endpoint + if *tcpPush { + if *noppgoroutines { + AddTcpEndpoint(ocData, jctx) + } else { + go AddTcpEndpoint(ocData, jctx) + } + } } }() for { diff --git a/subscribe_juniper_junos_test.go b/subscribe_juniper_junos_test.go index fc28cf15..4f118126 100644 --- a/subscribe_juniper_junos_test.go +++ b/subscribe_juniper_junos_test.go @@ -493,6 +493,9 @@ func TestInflux(t *testing.T) { } }) } +} +func TestTCP(t *testing.T) { + } func TestJTISIMMaxRun(t *testing.T) { tests := []struct { diff --git a/tcp_exporter.go b/tcp_exporter.go new file mode 100644 index 00000000..557ed712 --- /dev/null +++ b/tcp_exporter.go @@ -0,0 +1,166 @@ +package main // TODO: package as standalone + + +import ( + "encoding/json" + "bytes" + "fmt" + "log" + "net" + "sync" + "time" + backoff "github.com/cenkalti/backoff/v4" + na_pb "github.com/nileshsimaria/jtimon/telemetry" +) + +// Signal enums +const ( + TCPDisconnected = iota +) + +// Config Definition +type TCPConfig struct { + Host string `json:"host"` + Port int `json:"port"` +} + +// Runtime contexts +type TCPClient struct { + conn net.Conn + cv *sync.Cond +} +type TCPCtx struct { + client *TCPClient + backoff *backoff.ExponentialBackOff + statusCh chan int + dataCh chan []byte +} + +func tcpConnect(tcfg *TCPConfig, tctx *TCPCtx) error { + addr := fmt.Sprintf("%s:%d", tcfg.Host, tcfg.Port) + conn, err := net.Dial("tcp", addr) + if err != nil { + log.Printf("error dialing...\n") + sendStatusDisconnect(tctx) + return err + } + log.Printf("TCP endpoint: connected to %v\n", addr) + tctx.client.conn = conn + tctx.client.cv.Broadcast() + tctx.backoff.Reset() + return nil +} + +func sendStatusDisconnect(tctx *TCPCtx) { + select { + case tctx.statusCh <- TCPDisconnected: + default: + } + tctx.client.cv.L.Lock() + tctx.client.conn = nil + tctx.client.cv.L.Unlock() +} + +func heartbeat(tctx *TCPCtx) { + for { + var err error = nil + func() { + tctx.client.cv.L.Lock() + defer tctx.client.cv.L.Unlock() + if tctx.client.conn == nil { + return + } + b := []byte{} + _, err = tctx.client.conn.Write(b) + }() + if err != nil { + log.Printf("TCP endpoint: heartbeat lost\n") + sendStatusDisconnect(tctx) + } + time.Sleep(1 * time.Second) + } +} + +func TcpClientInit(tcfg *TCPConfig, tctx **TCPCtx) error { + if tcfg == nil { + return fmt.Errorf("no TCP endpoint configuration provided") + } + *tctx = &TCPCtx{dataCh: make(chan []byte), statusCh: make(chan int, 1)} + (*tctx).client = &TCPClient{conn: nil, cv: sync.NewCond(&sync.Mutex{})} + (*tctx).backoff = backoff.NewExponentialBackOff() + (*tctx).backoff.MaxInterval = 20 * time.Second + + go heartbeat(*tctx) + go func() { + connect: { + go func() { + if err := tcpConnect(tcfg, *tctx); err != nil { + log.Printf("TCP endpoint: connection error: %v\n", err) + } + }() + for { + select { + case data, ok := <-(*tctx).dataCh: + if !ok { + return + } + go func() { + if err := sendData(*tctx, data); err != nil { + log.Printf("TCP endpoint: data send error: %v\n", err) + } + }() + case <-(*tctx).statusCh: + goto reconnect + } + } + } + reconnect: { + time.Sleep((*tctx).backoff.NextBackOff()) + goto connect + } + }() + return nil +} + +func tcpClientTeardown(tctx *TCPCtx) { + if tctx != nil { + defer tctx.client.conn.Close() + } +} + +func sendData(tctx *TCPCtx, m[]byte) error { + tctx.client.cv.L.Lock() + defer tctx.client.cv.L.Unlock() + for tctx.client.conn == nil { + tctx.client.cv.Wait() + } + if _, err := tctx.client.conn.Write(m); err != nil { + return err + } + return nil +} + +func PushTcpEndpoint(tctx *TCPCtx, b []byte) error { + tctx.dataCh <- b + return nil +} + +func AddTcpEndpoint(ocData *na_pb.OpenConfigData, jctx *JCtx) { + b, err := processOcData(ocData) + if err != nil { + jLog(jctx, fmt.Sprintf("marshal error: %v", err)) + return + } + PushTcpEndpoint(jctx.tcpCtx, b) +} + +func processOcData(ocData *na_pb.OpenConfigData) ([]byte, error) { + b, err := json.Marshal(ocData) + if err != nil { + return nil, err + } + stripped := bytes.Replace(b, []byte("\n"), []byte(""), -1) + stripped = bytes.Replace(stripped, []byte("\r"), []byte(""), -1) + capped := append(stripped, '\n') + return capped, nil +} diff --git a/workers.go b/workers.go index 3748c838..e0e5a5e5 100644 --- a/workers.go +++ b/workers.go @@ -43,6 +43,7 @@ type JCtx struct { testExp *os.File testRes *os.File receivedSyncRsp bool + tcpCtx *TCPCtx } // JWorkers holds worker @@ -273,6 +274,9 @@ func NewJWorker(file string, wg *sync.WaitGroup, wsChan chan string) (*JWorker, return w, err } log.Printf("%v, jctx.config.Kafka.producer: %v", jctx.config.Host, jctx.config.Kafka) + log.Printf("%v, jctx.config.TCP: %v", jctx.config.Host, jctx.config.TCP) + + if alias, err := NewAlias(jctx.config.Alias); err == nil { jctx.alias = alias } else {