diff --git a/docs/docs/concepts/sink.md b/docs/docs/concepts/sink.md index 2191b7cf5..94d747fac 100644 --- a/docs/docs/concepts/sink.md +++ b/docs/docs/concepts/sink.md @@ -46,9 +46,23 @@ sinks: path: "./dir/sample.yaml" format: "yaml" ``` - Sinks metadata to a file in `json/yaml` format as per the config defined. +* **Stencil** + +```yaml +sinks: + name: stencil + config: + host: https://stencil.com + namespace_id: myNamespace + schema_id: mySchema + format: json + send_format_header: false +``` + +Upload metadata of a given schema `format` in the existing `namespace_id` present in Stencil. Request will be sent via HTTP to a given host. + ## Upcoming sinks * HTTP diff --git a/docs/docs/reference/sinks.md b/docs/docs/reference/sinks.md index 2b9b2b603..9218ce515 100644 --- a/docs/docs/reference/sinks.md +++ b/docs/docs/reference/sinks.md @@ -47,6 +47,23 @@ sinks: format: "yaml" ``` +## Stencil + +`stencil` + +Upload metadata of a given schema `format` in the existing `namespace_id` present in [Stencil](https://github.com/odpf/meteor/tree/cb12c3ecf8904cf3f4ce365ca8981ccd132f35d0/docs/reference/github.com/odpf/stencil/README.md). Request will be sent via HTTP to a given host. + +```yaml +sinks: + name: stencil + config: + host: https://stencil.com + namespace_id: myNamespace + schema_id: mySchema + format: json + send_format_header: false +``` + _**Notes**_ Compass' Type requires certain fields to be sent, hence why `mapping` config is needed to map value from any of our metadata models to any field name when sending to Compass. Supports getting value from nested fields. diff --git a/plugins/sinks/populate.go b/plugins/sinks/populate.go index a1dfa9dab..882b5e85a 100644 --- a/plugins/sinks/populate.go +++ b/plugins/sinks/populate.go @@ -5,4 +5,5 @@ import ( _ "github.com/odpf/meteor/plugins/sinks/console" _ "github.com/odpf/meteor/plugins/sinks/file" _ "github.com/odpf/meteor/plugins/sinks/kafka" + _ "github.com/odpf/meteor/plugins/sinks/stencil" ) diff --git a/plugins/sinks/stencil/README.md b/plugins/sinks/stencil/README.md new file mode 100644 index 000000000..e1fca95b8 --- /dev/null +++ b/plugins/sinks/stencil/README.md @@ -0,0 +1,32 @@ +# Stencil + +Stencil is a schema registry that provides schema management and validation dynamically, efficiently, and reliably to ensure data compatibility across applications. + +## Usage + +```yaml +sinks: + name: stencil + config: + host: https://stencil.com + namespace_id: test-namespace + schema_id: example + format: json + send_format_header: false +``` + +## Config Definition + +| Key | Value | Example | Description | | +| :-- | :---- | :------ | :---------- | :-- | +|`host` | `string` | `https://stencil.com` | The hostname of the stencil service | *required*| +| `namespace_id` | `string` | `myNamespace` | The namespace ID of the stencil service | *required* | +|`schema_id` | `string` | `mySchmea` | The schema ID which will be created in the above-mentioned namespace | *required*| +|`format` | `string` | `json` | The schema format in which data will sink to stencil | *optional*| +|`send_format_header` | `bool` | `false` | If schema format needs to be changed. Suppose changing format from json to avro, +provide below config value as true and schema format in format config. | *optional*| + + +## Contributing + +Refer to the contribution guidelines for information on contributing to this module. \ No newline at end of file diff --git a/plugins/sinks/stencil/schemas.go b/plugins/sinks/stencil/schemas.go new file mode 100644 index 000000000..9a31c0334 --- /dev/null +++ b/plugins/sinks/stencil/schemas.go @@ -0,0 +1,50 @@ +package stencil + +type JsonType string +type AvroType string + +const ( + JsonTypeObject JsonType = "object" + JsonTypeString JsonType = "string" + JsonTypeNumber JsonType = "number" + JsonTypeArray JsonType = "array" + JsonTypeBoolean JsonType = "boolean" + JsonTypeNull JsonType = "null" + + AvroTypeNull AvroType = "null" + AvroTypeBoolean AvroType = "boolean" + AvroTypeInteger AvroType = "int" + AvroTypeLong AvroType = "long" + AvroTypeFloat AvroType = "float" + AvroTypeDouble AvroType = "double" + AvroTypeBytes AvroType = "bytes" + AvroTypeString AvroType = "string" + AvroTypeRecord AvroType = "record" + AvroTypeArray AvroType = "array" + AvroTypeMap AvroType = "map" +) + +type JsonSchema struct { + Id string `json:"$id"` + Schema string `json:"$schema"` + Title string `json:"title"` + Type JsonType `json:"type"` + Properties map[string]JsonProperty `json:"properties"` +} + +type JsonProperty struct { + Type []JsonType `json:"type"` + Description string `json:"description"` +} + +type AvroSchema struct { + Type string `json:"type"` + Namespace string `json:"namespace"` + Name string `json:"name"` + Fields []AvroFields `json:"fields"` +} + +type AvroFields struct { + Name string `json:"name"` + Type interface{} `json:"type"` +} diff --git a/plugins/sinks/stencil/stencil.go b/plugins/sinks/stencil/stencil.go new file mode 100644 index 000000000..3d33f7354 --- /dev/null +++ b/plugins/sinks/stencil/stencil.go @@ -0,0 +1,334 @@ +package stencil + +import ( + "bytes" + "context" + _ "embed" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "strings" + + "github.com/odpf/meteor/models" + assetsv1beta1 "github.com/odpf/meteor/models/odpf/assets/v1beta1" + "github.com/odpf/meteor/plugins" + "github.com/odpf/meteor/registry" + "github.com/odpf/meteor/utils" + "github.com/odpf/salt/log" + "github.com/pkg/errors" +) + +//go:embed README.md +var summary string + +// Config holds the set of configuration options for the sink +type Config struct { + Host string `mapstructure:"host" validate:"required"` + NamespaceID string `mapstructure:"namespace_id" validate:"required"` + Format string `mapstructure:"format" validate:"oneof=json avro" default:"json"` +} + +var sampleConfig = ` +# The hostname of the stencil service +host: https://stencil.com +# The namespace ID of the stencil service +namespace_id: myNamespace +# The schema format in which data will sink to stencil +format: avro +` + +// httpClient holds the set of methods require for creating request +type httpClient interface { + Do(*http.Request) (*http.Response, error) +} + +// Sink manages the sinking of data to Stencil +type Sink struct { + client httpClient + config Config + logger log.Logger +} + +// New returns a pointer to an initialized Sink Object +func New(c httpClient, logger log.Logger) plugins.Syncer { + sink := &Sink{client: c, logger: logger} + return sink +} + +// Info returns the brief information about the sink +func (s *Sink) Info() plugins.Info { + return plugins.Info{ + Description: "Send metadata to stencil http service", + SampleConfig: sampleConfig, + Summary: summary, + Tags: []string{"http", "sink"}, + } +} + +// Validate validates the configuration of the sink +func (s *Sink) Validate(configMap map[string]interface{}) (err error) { + return utils.BuildConfig(configMap, &Config{}) +} + +// Init initializes the sink +func (s *Sink) Init(_ context.Context, configMap map[string]interface{}) (err error) { + if err = utils.BuildConfig(configMap, &s.config); err != nil { + return plugins.InvalidConfigError{Type: plugins.PluginTypeSink} + } + + return +} + +// Sink helps to sink record to stencil +func (s *Sink) Sink(_ context.Context, batch []models.Record) (err error) { + var stencilPayload interface{} + + for _, record := range batch { + metadata := record.Data() + + table, ok := metadata.(*assetsv1beta1.Table) + if !ok { + continue + } + s.logger.Info("sinking record to stencil", "record", table.GetResource().Urn) + + switch s.config.Format { + case "avro": + stencilPayload, err = s.buildAvroStencilPayload(table) + case "json": + stencilPayload, err = s.buildJsonStencilPayload(table) + } + + if err != nil { + return errors.Wrap(err, "failed to build stencil payload") + } + if err = s.send(table.Resource.Urn, stencilPayload); err != nil { + return errors.Wrap(err, "error sending data") + } + + s.logger.Info("successfully sinked record to stencil", "record", table.GetResource().Urn) + } + + return +} + +// Close will be called once after everything is done +func (s *Sink) Close() (err error) { return } + +// buildJsonStencilPayload build json stencil payload +func (s *Sink) buildJsonStencilPayload(table *assetsv1beta1.Table) (JsonSchema, error) { + resource := table.GetResource() + jsonProperties := buildJsonProperties(table) + + record := JsonSchema{ + Id: resource.GetUrn() + ".json", + Schema: "https://json-schema.org/draft/2020-12/schema", + Title: resource.GetName(), + Type: JsonTypeObject, + Properties: jsonProperties, + } + + return record, nil +} + +// buildAvroStencilPayload build Json stencil payload +func (s *Sink) buildAvroStencilPayload(table *assetsv1beta1.Table) (AvroSchema, error) { + resource := table.GetResource() + avroFields := buildAvroFields(table) + + record := AvroSchema{ + Type: "record", + Namespace: s.config.NamespaceID, + Name: resource.GetName(), + Fields: avroFields, + } + + return record, nil +} + +// send helps to pass data to stencil +func (s *Sink) send(tableURN string, record interface{}) (err error) { + payloadBytes, err := json.Marshal(record) + if err != nil { + return + } + + // send request + schemaID := strings.ReplaceAll(tableURN, "/", ".") + url := fmt.Sprintf("%s/v1beta1/namespaces/%s/schemas/%s", s.config.Host, s.config.NamespaceID, schemaID) + req, err := http.NewRequest(http.MethodPost, url, bytes.NewBuffer(payloadBytes)) + if err != nil { + return + } + + if s.config.Format == "json" { + req.Header.Add("X-Compatibility", "COMPATIBILITY_UNSPECIFIED") + } + + res, err := s.client.Do(req) + if err != nil { + return + } + if res.StatusCode == http.StatusCreated { + return + } + + var bodyBytes []byte + bodyBytes, err = ioutil.ReadAll(res.Body) + if err != nil { + return + } + err = fmt.Errorf("stencil returns %d: %v", res.StatusCode, string(bodyBytes)) + + switch code := res.StatusCode; { + case code >= 500: + return plugins.NewRetryError(err) + default: + return err + } +} + +// buildJsonProperties builds the json schema properties +func buildJsonProperties(table *assetsv1beta1.Table) map[string]JsonProperty { + columnRecord := make(map[string]JsonProperty) + service := table.GetResource().GetService() + schema := table.GetSchema() + if schema == nil { + return nil + } + columns := schema.GetColumns() + if len(columns) == 0 { + return nil + } + + for _, column := range columns { + dataType := typeToJsonSchemaType(service, column.DataType) + columnType := []JsonType{dataType} + + if column.IsNullable { + columnType = append(columnType, JsonTypeNull) + } + + columnRecord[column.Name] = JsonProperty{ + Type: columnType, + Description: column.GetDescription(), + } + } + + return columnRecord +} + +// typeToJsonSchemaType converts particular service type to Json type +func typeToJsonSchemaType(service string, columnType string) (dataType JsonType) { + + if service == "bigquery" { + switch columnType { + case "STRING", "DATE", "DATETIME", "TIME", "TIMESTAMP", "GEOGRAPHY": + dataType = JsonTypeString + case "INT64", "NUMERIC", "FLOAT64", "INT", "FLOAT", "BIGNUMERIC": + dataType = JsonTypeNumber + case "BYTES": + dataType = JsonTypeArray + case "BOOLEAN": + dataType = JsonTypeBoolean + case "RECORD": + dataType = JsonTypeObject + default: + dataType = JsonTypeString + } + } + if service == "postgres" { + switch columnType { + case "uuid", "integer", "decimal", "smallint", "bigint", "bit", "bit varying", "numeric", "real", "double precision", "cidr", "inet", "macaddr", "serial", "bigserial", "money": + dataType = JsonTypeNumber + case "varchar", "text", "character", "character varying", "date", "time", "timestamp", "interval", "point", "line", "path": + dataType = JsonTypeString + case "boolean": + dataType = JsonTypeBoolean + case "bytea", "integer[]", "character[]", "text[]": + dataType = JsonTypeArray + default: + dataType = JsonTypeString + } + } + + return +} + +// buildAvroFields builds the avro schema fields +func buildAvroFields(table *assetsv1beta1.Table) (fields []AvroFields) { + service := table.GetResource().GetService() + schema := table.GetSchema() + if schema == nil { + return + } + columns := schema.GetColumns() + if len(columns) == 0 { + return + } + + for _, column := range columns { + dataType := typeToAvroSchemaType(service, column.DataType) + columnType := []AvroType{dataType} + + if column.IsNullable { + columnType = []AvroType{dataType, AvroTypeNull} + } + + fields = append(fields, AvroFields{ + Name: column.Name, + Type: columnType, + }) + } + + return fields +} + +// typeToAvroSchemaType converts particular service type to avro type +func typeToAvroSchemaType(service string, columnType string) (dataType AvroType) { + + if service == "bigquery" { + switch columnType { + case "STRING", "DATE", "DATETIME", "TIME", "TIMESTAMP", "GEOGRAPHY": + dataType = AvroTypeString + case "INT64", "NUMERIC", "INT", "BIGNUMERIC": + dataType = AvroTypeInteger + case "FLOAT64", "FLOAT": + dataType = AvroTypeFloat + case "BYTES": + dataType = AvroTypeBytes + case "BOOLEAN": + dataType = AvroTypeBoolean + case "RECORD": + dataType = AvroTypeRecord + default: + dataType = AvroTypeString + } + } + if service == "postgres" { + switch columnType { + case "uuid", "integer", "decimal", "smallint", "bigint", "bit", "bit varying", "numeric", "real", "double precision", "cidr", "inet", "macaddr", "serial", "bigserial", "money": + dataType = AvroTypeInteger + case "varchar", "text", "character", "character varying", "date", "time", "timestamp", "interval", "point", "line", "path": + dataType = AvroTypeString + case "boolean": + dataType = AvroTypeBoolean + case "bytea", "integer[]", "character[]", "text[]": + dataType = AvroTypeArray + default: + dataType = AvroTypeString + } + } + + return +} + +// init register the sink to the catalog +func init() { + if err := registry.Sinks.Register("stencil", func() plugins.Syncer { + return New(&http.Client{}, plugins.GetLog()) + }); err != nil { + panic(err) + } +} diff --git a/plugins/sinks/stencil/stencil_test.go b/plugins/sinks/stencil/stencil_test.go new file mode 100644 index 000000000..9cced537e --- /dev/null +++ b/plugins/sinks/stencil/stencil_test.go @@ -0,0 +1,645 @@ +package stencil_test + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "net/http" + "testing" + + "github.com/odpf/meteor/models" + commonv1beta1 "github.com/odpf/meteor/models/odpf/assets/common/v1beta1" + facetsv1beta1 "github.com/odpf/meteor/models/odpf/assets/facets/v1beta1" + assetsv1beta1 "github.com/odpf/meteor/models/odpf/assets/v1beta1" + "github.com/odpf/meteor/plugins/sinks/stencil" + + "github.com/odpf/meteor/plugins" + testUtils "github.com/odpf/meteor/test/utils" + "github.com/stretchr/testify/assert" +) + +var ( + host = "https://stencil.com" + namespaceID = "test-namespace" + tableURN = "test-table-urn" + url = fmt.Sprintf("%s/v1beta1/namespaces/%s/schemas/%s", host, namespaceID, tableURN) +) + +func TestInit(t *testing.T) { + t.Run("should return InvalidConfigError on invalid config", func(t *testing.T) { + invalidConfigs := []map[string]interface{}{ + { + "host": "", + "namespace_id": "", + "format": "", + }, + } + for i, config := range invalidConfigs { + t.Run(fmt.Sprintf("test invalid config #%d", i+1), func(t *testing.T) { + stencilSink := stencil.New(newMockHTTPClient(config, http.MethodPost, url, stencil.JsonSchema{}), testUtils.Logger) + err := stencilSink.Init(context.TODO(), config) + + assert.Equal(t, plugins.InvalidConfigError{Type: plugins.PluginTypeSink}, err) + }) + } + }) +} + +func TestSink(t *testing.T) { + t.Run("should return error if stencil host returns error", func(t *testing.T) { + stencilError := `{"code": 0,"message": "string","details": [{"typeUrl": "string","value": "string"}]}` + + errMessage := "error sending data: stencil returns 404: {\"code\": 0,\"message\": \"string\",\"details\": [{\"typeUrl\": \"string\",\"value\": \"string\"}]}" + // setup mock client + url := fmt.Sprintf("%s/v1beta1/namespaces/%s/schemas/%s", host, namespaceID, tableURN) + client := newMockHTTPClient(map[string]interface{}{}, http.MethodPost, url, stencil.JsonSchema{}) + client.SetupResponse(404, stencilError) + ctx := context.TODO() + + stencilSink := stencil.New(client, testUtils.Logger) + err := stencilSink.Init(ctx, map[string]interface{}{ + "host": host, + "namespace_id": namespaceID, + "format": "json", + }) + if err != nil { + t.Fatal(err) + } + + data := &assetsv1beta1.Table{Resource: &commonv1beta1.Resource{Urn: tableURN}} + err = stencilSink.Sink(ctx, []models.Record{models.NewRecord(data)}) + assert.Equal(t, errMessage, err.Error()) + }) + + t.Run("should return RetryError if stencil returns certain status code", func(t *testing.T) { + for _, code := range []int{500, 501, 502, 503, 504, 505} { + t.Run(fmt.Sprintf("%d status code", code), func(t *testing.T) { + url := fmt.Sprintf("%s/v1beta1/namespaces/%s/schemas/%s", host, namespaceID, tableURN) + client := newMockHTTPClient(map[string]interface{}{}, http.MethodPost, url, stencil.JsonSchema{}) + client.SetupResponse(code, `{"reason":"internal server error"}`) + ctx := context.TODO() + + stencilSink := stencil.New(client, testUtils.Logger) + err := stencilSink.Init(ctx, map[string]interface{}{ + "host": host, + "namespace_id": namespaceID, + "format": "json", + }) + if err != nil { + t.Fatal(err) + } + + data := &assetsv1beta1.Table{Resource: &commonv1beta1.Resource{Urn: tableURN}} + err = stencilSink.Sink(ctx, []models.Record{models.NewRecord(data)}) + assert.True(t, errors.Is(err, plugins.RetryError{})) + }) + } + }) + + successJsonTestCases := []struct { + description string + data *assetsv1beta1.Table + config map[string]interface{} + expected stencil.JsonSchema + }{ + { + description: "should create the right request from json schema to stencil when bigquery is the service", + data: &assetsv1beta1.Table{ + Resource: &commonv1beta1.Resource{ + Urn: tableURN, + Name: "table-name", + Service: "bigquery", + }, + Schema: &facetsv1beta1.Columns{ + Columns: []*facetsv1beta1.Column{ + { + Name: "id", + Description: "It is the ID", + DataType: "INT", + IsNullable: true, + }, + { + Name: "user_id", + Description: "It is the user ID", + DataType: "STRING", + IsNullable: false, + }, + { + Name: "email_id", + Description: "It is the email ID", + IsNullable: true, + }, + { + Name: "description", + Description: "It is the description", + DataType: "STRING", + IsNullable: true, + }, + { + Name: "is_active", + Description: "It shows user regularity", + DataType: "BOOLEAN", + IsNullable: false, + }, + { + Name: "address", + Description: "It shows user address", + DataType: "RECORD", + IsNullable: false, + }, + { + Name: "range", + Description: "It is the range", + DataType: "BYTES", + IsNullable: false, + }, + }, + }, + }, + config: map[string]interface{}{ + "host": host, + "namespace_id": namespaceID, + "format": "json", + }, + expected: stencil.JsonSchema{ + Id: fmt.Sprintf("%s.json", tableURN), + Schema: "https://json-schema.org/draft/2020-12/schema", + Title: "table-name", + Type: "object", + Properties: map[string]stencil.JsonProperty{ + "id": { + Type: []stencil.JsonType{stencil.JsonTypeNumber, stencil.JsonTypeNull}, + Description: "It is the ID", + }, + "user_id": { + Type: []stencil.JsonType{stencil.JsonTypeString}, + Description: "It is the user ID", + }, + "email_id": { + Type: []stencil.JsonType{stencil.JsonTypeString, stencil.JsonTypeNull}, + Description: "It is the email ID", + }, + "description": { + Type: []stencil.JsonType{stencil.JsonTypeString, stencil.JsonTypeNull}, + Description: "It is the description", + }, + "is_active": { + Type: []stencil.JsonType{stencil.JsonTypeBoolean}, + Description: "It shows user regularity", + }, + "address": { + Type: []stencil.JsonType{stencil.JsonTypeObject}, + Description: "It shows user address", + }, + "range": { + Type: []stencil.JsonType{stencil.JsonTypeArray}, + Description: "It is the range", + }, + }, + }, + }, + { + description: "should create the right request from json schema to stencil when postgres is the service", + data: &assetsv1beta1.Table{ + Resource: &commonv1beta1.Resource{ + Urn: tableURN, + Name: "table-name", + Service: "postgres", + }, + Schema: &facetsv1beta1.Columns{ + Columns: []*facetsv1beta1.Column{ + { + Name: "id", + Description: "It is the ID", + DataType: "integer", + IsNullable: true, + }, + { + Name: "user_id", + Description: "It is the user ID", + DataType: "varchar", + IsNullable: false, + }, + { + Name: "email_id", + Description: "It is the email ID", + IsNullable: true, + }, + { + Name: "description", + Description: "It is the description", + DataType: "varchar", + IsNullable: true, + }, + { + Name: "is_active", + Description: "It shows user regularity", + DataType: "boolean", + IsNullable: false, + }, + { + Name: "range", + Description: "It is the range", + DataType: "bytea", + IsNullable: false, + }, + }, + }, + }, + config: map[string]interface{}{ + "host": host, + "namespace_id": namespaceID, + "format": "json", + }, + expected: stencil.JsonSchema{ + Id: fmt.Sprintf("%s.json", tableURN), + Schema: "https://json-schema.org/draft/2020-12/schema", + Title: "table-name", + Type: "object", + Properties: map[string]stencil.JsonProperty{ + "id": { + Type: []stencil.JsonType{stencil.JsonTypeNumber, stencil.JsonTypeNull}, + Description: "It is the ID", + }, + "user_id": { + Type: []stencil.JsonType{stencil.JsonTypeString}, + Description: "It is the user ID", + }, + "email_id": { + Type: []stencil.JsonType{stencil.JsonTypeString, stencil.JsonTypeNull}, + Description: "It is the email ID", + }, + "description": { + Type: []stencil.JsonType{stencil.JsonTypeString, stencil.JsonTypeNull}, + Description: "It is the description", + }, + "is_active": { + Type: []stencil.JsonType{stencil.JsonTypeBoolean}, + Description: "It shows user regularity", + }, + "range": { + Type: []stencil.JsonType{stencil.JsonTypeArray}, + Description: "It is the range", + }, + }, + }, + }, + { + description: "should return correct schema request with valid config", + data: &assetsv1beta1.Table{ + Resource: &commonv1beta1.Resource{ + Urn: tableURN, + Name: "table-name", + Service: "bigquery", + }, + Schema: &facetsv1beta1.Columns{ + Columns: []*facetsv1beta1.Column{ + { + Name: "id", + Description: "It is the ID", + DataType: "INT", + IsNullable: true, + }, + { + Name: "user_id", + Description: "It is the user ID", + DataType: "STRING", + IsNullable: false, + }, + }, + }, + }, + config: map[string]interface{}{ + "host": host, + "namespace_id": namespaceID, + "format": "json", + }, + expected: stencil.JsonSchema{ + Id: fmt.Sprintf("%s.json", tableURN), + Schema: "https://json-schema.org/draft/2020-12/schema", + Title: "table-name", + Type: "object", + Properties: map[string]stencil.JsonProperty{ + "id": { + Type: []stencil.JsonType{stencil.JsonTypeNumber, stencil.JsonTypeNull}, + Description: "It is the ID", + }, + "user_id": { + Type: []stencil.JsonType{stencil.JsonTypeString}, + Description: "It is the user ID", + }, + }, + }, + }, + } + + for _, tc := range successJsonTestCases { + t.Run(tc.description, func(t *testing.T) { + payload := stencil.JsonSchema{ + Id: tc.expected.Id, + Schema: tc.expected.Schema, + Title: tc.expected.Title, + Type: tc.expected.Type, + Properties: tc.expected.Properties, + } + + client := newMockHTTPClient(tc.config, http.MethodPost, url, payload) + client.SetupResponse(http.StatusCreated, "") + ctx := context.TODO() + + stencilSink := stencil.New(client, testUtils.Logger) + err := stencilSink.Init(ctx, tc.config) + if err != nil { + t.Fatal(err) + } + + err = stencilSink.Sink(ctx, []models.Record{models.NewRecord(tc.data)}) + assert.NoError(t, err) + + client.Assert(t) + }) + } + + successAvroTestCases := []struct { + description string + data *assetsv1beta1.Table + config map[string]interface{} + expected stencil.AvroSchema + }{ + { + description: "should create the right request from avro schema to stencil when bigquery is the service", + data: &assetsv1beta1.Table{ + Resource: &commonv1beta1.Resource{ + Urn: tableURN, + Name: "table-name", + Type: "table", + Service: "bigquery", + }, + Schema: &facetsv1beta1.Columns{ + Columns: []*facetsv1beta1.Column{ + { + Name: "id", + Description: "It is the ID", + DataType: "INT", + IsNullable: true, + }, + { + Name: "user_id", + Description: "It is the user ID", + DataType: "STRING", + IsNullable: false, + }, + { + Name: "description", + Description: "It is the description", + IsNullable: true, + }, + { + Name: "distance", + Description: "It is the user distance from source", + DataType: "FLOAT", + IsNullable: true, + }, + { + Name: "is_active", + Description: "It shows user regularity", + DataType: "BOOLEAN", + IsNullable: false, + }, + { + Name: "address", + Description: "It shows user address", + DataType: "RECORD", + IsNullable: false, + }, + { + Name: "range", + Description: "It is the range", + DataType: "BYTES", + IsNullable: false, + }, + }, + }, + }, + config: map[string]interface{}{ + "host": host, + "namespace_id": namespaceID, + "format": "avro", + }, + expected: stencil.AvroSchema{ + Type: "record", + Namespace: namespaceID, + Name: "table-name", + Fields: []stencil.AvroFields{ + { + Name: "id", + Type: []stencil.AvroType{stencil.AvroTypeInteger, stencil.AvroTypeNull}, + }, + { + Name: "user_id", + Type: []stencil.AvroType{stencil.AvroTypeString}, + }, + { + Name: "description", + Type: []stencil.AvroType{stencil.AvroTypeString, stencil.AvroTypeNull}, + }, + { + Name: "distance", + Type: []stencil.AvroType{stencil.AvroTypeFloat, stencil.AvroTypeNull}, + }, + { + Name: "is_active", + Type: []stencil.AvroType{stencil.AvroTypeBoolean}, + }, + { + Name: "address", + Type: []stencil.AvroType{stencil.AvroTypeRecord}, + }, + { + Name: "range", + Type: []stencil.AvroType{stencil.AvroTypeBytes}, + }, + }, + }, + }, + { + description: "should create the right request to stencil when postgres is the service", + data: &assetsv1beta1.Table{ + Resource: &commonv1beta1.Resource{ + Urn: tableURN, + Name: "table-name", + Type: "table", + Service: "postgres", + }, + Schema: &facetsv1beta1.Columns{ + Columns: []*facetsv1beta1.Column{ + { + Name: "id", + Description: "It is the ID", + DataType: "integer", + IsNullable: true, + }, + { + Name: "user_id", + Description: "It is the user ID", + DataType: "varchar", + IsNullable: false, + }, + { + Name: "description", + Description: "It is the description", + IsNullable: true, + }, + { + Name: "is_active", + Description: "It shows user regularity", + DataType: "boolean", + IsNullable: false, + }, + { + Name: "range", + Description: "It is the range", + DataType: "bytea", + IsNullable: false, + }, + }, + }, + }, + config: map[string]interface{}{ + "host": host, + "namespace_id": namespaceID, + "schema_id": tableURN, + "format": "avro", + }, + expected: stencil.AvroSchema{ + Type: "record", + Namespace: namespaceID, + Name: "table-name", + Fields: []stencil.AvroFields{ + { + Name: "id", + Type: []stencil.AvroType{stencil.AvroTypeInteger, stencil.AvroTypeNull}, + }, + { + Name: "user_id", + Type: []stencil.AvroType{stencil.AvroTypeString}, + }, + { + Name: "description", + Type: []stencil.AvroType{stencil.AvroTypeString, stencil.AvroTypeNull}, + }, + { + Name: "is_active", + Type: []stencil.AvroType{stencil.AvroTypeBoolean}, + }, + { + Name: "range", + Type: []stencil.AvroType{stencil.AvroTypeArray}, + }, + }, + }, + }, + } + + for _, tc := range successAvroTestCases { + t.Run(tc.description, func(t *testing.T) { + payload := stencil.AvroSchema{ + Type: tc.expected.Type, + Namespace: tc.expected.Namespace, + Name: tc.expected.Name, + Fields: tc.expected.Fields, + } + + client := newMockHTTPClient(tc.config, http.MethodPost, url, payload) + client.SetupResponse(http.StatusCreated, "") + ctx := context.TODO() + + stencilSink := stencil.New(client, testUtils.Logger) + err := stencilSink.Init(ctx, tc.config) + if err != nil { + t.Fatal(err) + } + + err = stencilSink.Sink(ctx, []models.Record{models.NewRecord(tc.data)}) + assert.NoError(t, err) + + client.Assert(t) + }) + } +} + +type mockHTTPClient struct { + URL string + Method string + Headers map[string]string + RequestPayload interface{} + ResponseJSON string + ResponseStatus int + req *http.Request +} + +func newMockHTTPClient(config map[string]interface{}, method, url string, payload interface{}) *mockHTTPClient { + headersMap := map[string]string{} + if headersItf, ok := config["headers"]; ok { + headersMap = headersItf.(map[string]string) + } + return &mockHTTPClient{ + Method: method, + URL: url, + Headers: headersMap, + RequestPayload: payload, + } +} + +func (m *mockHTTPClient) SetupResponse(statusCode int, json string) { + m.ResponseStatus = statusCode + m.ResponseJSON = json +} + +func (m *mockHTTPClient) Do(req *http.Request) (res *http.Response, err error) { + m.req = req + + res = &http.Response{ + // default values + Proto: "HTTP/1.1", + ProtoMajor: 1, + ProtoMinor: 1, + StatusCode: m.ResponseStatus, + Request: req, + Header: make(http.Header), + ContentLength: int64(len(m.ResponseJSON)), + Body: ioutil.NopCloser(bytes.NewBufferString(m.ResponseJSON)), + } + + return +} + +func (m *mockHTTPClient) Assert(t *testing.T) { + assert.Equal(t, m.Method, m.req.Method) + actualURL := fmt.Sprintf( + "%s://%s%s", + m.req.URL.Scheme, + m.req.URL.Host, + m.req.URL.Path, + ) + assert.Equal(t, m.URL, actualURL) + + var bodyBytes = []byte("") + if m.req.Body != nil { + var err error + bodyBytes, err = ioutil.ReadAll(m.req.Body) + if err != nil { + t.Error(err) + } + } + + expectedBytes, err := json.Marshal(m.RequestPayload) + if err != nil { + t.Error(err) + } + + assert.Equal(t, string(expectedBytes), string(bodyBytes)) +}