Skip to content

Commit 2c97668

Browse files
committed
feature: support event service
Signed-off-by: Michael Wan <[email protected]>
1 parent f76d3bc commit 2c97668

File tree

22 files changed

+1370
-8
lines changed

22 files changed

+1370
-8
lines changed

apis/filters/parse.go

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
package filters
2+
3+
import (
4+
"encoding/json"
5+
"errors"
6+
"strings"
7+
)
8+
9+
// Args stores filter arguments as map key:{map key: bool}.
10+
// It contains an aggregation of the map of arguments (which are in the form
11+
// of -f 'key=value') based on the key, and stores values for the same key
12+
// in a map with string keys and boolean values.
13+
// e.g given -f 'label=label1=1' -f 'label=label2=2' -f 'image.name=ubuntu'
14+
// the args will be {"image.name":{"ubuntu":true},"label":{"label1=1":true,"label2=2":true}}
15+
type Args struct {
16+
fields map[string]map[string]bool
17+
}
18+
19+
// KeyValuePair is used to initialize a new Args
20+
type KeyValuePair struct {
21+
Key string
22+
Value string
23+
}
24+
25+
// Arg creates a new KeyValuePair for initializing Args
26+
func Arg(key, value string) KeyValuePair {
27+
return KeyValuePair{Key: key, Value: value}
28+
}
29+
30+
// NewArgs returns a new Args populated with the initial args
31+
func NewArgs(initialArgs ...KeyValuePair) Args {
32+
args := Args{fields: map[string]map[string]bool{}}
33+
for _, arg := range initialArgs {
34+
args.Add(arg.Key, arg.Value)
35+
}
36+
return args
37+
}
38+
39+
// Get returns the list of values associated with the key
40+
func (args Args) Get(key string) []string {
41+
values := args.fields[key]
42+
if values == nil {
43+
return make([]string, 0)
44+
}
45+
slice := make([]string, 0, len(values))
46+
for key := range values {
47+
slice = append(slice, key)
48+
}
49+
return slice
50+
}
51+
52+
// Add a new value to the set of values
53+
func (args Args) Add(key, value string) {
54+
if _, ok := args.fields[key]; ok {
55+
args.fields[key][value] = true
56+
} else {
57+
args.fields[key] = map[string]bool{value: true}
58+
}
59+
}
60+
61+
// Del removes a value from the set
62+
func (args Args) Del(key, value string) {
63+
if _, ok := args.fields[key]; ok {
64+
delete(args.fields[key], value)
65+
if len(args.fields[key]) == 0 {
66+
delete(args.fields, key)
67+
}
68+
}
69+
}
70+
71+
// Len returns the number of fields in the arguments.
72+
func (args Args) Len() int {
73+
return len(args.fields)
74+
}
75+
76+
// ExactMatch returns true if the source matches exactly one of the filters.
77+
func (args Args) ExactMatch(field, source string) bool {
78+
fieldValues, ok := args.fields[field]
79+
//do not filter if there is no filter set or cannot determine filter
80+
if !ok || len(fieldValues) == 0 {
81+
return true
82+
}
83+
84+
// try to match full name value to avoid O(N) regular expression matching
85+
return fieldValues[source]
86+
}
87+
88+
// MarshalJSON returns a JSON byte representation of the Args
89+
func (args Args) MarshalJSON() ([]byte, error) {
90+
if len(args.fields) == 0 {
91+
return []byte{}, nil
92+
}
93+
return json.Marshal(args.fields)
94+
}
95+
96+
// UnmarshalJSON populates the Args from JSON encode bytes
97+
func (args Args) UnmarshalJSON(raw []byte) error {
98+
if len(raw) == 0 {
99+
return nil
100+
}
101+
return json.Unmarshal(raw, &args.fields)
102+
}
103+
104+
// ErrBadFormat is an error returned when a filter is not in the form key=value
105+
//
106+
// Deprecated: this error will be removed in a future version
107+
var ErrBadFormat = errors.New("bad format of filter (expected name=value)")
108+
109+
// ParseFlag parses a key=value string and adds it to an Args.
110+
//
111+
// Deprecated: Use Args.Add()
112+
func ParseFlag(arg string, prev Args) (Args, error) {
113+
filters := prev
114+
if len(arg) == 0 {
115+
return filters, nil
116+
}
117+
118+
if !strings.Contains(arg, "=") {
119+
return filters, ErrBadFormat
120+
}
121+
122+
f := strings.SplitN(arg, "=", 2)
123+
124+
name := strings.ToLower(strings.TrimSpace(f[0]))
125+
value := strings.TrimSpace(f[1])
126+
127+
filters.Add(name, value)
128+
129+
return filters, nil
130+
}
131+
132+
// ToParam packs the Args into a string for easy transport from client to server.
133+
func ToParam(a Args) (string, error) {
134+
if a.Len() == 0 {
135+
return "", nil
136+
}
137+
138+
buf, err := json.Marshal(a)
139+
return string(buf), err
140+
}
141+
142+
// FromParam decodes a JSON encoded string into Args
143+
func FromParam(p string) (Args, error) {
144+
args := NewArgs()
145+
146+
if p == "" {
147+
return args, nil
148+
}
149+
150+
raw := []byte(p)
151+
err := json.Unmarshal(raw, &args)
152+
if err != nil {
153+
return args, err
154+
}
155+
return args, nil
156+
}

