Skip to content

Commit bdf4ba0

Browse files
authored
Merge pull request ethereum#95 from ethersphere/network-testing-framework-services
p2p/simulations: Fix starting nodes with multiple services
2 parents 02f6b66 + 111dd56 commit bdf4ba0

File tree

9 files changed

+201
-210
lines changed

9 files changed

+201
-210
lines changed

p2p/simulations/adapters/docker.go

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,13 @@ func (d *DockerAdapter) Name() string {
4343

4444
// NewNode returns a new DockerNode using the given config
4545
func (d *DockerAdapter) NewNode(config *NodeConfig) (Node, error) {
46-
if _, exists := serviceFuncs[config.Service]; !exists {
47-
return nil, fmt.Errorf("unknown node service %q", config.Service)
46+
if len(config.Services) == 0 {
47+
return nil, errors.New("node must have at least one service")
48+
}
49+
for _, service := range config.Services {
50+
if _, exists := serviceFuncs[service]; !exists {
51+
return nil, fmt.Errorf("unknown node service %q", service)
52+
}
4853
}
4954

5055
// generate the config
@@ -76,22 +81,18 @@ type DockerNode struct {
7681
// dockerCommand returns a command which exec's the binary in a docker
7782
// container.
7883
//
79-
// It uses a shell so that we can pass the _P2P_NODE_CONFIG and _P2P_NODE_KEY
80-
// environment variables to the container using the --env flag.
84+
// It uses a shell so that we can pass the _P2P_NODE_CONFIG environment
85+
// variable to the container using the --env flag.
8186
func (n *DockerNode) dockerCommand() *exec.Cmd {
8287
return exec.Command(
8388
"sh", "-c",
8489
fmt.Sprintf(
85-
`exec docker run --interactive --env _P2P_NODE_CONFIG="${_P2P_NODE_CONFIG}" --env _P2P_NODE_KEY="${_P2P_NODE_KEY}" %s p2p-node %s %s`,
86-
dockerImage, strings.Join(n.Services, " "), n.ID.String(),
90+
`exec docker run --interactive --env _P2P_NODE_CONFIG="${_P2P_NODE_CONFIG}" %s p2p-node %s %s`,
91+
dockerImage, strings.Join(n.Config.Node.Services, ","), n.ID.String(),
8792
),
8893
)
8994
}
9095

91-
func (n *DockerNode) GetService(name string) node.Service {
92-
return nil
93-
}
94-
9596
// dockerImage is the name of the docker image
9697
const dockerImage = "p2p-node"
9798

p2p/simulations/adapters/exec.go

Lines changed: 79 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,13 @@ func (e *ExecAdapter) Name() string {
4646

4747
// NewNode returns a new ExecNode using the given config
4848
func (e *ExecAdapter) NewNode(config *NodeConfig) (Node, error) {
49-
if _, exists := serviceFuncs[config.Service]; !exists {
50-
return nil, fmt.Errorf("unknown node service %q", config.Service)
49+
if len(config.Services) == 0 {
50+
return nil, errors.New("node must have at least one service")
51+
}
52+
for _, service := range config.Services {
53+
if _, exists := serviceFuncs[service]; !exists {
54+
return nil, fmt.Errorf("unknown node service %q", service)
55+
}
5156
}
5257

5358
// create the node directory using the first 12 characters of the ID
@@ -88,12 +93,11 @@ func (e *ExecAdapter) NewNode(config *NodeConfig) (Node, error) {
8893
// (so for example we can run the node in a remote Docker container and
8994
// still communicate with it).
9095
type ExecNode struct {
91-
ID *NodeId
92-
Dir string
93-
Config *execNodeConfig
94-
Cmd *exec.Cmd
95-
Info *p2p.NodeInfo
96-
Services []string
96+
ID *NodeId
97+
Dir string
98+
Config *execNodeConfig
99+
Cmd *exec.Cmd
100+
Info *p2p.NodeInfo
97101

98102
client *rpc.Client
99103
rpcMux *rpcMux
@@ -118,7 +122,7 @@ func (n *ExecNode) Client() (*rpc.Client, error) {
118122
// Start exec's the node passing the ID and service as command line arguments
119123
// and the node config encoded as JSON in the _P2P_NODE_CONFIG environment
120124
// variable
121-
func (n *ExecNode) Start(snapshot []byte) (err error) {
125+
func (n *ExecNode) Start(snapshots map[string][]byte) (err error) {
122126
if n.Cmd != nil {
123127
return errors.New("already started")
124128
}
@@ -131,7 +135,7 @@ func (n *ExecNode) Start(snapshot []byte) (err error) {
131135

132136
// encode a copy of the config containing the snapshot
133137
confCopy := *n.Config
134-
confCopy.Snapshot = snapshot
138+
confCopy.Snapshots = snapshots
135139
confData, err := json.Marshal(confCopy)
136140
if err != nil {
137141
return fmt.Errorf("error generating node config: %s", err)
@@ -165,17 +169,13 @@ func (n *ExecNode) Start(snapshot []byte) (err error) {
165169
return nil
166170
}
167171

168-
func (n *ExecNode) GetService(name string) node.Service {
169-
return nil
170-
}
171-
172172
// execCommand returns a command which runs the node locally by exec'ing
173173
// the current binary but setting argv[0] to "p2p-node" so that the child
174174
// runs execP2PNode
175175
func (n *ExecNode) execCommand() *exec.Cmd {
176176
return &exec.Cmd{
177177
Path: reexec.Self(),
178-
Args: []string{"p2p-node", n.Services[0], n.ID.String()},
178+
Args: []string{"p2p-node", strings.Join(n.Config.Node.Services, ","), n.ID.String()},
179179
}
180180
}
181181

@@ -231,14 +231,14 @@ func (n *ExecNode) ServeRPC(conn net.Conn) error {
231231
return nil
232232
}
233233

234-
// Snapshot creates a snapshot of the service state by calling the
234+
// Snapshots creates snapshots of the services by calling the
235235
// simulation_snapshot RPC method
236-
func (n *ExecNode) Snapshot() ([]byte, error) {
236+
func (n *ExecNode) Snapshots() (map[string][]byte, error) {
237237
if n.client == nil {
238238
return nil, errors.New("RPC not started")
239239
}
240-
var snapshot []byte
241-
return snapshot, n.client.Call(&snapshot, "simulation_snapshot")
240+
var snapshots map[string][]byte
241+
return snapshots, n.client.Call(&snapshots, "simulation_snapshot")
242242
}
243243

244244
func init() {
@@ -250,9 +250,9 @@ func init() {
250250
// execNodeConfig is used to serialize the node configuration so it can be
251251
// passed to the child process as a JSON encoded environment variable
252252
type execNodeConfig struct {
253-
Stack node.Config `json:"stack"`
254-
Node *NodeConfig `json:"node"`
255-
Snapshot []byte `json:"snapshot,omitempty"`
253+
Stack node.Config `json:"stack"`
254+
Node *NodeConfig `json:"node"`
255+
Snapshots map[string][]byte `json:"snapshot,omitempty"`
256256
}
257257

258258
// execP2PNode starts a devp2p node when the current binary is executed with
@@ -263,8 +263,8 @@ func execP2PNode() {
263263
glogger.Verbosity(log.LvlInfo)
264264
log.Root().SetHandler(glogger)
265265

266-
// read the service and ID from argv
267-
serviceName := os.Args[1]
266+
// read the services and ID from argv
267+
serviceNames := strings.Split(os.Args[1], ",")
268268
id := NewNodeIdFromHex(os.Args[2])
269269

270270
// decode the config
@@ -278,12 +278,19 @@ func execP2PNode() {
278278
}
279279
conf.Stack.P2P.PrivateKey = conf.Node.PrivateKey
280280

281-
// initialize the service
282-
serviceFunc, exists := serviceFuncs[serviceName]
283-
if !exists {
284-
log.Crit(fmt.Sprintf("unknown node service %q", serviceName))
281+
// initialize the services
282+
services := make(map[string]node.Service, len(serviceNames))
283+
for _, name := range serviceNames {
284+
serviceFunc, exists := serviceFuncs[name]
285+
if !exists {
286+
log.Crit(fmt.Sprintf("unknown node service %q", name))
287+
}
288+
var snapshot []byte
289+
if conf.Snapshots != nil {
290+
snapshot = conf.Snapshots[name]
291+
}
292+
services[name] = serviceFunc(id, snapshot)
285293
}
286-
services := serviceFunc(id, conf.Snapshot)
287294

288295
// use explicit IP address in ListenAddr so that Enode URL is usable
289296
if strings.HasPrefix(conf.Stack.P2P.ListenAddr, ":") {
@@ -326,52 +333,75 @@ func execP2PNode() {
326333
stack.Wait()
327334
}
328335

329-
func startP2PNode(conf *node.Config, services []node.Service) (*node.Node, error) {
336+
func startP2PNode(conf *node.Config, services map[string]node.Service) (*node.Node, error) {
330337
stack, err := node.New(conf)
331338
if err != nil {
332339
return nil, err
333340
}
334-
for _, svc := range services {
335-
constructor := func(ctx *node.ServiceContext) (node.Service, error) {
336-
return &snapshotService{svc}, nil
341+
constructor := func(service node.Service) func(ctx *node.ServiceContext) (node.Service, error) {
342+
return func(ctx *node.ServiceContext) (node.Service, error) {
343+
return service, nil
337344
}
338-
if err := stack.Register(constructor); err != nil {
345+
}
346+
for _, service := range services {
347+
if err := stack.Register(constructor(service)); err != nil {
339348
return nil, err
340349
}
341350
}
342-
351+
if err := stack.Register(constructor(&snapshotService{services})); err != nil {
352+
return nil, err
353+
}
343354
if err := stack.Start(); err != nil {
344355
return nil, err
345356
}
346357
return stack, nil
347358
}
348359

349-
// snapshotService wraps a node.Service and injects a snapshot API into the
350-
// list of RPC APIs
360+
// snapshotService is a node.Service which wraps a list of services and
361+
// exposes an API to generate a snapshot of those services
351362
type snapshotService struct {
352-
node.Service
363+
services map[string]node.Service
353364
}
354365

355366
func (s *snapshotService) APIs() []rpc.API {
356-
return append([]rpc.API{{
367+
return []rpc.API{{
357368
Namespace: "simulation",
358369
Version: "1.0",
359-
Service: SnapshotAPI{s.Service},
360-
}}, s.Service.APIs()...)
370+
Service: SnapshotAPI{s.services},
371+
}}
361372
}
362373

363-
// SnapshotAPI provides an RPC method to create a snapshot of a node.Service
374+
func (s *snapshotService) Protocols() []p2p.Protocol {
375+
return nil
376+
}
377+
378+
func (s *snapshotService) Start(*p2p.Server) error {
379+
return nil
380+
}
381+
382+
func (s *snapshotService) Stop() error {
383+
return nil
384+
}
385+
386+
// SnapshotAPI provides an RPC method to create snapshots of services
364387
type SnapshotAPI struct {
365-
service node.Service
388+
services map[string]node.Service
366389
}
367390

368-
func (api SnapshotAPI) Snapshot() ([]byte, error) {
369-
if s, ok := api.service.(interface {
370-
Snapshot() ([]byte, error)
371-
}); ok {
372-
return s.Snapshot()
391+
func (api SnapshotAPI) Snapshot() (map[string][]byte, error) {
392+
snapshots := make(map[string][]byte)
393+
for name, service := range api.services {
394+
if s, ok := service.(interface {
395+
Snapshot() ([]byte, error)
396+
}); ok {
397+
snap, err := s.Snapshot()
398+
if err != nil {
399+
return nil, err
400+
}
401+
snapshots[name] = snap
402+
}
373403
}
374-
return nil, nil
404+
return snapshots, nil
375405
}
376406

377407
// stdioConn wraps os.Stdin / os.Stdout with a no-op Close method so we can

0 commit comments

Comments
 (0)