Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions plugins/extractors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ func (e *Extractor) buildTable(ctx context.Context, t *bigquery.Table, md *bigqu
Resource: &commonv1beta1.Resource{
Urn: tableURN,
Name: t.TableID,
Type: "table",
Description: md.Description,
Service: "bigquery",
},
Expand Down
1 change: 1 addition & 0 deletions plugins/extractors/bigtable/bigtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ func (e *Extractor) getTablesInfo(ctx context.Context, emit plugins.Emit) (err e
Urn: fmt.Sprintf("%s.%s.%s", e.config.ProjectID, instance, table),
Name: table,
Service: "bigtable",
Type: "table",
},
Properties: &facetsv1beta1.Properties{
Attributes: utils.TryParseMapToProto(map[string]interface{}{
Expand Down
1 change: 1 addition & 0 deletions plugins/extractors/cassandra/cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ func (e *Extractor) processTable(keyspace string, tableName string) (err error)
Resource: &commonv1beta1.Resource{
Urn: fmt.Sprintf("%s.%s", keyspace, tableName),
Name: tableName,
Type: "table",
},
Schema: &facetsv1beta1.Columns{
Columns: columns,
Expand Down
1 change: 1 addition & 0 deletions plugins/extractors/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func (e *Extractor) extractTables(emit plugins.Emit) (err error) {
Resource: &commonv1beta1.Resource{
Urn: fmt.Sprintf("%s.%s", dbName, tableName),
Name: tableName,
Type: "table",
}, Schema: &facetsv1beta1.Columns{
Columns: columns,
},
Expand Down
1 change: 1 addition & 0 deletions plugins/extractors/couchdb/couchdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ func (e *Extractor) processTable(ctx context.Context, dbName string, docID strin
Resource: &commonv1beta1.Resource{
Urn: fmt.Sprintf("%s.%s", dbName, docID),
Name: docID,
Type: "table",
},
Schema: &facetsv1beta1.Columns{
Columns: columns,
Expand Down
1 change: 1 addition & 0 deletions plugins/extractors/csv/csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ func (e *Extractor) buildTable(filePath string) (table *assetsv1beta1.Table, err
Urn: fileName,
Name: fileName,
Service: "csv",
Type: "table",
},
Schema: &facetsv1beta1.Columns{
Columns: e.buildColumns(content),
Expand Down
1 change: 1 addition & 0 deletions plugins/extractors/elastic/elastic.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) (err error)
Resource: &commonv1beta1.Resource{
Urn: fmt.Sprintf("%s.%s", "elasticsearch", indexName),
Name: indexName,
Type: "table",
},
Schema: &facetsv1beta1.Columns{
Columns: columns,
Expand Down
1 change: 1 addition & 0 deletions plugins/extractors/gcs/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ func (e *Extractor) buildBucket(b *storage.BucketAttrs, projectID string, blobs
Urn: fmt.Sprintf("%s/%s", projectID, b.Name),
Name: b.Name,
Service: metadataSource,
Type: "bucket",
},
Location: b.Location,
StorageType: b.StorageClass,
Expand Down
3 changes: 2 additions & 1 deletion plugins/extractors/github/github.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) (err error)
}
emit(models.NewRecord(&assetsv1beta1.User{
Resource: &commonv1beta1.Resource{
Urn: usr.GetURL(),
Urn: usr.GetURL(),
Type: "user",
},
Email: usr.GetEmail(),
Username: usr.GetLogin(),
Expand Down
1 change: 1 addition & 0 deletions plugins/extractors/grafana/grafana.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func (e *Extractor) grafanaDashboardToMeteorDashboard(dashboard DashboardDetail)
Resource: &commonv1beta1.Resource{
Urn: fmt.Sprintf("grafana.%s", dashboard.Dashboard.UID),
Name: dashboard.Meta.Slug,
Type: "dashboard",
Service: "grafana",
Url: dashboard.Meta.URL,
Description: dashboard.Dashboard.Description,
Expand Down
1 change: 1 addition & 0 deletions plugins/extractors/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ func (e *Extractor) buildTopic(topic string, numOfPartitions int) *assetsv1beta1
Urn: fmt.Sprintf("kafka::%s/%s", e.config.Label, topic),
Name: topic,
Service: "kafka",
Type: "topic",
},
Profile: &assetsv1beta1.TopicProfile{
NumberOfPartitions: int64(numOfPartitions),
Expand Down
1 change: 1 addition & 0 deletions plugins/extractors/mariadb/mariadb.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ func (e *Extractor) processTable(database string, tableName string) (err error)
Resource: &commonv1beta1.Resource{
Urn: fmt.Sprintf("%s.%s", database, tableName),
Name: tableName,
Type: "table",
},
Schema: &facetsv1beta1.Columns{
Columns: columns,
Expand Down
1 change: 1 addition & 0 deletions plugins/extractors/metabase/metabase.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ func (e *Extractor) buildDashboard(d Dashboard) (data *assetsv1beta1.Dashboard,
Urn: dashboardUrn,
Name: dashboard.Name,
Service: "metabase",
Type: "dashboard",
Description: dashboard.Description,
},
Charts: charts,
Expand Down
1 change: 1 addition & 0 deletions plugins/extractors/mongodb/mongodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ func (e *Extractor) buildTable(ctx context.Context, db *mongo.Database, collecti
Resource: &commonv1beta1.Resource{
Urn: fmt.Sprintf("%s.%s", db.Name(), collectionName),
Name: collectionName,
Type: "table",
},
Profile: &assetsv1beta1.TableProfile{
TotalRows: totalRows,
Expand Down
1 change: 1 addition & 0 deletions plugins/extractors/mssql/mssql.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ func (e *Extractor) processTable(database string, tableName string) (err error)
Resource: &commonv1beta1.Resource{
Urn: fmt.Sprintf("%s.%s", database, tableName),
Name: tableName,
Type: "table",
},
Schema: &facetsv1beta1.Columns{
Columns: columns,
Expand Down
1 change: 1 addition & 0 deletions plugins/extractors/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ func (e *Extractor) processTable(database string, tableName string) (err error)
Resource: &commonv1beta1.Resource{
Urn: fmt.Sprintf("%s.%s", database, tableName),
Name: tableName,
Type: "table",
},
Schema: &facetsv1beta1.Columns{
Columns: columns,
Expand Down
1 change: 1 addition & 0 deletions plugins/extractors/optimus/optimus.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ func (e *Extractor) buildJob(ctx context.Context, jobSpec *pb.JobSpecification,
Name: jobSpec.Name,
Service: service,
Description: jobSpec.Description,
Type: "job",
},
Ownership: &facetsv1beta1.Ownership{
Owners: []*facetsv1beta1.Owner{
Expand Down
1 change: 1 addition & 0 deletions plugins/extractors/oracle/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ func (e *Extractor) getTableMetadata(db *sql.DB, dbName string, tableName string
Urn: fmt.Sprintf("%s.%s", dbName, tableName),
Name: tableName,
Service: "Oracle",
Type: "table",
},
Schema: &facetsv1beta1.Columns{
Columns: columns,
Expand Down
1 change: 1 addition & 0 deletions plugins/extractors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ func (e *Extractor) getTableMetadata(db *sql.DB, dbName string, tableName string
Urn: models.TableURN("postgres", e.host, dbName, tableName),
Name: tableName,
Service: "postgres",
Type: "table",
},
Schema: &facetsv1beta1.Columns{
Columns: columns,
Expand Down
1 change: 1 addition & 0 deletions plugins/extractors/presto/presto.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ func (e *Extractor) processTable(db *sql.DB, catalog string, database string, ta
Urn: fmt.Sprintf("%s.%s.%s", catalog, database, tableName),
Name: tableName,
Service: "presto",
Type: "table",
},
Schema: &facetsv1beta1.Columns{
Columns: columns,
Expand Down
1 change: 1 addition & 0 deletions plugins/extractors/redshift/redshift.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ func (e *Extractor) getTableMetadata(dbName string, tableName string) (result *a
Resource: &commonv1beta1.Resource{
Urn: models.TableURN("redshift", e.config.AWSRegion, dbName, tableName),
Name: tableName,
Type: "table",
Service: "redshift",
},
Schema: &facetsv1beta1.Columns{
Expand Down
1 change: 1 addition & 0 deletions plugins/extractors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ func (e *Extractor) processTable(database string, tableName string) (err error)
Urn: fmt.Sprintf("%s.%s", database, tableName),
Name: tableName,
Service: "Snowflake",
Type: "table",
},
Schema: &facetsv1beta1.Columns{
Columns: columns,
Expand Down
1 change: 1 addition & 0 deletions plugins/extractors/superset/superset.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ func (e *Extractor) buildDashboard(id int) (data *assetsv1beta1.Dashboard, err e
Name: dashboard.DashboardTitle,
Service: "superset",
Url: dashboard.URL,
Type: "dashboard",
},
Charts: chart,
}
Expand Down
1 change: 1 addition & 0 deletions plugins/extractors/tableau/tableau.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func (e *Extractor) buildDashboard(wb *Workbook) (data *assetsv1beta1.Dashboard,
Urn: dashboardURN,
Name: wb.Name,
Service: "tableau",
Type: "dashboard",
Description: wb.Description,
},
Charts: e.buildCharts(dashboardURN, wb, lineages),
Expand Down
1 change: 0 additions & 1 deletion plugins/sinks/columbus/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ sinks:
name: columbus
config:
host: https://columbus.com
type: sample-columbus-type
headers:
Columbus-User-Email: [email protected]
Header-1: value11,value12
Expand Down
20 changes: 13 additions & 7 deletions plugins/sinks/columbus/columbus.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,26 @@
package columbus

type Record struct {
Urn string `json:"urn"`
type RequestPayload struct {
Asset Asset `json:"asset"`
Upstreams []LineageRecord `json:"upstreams"`
Downstreams []LineageRecord `json:"downstreams"`
}

type Asset struct {
URN string `json:"urn"`
Type string `json:"type"`
Name string `json:"name"`
Service string `json:"service"`
Upstreams []LineageRecord `json:"upstreams"`
Downstreams []LineageRecord `json:"downstreams"`
Owners []Owner `json:"owners"`
Description string `json:"description"`
Owners []Owner `json:"owners"`
Data interface{} `json:"data"`
Labels map[string]string `json:"labels"`
}

type LineageRecord struct {
Urn string `json:"urn"`
Type string `json:"type"`
URN string `json:"urn"`
Type string `json:"type"`
Service string `json:"service"`
}

type Owner struct {
Expand Down
59 changes: 33 additions & 26 deletions plugins/sinks/columbus/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,37 +6,38 @@ import (
_ "embed"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"strings"

"github.com/odpf/meteor/models"
"github.com/odpf/meteor/plugins"
"github.com/odpf/meteor/registry"
"github.com/odpf/meteor/utils"
"github.com/odpf/salt/log"
"github.com/pkg/errors"
"io/ioutil"
"net/http"
"strings"
)

//go:embed README.md
var summary string

type Config struct {
Host string `mapstructure:"host" validate:"required"`
Type string `mapstructure:"type" validate:"required"`
Headers map[string]string `mapstructure:"headers"`
Labels map[string]string `mapstructure:"labels"`
}

var sampleConfig = `
# The hostnmame of the columbus service
# The hostname of the columbus service
host: https://columbus.com
# The type of the data to send
type: sample-columbus-type
# Additional HTTP headers send to columbus, multiple headers value are separated by a comma
headers:
Columbus-User-Email: [email protected]
X-Other-Header: value1, value2`
X-Other-Header: value1, value2
# The labels to pass as payload label of the patch api
labels:
myCustom: $properties.attributes.myCustomField
sampleLabel: $properties.labels.sampleLabelField
`

type httpClient interface {
Do(*http.Request) (*http.Response, error)
Expand Down Expand Up @@ -95,15 +96,15 @@ func (s *Sink) Sink(ctx context.Context, batch []models.Record) (err error) {

func (s *Sink) Close() (err error) { return }

func (s *Sink) send(record Record) (err error) {
payloadBytes, err := json.Marshal([]Record{record})
func (s *Sink) send(record RequestPayload) (err error) {
payloadBytes, err := json.Marshal(record)
if err != nil {
return
}

// send request
url := fmt.Sprintf("%s/v1beta1/types/%s/records", s.config.Host, s.config.Type)
req, err := http.NewRequest(http.MethodPut, url, bytes.NewBuffer(payloadBytes))
url := fmt.Sprintf("%s/v1beta1/assets", s.config.Host)
req, err := http.NewRequest(http.MethodPatch, url, bytes.NewBuffer(payloadBytes))
if err != nil {
return
}
Expand Down Expand Up @@ -138,22 +139,26 @@ func (s *Sink) send(record Record) (err error) {
}
}

func (s *Sink) buildColumbusPayload(metadata models.Metadata) (Record, error) {
func (s *Sink) buildColumbusPayload(metadata models.Metadata) (RequestPayload, error) {
labels, err := s.buildLabels(metadata)
if err != nil {
return Record{}, errors.Wrap(err, "failed to build labels")
return RequestPayload{}, errors.Wrap(err, "failed to build labels")
}

upstreams, downstreams := s.buildLineage(metadata)
owners := s.buildOwners(metadata)
resource := metadata.GetResource()
record := Record{
Urn: resource.GetUrn(),
Name: resource.GetName(),
Service: resource.GetService(),
Data: metadata,
Labels: labels,
Owners: owners,
record := RequestPayload{
Asset: Asset{
URN: resource.GetUrn(),
Type: resource.GetType(),
Name: resource.GetName(),
Service: resource.GetService(),
Description: resource.GetDescription(),
Owners: owners,
Data: metadata,
Labels: labels,
},
Upstreams: upstreams,
Downstreams: downstreams,
}
Expand All @@ -174,14 +179,16 @@ func (s *Sink) buildLineage(metadata models.Metadata) (upstreams, downstreams []

for _, upstream := range lineage.Upstreams {
upstreams = append(upstreams, LineageRecord{
Urn: upstream.Urn,
Type: upstream.Type,
URN: upstream.Urn,
Type: upstream.Type,
Service: upstream.Service,
})
}
for _, downstream := range lineage.Downstreams {
downstreams = append(downstreams, LineageRecord{
Urn: downstream.Urn,
Type: downstream.Type,
URN: downstream.Urn,
Type: downstream.Type,
Service: downstream.Service,
})
}

Expand Down
Loading