apis/filters/parse_test.go

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
package filters
2+
3+
import (
4+
"testing"
5+
)
6+
7+
func TestParseArgs(t *testing.T) {
8+
// equivalent of `docker ps -f 'created=today' -f 'image.name=ubuntu*' -f 'image.name=*untu'`
9+
flagArgs := []string{
10+
"created=today",
11+
"image.name=ubuntu*",
12+
"image.name=*untu",
13+
}
14+
var (
15+
args = NewArgs()
16+
err error
17+
)
18+
19+
for i := range flagArgs {
20+
args, err = ParseFlag(flagArgs[i], args)
21+
if err != nil {
22+
t.Fatalf("ParseFlag got err: %v", err)
23+
}
24+
}
25+
26+
if len(args.Get("created")) != 1 {
27+
t.Fatalf("got unexpected created keys: %v", args.Get("created"))
28+
}
29+
if len(args.Get("image.name")) != 2 {
30+
t.Fatalf("got unexpected image.name keys: %v", args.Get("image.name"))
31+
}
32+
}
33+
34+
func TestAdd(t *testing.T) {
35+
f := NewArgs()
36+
f.Add("status", "running")
37+
v := f.fields["status"]
38+
if len(v) != 1 || !v["running"] {
39+
t.Fatalf("Expected to include a running status, got %v", v)
40+
}
41+
42+
f.Add("status", "paused")
43+
if len(v) != 2 || !v["paused"] {
44+
t.Fatalf("Expected to include a paused status, got %v", v)
45+
}
46+
}
47+
48+
func TestDel(t *testing.T) {
49+
f := NewArgs()
50+
f.Add("status", "running")
51+
f.Del("status", "running")
52+
v := f.fields["status"]
53+
if v["running"] {
54+
t.Fatal("Expected to not include a running status filter, got true")
55+
}
56+
}
57+
58+
func TestLen(t *testing.T) {
59+
f := NewArgs()
60+
if f.Len() != 0 {
61+
t.Fatal("Expected to not include any field")
62+
}
63+
f.Add("status", "running")
64+
if f.Len() != 1 {
65+
t.Fatal("Expected to include one field")
66+
}
67+
}
68+
69+
func TestExactMatch(t *testing.T) {
70+
f := NewArgs()
71+
72+
if !f.ExactMatch("status", "running") {
73+
t.Fatal("Expected to match `running` when there are no filters, got false")
74+
}
75+
76+
f.Add("status", "running")
77+
f.Add("status", "pause*")
78+
79+
if !f.ExactMatch("status", "running") {
80+
t.Fatal("Expected to match `running` with one of the filters, got false")
81+
}
82+
83+
if f.ExactMatch("status", "paused") {
84+
t.Fatal("Expected to not match `paused` with one of the filters, got true")
85+
}
86+
}
87+
88+
func TestToParam(t *testing.T) {
89+
fields := map[string]map[string]bool{
90+
"created": {"today": true},
91+
"image.name": {"ubuntu*": true, "*untu": true},
92+
}
93+
a := Args{fields: fields}
94+
95+
_, err := ToParam(a)
96+
if err != nil {
97+
t.Errorf("failed to marshal the filters: %s", err)
98+
}
99+
}
100+
101+
func TestFromParam(t *testing.T) {
102+
invalids := []string{
103+
"anything",
104+
"['a','list']",
105+
"{'key': 'value'}",
106+
`{"key": "value"}`,
107+
`{"key": ["value"]}`,
108+
}
109+
valid := map[*Args][]string{
110+
{fields: map[string]map[string]bool{"key": {"value": true}}}: {
111+
`{"key": {"value": true}}`,
112+
},
113+
{fields: map[string]map[string]bool{"key": {"value1": true, "value2": true}}}: {
114+
`{"key": {"value1": true, "value2": true}}`,
115+
},
116+
{fields: map[string]map[string]bool{"key1": {"value1": true}, "key2": {"value2": true}}}: {
117+
`{"key1": {"value1": true}, "key2": {"value2": true}}`,
118+
},
119+
}
120+
121+
for _, invalid := range invalids {
122+
if _, err := FromParam(invalid); err == nil {
123+
t.Fatalf("Expected an error with %v, got nothing", invalid)
124+
}
125+
}
126+
127+
for expectedArgs, matchers := range valid {
128+
for _, json := range matchers {
129+
args, err := FromParam(json)
130+
if err != nil {
131+
t.Fatal(err)
132+
}
133+
if args.Len() != expectedArgs.Len() {
134+
t.Fatalf("Expected %v, go %v", expectedArgs, args)
135+
}
136+
for key, expectedValues := range expectedArgs.fields {
137+
values := args.Get(key)
138+
139+
if len(values) != len(expectedValues) {
140+
t.Fatalf("Expected %v, go %v", expectedArgs, args)
141+
}
142+
143+
for _, v := range values {
144+
if !expectedValues[v] {
145+
t.Fatalf("Expected %v, go %v", expectedArgs, args)
146+
}
147+
}
148+
}
149+
}
150+
}
151+
}

