diff --git a/Makefile b/Makefile index 7c9b30137..17fad7a30 100644 --- a/Makefile +++ b/Makefile @@ -27,5 +27,5 @@ test-e2e: generate-proto: ## regenerate protos @echo " > cloning protobuf from odpf/proton" @echo " > generating protobuf" - @buf generate --template buf.gen.yaml https://github.com/odpf/proton/archive/52353ad461321cb601b6c963c26f0ee50e0d398b.zip#strip_components=1 --path odpf/assets + @buf generate --template buf.gen.yaml https://github.com/odpf/proton/archive/135a03875fc82b501e686924457a44730fd04268.zip#strip_components=1 --path odpf/assets @echo " > protobuf compilation finished" \ No newline at end of file diff --git a/go.mod b/go.mod index 64be5c2ea..9506bd0f5 100644 --- a/go.mod +++ b/go.mod @@ -5,10 +5,12 @@ go 1.16 require ( cloud.google.com/go/bigquery v1.8.0 cloud.google.com/go/bigtable v1.10.1 + cloud.google.com/go/logging v1.4.2 cloud.google.com/go/storage v1.16.1 github.com/ClickHouse/clickhouse-go v1.4.5 github.com/MakeNowJust/heredoc v1.0.0 github.com/Microsoft/go-winio v0.5.0 // indirect + github.com/alecthomas/assert v0.0.0-20170929043011-405dbfeb8e38 github.com/blastrain/vitess-sqlparser v0.0.0-20201030050434-a139afbb1aba github.com/cenkalti/backoff/v4 v4.1.1 github.com/containerd/continuity v0.1.0 // indirect @@ -55,6 +57,7 @@ require ( golang.org/x/oauth2 v0.0.0-20210819190943-2bc19b11175f golang.org/x/sys v0.0.0-20210903071746-97244b99971b // indirect google.golang.org/api v0.56.0 + google.golang.org/genproto v0.0.0-20210903162649-d08c68adba83 google.golang.org/grpc v1.40.0 google.golang.org/protobuf v1.27.1 gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b diff --git a/go.sum b/go.sum index d65962ffe..143838cc6 100644 --- a/go.sum +++ b/go.sum @@ -50,6 +50,8 @@ cloud.google.com/go/datastore v1.1.0/go.mod h1:umbIZjpQpHh4hmRpGhH4tLFup+FVzqBi1 cloud.google.com/go/firestore v1.1.0/go.mod h1:ulACoGHTpvq5r8rxGJ4ddJZBZqakUQqClKRT5SZwBmk= cloud.google.com/go/firestore v1.5.0/go.mod h1:c4nNYR1qdq7eaZ+jSc5fonrQN2k3M7sWATcYTiakjEo= cloud.google.com/go/kms v0.1.0/go.mod h1:8Qp8PCAypHg4FdmlyW1QRAv09BGQ9Uzh7JnmIZxPk+c= +cloud.google.com/go/logging v1.4.2 h1:Mu2Q75VBDQlW1HlBMjTX4X84UFR73G1TiLlRYc/b7tA= +cloud.google.com/go/logging v1.4.2/go.mod h1:jco9QZSx8HiVVqLJReq7z7bVdj0P1Jb9PDFs63T+axo= cloud.google.com/go/monitoring v0.1.0/go.mod h1:Hpm3XfzJv+UTiXzCG5Ffp0wijzHTC7Cv4eR7o3x/fEE= cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2kNxGRt3I= cloud.google.com/go/pubsub v1.1.0/go.mod h1:EwwdRX2sKPjnvnqCa270oGRyludottCI76h+R3AArQw= diff --git a/models/odpf/assets/table.pb.go b/models/odpf/assets/table.pb.go index 9c2aea7cc..05bae5bfa 100644 --- a/models/odpf/assets/table.pb.go +++ b/models/odpf/assets/table.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.26.0 -// protoc v3.18.1 +// protoc v3.19.1 // source: odpf/assets/table.proto package assets @@ -163,8 +163,11 @@ type TableProfile struct { TotalRows int64 `protobuf:"varint,1,opt,name=total_rows,json=totalRows,proto3" json:"total_rows,omitempty"` // The number of rows in the table that are not deleted. // Example: `event_timestamp`. - PartitionKey string `protobuf:"bytes,2,opt,name=partition_key,json=partitionKey,proto3" json:"partition_key,omitempty"` - PartitionValue string `protobuf:"bytes,3,opt,name=partition_value,json=partitionValue,proto3" json:"partition_value,omitempty"` + PartitionKey string `protobuf:"bytes,2,opt,name=partition_key,json=partitionKey,proto3" json:"partition_key,omitempty"` + PartitionValue string `protobuf:"bytes,3,opt,name=partition_value,json=partitionValue,proto3" json:"partition_value,omitempty"` + UsageCount int64 `protobuf:"varint,4,opt,name=usage_count,json=usageCount,proto3" json:"usage_count,omitempty"` + Joins []*Join `protobuf:"bytes,5,rep,name=joins,proto3" json:"joins,omitempty"` + Filters []string `protobuf:"bytes,6,rep,name=filters,proto3" json:"filters,omitempty"` } func (x *TableProfile) Reset() { @@ -220,6 +223,91 @@ func (x *TableProfile) GetPartitionValue() string { return "" } +func (x *TableProfile) GetUsageCount() int64 { + if x != nil { + return x.UsageCount + } + return 0 +} + +func (x *TableProfile) GetJoins() []*Join { + if x != nil { + return x.Joins + } + return nil +} + +func (x *TableProfile) GetFilters() []string { + if x != nil { + return x.Filters + } + return nil +} + +// Join is the metric of which are other tables that are joined with this table +type Join struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Urn string `protobuf:"bytes,1,opt,name=urn,proto3" json:"urn,omitempty"` + Count int64 `protobuf:"varint,2,opt,name=count,proto3" json:"count,omitempty"` + Conditions []string `protobuf:"bytes,3,rep,name=conditions,proto3" json:"conditions,omitempty"` +} + +func (x *Join) Reset() { + *x = Join{} + if protoimpl.UnsafeEnabled { + mi := &file_odpf_assets_table_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Join) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Join) ProtoMessage() {} + +func (x *Join) ProtoReflect() protoreflect.Message { + mi := &file_odpf_assets_table_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Join.ProtoReflect.Descriptor instead. +func (*Join) Descriptor() ([]byte, []int) { + return file_odpf_assets_table_proto_rawDescGZIP(), []int{2} +} + +func (x *Join) GetUrn() string { + if x != nil { + return x.Urn + } + return "" +} + +func (x *Join) GetCount() int64 { + if x != nil { + return x.Count + } + return 0 +} + +func (x *Join) GetConditions() []string { + if x != nil { + return x.Conditions + } + return nil +} + var File_odpf_assets_table_proto protoreflect.FileDescriptor var file_odpf_assets_table_proto_rawDesc = []byte{ @@ -275,19 +363,30 @@ var file_odpf_assets_table_proto_rawDesc = []byte{ 0x6d, 0x70, 0x73, 0x12, 0x2f, 0x0a, 0x05, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x18, 0x64, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x6f, 0x64, 0x70, 0x66, 0x2e, 0x61, 0x73, 0x73, 0x65, 0x74, 0x73, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x05, 0x65, - 0x76, 0x65, 0x6e, 0x74, 0x22, 0x7b, 0x0a, 0x0c, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x50, 0x72, 0x6f, - 0x66, 0x69, 0x6c, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x5f, 0x72, 0x6f, - 0x77, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x09, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x52, - 0x6f, 0x77, 0x73, 0x12, 0x23, 0x0a, 0x0d, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, - 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x70, 0x61, 0x72, 0x74, - 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x4b, 0x65, 0x79, 0x12, 0x27, 0x0a, 0x0f, 0x70, 0x61, 0x72, 0x74, - 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x0e, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x56, 0x61, 0x6c, 0x75, - 0x65, 0x42, 0x3b, 0x0a, 0x0e, 0x69, 0x6f, 0x2e, 0x6f, 0x64, 0x70, 0x66, 0x2e, 0x61, 0x73, 0x73, - 0x65, 0x74, 0x73, 0x42, 0x0a, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x5a, - 0x1d, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6f, 0x64, 0x70, 0x66, - 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x6e, 0x2f, 0x61, 0x73, 0x73, 0x65, 0x74, 0x73, 0x62, 0x06, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x76, 0x65, 0x6e, 0x74, 0x22, 0xdf, 0x01, 0x0a, 0x0c, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x50, 0x72, + 0x6f, 0x66, 0x69, 0x6c, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x5f, 0x72, + 0x6f, 0x77, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x09, 0x74, 0x6f, 0x74, 0x61, 0x6c, + 0x52, 0x6f, 0x77, 0x73, 0x12, 0x23, 0x0a, 0x0d, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, + 0x6e, 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x70, 0x61, 0x72, + 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x4b, 0x65, 0x79, 0x12, 0x27, 0x0a, 0x0f, 0x70, 0x61, 0x72, + 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x03, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x0e, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x56, 0x61, 0x6c, + 0x75, 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x75, 0x73, 0x61, 0x67, 0x65, 0x5f, 0x63, 0x6f, 0x75, 0x6e, + 0x74, 0x18, 0x04, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0a, 0x75, 0x73, 0x61, 0x67, 0x65, 0x43, 0x6f, + 0x75, 0x6e, 0x74, 0x12, 0x27, 0x0a, 0x05, 0x6a, 0x6f, 0x69, 0x6e, 0x73, 0x18, 0x05, 0x20, 0x03, + 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x6f, 0x64, 0x70, 0x66, 0x2e, 0x61, 0x73, 0x73, 0x65, 0x74, 0x73, + 0x2e, 0x4a, 0x6f, 0x69, 0x6e, 0x52, 0x05, 0x6a, 0x6f, 0x69, 0x6e, 0x73, 0x12, 0x18, 0x0a, 0x07, + 0x66, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x73, 0x18, 0x06, 0x20, 0x03, 0x28, 0x09, 0x52, 0x07, 0x66, + 0x69, 0x6c, 0x74, 0x65, 0x72, 0x73, 0x22, 0x4e, 0x0a, 0x04, 0x4a, 0x6f, 0x69, 0x6e, 0x12, 0x10, + 0x0a, 0x03, 0x75, 0x72, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x75, 0x72, 0x6e, + 0x12, 0x14, 0x0a, 0x05, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, + 0x05, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x12, 0x1e, 0x0a, 0x0a, 0x63, 0x6f, 0x6e, 0x64, 0x69, 0x74, + 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0a, 0x63, 0x6f, 0x6e, 0x64, + 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x42, 0x3b, 0x0a, 0x0e, 0x69, 0x6f, 0x2e, 0x6f, 0x64, 0x70, + 0x66, 0x2e, 0x61, 0x73, 0x73, 0x65, 0x74, 0x73, 0x42, 0x0a, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x50, + 0x72, 0x6f, 0x74, 0x6f, 0x5a, 0x1d, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, + 0x2f, 0x6f, 0x64, 0x70, 0x66, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x6e, 0x2f, 0x61, 0x73, 0x73, + 0x65, 0x74, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -302,34 +401,36 @@ func file_odpf_assets_table_proto_rawDescGZIP() []byte { return file_odpf_assets_table_proto_rawDescData } -var file_odpf_assets_table_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_odpf_assets_table_proto_msgTypes = make([]protoimpl.MessageInfo, 3) var file_odpf_assets_table_proto_goTypes = []interface{}{ (*Table)(nil), // 0: odpf.assets.Table (*TableProfile)(nil), // 1: odpf.assets.TableProfile - (*common.Resource)(nil), // 2: odpf.assets.common.Resource - (*facets.Columns)(nil), // 3: odpf.assets.facets.Columns - (*facets.Preview)(nil), // 4: odpf.assets.facets.Preview - (*facets.Ownership)(nil), // 5: odpf.assets.facets.Ownership - (*facets.Lineage)(nil), // 6: odpf.assets.facets.Lineage - (*facets.Properties)(nil), // 7: odpf.assets.facets.Properties - (*common.Timestamp)(nil), // 8: odpf.assets.common.Timestamp - (*common.Event)(nil), // 9: odpf.assets.common.Event + (*Join)(nil), // 2: odpf.assets.Join + (*common.Resource)(nil), // 3: odpf.assets.common.Resource + (*facets.Columns)(nil), // 4: odpf.assets.facets.Columns + (*facets.Preview)(nil), // 5: odpf.assets.facets.Preview + (*facets.Ownership)(nil), // 6: odpf.assets.facets.Ownership + (*facets.Lineage)(nil), // 7: odpf.assets.facets.Lineage + (*facets.Properties)(nil), // 8: odpf.assets.facets.Properties + (*common.Timestamp)(nil), // 9: odpf.assets.common.Timestamp + (*common.Event)(nil), // 10: odpf.assets.common.Event } var file_odpf_assets_table_proto_depIdxs = []int32{ - 2, // 0: odpf.assets.Table.resource:type_name -> odpf.assets.common.Resource - 1, // 1: odpf.assets.Table.profile:type_name -> odpf.assets.TableProfile - 3, // 2: odpf.assets.Table.schema:type_name -> odpf.assets.facets.Columns - 4, // 3: odpf.assets.Table.preview:type_name -> odpf.assets.facets.Preview - 5, // 4: odpf.assets.Table.ownership:type_name -> odpf.assets.facets.Ownership - 6, // 5: odpf.assets.Table.lineage:type_name -> odpf.assets.facets.Lineage - 7, // 6: odpf.assets.Table.properties:type_name -> odpf.assets.facets.Properties - 8, // 7: odpf.assets.Table.timestamps:type_name -> odpf.assets.common.Timestamp - 9, // 8: odpf.assets.Table.event:type_name -> odpf.assets.common.Event - 9, // [9:9] is the sub-list for method output_type - 9, // [9:9] is the sub-list for method input_type - 9, // [9:9] is the sub-list for extension type_name - 9, // [9:9] is the sub-list for extension extendee - 0, // [0:9] is the sub-list for field type_name + 3, // 0: odpf.assets.Table.resource:type_name -> odpf.assets.common.Resource + 1, // 1: odpf.assets.Table.profile:type_name -> odpf.assets.TableProfile + 4, // 2: odpf.assets.Table.schema:type_name -> odpf.assets.facets.Columns + 5, // 3: odpf.assets.Table.preview:type_name -> odpf.assets.facets.Preview + 6, // 4: odpf.assets.Table.ownership:type_name -> odpf.assets.facets.Ownership + 7, // 5: odpf.assets.Table.lineage:type_name -> odpf.assets.facets.Lineage + 8, // 6: odpf.assets.Table.properties:type_name -> odpf.assets.facets.Properties + 9, // 7: odpf.assets.Table.timestamps:type_name -> odpf.assets.common.Timestamp + 10, // 8: odpf.assets.Table.event:type_name -> odpf.assets.common.Event + 2, // 9: odpf.assets.TableProfile.joins:type_name -> odpf.assets.Join + 10, // [10:10] is the sub-list for method output_type + 10, // [10:10] is the sub-list for method input_type + 10, // [10:10] is the sub-list for extension type_name + 10, // [10:10] is the sub-list for extension extendee + 0, // [0:10] is the sub-list for field type_name } func init() { file_odpf_assets_table_proto_init() } @@ -362,6 +463,18 @@ func file_odpf_assets_table_proto_init() { return nil } } + file_odpf_assets_table_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Join); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -369,7 +482,7 @@ func file_odpf_assets_table_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_odpf_assets_table_proto_rawDesc, NumEnums: 0, - NumMessages: 2, + NumMessages: 3, NumExtensions: 0, NumServices: 0, }, diff --git a/plugins/extractors/bigquery/README.md b/plugins/extractors/bigquery/README.md index aaef69f8f..d80a486e5 100644 --- a/plugins/extractors/bigquery/README.md +++ b/plugins/extractors/bigquery/README.md @@ -21,6 +21,8 @@ source: "auth_provider_x509_cert_url": "xxxxxxx", "client_x509_cert_url": "xxxxxxx" } + collect_table_usage: false + usage_period_in_day: 7 ``` ## Inputs @@ -32,10 +34,13 @@ source: | `table_pattern` | `string` | `gofood.fact_` | Regex pattern to filter which bigquery table to scan (whitelist) | *optional* | | `include_column_profile` | `bool` | `true` | true if you want to profile the column value such min, max, med, avg, top, and freq | *optional* | | `max_preview_rows` | `int` | `30` | max number of preview rows to fetch, `0` will skip preview fetching. Default to `30`. | *optional* | +| `collect_table_usage` | `bools` | `false` | toggle feature to collect table usage, `true` will enable collecting table usage. Default to `false`. | *optional* | +| `usage_period_in_day` | `int` | `7` | collecting log from `(now - usage_period_in_day)` until `now`. only matter if `collect_table_usage` is true. Default to `7`. | *optional* | ### *Notes* -Leaving `credentials_json` blank will default to [Google's default authentication](https://cloud.google.com/docs/authentication/production#automatically). It is recommended if Meteor instance runs inside the same Google Cloud environment as the BigQuery project. +- Leaving `credentials_json` blank will default to [Google's default authentication](https://cloud.google.com/docs/authentication/production#automatically). It is recommended if Meteor instance runs inside the same Google Cloud environment as the BigQuery project. +- Service account needs to have `bigquery.privateLogsViewer` role to be able to collect bigquery audit logs ## Outputs @@ -46,6 +51,9 @@ Leaving `credentials_json` blank will default to [Google's default authenticatio | `resource.service` | `bigquery` | | `description` | `table description` | | `profile.total_rows` | `2100` | +| `profile.usage_count` | `15` | +| `profile.common_join` | [][CommonJoin](#CommonJoin) | +| `profile.filter_conditions` | [`"WHERE t.param_3 = 'the_param' AND t.column_1 = \"xxxxxx-xxxx-xxxx-xxxx-xxxxxxxxx\""`,`"WHERE event_timestamp >= TIMESTAMP(\"2021-10-29\", \"UTC\") AND event_timestamp < TIMESTAMP(\"2021-11-22T02:01:06Z\")"`] | | `schema` | [][Column](#column) | ### Column @@ -59,6 +67,15 @@ Leaving `credentials_json` blank will default to [Google's default authenticatio | `length` | `12,2` | | `profile` | `{"min":...,"max": ...,"unique": ...}` | +### CommonJoin + +| Field | Sample Value | +| :---- | :---- | +| `urn` | `project_id.dataset_name.table_name` | +| `count` | `3` | +| `conditions` | [`"ON target.column_1 = source.column_1 and target.param_name = source.param_name"`,`"ON DATE(target.event_timestamp) = DATE(source.event_timestamp)"`] | + + ## Contributing Refer to the [contribution guidelines](../../../docs/contribute/guide.md#adding-a-new-extractor) for information on contributing to this module. diff --git a/plugins/extractors/bigquery/auditlog/auditlog.go b/plugins/extractors/bigquery/auditlog/auditlog.go new file mode 100644 index 000000000..fc2e4f5cd --- /dev/null +++ b/plugins/extractors/bigquery/auditlog/auditlog.go @@ -0,0 +1,173 @@ +package auditlog + +import ( + "context" + "fmt" + "time" + + "cloud.google.com/go/logging/logadmin" + "github.com/odpf/salt/log" + "github.com/pkg/errors" + "google.golang.org/api/iterator" + "google.golang.org/api/option" + auditpb "google.golang.org/genproto/googleapis/cloud/audit" + loggingpb "google.golang.org/genproto/googleapis/cloud/bigquery/logging/v1" + "google.golang.org/protobuf/encoding/protojson" +) + +type Config struct { + ProjectID string `mapstructure:"project_id" validate:"required"` + ServiceAccountJSON string `mapstructure:"service_account_json"` + IsCollectTableUsage bool `mapstructure:"collect_table_usage" default:"false"` + UsagePeriodInDay int64 `mapstructure:"usage_period_in_day" default:"7"` +} + +const advancedFilterTemplate = `protoPayload.methodName="jobservice.jobcompleted" AND ` + + `resource.type="bigquery_resource" AND NOT ` + + `protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.query.query:(INFORMATION_SCHEMA OR __TABLES__) AND ` + + `timestamp >= "%s" AND timestamp < "%s"` + +type AuditLog struct { + logger log.Logger + client *logadmin.Client + config Config +} + +func New(logger log.Logger) *AuditLog { + return &AuditLog{ + logger: logger, + } +} + +func (l *AuditLog) Init(ctx context.Context, cfg Config) (err error) { + l.config = cfg + l.client, err = l.createClient(ctx) + if err != nil { + err = errors.Wrap(err, "failed to create logadmin client") + return + } + + return +} + +func (l *AuditLog) createClient(ctx context.Context) (client *logadmin.Client, err error) { + if l.config.ServiceAccountJSON == "" { + l.logger.Info("credentials are not specified, creating logadmin using default credentials...") + client, err = logadmin.NewClient(ctx, l.config.ProjectID) + return + } + + client, err = logadmin.NewClient(ctx, l.config.ProjectID, option.WithCredentialsJSON([]byte(l.config.ServiceAccountJSON))) + if err != nil { + err = errors.New("client is nil, failed initiating client") + } + return +} + +func (l *AuditLog) Collect(ctx context.Context) (tableStats *TableStats, err error) { + tableStats = NewTableStats() + + filter := l.buildFilter() + it := l.client.Entries(ctx, + logadmin.ProjectIDs([]string{l.config.ProjectID}), + logadmin.Filter(filter)) + + l.logger.Info("getting logs with the filter", "filter", filter) + + for { + entry, errF := it.Next() + if errF == iterator.Done { + break + } + if errF != nil { + err = errors.Wrap(errF, "error iterating logEntries") + break + } + logData, errF := parsePayload(entry.Payload) + if errF != nil { + l.logger.Warn("error parsing LogEntry payload", "err", errF, "payload", entry.Payload) + continue + } + + if errF := tableStats.Populate(logData); errF != nil { + l.logger.Warn("error populating logdata", "err", errF) + continue + } + } + return +} + +func (l *AuditLog) buildFilter() string { + + timeNow := time.Now().UTC() + dayDuration := time.Duration(24*l.config.UsagePeriodInDay) * time.Hour + timeFrom := timeNow.Add(-1 * dayDuration) + + timeNowFormatted := timeNow.Format(time.RFC3339) + timeFromFormatted := timeFrom.Format(time.RFC3339) + + return fmt.Sprintf(advancedFilterTemplate, timeFromFormatted, timeNowFormatted) +} + +func parsePayload(payload interface{}) (ld *LogData, err error) { + + ad := &loggingpb.AuditData{} + pl, ok := payload.(*auditpb.AuditLog) + if !ok { + err = errors.New("cannot parse payload to AuditLog") + return + } + + if errPB := getAuditData(pl, ad); errPB != nil { + err = errors.Wrap(err, "failed to get audit data from metadata") + return + } + + ld = &LogData{ad} + err = ld.validateAuditData() + return +} + +func getAuditData(pl *auditpb.AuditLog, ad *loggingpb.AuditData) (err error) { + // ServiceData is deprecated and suggested to be replaced with Metadata + // But in some logs, ServiceData is still being used + //nolint:staticcheck + if pl.GetServiceData() != nil { + // if ServiceData is not nil, the log is still using the old one + if errPB := getAuditDataFromServiceData(pl, ad); errPB != nil { + err = errors.Wrap(err, "failed to get audit data from service data") + return + } + } + + // perhaps with metadata + if errPB := getAuditDataFromMetadata(pl, ad); errPB != nil { + err = errors.Wrap(err, "failed to get audit data from metadata") + return + } + err = errors.New("AuditData not found") + return +} + +func getAuditDataFromServiceData(pl *auditpb.AuditLog, ad *loggingpb.AuditData) (err error) { + //nolint:staticcheck + if errPB := pl.GetServiceData().UnmarshalTo(ad); errPB != nil { + err = errors.New("failed to marshal service data to audit data") + return + } + return +} + +func getAuditDataFromMetadata(pl *auditpb.AuditLog, ad *loggingpb.AuditData) (err error) { + mdJSON, err := pl.GetMetadata().MarshalJSON() + if err != nil { + err = errors.New("cannot marshal payload metadata") + return + } + + if errPB := protojson.Unmarshal(mdJSON, ad); errPB != nil { + err = errors.New("cannot parse service data to Audit") + return + } + return +} diff --git a/plugins/extractors/bigquery/auditlog/auditlog_test.go b/plugins/extractors/bigquery/auditlog/auditlog_test.go new file mode 100644 index 000000000..5c4f96119 --- /dev/null +++ b/plugins/extractors/bigquery/auditlog/auditlog_test.go @@ -0,0 +1,33 @@ +package auditlog + +import ( + "context" + "testing" + + "github.com/odpf/meteor/plugins" + "github.com/odpf/meteor/test/utils" + "github.com/stretchr/testify/assert" +) + +func TestInit(t *testing.T) { + t.Run("should return error if failed to init client", func(t *testing.T) { + la := New(utils.Logger) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + err := la.Init(ctx, Config{ + ProjectID: "---", + ServiceAccountJSON: "---", + }) + + assert.EqualError(t, err, "failed to create logadmin client: client is nil, failed initiating client") + }) + + t.Run("should not return error if init client is success", func(t *testing.T) { + la := New(utils.Logger) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + err := la.Init(ctx, Config{}) + + assert.NotEqual(t, plugins.InvalidConfigError{}, err) + }) +} diff --git a/plugins/extractors/bigquery/auditlog/logdata.go b/plugins/extractors/bigquery/auditlog/logdata.go new file mode 100644 index 000000000..9d9538d89 --- /dev/null +++ b/plugins/extractors/bigquery/auditlog/logdata.go @@ -0,0 +1,95 @@ +package auditlog + +import ( + "github.com/odpf/meteor/models" + "github.com/pkg/errors" + loggingpb "google.golang.org/genproto/googleapis/cloud/bigquery/logging/v1" +) + +const serviceName = "bigquery" + +type LogData struct { + *loggingpb.AuditData +} + +func (ld *LogData) GetReferencedTablesURN() (refTablesURN []string) { + refTablesURN = []string{} + var stats *loggingpb.JobStatistics + if stats = ld.GetJobCompletedEvent().GetJob().GetJobStatistics(); stats == nil { + return + } + for _, rt := range stats.ReferencedTables { + tableURN := models.TableURN(serviceName, rt.ProjectId, rt.DatasetId, rt.TableId) + refTablesURN = append(refTablesURN, tableURN) + } + return +} + +func (ld *LogData) GetQuery() (sqlQuery string, err error) { + + if jobConfig := ld.GetJobCompletedEvent().GetJob().GetJobConfiguration(); jobConfig == nil { + err = errors.New("jobConfiguration is nil") + return + } + if jobConfigQuery := ld.GetJobCompletedEvent().GetJob().GetJobConfiguration().GetQuery(); jobConfigQuery == nil { + err = errors.New("jobConfiguration_Query_ is nil") + return + } + sqlQuery = ld.GetJobCompletedEvent().GetJob().GetJobConfiguration().GetQuery().GetQuery() + if sqlQuery == "" { + err = errors.New("sql query is empty") + } + return +} + +func (ld *LogData) validateAuditData() (err error) { + if ld.GetJobCompletedEvent() == nil { + err = errors.New("can't found jobCompletedEvent field") + return + } + + job := ld.GetJobCompletedEvent().GetJob() + if job == nil { + err = errors.New("can't found jobCompletedEvent.job field") + return + } + + // if referenced tables is empty, we don't count it + stats := job.GetJobStatistics() + if stats == nil { + err = errors.Errorf("job statistics is nil") + return + } + + if len(stats.ReferencedTables) == 0 { + err = errors.Errorf("no referenced tables found") + return + } + + jobStatus := job.GetJobStatus() + if jobStatus == nil { + err = errors.New("can't found jobCompletedEvent.job.jobStatus field") + return + } + + jobState := jobStatus.GetState() + if jobState == "" { + err = errors.New("jobCompletedEvent.job.jobStatus.state is empty") + return + } + + // ignoring the job that has not finished + if jobState != "DONE" { + err = errors.New("job status state is not DONE") + return + } + + if jobStatus.GetError() != nil { + if jobErrMsg := jobStatus.GetError().GetMessage(); jobErrMsg != "" { + err = errors.Errorf("job status has error: %s", jobErrMsg) + return + } + } + + return +} diff --git a/plugins/extractors/bigquery/auditlog/logdata_test.go b/plugins/extractors/bigquery/auditlog/logdata_test.go new file mode 100644 index 000000000..815a60a29 --- /dev/null +++ b/plugins/extractors/bigquery/auditlog/logdata_test.go @@ -0,0 +1,215 @@ +package auditlog + +import ( + "testing" + + "github.com/odpf/meteor/models" + "github.com/stretchr/testify/assert" + loggingpb "google.golang.org/genproto/googleapis/cloud/bigquery/logging/v1" + statuspb "google.golang.org/genproto/googleapis/rpc/status" +) + +func TestValidateAuditData(t *testing.T) { + + t.Run("return error if AuditData does not have JobCompletedEvent data", func(t *testing.T) { + ld := &LogData{ + &loggingpb.AuditData{}, + } + err := ld.validateAuditData() + + assert.EqualError(t, err, "can't found jobCompletedEvent field") + }) + + t.Run("return error if JobCompletedEvent does not have Job data", func(t *testing.T) { + ld := &LogData{ + &loggingpb.AuditData{ + JobCompletedEvent: &loggingpb.JobCompletedEvent{ + EventName: "", + }, + }, + } + err := ld.validateAuditData() + + assert.EqualError(t, err, "can't found jobCompletedEvent.job field") + }) + + t.Run("return error if JobCompletedEvent.Job.JobStatistics is nil", func(t *testing.T) { + ld := &LogData{ + &loggingpb.AuditData{ + JobCompletedEvent: &loggingpb.JobCompletedEvent{ + EventName: "", + Job: &loggingpb.Job{}, + }, + }, + } + err := ld.validateAuditData() + + assert.EqualError(t, err, "job statistics is nil") + }) + + t.Run("return error if referencedTables is empty", func(t *testing.T) { + ld := &LogData{ + &loggingpb.AuditData{ + JobCompletedEvent: &loggingpb.JobCompletedEvent{ + EventName: "", + Job: &loggingpb.Job{ + JobStatistics: &loggingpb.JobStatistics{}, + }, + }, + }, + } + err := ld.validateAuditData() + + assert.EqualError(t, err, "no referenced tables found") + }) + + t.Run("return error if JobCompletedEvent.Job does not have JobStatus info", func(t *testing.T) { + ld := &LogData{ + &loggingpb.AuditData{ + JobCompletedEvent: &loggingpb.JobCompletedEvent{ + EventName: "", + Job: &loggingpb.Job{ + JobStatistics: &loggingpb.JobStatistics{ + ReferencedTables: []*loggingpb.TableName{ + { + ProjectId: "project_id", + }, + }, + }, + }, + }, + }, + } + err := ld.validateAuditData() + + assert.EqualError(t, err, "can't found jobCompletedEvent.job.jobStatus field") + }) + + t.Run("return error if JobCompletedEvent.Job.JobStatus's state is empty", func(t *testing.T) { + ld := &LogData{ + &loggingpb.AuditData{ + JobCompletedEvent: &loggingpb.JobCompletedEvent{ + EventName: "", + Job: &loggingpb.Job{ + JobStatistics: &loggingpb.JobStatistics{ + ReferencedTables: []*loggingpb.TableName{ + { + ProjectId: "project_id", + }, + }, + }, + JobStatus: &loggingpb.JobStatus{}, + }, + }, + }, + } + err := ld.validateAuditData() + + assert.EqualError(t, err, "jobCompletedEvent.job.jobStatus.state is empty") + }) + + t.Run("return error if JobCompletedEvent.Job.JobStatus's state is not done", func(t *testing.T) { + ld := &LogData{ + &loggingpb.AuditData{ + JobCompletedEvent: &loggingpb.JobCompletedEvent{ + EventName: "", + Job: &loggingpb.Job{ + JobStatistics: &loggingpb.JobStatistics{ + ReferencedTables: []*loggingpb.TableName{ + { + ProjectId: "project_id", + }, + }, + }, + JobStatus: &loggingpb.JobStatus{ + State: "WORKING", + }, + }, + }, + }, + } + err := ld.validateAuditData() + + assert.EqualError(t, err, "job status state is not DONE") + }) + + t.Run("return error if JobCompletedEvent.Job.JobStatus error is not nil and has an error message", func(t *testing.T) { + ld := &LogData{ + &loggingpb.AuditData{ + JobCompletedEvent: &loggingpb.JobCompletedEvent{ + EventName: "", + Job: &loggingpb.Job{ + JobStatistics: &loggingpb.JobStatistics{ + ReferencedTables: []*loggingpb.TableName{ + { + ProjectId: "project_id", + }, + }, + }, + JobStatus: &loggingpb.JobStatus{ + State: "DONE", + Error: &statuspb.Status{ + Message: "error parsing the data", + }, + }, + }, + }, + }, + } + err := ld.validateAuditData() + + assert.EqualError(t, err, "job status has error: error parsing the data") + }) + + t.Run("return nil if AuditData's Job is DONE and no error", func(t *testing.T) { + ld := &LogData{ + &loggingpb.AuditData{ + JobCompletedEvent: &loggingpb.JobCompletedEvent{ + EventName: "", + Job: &loggingpb.Job{ + JobStatistics: &loggingpb.JobStatistics{ + ReferencedTables: []*loggingpb.TableName{ + { + ProjectId: "project_id", + }, + }, + }, + JobStatus: &loggingpb.JobStatus{ + State: "DONE", + }, + }, + }, + }, + } + err := ld.validateAuditData() + + assert.Nil(t, err) + }) +} + +func TestGetReferencedTablesURN(t *testing.T) { + t.Run("return empty slice if JobStatistics is nil", func(t *testing.T) { + ld := &LogData{ + &loggingpb.AuditData{ + JobCompletedEvent: &loggingpb.JobCompletedEvent{ + EventName: "", + Job: &loggingpb.Job{}, + }, + }, + } + rts := ld.GetReferencedTablesURN() + + assert.Empty(t, rts) + }) + + t.Run("return slice of urns if referenced tables exists if JobStatistics", func(t *testing.T) { + rts := testDataLogData1.GetReferencedTablesURN() + + expectedRefTablesURN := []string{ + models.TableURN("bigquery", "project1", "dataset1", "table1"), + models.TableURN("bigquery", "project2", "dataset1", "table1"), + models.TableURN("bigquery", "project3", "dataset1", "table1"), + } + assert.EqualValues(t, expectedRefTablesURN, rts) + }) +} diff --git a/plugins/extractors/bigquery/auditlog/stats.go b/plugins/extractors/bigquery/auditlog/stats.go new file mode 100644 index 000000000..c35a93d48 --- /dev/null +++ b/plugins/extractors/bigquery/auditlog/stats.go @@ -0,0 +1,129 @@ +package auditlog + +import ( + "github.com/odpf/meteor/plugins/extractors/bigquery/sqlparser" + "github.com/pkg/errors" +) + +type TableStats struct { + TableUsage map[string]int64 + JoinDetail map[string]map[string]JoinDetail + FilterConditions map[string]map[string]bool + processedLog *LogData +} + +type JoinDetail struct { + Usage int64 + Conditions map[string]bool +} + +func NewTableStats() *TableStats { + ts := &TableStats{} + ts.initPopulate() + return ts +} + +func (b *TableStats) initPopulate() { + b.TableUsage = map[string]int64{} + b.JoinDetail = map[string]map[string]JoinDetail{} + b.FilterConditions = map[string]map[string]bool{} +} + +func (b *TableStats) Populate(ld *LogData) (err error) { + b.processedLog = ld + + // if 0, query is not involving table + refTablesURN := b.processedLog.GetReferencedTablesURN() + if len(refTablesURN) == 0 { + err = errors.New("got empty referenced tables") + return + } + // query must be there, otherwise it is not valid + sqlQuery, err := b.processedLog.GetQuery() + if err != nil { + // log query not exist here + err = errors.Wrap(err, "can't get query") + return + } + + jcs := sqlparser.ParseJoinConditions(sqlQuery) + fcs := sqlparser.ParseFilterConditions(sqlQuery) + + // populate all data + for _, rt := range refTablesURN { + + // single table usage + b.populateTableUsage(rt) + + // no common join if only 1 referenced tables + if len(refTablesURN) > 1 { + b.populateJoinDetail(rt, refTablesURN, jcs) + } + + b.populateFilterConditions(rt, fcs) + } + + return +} + +func (b *TableStats) populateTableUsage(tableURN string) { + // single table usage + if _, exist := b.TableUsage[tableURN]; !exist { + b.TableUsage[tableURN] = 0 + } + b.TableUsage[tableURN]++ +} + +func (b *TableStats) populateJoinDetail(tableURN string, refTablesURN []string, jcs []string) { + if _, exist := b.JoinDetail[tableURN]; !exist { + b.JoinDetail[tableURN] = map[string]JoinDetail{} + } + + // join detail + for _, selectedTableURN := range refTablesURN { + if selectedTableURN == tableURN { + continue + } + + // init usage and conditions + jd, exist := b.JoinDetail[tableURN][selectedTableURN] + if !exist { + jd.Usage = 1 + } else { + // update usage + jd.Usage++ + } + b.JoinDetail[tableURN][selectedTableURN] = jd + + // ignore join conditions + if len(jcs) == 0 { + continue + } + + // init conditions + if jd.Conditions == nil { + jd.Conditions = map[string]bool{} + } + + for _, jc := range jcs { + jd.Conditions[jc] = true + b.JoinDetail[tableURN][selectedTableURN] = jd + } + + } + +} + +func (b *TableStats) populateFilterConditions(tableURN string, fcs []string) { + if len(fcs) == 0 { + return + } + + if _, exist := b.FilterConditions[tableURN]; !exist { + b.FilterConditions[tableURN] = map[string]bool{} + } + + for _, fc := range fcs { + b.FilterConditions[tableURN][fc] = true + } +} diff --git a/plugins/extractors/bigquery/auditlog/stats_test.go b/plugins/extractors/bigquery/auditlog/stats_test.go new file mode 100644 index 000000000..52f813fca --- /dev/null +++ b/plugins/extractors/bigquery/auditlog/stats_test.go @@ -0,0 +1,101 @@ +package auditlog + +import ( + "testing" + + "github.com/stretchr/testify/assert" + loggingpb "google.golang.org/genproto/googleapis/cloud/bigquery/logging/v1" +) + +func TestCreatingTableStats(t *testing.T) { + ts := NewTableStats() + + assert.EqualValues(t, ts.TableUsage, map[string]int64{}) + assert.EqualValues(t, ts.JoinDetail, map[string]map[string]JoinDetail{}) +} + +func TestPopulateIndividually(t *testing.T) { + t.Run("populate table usage by counting every table in referenced tables", func(t *testing.T) { + ts := NewTableStats() + + for _, td := range testDataRefTables1 { + ts.populateTableUsage(td) + } + for _, td := range testDataRefTables2 { + ts.populateTableUsage(td) + } + for _, td := range testDataRefTables3 { + ts.populateTableUsage(td) + } + for _, td := range testDataRefTables4 { + ts.populateTableUsage(td) + } + + assert.EqualValues(t, testDataTableUsage1234, ts.TableUsage) + }) + + t.Run("populate join usage by counting every joined table in referenced tables", func(t *testing.T) { + ts := NewTableStats() + + for _, td := range testDataRefTables1 { + ts.populateJoinDetail(td, testDataRefTables1, nil) + } + for _, td := range testDataRefTables2 { + ts.populateJoinDetail(td, testDataRefTables2, nil) + } + for _, td := range testDataRefTables3 { + ts.populateJoinDetail(td, testDataRefTables3, nil) + } + for _, td := range testDataRefTables4 { + ts.populateJoinDetail(td, testDataRefTables4, nil) + } + + assert.EqualValues(t, testDataJoinUsage1234, ts.JoinDetail) + }) +} + +func TestPopulateAll(t *testing.T) { + t.Run("populate all usage data from log data", func(t *testing.T) { + ts := NewTableStats() + + err := ts.Populate(testDataLogData1) + assert.Nil(t, err) + + err = ts.Populate(testDataLogData2) + assert.Nil(t, err) + + err = ts.Populate(testDataLogData3) + assert.Nil(t, err) + + err = ts.Populate(testDataLogData4) + assert.Nil(t, err) + + assert.EqualValues(t, testDataTableUsage1234, ts.TableUsage) + assert.EqualValues(t, testDataJoinDetail1234, ts.JoinDetail) + assert.EqualValues(t, testDataFilterCondition1234, ts.FilterConditions) + }) + + t.Run("error populating table stats if no referenced tables found in log data", func(t *testing.T) { + ld := &LogData{ + &loggingpb.AuditData{ + JobCompletedEvent: &loggingpb.JobCompletedEvent{ + EventName: "", + Job: &loggingpb.Job{ + JobStatistics: &loggingpb.JobStatistics{ + ReferencedTables: []*loggingpb.TableName{}, + }, + }, + }, + }, + } + + ts := NewTableStats() + + err := ts.Populate(ld) + + assert.EqualError(t, err, "got empty referenced tables") + assert.Empty(t, ts.TableUsage) + assert.Empty(t, ts.JoinDetail) + assert.Empty(t, ts.FilterConditions) + }) +} diff --git a/plugins/extractors/bigquery/auditlog/testdata.go b/plugins/extractors/bigquery/auditlog/testdata.go new file mode 100644 index 000000000..c5a7a989e --- /dev/null +++ b/plugins/extractors/bigquery/auditlog/testdata.go @@ -0,0 +1,321 @@ +package auditlog + +import ( + "github.com/odpf/meteor/models" + loggingpb "google.golang.org/genproto/googleapis/cloud/bigquery/logging/v1" +) + +var testDataRefTables1 = []string{ + models.TableURN("bigquery", "project1", "dataset1", "table1"), + models.TableURN("bigquery", "project2", "dataset1", "table1"), + models.TableURN("bigquery", "project3", "dataset1", "table1"), +} + +var testDataRefTables2 = []string{ + models.TableURN("bigquery", "project1", "dataset1", "table1"), + models.TableURN("bigquery", "project3", "dataset1", "table1"), + models.TableURN("bigquery", "project4", "dataset1", "table1"), +} + +var testDataRefTables3 = []string{ + models.TableURN("bigquery", "project1", "dataset1", "table1"), + models.TableURN("bigquery", "project3", "dataset1", "table1"), +} + +var testDataRefTables4 = []string{ + models.TableURN("bigquery", "project1", "dataset1", "table1"), +} + +var testDataLogData1 = &LogData{ + &loggingpb.AuditData{ + JobCompletedEvent: &loggingpb.JobCompletedEvent{ + EventName: "", + Job: &loggingpb.Job{ + JobConfiguration: &loggingpb.JobConfiguration{ + Configuration: &loggingpb.JobConfiguration_Query_{ + Query: &loggingpb.JobConfiguration_Query{ + Query: ` + SELECT + t1.field1 AS field1, + t2.field2 AS field2, + t1.field3 AS field3, + t3.field4 AS field4` + + "FROM `project1.dataset1.table1` t1" + + "JOIN `project2.dataset1.table1` t2 ON t1.somefield = t2.anotherfield " + + "JOIN `project3.dataset1.table1` t3 ON t1.somefield = t3.yetanotherfield", + }, + }, + }, + JobStatistics: &loggingpb.JobStatistics{ + ReferencedTables: []*loggingpb.TableName{ + { + ProjectId: "project1", + DatasetId: "dataset1", + TableId: "table1", + }, { + ProjectId: "project2", + DatasetId: "dataset1", + TableId: "table1", + }, { + ProjectId: "project3", + DatasetId: "dataset1", + TableId: "table1", + }, + }, + }, + }, + }, + }, +} + +var testDataLogData2 = &LogData{ + &loggingpb.AuditData{ + JobCompletedEvent: &loggingpb.JobCompletedEvent{ + EventName: "", + Job: &loggingpb.Job{ + JobConfiguration: &loggingpb.JobConfiguration{ + Configuration: &loggingpb.JobConfiguration_Query_{ + Query: &loggingpb.JobConfiguration_Query{ + Query: ` + WITH temp_table as + (SELECT + t1.field1 AS field1, + t2.field2 AS field2, + t1.field3 AS field3, + t3.field4 AS field4` + + "FROM `project1.dataset1.table1` t1" + + "JOIN `project3.dataset1.table1` t2 ON t1.somefield = t2.anotherfield " + + "JOIN `project4.dataset1.table1` t3 ON t1.somefield = t3.yetanotherfield)" + + `SELECT * FROM temp_table WHERE t1.field2 = 'valid';`, + }, + }, + }, + JobStatistics: &loggingpb.JobStatistics{ + ReferencedTables: []*loggingpb.TableName{ + { + ProjectId: "project1", + DatasetId: "dataset1", + TableId: "table1", + }, { + ProjectId: "project3", + DatasetId: "dataset1", + TableId: "table1", + }, { + ProjectId: "project4", + DatasetId: "dataset1", + TableId: "table1", + }, + }, + }, + }, + }, + }, +} + +var testDataLogData3 = &LogData{ + &loggingpb.AuditData{ + JobCompletedEvent: &loggingpb.JobCompletedEvent{ + EventName: "", + Job: &loggingpb.Job{ + JobConfiguration: &loggingpb.JobConfiguration{ + Configuration: &loggingpb.JobConfiguration_Query_{ + Query: &loggingpb.JobConfiguration_Query{ + Query: ` + SELECT + * + (SELECT order_id FROM FROM project1.dataset1.table1 WHERE column_1 IS TRUE) + JOIN project3.dataset1.table1 + USING (somefield,anotherfield)`}, + }, + }, + JobStatistics: &loggingpb.JobStatistics{ + ReferencedTables: []*loggingpb.TableName{ + { + ProjectId: "project1", + DatasetId: "dataset1", + TableId: "table1", + }, { + ProjectId: "project3", + DatasetId: "dataset1", + TableId: "table1", + }, + }, + }, + }, + }, + }, +} + +var testDataLogData4 = &LogData{ + &loggingpb.AuditData{ + JobCompletedEvent: &loggingpb.JobCompletedEvent{ + EventName: "", + Job: &loggingpb.Job{ + JobConfiguration: &loggingpb.JobConfiguration{ + Configuration: &loggingpb.JobConfiguration_Query_{ + Query: &loggingpb.JobConfiguration_Query{ + Query: "SELECT start_time FROM `project1`.dataset1.table1 where job_type=\"query\" and statement_type=\"insert\" order by start_time desc limit 1", + }, + }, + }, + JobStatistics: &loggingpb.JobStatistics{ + ReferencedTables: []*loggingpb.TableName{ + { + ProjectId: "project1", + DatasetId: "dataset1", + TableId: "table1", + }, + }, + }, + }, + }, + }, +} + +var testDataJoinDetail1234 = map[string]map[string]JoinDetail{ + models.TableURN("bigquery", "project1", "dataset1", "table1"): { + models.TableURN("bigquery", "project2", "dataset1", "table1"): { + Usage: 1, + Conditions: map[string]bool{ + "ON t1.somefield = t2.anotherfield": true, + "ON t1.somefield = t3.yetanotherfield": true, + }, + }, + models.TableURN("bigquery", "project3", "dataset1", "table1"): { + Usage: 3, + Conditions: map[string]bool{ + "ON t1.somefield = t2.anotherfield": true, + "ON t1.somefield = t3.yetanotherfield": true, + "USING (somefield,anotherfield)": true, + }, + }, + + models.TableURN("bigquery", "project4", "dataset1", "table1"): { + Usage: 1, + Conditions: map[string]bool{ + "ON t1.somefield = t2.anotherfield": true, + "ON t1.somefield = t3.yetanotherfield": true, + }, + }, + }, + models.TableURN("bigquery", "project2", "dataset1", "table1"): { + models.TableURN("bigquery", "project1", "dataset1", "table1"): { + Usage: 1, + Conditions: map[string]bool{ + "ON t1.somefield = t2.anotherfield": true, + "ON t1.somefield = t3.yetanotherfield": true, + }, + }, + models.TableURN("bigquery", "project3", "dataset1", "table1"): { + Usage: 1, + Conditions: map[string]bool{ + "ON t1.somefield = t2.anotherfield": true, + "ON t1.somefield = t3.yetanotherfield": true, + }, + }, + }, + models.TableURN("bigquery", "project3", "dataset1", "table1"): { + models.TableURN("bigquery", "project1", "dataset1", "table1"): { + Usage: 3, + Conditions: map[string]bool{ + "ON t1.somefield = t2.anotherfield": true, + "ON t1.somefield = t3.yetanotherfield": true, + "USING (somefield,anotherfield)": true, + }, + }, + models.TableURN("bigquery", "project2", "dataset1", "table1"): { + Usage: 1, + Conditions: map[string]bool{ + "ON t1.somefield = t2.anotherfield": true, + "ON t1.somefield = t3.yetanotherfield": true, + }, + }, + models.TableURN("bigquery", "project4", "dataset1", "table1"): { + Usage: 1, + Conditions: map[string]bool{ + "ON t1.somefield = t2.anotherfield": true, + "ON t1.somefield = t3.yetanotherfield": true, + }, + }, + }, + models.TableURN("bigquery", "project4", "dataset1", "table1"): { + models.TableURN("bigquery", "project1", "dataset1", "table1"): { + Usage: 1, + Conditions: map[string]bool{ + "ON t1.somefield = t2.anotherfield": true, + "ON t1.somefield = t3.yetanotherfield": true, + }, + }, + models.TableURN("bigquery", "project3", "dataset1", "table1"): { + Usage: 1, + Conditions: map[string]bool{ + "ON t1.somefield = t2.anotherfield": true, + "ON t1.somefield = t3.yetanotherfield": true, + }, + }, + }, +} + +var testDataJoinUsage1234 = map[string]map[string]JoinDetail{ + models.TableURN("bigquery", "project1", "dataset1", "table1"): { + models.TableURN("bigquery", "project2", "dataset1", "table1"): { + Usage: 1, + }, + models.TableURN("bigquery", "project3", "dataset1", "table1"): { + Usage: 3, + }, + + models.TableURN("bigquery", "project4", "dataset1", "table1"): { + Usage: 1, + }, + }, + models.TableURN("bigquery", "project2", "dataset1", "table1"): { + models.TableURN("bigquery", "project1", "dataset1", "table1"): { + Usage: 1, + }, + models.TableURN("bigquery", "project3", "dataset1", "table1"): { + Usage: 1, + }, + }, + models.TableURN("bigquery", "project3", "dataset1", "table1"): { + models.TableURN("bigquery", "project1", "dataset1", "table1"): { + Usage: 3, + }, + models.TableURN("bigquery", "project2", "dataset1", "table1"): { + Usage: 1, + }, + models.TableURN("bigquery", "project4", "dataset1", "table1"): { + Usage: 1, + }, + }, + models.TableURN("bigquery", "project4", "dataset1", "table1"): { + models.TableURN("bigquery", "project1", "dataset1", "table1"): { + Usage: 1, + }, + models.TableURN("bigquery", "project3", "dataset1", "table1"): { + Usage: 1, + }, + }, +} + +var testDataTableUsage1234 = map[string]int64{ + models.TableURN("bigquery", "project1", "dataset1", "table1"): 4, + models.TableURN("bigquery", "project2", "dataset1", "table1"): 1, + models.TableURN("bigquery", "project3", "dataset1", "table1"): 3, + models.TableURN("bigquery", "project4", "dataset1", "table1"): 1, +} + +var testDataFilterCondition1234 = map[string]map[string]bool{ + models.TableURN("bigquery", "project1", "dataset1", "table1"): { + "WHERE column_1 IS TRUE": true, + "WHERE t1.field2 = 'valid'": true, + "where job_type=\"query\" and statement_type=\"insert\"": true, + }, + models.TableURN("bigquery", "project3", "dataset1", "table1"): { + "WHERE column_1 IS TRUE": true, + "WHERE t1.field2 = 'valid'": true, + }, + models.TableURN("bigquery", "project4", "dataset1", "table1"): { + "WHERE t1.field2 = 'valid'": true, + }, +} diff --git a/plugins/extractors/bigquery/bigquery.go b/plugins/extractors/bigquery/bigquery.go index 9200dc2db..8e4a18f48 100644 --- a/plugins/extractors/bigquery/bigquery.go +++ b/plugins/extractors/bigquery/bigquery.go @@ -14,6 +14,7 @@ import ( "github.com/odpf/meteor/models/odpf/assets/common" "github.com/odpf/meteor/models/odpf/assets/facets" "github.com/odpf/meteor/plugins" + "github.com/odpf/meteor/plugins/extractors/bigquery/auditlog" "github.com/odpf/meteor/registry" "github.com/odpf/meteor/utils" "github.com/odpf/salt/log" @@ -34,6 +35,8 @@ type Config struct { TablePattern string `mapstructure:"table_pattern"` IncludeColumnProfile bool `mapstructure:"include_column_profile"` MaxPreviewRows int `mapstructure:"max_preview_rows" default:"30"` + IsCollectTableUsage bool `mapstructure:"collect_table_usage" default:"false"` + UsagePeriodInDay int64 `mapstructure:"usage_period_in_day" default:"7"` } var sampleConfig = ` @@ -51,18 +54,24 @@ service_account_json: |- "token_uri": "https://oauth2.googleapis.com/token", "auth_provider_x509_cert_url": "xxxxxxx", "client_x509_cert_url": "xxxxxxx" - }` + } +collect_table_usage: false +usage_period_in_day: 7` // Extractor manages the communication with the bigquery service type Extractor struct { - logger log.Logger - client *bigquery.Client - config Config + logger log.Logger + client *bigquery.Client + config Config + galClient *auditlog.AuditLog + tableStats *auditlog.TableStats } func New(logger log.Logger) *Extractor { + galc := auditlog.New(logger) return &Extractor{ - logger: logger, + logger: logger, + galClient: galc, } } @@ -93,11 +102,32 @@ func (e *Extractor) Init(ctx context.Context, configMap map[string]interface{}) return errors.Wrap(err, "failed to create client") } + if e.config.IsCollectTableUsage { + errL := e.galClient.Init(ctx, auditlog.Config{ + ProjectID: e.config.ProjectID, + ServiceAccountJSON: e.config.ServiceAccountJSON, + IsCollectTableUsage: e.config.IsCollectTableUsage, + UsagePeriodInDay: e.config.UsagePeriodInDay, + }) + if errL != nil { + e.logger.Error("failed to create google audit log client", "err", errL) + } + } + return } // Extract checks if the table is valid and extracts the table schema func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) (err error) { + if e.config.IsCollectTableUsage { + // Fetch and extract logs first to build a map + ts, errL := e.galClient.Collect(ctx) + e.tableStats = ts + if errL != nil { + e.logger.Error("error populating table stats usage", errL) + } + } + // Fetch and iterate over datasets it := e.client.Datasets(ctx) for { @@ -150,6 +180,9 @@ func (e *Extractor) extractTable(ctx context.Context, ds *bigquery.Dataset, emit // Build the bigquery table metadata func (e *Extractor) buildTable(ctx context.Context, t *bigquery.Table, md *bigquery.TableMetadata) *assets.Table { tableFQN := t.FullyQualifiedName() + tableURN := models.TableURN("bigquery", t.ProjectID, t.DatasetID, t.TableID) + + tableProfile := e.buildTableProfile(tableURN) var partitionField string if md.TimePartitioning != nil { @@ -167,9 +200,10 @@ func (e *Extractor) buildTable(ctx context.Context, t *bigquery.Table, md *bigqu return &assets.Table{ Resource: &common.Resource{ - Urn: models.TableURN("bigquery", t.ProjectID, t.DatasetID, t.TableID), - Name: t.TableID, - Service: "bigquery", + Urn: tableURN, + Name: t.TableID, + Description: md.Description, + Service: "bigquery", }, Schema: &facets.Columns{ Columns: e.buildColumns(ctx, md), @@ -185,6 +219,7 @@ func (e *Extractor) buildTable(ctx context.Context, t *bigquery.Table, md *bigqu }), Labels: md.Labels, }, + Profile: tableProfile, Timestamps: &common.Timestamp{ CreateTime: timestamppb.New(md.CreationTime), UpdateTime: timestamppb.New(md.LastModifiedTime), diff --git a/plugins/extractors/bigquery/bigquery_test.go b/plugins/extractors/bigquery/bigquery_test.go index 520cc69e9..7b79f2a0c 100644 --- a/plugins/extractors/bigquery/bigquery_test.go +++ b/plugins/extractors/bigquery/bigquery_test.go @@ -1,3 +1,4 @@ +//go:build integration // +build integration package bigquery_test diff --git a/plugins/extractors/bigquery/profile.go b/plugins/extractors/bigquery/profile.go new file mode 100644 index 000000000..91c8755cd --- /dev/null +++ b/plugins/extractors/bigquery/profile.go @@ -0,0 +1,44 @@ +package bigquery + +import "github.com/odpf/meteor/models/odpf/assets" + +func (e *Extractor) buildTableProfile(tableURN string) (tp *assets.TableProfile) { + var tableUsage int64 + var commonJoins []*assets.Join + var filterConditions []string + + if e.config.IsCollectTableUsage && e.tableStats != nil { + // table usage + tableUsage = e.tableStats.TableUsage[tableURN] + + // common join + if jdMapping, exist := e.tableStats.JoinDetail[tableURN]; exist { + for joinedTableURN, jd := range jdMapping { + var joinConditions []string + for jc := range jd.Conditions { + joinConditions = append(joinConditions, jc) + } + commonJoins = append(commonJoins, &assets.Join{ + Urn: joinedTableURN, + Count: jd.Usage, + Conditions: joinConditions, + }) + } + } + + // filter conditions + if filterMapping, exist := e.tableStats.FilterConditions[tableURN]; exist { + for filterExpression := range filterMapping { + filterConditions = append(filterConditions, filterExpression) + } + } + } + + tp = &assets.TableProfile{ + UsageCount: tableUsage, + Joins: commonJoins, + Filters: filterConditions, + } + + return +} diff --git a/plugins/extractors/bigquery/profile_test.go b/plugins/extractors/bigquery/profile_test.go new file mode 100644 index 000000000..002871981 --- /dev/null +++ b/plugins/extractors/bigquery/profile_test.go @@ -0,0 +1,84 @@ +package bigquery + +import ( + "testing" + + "github.com/alecthomas/assert" + "github.com/odpf/meteor/models" + "github.com/odpf/meteor/models/odpf/assets" + "github.com/odpf/meteor/plugins/extractors/bigquery/auditlog" +) + +func TestBuildTableProfile(t *testing.T) { + tableURN := models.TableURN("bigquery", "project1", "dataset1", "table1") + t.Run("table profile usage related fields are empty if usage collection is disabled", func(t *testing.T) { + extr := &Extractor{ + config: Config{ + IsCollectTableUsage: false, + }, + } + + tp := extr.buildTableProfile(tableURN) + + assert.Empty(t, tp.UsageCount) + assert.Empty(t, tp.Joins) + }) + + t.Run("table profile usage related fields are empty if table stats is nil", func(t *testing.T) { + extr := &Extractor{ + config: Config{ + IsCollectTableUsage: true, + }, + tableStats: nil, + } + + tp := extr.buildTableProfile(tableURN) + + assert.Empty(t, tp.UsageCount) + assert.Empty(t, tp.Joins) + }) + + t.Run("table profile usage related fields are populated if table stats is not nil and usage collection is enabled", func(t *testing.T) { + extr := &Extractor{ + config: Config{ + IsCollectTableUsage: true, + }, + tableStats: &auditlog.TableStats{ + TableUsage: map[string]int64{ + models.TableURN("bigquery", "project1", "dataset1", "table1"): 5, + models.TableURN("bigquery", "project2", "dataset1", "table1"): 3, + models.TableURN("bigquery", "project3", "dataset1", "table1"): 1, + }, + JoinDetail: map[string]map[string]auditlog.JoinDetail{ + models.TableURN("bigquery", "project1", "dataset1", "table1"): { + models.TableURN("bigquery", "project2", "dataset1", "table1"): auditlog.JoinDetail{ + Usage: 1, + }, + models.TableURN("bigquery", "project3", "dataset1", "table1"): auditlog.JoinDetail{ + Usage: 3, + }, + models.TableURN("bigquery", "project4", "dataset1", "table1"): auditlog.JoinDetail{ + Usage: 1, + }, + }, + }, + }, + } + + tp := extr.buildTableProfile(tableURN) + + assert.EqualValues(t, 5, tp.UsageCount) + assert.Contains(t, tp.Joins, &assets.Join{ + Urn: models.TableURN("bigquery", "project2", "dataset1", "table1"), + Count: 1, + }) + assert.Contains(t, tp.Joins, &assets.Join{ + Urn: models.TableURN("bigquery", "project3", "dataset1", "table1"), + Count: 3, + }) + assert.Contains(t, tp.Joins, &assets.Join{ + Urn: models.TableURN("bigquery", "project4", "dataset1", "table1"), + Count: 1, + }) + }) +} diff --git a/plugins/extractors/bigquery/sqlparser/parser.go b/plugins/extractors/bigquery/sqlparser/parser.go new file mode 100644 index 000000000..510234b20 --- /dev/null +++ b/plugins/extractors/bigquery/sqlparser/parser.go @@ -0,0 +1,95 @@ +package sqlparser + +import ( + "regexp" + "strings" +) + +var ( + queryCommentPatterns = regexp.MustCompile(`(--.*)|(((/\\*)+?[\\w\\W]*?(\\*/)+))`) + + // join patterns + joinCharsRegex = "[a-zA-Z0-9@_\\.\\`-]*" + + joinExpr = "(" + + "DATE\\(" + joinCharsRegex + "\\)" + + "|" + + joinCharsRegex + + ")" + joinOnTerminals = joinExpr + "\\s*\\=\\s*" + joinExpr + + joinRegex = "" + + "(?i)(?:ON)\\s+" + joinOnTerminals + "(\\s+(AND|OR)\\s+" + joinOnTerminals + ")*" + + "|" + + "(?i)(?:USING)\\s*\\(\\s*([a-zA-Z0-9,@_\\s `-]*)\\s*\\)" + joinPatterns = regexp.MustCompile(joinRegex) + + // filter patterns + filterCharsRegex = "[a-zA-Z0-9@_\"\\',\\.\\x60-]*" + filterCharsWithWhitespaceColonRegex = "[a-zA-Z0-9@_\\:\"\\',\\s\\.\\x60-]*" + + filterExprLHS = filterCharsRegex + + filterExprRHS = "(" + + "CURRENT_TIMESTAMP\\(\\)" + + "|" + + "TIMESTAMP\\(" + filterCharsWithWhitespaceColonRegex + "\\)" + + "|" + + filterCharsRegex + + ")" + + filterTerminals = "(" + + filterExprLHS + "\\s*(<=|>=|!=|<>|=|<|>)\\s*" + filterExprRHS + + "|" + + filterExprLHS + "\\s+(LIKE|NOT LIKE)\\s+" + filterExprRHS + + "|" + + filterExprLHS + "\\s+(BETWEEN|NOT BETWEEN)\\s+" + filterExprRHS + "\\s+AND\\s+" + filterExprRHS + + "|" + + filterExprLHS + "\\s+IS (?:NOT)?\\s?(S_NULL|TRUE|FALSE)" + + "|" + + filterExprLHS + "\\s+(IN|NOT IN)\\s?\\(" + filterCharsWithWhitespaceColonRegex + "\\)" + + ")" + filterRegex = "(?i)(?:WHERE|HAVING)\\s+" + filterTerminals + "(\\s+(AND|OR)\\s+" + filterTerminals + ")*" + filterPatterns = regexp.MustCompile(filterRegex) +) + +// ParseJoinConditions will return all join condition (ON and USING) in sql Query in a list of string +// where each of it is a join condition string +func ParseJoinConditions(sqlQuery string) (jcs []string) { + sqlQuery = cleanUpQuery(sqlQuery) + + matches := joinPatterns.FindAllStringSubmatch(sqlQuery, -1) + for _, match := range matches { + + const joinConditionFullIdx = 0 + jcs = append(jcs, match[joinConditionFullIdx]) + } + + return +} + +// ParseFilterConditions will return all filter condition (WHERE and HAVING) in sql Query in a list of string +func ParseFilterConditions(sqlQuery string) (fcs []string) { + sqlQuery = cleanUpQuery(sqlQuery) + + fcs = filterPatterns.FindAllString(sqlQuery, -1) + return +} + +func cleanUpQuery(s string) string { + // remove comments from query + matches := queryCommentPatterns.FindAllStringSubmatch(s, -1) + for _, match := range matches { + // replace full match + s = strings.ReplaceAll(s, match[0], " ") + } + + // cleaning leading and trailing whitespace + s = strings.TrimSpace(s) + // standarizing string + s = strings.Join(strings.Fields(s), " ") + // removing ; char + s = strings.ReplaceAll(s, ";", "") + + return s +} diff --git a/plugins/extractors/bigquery/sqlparser/parser_test.go b/plugins/extractors/bigquery/sqlparser/parser_test.go new file mode 100644 index 000000000..5c4c12513 --- /dev/null +++ b/plugins/extractors/bigquery/sqlparser/parser_test.go @@ -0,0 +1,210 @@ +package sqlparser + +import ( + "testing" + + "github.com/alecthomas/assert" +) + +func TestParseSimpleJoin(t *testing.T) { + + type set map[string]bool + newSet := func(values ...string) set { + s := make(set) + for _, val := range values { + s[val] = true + } + return s + } + testCases := []struct { + Name string + Query string + JoinInfo set + }{ + { + Name: "simple query with simple join with on", + Query: "select * from data-engineering.testing.table1 join data-engineering.testing.table2 on some_field1 = some_field2", + JoinInfo: newSet("on some_field1 = some_field2"), + }, + { + Name: "simple query with simple join with on and unformatted", + Query: "select * from data-engineering.testing.table1 join data-engineering.testing.table2 on some_field1 =some_field2", + JoinInfo: newSet("on some_field1 =some_field2"), + }, + { + Name: "simple query with simple join with using ", + Query: "select * from data-engineering.testing.table1 join data-engineering.testing.table2 using (some_field)", + JoinInfo: newSet("using (some_field)"), + }, + { + Name: "simple query with simple join with using and multiple columns", + Query: "select * from data-engineering.testing.table1 join data-engineering.testing.table2 using (some_field, some_field1, somefield3)", + JoinInfo: newSet("using (some_field, some_field1, somefield3)"), + }, + { + Name: "simple query with simple join with using and multiple columns and unformatted", + Query: "select * from data-engineering.testing.table1 join data-engineering.testing.table2 using (some_field, some_field1,somefield3)", + JoinInfo: newSet("using (some_field, some_field1,somefield3)"), + }, + { + Name: "simple query with outer join and `on`", + Query: "select * from data-engineering.testing.table1 full outer join data-engineering.testing.table2 on some_field1 = some_field2", + JoinInfo: newSet("on some_field1 = some_field2"), + }, + { + Name: "simple query with outer join and `using`", + Query: "select * from data-engineering.testing.table1 full outer join data-engineering.testing.table2 using (some_field1, some_field2)", + JoinInfo: newSet("using (some_field1, some_field2)"), + }, + { + Name: "subquery and join", + Query: "select * from (select order_id from data-engineering.testing.orders) join data-engineering.testing.table2 using (some_field1, some_field2)", + JoinInfo: newSet("using (some_field1, some_field2)"), + }, + { + Name: "`with` clause + join", + Query: "with dedup_source as (select * from `project.fire.fly`) select * from dedup_source join `project.maximum.overdrive` on dedup_source.left = `project.maximum.overdrive`.right", + JoinInfo: newSet("on dedup_source.left = `project.maximum.overdrive`.right"), + }, + { + Name: "more than 2 joins", + Query: `SELECT + t1.field1 AS field1, + t2.field2 AS field2, + t1.field3 AS field3, + t3.field4 AS field4` + + "FROM `project1.dataset1.table1` t1" + + "JOIN `project2.dataset1.table1` t2 ON t1.somefield = t2.anotherfield " + + "JOIN `project3.dataset1.table1` t3 ON t1.somefield = t3.yetanotherfield", + JoinInfo: newSet("ON t1.somefield = t2.anotherfield", "ON t1.somefield = t3.yetanotherfield"), + }, + } + + for _, test := range testCases { + t.Run(test.Name, func(t *testing.T) { + jcs := ParseJoinConditions(test.Query) + + assert.Equal(t, test.JoinInfo, newSet(jcs...)) + }) + } +} + +func TestParseSimpleFilter(t *testing.T) { + + type set map[string]bool + newSet := func(values ...string) set { + s := make(set) + for _, val := range values { + s[val] = true + } + return s + } + testCases := []struct { + Name string + Query string + FilterCondition set + }{ + { + Name: "simple query with where in the middle and order", + Query: "select start_time from `region-us`.information_schema.jobs_by_user where job_type=\"query\" and statement_type=\"insert\" order by start_time desc limit 1", + FilterCondition: newSet("where job_type=\"query\" and statement_type=\"insert\""), + }, + { + Name: "simple query with where in the middle and limit", + Query: "select start_time from `region-us`.information_schema.jobs_by_user where job_type=\"query\" and statement_type=\"insert\" limit 1", + FilterCondition: newSet("where job_type=\"query\" and statement_type=\"insert\""), + }, + { + Name: "`with` clause and where in subquery", + Query: "with subq1 as (select * from roster where schoolid = 52 limit 1), subq2 as (select schoolid from subq1) select distinct * from subq2;", + FilterCondition: newSet("where schoolid = 52"), + }, + { + Name: "`with` clause and where in subquery with sequence of bool expression", + Query: "with subq1 as (select * from roster where schoolid = 52 and param=\"a_string\" and param2= 4), subq2 as (select schoolid from subq1) select distinct * from subq2;", + FilterCondition: newSet("where schoolid = 52 and param=\"a_string\" and param2= 4"), + }, + { + Name: "`with` clause and where in multiple subquery with sequence of bool expression", + Query: "with subq1 as (select * from roster where schoolid = 52 and param=\"a_string\"), subq2 as (select schoolid from subq1 where schoolid = 13 and param=\"is_a_string\") select distinct * from subq2;", + FilterCondition: newSet("where schoolid = 52 and param=\"a_string\"", "where schoolid = 13 and param=\"is_a_string\""), + }, + { + Name: "simple query with simple join with using and multiple columns and where with sequence of bool expression", + Query: "select * from data-engineering.testing.table1 join data-engineering.testing.table2 using (some_field, some_field1, somefield3) where job_type=\"query\" and statement_type=\"insert\" limit 1", + FilterCondition: newSet("where job_type=\"query\" and statement_type=\"insert\""), + }, + { + Name: "simple select with where timestamp function", + Query: "SELECT * FROM `dataset-1-name.bq-dataset-all` WHERE event_timestamp between TIMESTAMP('2021-11-20') AND TIMESTAMP('2021-11-21')", + FilterCondition: newSet("WHERE event_timestamp between TIMESTAMP('2021-11-20') AND TIMESTAMP('2021-11-21')"), + }, + { + Name: "complex query with comment", + Query: `SELECT + COUNT(DISTINCT user_id) AS purchasers_count + FROM + -- PLEASE REPLACE WITH YOUR TABLE NAME. + ` + "`" + "YOUR_TABLE.events_*" + "`" + ` + WHERE + event_name IN ('in_app_purchase', 'purchase') + -- PLEASE REPLACE WITH YOUR DESIRED DATE RANGE + AND _TABLE_SUFFIX BETWEEN '20180501' AND '20240131' + AND ` + "`" + "_TABLE_SUFFIX" + "`" + ` BETWEEN '1' AND '2';`, + FilterCondition: newSet("WHERE event_name IN ('in_app_purchase', 'purchase') AND _TABLE_SUFFIX BETWEEN '20180501' AND '20240131' AND `_TABLE_SUFFIX` BETWEEN '1' AND '2'"), + }, + } + + for _, test := range testCases { + t.Run(test.Name, func(t *testing.T) { + fcs := ParseFilterConditions(test.Query) + assert.Equal(t, test.FilterCondition, newSet(fcs...)) + }) + } +} + +func TestComplexQueries(t *testing.T) { + type set map[string]bool + newSet := func(values ...string) set { + s := make(set) + for _, val := range values { + s[val] = true + } + return s + } + testCases := []struct { + Name string + Query string + JoinInfo set + FilterCondition set + }{ + { + Name: "complex query 1", + Query: testDataSQL1, + JoinInfo: newSet(), + FilterCondition: newSet("WHERE event_name IN ('in_app_purchase', 'purchase') AND _TABLE_SUFFIX BETWEEN '20180501' AND '20240131' AND `_TABLE_SUFFIX` BETWEEN '1' AND '2'"), + }, + { + Name: "complex query 2", + Query: testDataSQL2, + JoinInfo: newSet("ON target.column_1 = source.column_1 and target.variant_name = source.variant_name and DATE(target.event_timestamp) = DATE(source.event_timestamp)"), + FilterCondition: newSet("WHERE t.column_type = 'tester' AND t.param_id = \"280481a2-2384-4b81-aa3e-214ac60b31db\" AND event_timestamp >= TIMESTAMP(\"2021-10-29\", \"UTC\") AND event_timestamp < TIMESTAMP(\"2021-11-22T02:01:06Z\")"), + }, + { + Name: "complex query 3", + Query: testDataSQL3, + JoinInfo: newSet(), + FilterCondition: newSet("WHERE traffic_source.source = 'google' AND traffic_source.medium = 'cpc' AND traffic_source.name = 'VTA-Test-Android' AND _TABLE_SUFFIX BETWEEN '20180521' AND '20240131'"), + }, + } + + for _, test := range testCases { + t.Run(test.Name, func(t *testing.T) { + jcs := ParseJoinConditions(test.Query) + assert.Equal(t, test.JoinInfo, newSet(jcs...)) + + fcs := ParseFilterConditions(test.Query) + assert.Equal(t, test.FilterCondition, newSet(fcs...)) + }) + } +} diff --git a/plugins/extractors/bigquery/sqlparser/testdatasql.go b/plugins/extractors/bigquery/sqlparser/testdatasql.go new file mode 100644 index 000000000..896bc1e7e --- /dev/null +++ b/plugins/extractors/bigquery/sqlparser/testdatasql.go @@ -0,0 +1,58 @@ +package sqlparser + +const testDataSQL1 = ` +SELECT +COUNT(DISTINCT user_id) AS purchasers_count +FROM +-- PLEASE REPLACE WITH YOUR TABLE NAME. +` + "`" + "YOUR_TABLE.events_*" + "`" + ` +WHERE +event_name IN ('in_app_purchase', 'purchase') +-- PLEASE REPLACE WITH YOUR DESIRED DATE RANGE +AND _TABLE_SUFFIX BETWEEN '20180501' AND '20240131' +AND ` + "`" + "_TABLE_SUFFIX" + "`" + ` BETWEEN '1' AND '2';` + +const testDataSQL2 = ` +USING ( + SELECT + t.column_1, + t.column_2, + CAST(count(distinct(id)) AS BIGNUMERIC) as total, + TIMESTAMP("2021-10-29", "UTC") as window_start_time, + CAST("2021-11-22T02:01:06Z" AS TIMESTAMP) as event_timestamp + FROM` + "`" + `a-dataset.a-specific_table-2021` + "`" + `, + UNNEST(trial) as t + WHERE + t.column_type = 'tester' AND + t.param_id = "280481a2-2384-4b81-aa3e-214ac60b31db" AND + event_timestamp >= TIMESTAMP("2021-10-29", "UTC") AND + event_timestamp < TIMESTAMP("2021-11-22T02:01:06Z") + GROUP BY t.column_1, t.column_2 + ) source + ON + target.column_1 = source.column_1 + and target.variant_name = source.variant_name + and DATE(target.event_timestamp) = DATE(source.event_timestamp) + WHEN matched then update set + target.total = source.total, + target.event_timestamp = source.event_timestamp + WHEN NOT MATCHED then insert(param1,param2,param3,param4,param5_ext) values( + source.param1, + source.param2, + source.param3, + source.param4, + source.param5_ext + )` + +const testDataSQL3 = ` +SELECT + COUNT(DISTINCT user_id) AS acquired_users_count +FROM + -- PLEASE REPLACE WITH YOUR TABLE NAME. + ` + "`" + "YOUR_TABLE.events_*" + "`" + ` +WHERE + traffic_source.source = 'google' + AND traffic_source.medium = 'cpc' + AND traffic_source.name = 'VTA-Test-Android' + -- PLEASE REPLACE YOUR DESIRED DATE RANGE. + AND _TABLE_SUFFIX BETWEEN '20180521' AND '20240131';`