diff --git a/.golangci.yml b/.golangci.yml index 4d66ebb..e0afa50 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -367,6 +367,7 @@ linters: - goconst - goheader - goimports + - gomnd - gomodguard - goprintffuncname - ineffassign @@ -394,7 +395,6 @@ linters: - gocyclo - godox - goerr113 - - gomnd - nestif - nlreturn - structcheck @@ -424,6 +424,7 @@ issues: - errcheck # - dupl - gosec + - funlen # Exclude known linters from partially hard-vendored code, # which is impossible to exclude via "nolint" comments. diff --git a/README.md b/README.md index 35ce87e..5abb742 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,8 @@ # GoFlow - Dataflow and Flow-based programming library for Go (golang) -[![Build Status](https://travis-ci.com/trustmaster/goflow.svg?branch=master)](https://travis-ci.com/trustmaster/goflow) [![codecov](https://codecov.io/gh/trustmaster/goflow/branch/master/graph/badge.svg)](https://codecov.io/gh/trustmaster/goflow) +[![Build Status](https://travis-ci.com/trustmaster/goflow.svg?branch=master)](https://travis-ci.com/trustmaster/goflow) +[![codecov](https://codecov.io/gh/trustmaster/goflow/branch/master/graph/badge.svg)](https://codecov.io/gh/trustmaster/goflow) +[![PkgGoDev](https://pkg.go.dev/badge/github.com/trustmaster/goflow)](https://pkg.go.dev/github.com/trustmaster/goflow) ### _Status of this branch (WIP)_ diff --git a/address.go b/address.go new file mode 100644 index 0000000..23d8e34 --- /dev/null +++ b/address.go @@ -0,0 +1,132 @@ +package goflow + +import ( + "fmt" + "strconv" + "strings" + "unicode" +) + +// address is a full port accessor including the index part. +type address struct { + proc string // Process name + port string // Component port name + key string // Port key (only for map ports) + index int // Port index (only for array ports) +} + +// noIndex is a "zero" index value. Not a `0` since 0 is a valid array index. +const noIndex = -1 + +type portKind uint + +const ( + portKindNone portKind = iota + portKindChan + portKindArray + portKindMap +) + +func (a address) kind() portKind { + switch { + case len(a.proc) == 0 || len(a.port) == 0: + return portKindNone + case a.index != noIndex: + return portKindArray + case len(a.key) != 0: + return portKindMap + default: + return portKindChan + } +} + +func (a address) String() string { + switch a.kind() { + case portKindChan: + return fmt.Sprintf("%s.%s", a.proc, a.port) + case portKindArray: + return fmt.Sprintf("%s.%s[%d]", a.proc, a.port, a.index) + case portKindMap: + return fmt.Sprintf("%s.%s[%s]", a.proc, a.port, a.key) + case portKindNone: // makes go-lint happy + } + + return "" +} + +// parseAddress validates and constructs a port address. +// port parameter may include an array index ("[]") or a hashmap key ("[]"). +func parseAddress(proc, port string) (address, error) { + switch { + case len(proc) == 0: + return address{}, fmt.Errorf("empty process name") + case len(port) == 0: + return address{}, fmt.Errorf("empty port name") + } + + // Validate the proc contents + for i, r := range proc { + if !unicode.IsLetter(r) && !unicode.IsDigit(r) { + return address{}, fmt.Errorf("unexpected %q at process name index %d", r, i) + } + } + + keyPos := 0 + a := address{ + proc: proc, + port: port, + index: noIndex, + } + + // Validate and parse the port contents in one scan + for i, r := range port { + switch { + case r == '[': + if i == 0 || keyPos > 0 { + // '[' at the very beginning of the port or a second '[' found + return address{}, fmt.Errorf("unexpected '[' at port name index %d", i) + } + + keyPos = i + 1 + a.port = port[:i] + case r == ']': + switch { + case keyPos == 0: + // No preceding matching '[' + return address{}, fmt.Errorf("unexpected ']' at port name index %d", i) + case i != len(port)-1: + // Closing bracket is not the last rune + return address{}, fmt.Errorf("unexpected %q at port name index %d", port[i+1:], i) + } + + if idx, err := strconv.Atoi(port[keyPos:i]); err != nil { + a.key = port[keyPos:i] + } else { + a.index = idx + } + case !unicode.IsLetter(r) && !unicode.IsDigit(r): + return address{}, fmt.Errorf("unexpected %q at port name index %d", r, i) + } + } + + if keyPos != 0 && len(a.key) == 0 && a.index == noIndex { + return address{}, fmt.Errorf("unmatched '[' at port name index %d", keyPos-1) + } + + a.port = capitalizePortName(a.port) + + return a, nil +} + +// capitalizePortName converts port names defined in UPPER or lower case to Title case, +// which is more common for structs in Go. +func capitalizePortName(name string) string { + lower := strings.ToLower(name) + upper := strings.ToUpper(name) + + if name == lower || name == upper { + return strings.Title(lower) + } + + return name +} diff --git a/address_test.go b/address_test.go new file mode 100644 index 0000000..cecdfce --- /dev/null +++ b/address_test.go @@ -0,0 +1,188 @@ +package goflow + +import "testing" + +func Test_address_kind(t *testing.T) { + type fields struct { + proc string + port string + key string + index int + } + + tests := []struct { + name string + fields fields + want portKind + }{ + { + name: "empty", + fields: fields{proc: "", port: "", key: "", index: noIndex}, + want: portKindNone, + }, + { + name: "no port name", + fields: fields{proc: "echo", port: "", key: "", index: noIndex}, + want: portKindNone, + }, + { + name: "no proc name", + fields: fields{proc: "", port: "in", key: "", index: noIndex}, + want: portKindNone, + }, + { + name: "chan port", + fields: fields{proc: "echo", port: "in", key: "", index: noIndex}, + want: portKindChan, + }, + { + name: "map port", + fields: fields{proc: "echo", port: "in", key: "key", index: noIndex}, + want: portKindMap, + }, + { + name: "array port", + fields: fields{proc: "echo", port: "in", key: "", index: 10}, + want: portKindArray, + }, + } + + for i := range tests { + tt := tests[i] + t.Run(tt.name, func(t *testing.T) { + a := address{ + proc: tt.fields.proc, + port: tt.fields.port, + key: tt.fields.key, + index: tt.fields.index, + } + if got := a.kind(); got != tt.want { + t.Errorf("address.kind() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_address_String(t *testing.T) { + type fields struct { + proc string + port string + key string + index int + } + + tests := []struct { + name string + fields fields + want string + }{ + { + name: "empty", + fields: fields{proc: "", port: "", key: "", index: noIndex}, + want: "", + }, + { + name: "no port name", + fields: fields{proc: "echo", port: "", key: "", index: noIndex}, + want: "", + }, + { + name: "no proc name", + fields: fields{proc: "", port: "in", key: "", index: noIndex}, + want: "", + }, + { + name: "chan port", + fields: fields{proc: "echo", port: "in", key: "", index: noIndex}, + want: "echo.in", + }, + { + name: "map port", + fields: fields{proc: "echo", port: "in", key: "key", index: noIndex}, + want: "echo.in[key]", + }, + { + name: "array port", + fields: fields{proc: "echo", port: "in", key: "", index: 10}, + want: "echo.in[10]", + }, + } + + for i := range tests { + tt := tests[i] + t.Run(tt.name, func(t *testing.T) { + a := address{ + proc: tt.fields.proc, + port: tt.fields.port, + key: tt.fields.key, + index: tt.fields.index, + } + if got := a.String(); got != tt.want { + t.Errorf("address.String() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_parseAddress(t *testing.T) { + type args struct { + proc string + port string + } + + tests := []struct { + name string + args args + want address + wantErr bool + }{ + {name: "empty", args: args{proc: "", port: ""}, want: address{}, wantErr: true}, + {name: "empty proc", args: args{proc: "", port: "in"}, want: address{}, wantErr: true}, + {name: "empty port", args: args{proc: "echo", port: ""}, want: address{}, wantErr: true}, + {name: "malformed port", args: args{proc: "echo", port: "in[[key1]"}, want: address{}, wantErr: true}, + {name: "unmatched opening bracket", args: args{proc: "echo", port: "in[3"}, want: address{}, wantErr: true}, + {name: "unmatched closing bracket", args: args{proc: "echo", port: "in]3"}, want: address{}, wantErr: true}, + {name: "chars after closing bracket", args: args{proc: "echo", port: "in[3]abc"}, want: address{}, wantErr: true}, + {name: "non-UTF-8 in proc", args: args{proc: "echo\xbd", port: "in"}, want: address{}, wantErr: true}, + {name: "non-UTF-8 in port", args: args{proc: "echo", port: "in\xb2"}, want: address{}, wantErr: true}, + { + name: "chan port", + args: args{proc: "echo", port: "in1"}, + want: address{proc: "echo", port: "In1", key: "", index: noIndex}, + wantErr: false, + }, + { + name: "non-Latin chan port", + args: args{proc: "эхо", port: "ввод"}, + want: address{proc: "эхо", port: "Ввод", key: "", index: noIndex}, + wantErr: false, + }, + { + name: "map port", + args: args{proc: "echo", port: "in[key1]"}, + want: address{proc: "echo", port: "In", key: "key1", index: noIndex}, + wantErr: false, + }, + { + name: "array port", + args: args{proc: "echo", port: "in[10]"}, + want: address{proc: "echo", port: "In", key: "", index: 10}, + wantErr: false, + }, + } + + for i := range tests { + tt := tests[i] + t.Run(tt.name, func(t *testing.T) { + got, err := parseAddress(tt.args.proc, tt.args.port) + if (err != nil) != tt.wantErr { + t.Errorf("parseAddress() error = %v, wantErr %v", err, tt.wantErr) + return + } + + if got != tt.want { + t.Errorf("parseAddress() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/components_for_test.go b/components_for_test.go index 389b382..80ff359 100644 --- a/components_for_test.go +++ b/components_for_test.go @@ -184,30 +184,25 @@ func (c *irouter) Process() { } func RegisterTestComponents(f *Factory) error { - f.Register("echo", func() (interface{}, error) { - return new(echo), nil - }) + f.Register("echo", func() (Component, error) { return new(echo), nil }) f.Annotate("echo", Annotation{ Description: "Passes an int from in to out without changing it", Icon: "arrow-right", }) - f.Register("doubler", func() (interface{}, error) { - return new(doubler), nil - }) + + f.Register("doubler", func() (Component, error) { return new(doubler), nil }) f.Annotate("doubler", Annotation{ Description: "Doubles its input", Icon: "times-circle", }) - f.Register("repeater", func() (interface{}, error) { - return new(repeater), nil - }) + + f.Register("repeater", func() (Component, error) { return new(repeater), nil }) f.Annotate("repeater", Annotation{ Description: "Repeats Word given numer of Times", Icon: "times-circle", }) - f.Register("adder", func() (interface{}, error) { - return new(adder), nil - }) + + f.Register("adder", func() (Component, error) { return new(adder), nil }) f.Annotate("adder", Annotation{ Description: "Sums integers coming to its inports", Icon: "plus-circle", diff --git a/factory.go b/factory.go index 7016308..74b03e9 100644 --- a/factory.go +++ b/factory.go @@ -15,7 +15,7 @@ type registryEntry struct { } // Constructor is used to create a component instance at run-time. -type Constructor func() (interface{}, error) +type Constructor func() (Component, error) // NewFactory creates a new component Factory instance. func NewFactory() *Factory { @@ -73,7 +73,7 @@ func (f *Factory) Unregister(componentName string) error { } // Create creates a new instance of a component registered under a specific name. -func (f *Factory) Create(componentName string) (interface{}, error) { +func (f *Factory) Create(componentName string) (Component, error) { info, exists := f.registry[componentName] if !exists { return nil, fmt.Errorf("factory error: component '%s' does not exist", componentName) diff --git a/factory_test.go b/factory_test.go index 864062e..d032033 100644 --- a/factory_test.go +++ b/factory_test.go @@ -39,9 +39,7 @@ func TestFactoryRegistration(t *testing.T) { return } - err := f.Register("echo", func() (interface{}, error) { - return new(echo), nil - }) + err := f.Register("echo", func() (Component, error) { return new(echo), nil }) if err == nil { t.Errorf("Expected an error") return diff --git a/graph.go b/graph.go index 1762eac..afc3c99 100644 --- a/graph.go +++ b/graph.go @@ -14,15 +14,15 @@ type GraphConfig struct { // Graph represents a graph of processes connected with packet channels. type Graph struct { - conf GraphConfig // Graph configuration - waitGrp *sync.WaitGroup // Wait group for a graceful termination - procs map[string]interface{} // Network processes - inPorts map[string]port // Map of network incoming ports to component ports - outPorts map[string]port // Map of network outgoing ports to component ports - connections []connection // Network graph edges (inter-process connections) - chanListenersCount map[uintptr]uint // Tracks how many outports use the same channel - chanListenersCountLock sync.Locker // Used to synchronize operations on the chanListenersCount map - iips []iip // Initial Information Packets to be sent to the network on start + conf GraphConfig // Graph configuration + waitGrp *sync.WaitGroup // Wait group for a graceful termination + procs map[string]Component // Network processes + inPorts map[string]port // Map of network incoming ports to component ports + outPorts map[string]port // Map of network outgoing ports to component ports + connections []connection // Network graph edges (inter-process connections) + chanListenersCount map[uintptr]uint // Tracks how many outports use the same channel + chanListenersCountLock sync.Locker // Used to synchronize operations on the chanListenersCount map + iips []iip // Initial Information Packets to be sent to the network on start } // NewGraph returns a new initialized empty graph instance. @@ -35,7 +35,7 @@ func NewGraph(config ...GraphConfig) *Graph { return &Graph{ conf: conf, waitGrp: new(sync.WaitGroup), - procs: make(map[string]interface{}), + procs: make(map[string]Component), inPorts: make(map[string]port), outPorts: make(map[string]port), chanListenersCount: make(map[uintptr]uint), @@ -44,9 +44,7 @@ func NewGraph(config ...GraphConfig) *Graph { } // NewDefaultGraph is a ComponentConstructor for the factory. -func NewDefaultGraph() interface{} { - return NewGraph() -} +func NewDefaultGraph() *Graph { return NewGraph() } // // Register an empty graph component in the registry // func init() { @@ -58,15 +56,11 @@ func NewDefaultGraph() interface{} { // } // Add adds a new process with a given name to the network. -func (n *Graph) Add(name string, c interface{}) error { - // c should be either graph or a component - _, isComponent := c.(Component) - _, isGraph := c.(Graph) - - if !isComponent && !isGraph { - return fmt.Errorf("could not add process '%s': instance is neither Component nor Graph", name) +func (n *Graph) Add(name string, c Component) error { + if _, found := n.procs[name]; found { + return fmt.Errorf("could not add process %q: already added", name) } - // Add to the map of processes + n.procs[name] = c return nil diff --git a/graph_connect.go b/graph_connect.go index 5ac97c6..1f8020c 100644 --- a/graph_connect.go +++ b/graph_connect.go @@ -3,26 +3,8 @@ package goflow import ( "fmt" "reflect" - "strconv" - "strings" ) -// address is a full port accessor including the index part. -type address struct { - proc string // Process name - port string // Component port name - key string // Port key (only for map ports) - index int // Port index (only for array ports) -} - -func (a address) String() string { - if a.key != "" { - return fmt.Sprintf("%s.%s[%s]", a.proc, a.port, a.key) - } - - return fmt.Sprintf("%s.%s", a.proc, a.port) -} - // connection stores information about a connection within the net. type connection struct { src address @@ -34,7 +16,6 @@ type connection struct { // Connect a sender to a receiver and create a channel between them using BufferSize graph configuration. // Normally such a connection is unbuffered but you can change by setting flow.DefaultBufferSize > 0 or // by using ConnectBuf() function instead. -// It returns true on success or panics and returns false if error occurs. func (n *Graph) Connect(senderName, senderPort, receiverName, receiverPort string) error { return n.ConnectBuf(senderName, senderPort, receiverName, receiverPort, n.conf.BufferSize) } @@ -42,14 +23,20 @@ func (n *Graph) Connect(senderName, senderPort, receiverName, receiverPort strin // ConnectBuf connects a sender to a receiver using a channel with a buffer of a given size. // It returns true on success or panics and returns false if error occurs. func (n *Graph) ConnectBuf(senderName, senderPort, receiverName, receiverPort string, bufferSize int) error { - sendAddr := parseAddress(senderName, senderPort) + sendAddr, err := parseAddress(senderName, senderPort) + if err != nil { + return fmt.Errorf("bad sender address: %w", err) + } sendPort, err := n.getProcPort(senderName, sendAddr.port, reflect.SendDir) if err != nil { return fmt.Errorf("connect: %w", err) } - recvAddr := parseAddress(receiverName, receiverPort) + recvAddr, err := parseAddress(receiverName, receiverPort) + if err != nil { + return fmt.Errorf("bad receiver address: %w", err) + } recvPort, err := n.getProcPort(receiverName, recvAddr.port, reflect.RecvDir) if err != nil { @@ -98,11 +85,10 @@ func (n *Graph) ConnectBuf(senderName, senderPort, receiverName, receiverPort st // getProcPort finds an assignable port field in one of the subprocesses. func (n *Graph) getProcPort(procName, portName string, dir reflect.ChanDir) (reflect.Value, error) { - nilValue := reflect.ValueOf(nil) // Check if process exists proc, ok := n.procs[procName] if !ok { - return nilValue, fmt.Errorf("getProcPort: process '%s' not found", procName) + return reflect.Value{}, fmt.Errorf("getProcPort: process '%s' not found", procName) } // Check if process is settable @@ -112,7 +98,7 @@ func (n *Graph) getProcPort(procName, portName string, dir reflect.ChanDir) (ref } if !val.CanSet() { - return nilValue, fmt.Errorf("getProcPort: process '%s' is not settable", procName) + return reflect.Value{}, fmt.Errorf("getProcPort: process '%s' is not settable", procName) } // Get the port value @@ -134,7 +120,7 @@ func (n *Graph) getProcPort(procName, portName string, dir reflect.ChanDir) (ref p, ok := ports[portName] if !ok { - return nilValue, fmt.Errorf("getProcPort: subgraph '%s' does not have inport '%s'", procName, portName) + return reflect.Value{}, fmt.Errorf("getProcPort: subgraph '%s' does not have inport '%s'", procName, portName) } portVal, err = net.getProcPort(p.addr.proc, p.addr.port, dir) @@ -148,22 +134,24 @@ func (n *Graph) getProcPort(procName, portName string, dir reflect.ChanDir) (ref } if err != nil { - return nilValue, fmt.Errorf("getProcPort: %w", err) + return reflect.Value{}, fmt.Errorf("getProcPort: %w", err) } return portVal, nil } func attachPort(port reflect.Value, addr address, dir reflect.ChanDir, ch reflect.Value, bufSize int) (reflect.Value, error) { - if addr.index > -1 { + switch addr.kind() { + case portKindChan: + return attachChanPort(port, dir, ch, bufSize) + case portKindArray: return attachArrayPort(port, addr.index, dir, ch, bufSize) - } - - if addr.key != "" { + case portKindMap: return attachMapPort(port, addr.key, dir, ch, bufSize) + case portKindNone: // makes go-lint happy } - return attachChanPort(port, dir, ch, bufSize) + return reflect.Value{}, fmt.Errorf("invalid address %v", addr) } func attachChanPort(port reflect.Value, dir reflect.ChanDir, ch reflect.Value, bufSize int) (reflect.Value, error) { @@ -171,8 +159,8 @@ func attachChanPort(port reflect.Value, dir reflect.ChanDir, ch reflect.Value, b return ch, err } - if err := validateCanSet(port); err != nil { - return ch, err + if !port.CanSet() { + return ch, fmt.Errorf("port is not assignable") } ch = selectOrMakeChan(ch, port, port.Type().Elem(), bufSize) @@ -205,16 +193,18 @@ func attachArrayPort(port reflect.Value, key int, dir reflect.ChanDir, ch reflec return ch, err } - if port.IsNil() { - m := reflect.MakeSlice(port.Type(), 0, 32) - port.Set(m) - } + const scalingFactor = 2 - if port.Cap() <= key { - port.SetCap(2 * key) - } - - if port.Len() <= key { + switch { + case port.IsNil(): + // Allocate a new slice + port.Set(reflect.MakeSlice(port.Type(), key+1, scalingFactor*(key+1))) + case port.Cap() <= key: + // Allocate a new slice and copy all of the old contents, fallthrough since length is always less than capacity. + port.Set(reflect.AppendSlice(reflect.MakeSlice(port.Type(), 0, scalingFactor*(key+1)), port)) + fallthrough + case port.Len() <= key: + // Extend the slice port.SetLen(key + 1) } @@ -226,108 +216,38 @@ func attachArrayPort(port reflect.Value, key int, dir reflect.ChanDir, ch reflec } func validateChanDir(portType reflect.Type, dir reflect.ChanDir) error { - if portType.Kind() != reflect.Chan { + switch { + case portType.Kind() != reflect.Chan: return fmt.Errorf("not a channel") - } - - if portType.ChanDir()&dir == 0 { + case portType.ChanDir()&dir == 0: return fmt.Errorf("channel does not support direction %s", dir.String()) } return nil } -func validateCanSet(portVal reflect.Value) error { - if !portVal.CanSet() { - return fmt.Errorf("port is not assignable") - } - - return nil -} - func selectOrMakeChan(new, existing reflect.Value, t reflect.Type, bufSize int) reflect.Value { - if !new.IsValid() || new.IsNil() { - if existing.IsValid() && !existing.IsNil() { - return existing - } - - chanType := reflect.ChanOf(reflect.BothDir, t) - new = reflect.MakeChan(chanType, bufSize) - } - - return new -} - -// parseAddress unfolds a string port name into parts, including array index or hashmap key. -func parseAddress(proc, port string) address { - n := address{ - proc: proc, - port: port, - index: -1, - } - keyPos := 0 - key := "" - - for i, r := range port { - if r == '[' { - keyPos = i + 1 - n.port = port[0:i] - } - - if r == ']' { - key = port[keyPos:i] - } - } - - n.port = capitalizePortName(n.port) - - if key == "" { - return n + switch { + case new.IsValid() && !new.IsNil(): + return new + case existing.IsValid() && !existing.IsNil(): + return existing } - if i, err := strconv.Atoi(key); err == nil { - n.index = i - } else { - n.key = key - } - - n.key = key - - return n -} - -// capitalizePortName converts port names defined in UPPER or lower case to Title case, -// which is more common for structs in Go. -func capitalizePortName(name string) string { - lower := strings.ToLower(name) - upper := strings.ToUpper(name) - - if name == lower || name == upper { - return strings.Title(lower) - } - - return name + return reflect.MakeChan(reflect.ChanOf(reflect.BothDir, t), bufSize) } // findExistingChan returns a channel attached to receiver if it already exists among connections. func (n *Graph) findExistingChan(addr address, dir reflect.ChanDir) reflect.Value { - var channel reflect.Value // Find existing channel attached to the receiver for i := range n.connections { - var a address - if dir == reflect.SendDir { - a = n.connections[i].src - } else { - a = n.connections[i].tgt - } - - if a == addr { - channel = n.connections[i].channel - break + if dir == reflect.SendDir && n.connections[i].src == addr || + dir == reflect.RecvDir && n.connections[i].tgt == addr { + return n.connections[i].channel } } - return channel + return reflect.Value{} } // incChanListenersCount increments SendChanRefCount. diff --git a/graph_connect_test.go b/graph_connect_test.go index 9b1a1ee..ca11a99 100644 --- a/graph_connect_test.go +++ b/graph_connect_test.go @@ -161,7 +161,7 @@ func TestSubgraphReceiver(t *testing.T) { func newFanOutFanIn() (*Graph, error) { n := NewGraph() - components := map[string]interface{}{ + components := map[string]Component{ "e1": new(echo), "d1": new(doubler), "d2": new(doubler), @@ -251,7 +251,7 @@ func TestFanOutFanIn(t *testing.T) { func newMapPorts() (*Graph, error) { n := NewGraph() - components := map[string]interface{}{ + components := map[string]Component{ "e1": new(echo), "e11": new(echo), "e22": new(echo), @@ -354,7 +354,7 @@ func TestMapPorts(t *testing.T) { func newArrayPorts() (*Graph, error) { n := NewGraph() - components := map[string]interface{}{ + components := map[string]Component{ "e0": new(echo), "e00": new(echo), "e11": new(echo), diff --git a/graph_iip.go b/graph_iip.go index c2c3369..5e2d5e1 100644 --- a/graph_iip.go +++ b/graph_iip.go @@ -14,7 +14,10 @@ type iip struct { // AddIIP adds an Initial Information packet to the network. func (n *Graph) AddIIP(processName, portName string, data interface{}) error { - addr := parseAddress(processName, portName) + addr, err := parseAddress(processName, portName) + if err != nil { + return fmt.Errorf("bad address: %w", err) + } if _, exists := n.procs[processName]; exists { n.iips = append(n.iips, iip{data: data, addr: addr}) @@ -26,7 +29,11 @@ func (n *Graph) AddIIP(processName, portName string, data interface{}) error { // RemoveIIP detaches an IIP from specific process and port. func (n *Graph) RemoveIIP(processName, portName string) error { - addr := parseAddress(processName, portName) + addr, err := parseAddress(processName, portName) + if err != nil { + return fmt.Errorf("bad address: %w", err) + } + for i := range n.iips { if n.iips[i].addr == addr { // Remove item from the slice diff --git a/graph_ports.go b/graph_ports.go index 5c006f5..4fdc4e1 100644 --- a/graph_ports.go +++ b/graph_ports.go @@ -13,9 +13,15 @@ type port struct { } // MapInPort adds an inport to the net and maps it to a contained proc's port. -func (n *Graph) MapInPort(name, procName, procPort string) { - addr := parseAddress(procName, procPort) +func (n *Graph) MapInPort(name, procName, procPort string) error { + addr, err := parseAddress(procName, procPort) + if err != nil { + return err + } + n.inPorts[name] = port{addr: addr} + + return nil } // // AnnotateInPort sets optional run-time annotation for the port utilized by @@ -39,9 +45,15 @@ func (n *Graph) MapInPort(name, procName, procPort string) { // } // MapOutPort adds an outport to the net and maps it to a contained proc's port. -func (n *Graph) MapOutPort(name, procName, procPort string) { - addr := parseAddress(procName, procPort) +func (n *Graph) MapOutPort(name, procName, procPort string) error { + addr, err := parseAddress(procName, procPort) + if err != nil { + return err + } + n.outPorts[name] = port{addr: addr} + + return nil } // // AnnotateOutPort sets optional run-time annotation for the port utilized by diff --git a/graph_test.go b/graph_test.go index c1a1da8..fcf9fc3 100644 --- a/graph_test.go +++ b/graph_test.go @@ -72,12 +72,14 @@ func testGraphWithNumberSequence(n *Graph, t *testing.T) { <-wait } -func TestAddInvalidProcess(t *testing.T) { - s := struct{ Name string }{"This is not a Component"} +func TestAddDuplicateProcess(t *testing.T) { n := NewGraph() - err := n.Add("wrong", s) - if err == nil { + if err := n.Add("echo", new(echo)); err != nil { + t.Error(err) + } + + if err := n.Add("echo", new(echo)); err == nil { t.Errorf("Expected an error") } } @@ -103,7 +105,7 @@ func TestRemove(t *testing.T) { } func RegisterTestGraph(f *Factory) error { - f.Register("doubleEcho", func() (interface{}, error) { + f.Register("doubleEcho", func() (Component, error) { return newDoubleEcho() })