apis/server/router.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ func initRoute(s *Server) http.Handler {
2828
s.addRoute(r, http.MethodGet, "/info", s.info)
2929
s.addRoute(r, http.MethodGet, "/version", s.version)
3030
s.addRoute(r, http.MethodPost, "/auth", s.auth)
31+
s.addRoute(r, http.MethodGet, "/events", s.events)
3132

3233
// daemon, we still list this API into system manager.
3334
s.addRoute(r, http.MethodPost, "/daemon/update", s.updateDaemon)

apis/server/system_bridge.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,12 @@ import (
55
"encoding/json"
66
"net/http"
77

8+
"github.com/alibaba/pouch/apis/filters"
89
"github.com/alibaba/pouch/apis/types"
910
"github.com/alibaba/pouch/pkg/httputils"
11+
"github.com/alibaba/pouch/pkg/ioutils"
12+
13+
"github.com/pkg/errors"
1014
)
1115

1216
func (s *Server) ping(context context.Context, rw http.ResponseWriter, req *http.Request) (err error) {
@@ -60,3 +64,34 @@ func (s *Server) auth(ctx context.Context, rw http.ResponseWriter, req *http.Req
6064
}
6165
return EncodeResponse(rw, http.StatusOK, authResp)
6266
}
67+
68+
func (s *Server) events(ctx context.Context, rw http.ResponseWriter, req *http.Request) (err error) {
69+
ctx, cancel := context.WithCancel(ctx)
70+
defer cancel()
71+
72+
rw.Header().Set("Content-Type", "application/json")
73+
output := ioutils.NewWriteFlusher(rw)
74+
defer output.Close()
75+
output.Flush()
76+
enc := json.NewEncoder(output)
77+
78+
ef, err := filters.FromParam(req.FormValue("filters"))
79+
if err != nil {
80+
return err
81+
}
82+
83+
eventq, errq := s.SystemMgr.SubscribeToEvents(ctx, ef)
84+
for {
85+
select {
86+
case ev := <-eventq:
87+
if err := enc.Encode(ev); err != nil {
88+
return err
89+
}
90+
case err := <-errq:
91+
if err != nil {
92+
return errors.Wrapf(err, "subscribe failed")
93+
}
94+
return nil
95+
}
96+
}
97+
}

0 commit comments

Comments
 (0)