Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
49b5167
feat: add shield sink
ishanarya0 Sep 6, 2022
0f2fec9
fix: use correct user asset keys
ishanarya0 Sep 6, 2022
1a92095
test: add sink test
ishanarya0 Sep 6, 2022
e0d7f46
docs: add README
ishanarya0 Sep 7, 2022
d09e216
docs: fix white spaces
ishanarya0 Sep 8, 2022
eac937b
test: add success test
ishanarya0 Sep 8, 2022
f13225d
chore: replace tabs with spaces
ishanarya0 Sep 12, 2022
d613f07
refactor: code review changes
ishanarya0 Sep 12, 2022
ef24c8f
feat: pass context to http request
ishanarya0 Sep 13, 2022
cc3908c
fix: http req & res close
ishanarya0 Sep 16, 2022
9099fe5
chore: improve errors
ishanarya0 Sep 16, 2022
13a17f6
fix: remove urlPathEscape for host
ishanarya0 Sep 16, 2022
91fe83b
test: fix payload error messages
ishanarya0 Sep 16, 2022
ae89972
Merge branch 'main' into shield-sink
ishanarya0 Sep 16, 2022
ef94575
feat: add grpc client
ishanarya0 Sep 18, 2022
6a5c772
test: tests to use grpc client
ishanarya0 Sep 18, 2022
61bae92
chore: fix linting issue
ishanarya0 Sep 18, 2022
3253adf
chore: change http to grpc in docs
ishanarya0 Sep 18, 2022
3f87648
docs: add shield sink
ishanarya0 Sep 19, 2022
90a8589
test: refactor test
ishanarya0 Sep 22, 2022
6b3a1dd
test: remove retry error test
ishanarya0 Sep 22, 2022
27fad0b
test: add retry error test
ishanarya0 Sep 22, 2022
9ba47bd
fix: code review changes
ishanarya0 Sep 26, 2022
a97ad79
feat: handle err when non User struct is sent
ishanarya0 Sep 26, 2022
5cc8fbf
fix: skip a bached record when build fails
ishanarya0 Sep 27, 2022
8f1c098
fix: error handling
ishanarya0 Oct 28, 2022
8edebe1
chore: shield proto variable name change
ishanarya0 Oct 28, 2022
f1e14ad
fix: shield sink test
ishanarya0 Oct 28, 2022
12241a5
chore: remove empty line
ishanarya0 Oct 28, 2022
fddd652
test: remove test
ishanarya0 Oct 28, 2022
0488a95
test: add test
ishanarya0 Oct 28, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions plugins/sinks/populate.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ import (
_ "github.com/odpf/meteor/plugins/sinks/file"
_ "github.com/odpf/meteor/plugins/sinks/http"
_ "github.com/odpf/meteor/plugins/sinks/kafka"
_ "github.com/odpf/meteor/plugins/sinks/shield"
_ "github.com/odpf/meteor/plugins/sinks/stencil"
)
19 changes: 19 additions & 0 deletions plugins/sinks/shield/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Shield

Shield is a cloud-native role-based authorization-aware reverse-proxy service that helps you manage the authorization of given resources. With Shield, you can create groups and manage members, manage policies of the resources.

## Usage

```yaml
sinks:
name: shield
config:
host: https://shield.com
headers:
X-Shield-Email: [email protected]
X-Other-Header: value1, value2
```

## Contributing

Refer to the contribution guidelines for information on contributing to this module.
7 changes: 7 additions & 0 deletions plugins/sinks/shield/shield.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package shield

type RequestPayload struct {
Name string `json:"name"`
Email string `json:"email"`
Metadata map[string]interface{} `json:"metadata"`
}
196 changes: 196 additions & 0 deletions plugins/sinks/shield/sink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
package shield

import (
"bytes"
"context"
_ "embed"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"strings"

"github.com/MakeNowJust/heredoc"
"github.com/odpf/meteor/models"
assetsv1beta2 "github.com/odpf/meteor/models/odpf/assets/v1beta2"
"github.com/odpf/meteor/plugins"
"github.com/odpf/meteor/registry"
"github.com/odpf/salt/log"
"github.com/pkg/errors"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/types/known/anypb"
)

