Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions plugins/extractors/optimus/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ source:
| Key | Value | Example | Description | |
| :-- | :---- | :------ | :---------- | :- |
| `host` | `string` | `optimus.com:80` | Optimus' GRPC host | *required* |
| `max_size_in_mb` | `int` | `45` | Max megabytes for GRPC client to receive message. Default to 45. | |

## Outputs

Expand Down
24 changes: 14 additions & 10 deletions plugins/extractors/optimus/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,18 @@ import (
)

const (
service = "optimus"
GRPCMaxClientSendSize = 45 << 20 // 45MB
GRPCMaxClientRecvSize = 45 << 20 // 45MB
GRPCMaxRetry uint = 3
service = "optimus"
GRPCMaxClientSendSizeMB = 45
GRPCMaxClientRecvSizeMB = 45
GRPCMaxRetry uint = 3
)

type Client interface {
pb.NamespaceServiceClient
pb.ProjectServiceClient
pb.JobSpecificationServiceClient
pb.JobRunServiceClient
Connect(ctx context.Context, host string) error
Connect(ctx context.Context, host string, maxSizeInMB int) error
Close() error
}

Expand All @@ -41,11 +41,11 @@ type client struct {
conn *grpc.ClientConn
}

func (c *client) Connect(ctx context.Context, host string) (err error) {
func (c *client) Connect(ctx context.Context, host string, maxSizeInMB int) (err error) {
dialTimeoutCtx, dialCancel := context.WithTimeout(ctx, time.Second*2)
defer dialCancel()

if c.conn, err = c.createConnection(dialTimeoutCtx, host); err != nil {
if c.conn, err = c.createConnection(dialTimeoutCtx, host, maxSizeInMB); err != nil {
err = errors.Wrap(err, "error creating connection")
return
}
Expand All @@ -62,7 +62,11 @@ func (c *client) Close() error {
return c.conn.Close()
}

func (c *client) createConnection(ctx context.Context, host string) (*grpc.ClientConn, error) {
func (c *client) createConnection(ctx context.Context, host string, maxSizeInMB int) (*grpc.ClientConn, error) {
if maxSizeInMB <= 0 {
maxSizeInMB = GRPCMaxClientRecvSizeMB
}

retryOpts := []grpc_retry.CallOption{
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100 * time.Millisecond)),
grpc_retry.WithMax(GRPCMaxRetry),
Expand All @@ -72,8 +76,8 @@ func (c *client) createConnection(ctx context.Context, host string) (*grpc.Clien
grpc.WithInsecure(),
grpc.WithBlock(),
grpc.WithDefaultCallOptions(
grpc.MaxCallSendMsgSize(GRPCMaxClientSendSize),
grpc.MaxCallRecvMsgSize(GRPCMaxClientRecvSize),
grpc.MaxCallSendMsgSize(GRPCMaxClientSendSizeMB<<20),
grpc.MaxCallRecvMsgSize(maxSizeInMB<<20),
),
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
grpc_retry.UnaryClientInterceptor(retryOpts...),
Expand Down
5 changes: 3 additions & 2 deletions plugins/extractors/optimus/optimus.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ var summary string

// Config holds the set of configuration for the bigquery extractor
type Config struct {
Host string `mapstructure:"host" validate:"required"`
Host string `mapstructure:"host" validate:"required"`
MaxSizeInMB int `mapstructure:"max_size_in_mb"`
}

var sampleConfig = `
Expand Down Expand Up @@ -64,7 +65,7 @@ func (e *Extractor) Init(ctx context.Context, configMap map[string]interface{})
return plugins.InvalidConfigError{}
}

if err := e.client.Connect(ctx, e.config.Host); err != nil {
if err := e.client.Connect(ctx, e.config.Host, e.config.MaxSizeInMB); err != nil {
return errors.Wrap(err, "error connecting to host")
}

Expand Down
8 changes: 4 additions & 4 deletions plugins/extractors/optimus/optimus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestInit(t *testing.T) {
ctx := context.TODO()

client := new(mockClient)
client.On("Connect", ctx, validConfig["host"]).Return(nil)
client.On("Connect", ctx, validConfig["host"], 0).Return(nil)
defer client.AssertExpectations(t)

extr := optimus.New(testutils.Logger, client)
Expand Down Expand Up @@ -78,8 +78,8 @@ type mockClient struct {
mock.Mock
}

func (c *mockClient) Connect(ctx context.Context, host string) (err error) {
args := c.Called(ctx, host)
func (c *mockClient) Connect(ctx context.Context, host string, maxSizeInMB int) (err error) {
args := c.Called(ctx, host, maxSizeInMB)

return args.Error(0)
}
Expand Down Expand Up @@ -118,7 +118,7 @@ func (c *mockClient) GetJobTask(
}

func setupExtractExpectation(ctx context.Context, client *mockClient) {
client.On("Connect", ctx, validConfig["host"]).Return(nil).Once()
client.On("Connect", ctx, validConfig["host"], 0).Return(nil).Once()

client.On("ListProjects", ctx, &pb.ListProjectsRequest{}, mock.Anything).Return(&pb.ListProjectsResponse{
Projects: []*pb.ProjectSpecification{
Expand Down