Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
49b5167
feat: add shield sink
ishanarya0 Sep 6, 2022
0f2fec9
fix: use correct user asset keys
ishanarya0 Sep 6, 2022
1a92095
test: add sink test
ishanarya0 Sep 6, 2022
e0d7f46
docs: add README
ishanarya0 Sep 7, 2022
d09e216
docs: fix white spaces
ishanarya0 Sep 8, 2022
eac937b
test: add success test
ishanarya0 Sep 8, 2022
f13225d
chore: replace tabs with spaces
ishanarya0 Sep 12, 2022
d613f07
refactor: code review changes
ishanarya0 Sep 12, 2022
ef24c8f
feat: pass context to http request
ishanarya0 Sep 13, 2022
cc3908c
fix: http req & res close
ishanarya0 Sep 16, 2022
9099fe5
chore: improve errors
ishanarya0 Sep 16, 2022
13a17f6
fix: remove urlPathEscape for host
ishanarya0 Sep 16, 2022
91fe83b
test: fix payload error messages
ishanarya0 Sep 16, 2022
ae89972
Merge branch 'main' into shield-sink
ishanarya0 Sep 16, 2022
ef94575
feat: add grpc client
ishanarya0 Sep 18, 2022
6a5c772
test: tests to use grpc client
ishanarya0 Sep 18, 2022
61bae92
chore: fix linting issue
ishanarya0 Sep 18, 2022
3253adf
chore: change http to grpc in docs
ishanarya0 Sep 18, 2022
3f87648
docs: add shield sink
ishanarya0 Sep 19, 2022
90a8589
test: refactor test
ishanarya0 Sep 22, 2022
6b3a1dd
test: remove retry error test
ishanarya0 Sep 22, 2022
27fad0b
test: add retry error test
ishanarya0 Sep 22, 2022
9ba47bd
fix: code review changes
ishanarya0 Sep 26, 2022
a97ad79
feat: handle err when non User struct is sent
ishanarya0 Sep 26, 2022
5cc8fbf
fix: skip a bached record when build fails
ishanarya0 Sep 27, 2022
8f1c098
fix: error handling
ishanarya0 Oct 28, 2022
8edebe1
chore: shield proto variable name change
ishanarya0 Oct 28, 2022
f1e14ad
fix: shield sink test
ishanarya0 Oct 28, 2022
12241a5
chore: remove empty line
ishanarya0 Oct 28, 2022
fddd652
test: remove test
ishanarya0 Oct 28, 2022
0488a95
test: add test
ishanarya0 Oct 28, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions docs/docs/reference/sinks.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,22 @@ sinks:
send_format_header: false
```

## Shield

`shield`

Upsert users to shield service running at a given 'host'. Request will be sent via GRPC.

```yaml
sinks:
name: shield
config:
host: shield.com
headers:
X-Shield-Email: [email protected]
X-Other-Header: value1, value2
```

_**Notes**_

Compass' Type requires certain fields to be sent, hence why `mapping` config is needed to map value from any of our metadata models to any field name when sending to Compass. Supports getting value from nested fields.
1 change: 1 addition & 0 deletions plugins/sinks/populate.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ import (
_ "github.com/odpf/meteor/plugins/sinks/file"
_ "github.com/odpf/meteor/plugins/sinks/http"
_ "github.com/odpf/meteor/plugins/sinks/kafka"
_ "github.com/odpf/meteor/plugins/sinks/shield"
_ "github.com/odpf/meteor/plugins/sinks/stencil"
)
19 changes: 19 additions & 0 deletions plugins/sinks/shield/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Shield

Shield is a cloud-native role-based authorization-aware reverse-proxy service that helps you manage the authorization of given resources. With Shield, you can create groups and manage members, manage policies of the resources.

## Usage

```yaml
sinks:
name: shield
config:
host: shield.com
headers:
X-Shield-Email: [email protected]
X-Other-Header: value1, value2
```

## Contributing

Refer to the contribution guidelines for information on contributing to this module.
81 changes: 81 additions & 0 deletions plugins/sinks/shield/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package shield

import (
"context"
"fmt"
"time"

grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
sh "github.com/odpf/shield/proto/v1beta1"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

const (
GRPCMaxClientSendSize = 45 << 20 // 45MB
GRPCMaxClientRecvSize = 45 << 20 // 45MB
GRPCMaxRetry uint = 3
)

type Client interface {
sh.ShieldServiceClient
Connect(ctx context.Context, host string) error
Close() error
}

func newClient() Client {
return &client{}
}

type client struct {
sh.ShieldServiceClient
conn *grpc.ClientConn
}

func (c *client) Connect(ctx context.Context, host string) (err error) {
dialTimeoutCtx, dialCancel := context.WithTimeout(ctx, time.Second*2)
defer dialCancel()

if c.conn, err = c.createConnection(dialTimeoutCtx, host); err != nil {
err = fmt.Errorf("error creating connection: %w", err)
return
}

c.ShieldServiceClient = sh.NewShieldServiceClient(c.conn)

return
}

func (c *client) Close() error {
return c.conn.Close()
}

func (c *client) createConnection(ctx context.Context, host string) (*grpc.ClientConn, error) {
retryOpts := []grpc_retry.CallOption{
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100 * time.Millisecond)),
grpc_retry.WithMax(GRPCMaxRetry),
}
var opts []grpc.DialOption
opts = append(opts,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(),
grpc.WithDefaultCallOptions(
grpc.MaxCallSendMsgSize(GRPCMaxClientSendSize),
grpc.MaxCallRecvMsgSize(GRPCMaxClientRecvSize),
),
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
grpc_retry.UnaryClientInterceptor(retryOpts...),
otelgrpc.UnaryClientInterceptor(),
grpc_prometheus.UnaryClientInterceptor,
)),
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(
otelgrpc.StreamClientInterceptor(),
grpc_prometheus.StreamClientInterceptor,
)),
)

return grpc.DialContext(ctx, host, opts...)
}
7 changes: 7 additions & 0 deletions plugins/sinks/shield/shield.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package shield

type RequestPayload struct {
Name string `json:"name"`
Email string `json:"email"`
Metadata map[string]interface{} `json:"metadata"`
}
167 changes: 167 additions & 0 deletions plugins/sinks/shield/sink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package shield

import (
"context"
_ "embed"
"fmt"
"strings"

"github.com/MakeNowJust/heredoc"
"github.com/odpf/meteor/models"
assetsv1beta2 "github.com/odpf/meteor/models/odpf/assets/v1beta2"
"github.com/odpf/meteor/plugins"
"github.com/odpf/meteor/registry"
"github.com/odpf/salt/log"
sh "github.com/odpf/shield/proto/v1beta1"
"github.com/pkg/errors"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)

//go:embed README.md
var summary string

type Config struct {
Host string `mapstructure:"host" validate:"required"`
Headers map[string]string `mapstructure:"headers"`
}

var info = plugins.Info{
Description: "Send user information to shield grpc service",
Summary: summary,
Tags: []string{"grpc", "sink"},
SampleConfig: heredoc.Doc(`
# The hostname of the shield service
host: shield.com:5556
# Additional headers send to shield, multiple headers value are separated by a comma
headers:
X-Shield-Email: [email protected]
X-Other-Header: value1, value2
`),
}

type Sink struct {
plugins.BasePlugin
client Client
config Config
logger log.Logger
}

func New(c Client, logger log.Logger) plugins.Syncer {
s := &Sink{
logger: logger,
client: c,
}
s.BasePlugin = plugins.NewBasePlugin(info, &s.config)

return s
}

func (s *Sink) Init(ctx context.Context, config plugins.Config) error {
if err := s.BasePlugin.Init(ctx, config); err != nil {
return err
}

if err := s.client.Connect(ctx, s.config.Host); err != nil {
return fmt.Errorf("error connecting to host: %w", err)
}

return nil
}

func (s *Sink) Sink(ctx context.Context, batch []models.Record) error {
for _, record := range batch {
asset := record.Data()
s.logger.Info("sinking record to shield", "record", asset.GetUrn())

userRequestBody, err := s.buildUserRequestBody(asset)
if err != nil {
s.logger.Error("failed to build shield payload", "err", err, "record", asset.Name)
continue
}

if err = s.send(ctx, userRequestBody); err != nil {
return errors.Wrap(err, "error sending data")
}

s.logger.Info("successfully sinked record to shield", "record", asset.Name)
}

return nil
}

func (s *Sink) Close() (err error) {
return
//TODO: Connection closes even when some records are unpiblished
//TODO: return s.client.Close()
}

func (s *Sink) send(ctx context.Context, userRequestBody *sh.UserRequestBody) error {
for hdrKey, hdrVal := range s.config.Headers {
hdrVals := strings.Split(hdrVal, ",")
for _, val := range hdrVals {
val = strings.TrimSpace(val)
md := metadata.New(map[string]string{hdrKey: val})
ctx = metadata.NewOutgoingContext(ctx, md)
}
}

_, err := s.client.UpdateUser(ctx, &sh.UpdateUserRequest{
Id: userRequestBody.Email,
Body: userRequestBody,
})
if err == nil {
return nil
}

if e, ok := status.FromError(err); ok {
err = fmt.Errorf("shield returns code %d: %v", e.Code(), e.Message())
switch e.Code() {
case codes.Unavailable:
return plugins.NewRetryError(err)
default:
return err
}
} else {
err = fmt.Errorf("not able to parse error returned %v", err)
}

return err
}

func (s *Sink) buildUserRequestBody(asset *assetsv1beta2.Asset) (*sh.UserRequestBody, error) {
data := asset.GetData()

var user assetsv1beta2.User
err := data.UnmarshalTo(&user)
if err != nil {
return &sh.UserRequestBody{}, errors.Wrap(err, "not a User struct")
}

if user.FullName == "" {
return &sh.UserRequestBody{}, errors.New("empty user name")
}
if user.Email == "" {
return &sh.UserRequestBody{}, errors.New("empty user email")
}
if user.Attributes == nil {
return &sh.UserRequestBody{}, errors.New("empty user attributes")
}

requestBody := &sh.UserRequestBody{
Name: user.FullName,
Email: user.Email,
Metadata: user.Attributes,
}

return requestBody, nil
}

func init() {
if err := registry.Sinks.Register("shield", func() plugins.Syncer {
return New(newClient(), plugins.GetLog())
}); err != nil {
panic(err)
}
}
Loading