//go:embed README.md
var summary string

type Config struct {
Host string `mapstructure:"host" validate:"required"`
Headers map[string]string `mapstructure:"headers"`
}

var info = plugins.Info{
Description: "Send user information to shield http service",
Summary: summary,
Tags: []string{"http", "sink"},
SampleConfig: heredoc.Doc(`
# The hostname of the shield service
host: https://shield.com
# Additional HTTP headers send to shield, multiple headers value are separated by a comma
headers:
X-Shield-Email: [email protected]
X-Other-Header: value1, value2
`),
}

type httpClient interface {
Do(*http.Request) (*http.Response, error)
}

type Sink struct {
plugins.BasePlugin
client httpClient
config Config
logger log.Logger
}

func New(c httpClient, logger log.Logger) plugins.Syncer {
s := &Sink{
logger: logger,
client: c,
}
s.BasePlugin = plugins.NewBasePlugin(info, &s.config)

return s
}

func (s *Sink) Init(ctx context.Context, config plugins.Config) (err error) {
if err = s.BasePlugin.Init(ctx, config); err != nil {
return err
}

return
}

func (s *Sink) Sink(ctx context.Context, batch []models.Record) (err error) {
for _, record := range batch {
metadata := record.Data()
s.logger.Info("sinking record to shield", "record", metadata.GetUrn())

shieldPayload, err := s.buildShieldPayload(metadata)
if err != nil {
return errors.Wrap(err, "failed to build shield payload")
}
if err = s.send(shieldPayload); err != nil {
return errors.Wrap(err, "error sending data")
}

s.logger.Info("successfully sinked record to shield", "record", metadata.Name)
}

return
}

func (s *Sink) Close() (err error) { return }

func (s *Sink) send(record RequestPayload) (err error) {
payloadBytes, err := json.Marshal(record)
if err != nil {
return
}

// send request
url := fmt.Sprintf("%s/admin/v1beta1/users/%s", s.config.Host, record.Email)
req, err := http.NewRequest(http.MethodPut, url, bytes.NewBuffer(payloadBytes))
if err != nil {
return
}

for hdrKey, hdrVal := range s.config.Headers {
hdrVals := strings.Split(hdrVal, ",")
for _, val := range hdrVals {
req.Header.Add(hdrKey, val)
}
}

res, err := s.client.Do(req)
if err != nil {
return
}
if res.StatusCode == 200 {
return
}

var bodyBytes []byte
bodyBytes, err = ioutil.ReadAll(res.Body)
if err != nil {
return
}
err = fmt.Errorf("shield returns %d: %v", res.StatusCode, string(bodyBytes))

switch code := res.StatusCode; {
case code >= 500:
return plugins.NewRetryError(err)
default:
return err
}
}

func (s *Sink) buildShieldPayload(resource *assetsv1beta2.Asset) (RequestPayload, error) {
data := resource.GetData()

mapdata, err := s.buildShieldData(data)
if err != nil {
return RequestPayload{}, err
}

name, ok := mapdata["full_name"].(string)
if !ok {
return RequestPayload{}, errors.New("name must be a string")
}

email, ok := mapdata["email"].(string)
if !ok {
return RequestPayload{}, errors.New("email must be a string")
}

metadata, ok := mapdata["attributes"].(map[string]interface{})
if !ok {
return RequestPayload{}, errors.New("attributes must be a map[string]interface{}")
}

record := RequestPayload{
Name: name,
Email: email,
Metadata: metadata,
}

return record, nil
}

func (s *Sink) buildShieldData(anyData *anypb.Any) (map[string]interface{}, error) {
var mapData map[string]interface{}

marshaler := &protojson.MarshalOptions{
UseProtoNames: true,
}
bytes, err := marshaler.Marshal(anyData)
if err != nil {
return mapData, errors.Wrap(err, "error marshaling asset data")
}

err = json.Unmarshal(bytes, &mapData)
if err != nil {
return mapData, errors.Wrap(err, "error unmarshalling to mapdata")
}

return mapData, nil
}

func init() {
if err := registry.Sinks.Register("shield", func() plugins.Syncer {
return New(&http.Client{}, plugins.GetLog())
}); err != nil {
panic(err)
}
}
Loading