diff --git a/go.mod b/go.mod index 37878746e..d94a74b80 100644 --- a/go.mod +++ b/go.mod @@ -5,10 +5,12 @@ go 1.16 require ( cloud.google.com/go/bigquery v1.8.0 cloud.google.com/go/bigtable v1.10.1 + cloud.google.com/go/logging v1.4.2 cloud.google.com/go/storage v1.15.0 github.com/ClickHouse/clickhouse-go v1.4.5 github.com/MakeNowJust/heredoc v1.0.0 github.com/Microsoft/go-winio v0.5.0 // indirect + github.com/alecthomas/assert v0.0.0-20170929043011-405dbfeb8e38 github.com/aws/aws-sdk-go v1.38.35 // indirect github.com/blastrain/vitess-sqlparser v0.0.0-20201030050434-a139afbb1aba github.com/cenkalti/backoff/v4 v4.1.1 @@ -55,6 +57,7 @@ require ( golang.org/x/oauth2 v0.0.0-20210628180205-a41e5a781914 golang.org/x/sys v0.0.0-20210903071746-97244b99971b // indirect google.golang.org/api v0.47.0 - google.golang.org/protobuf v1.26.0 + google.golang.org/genproto v0.0.0-20211115160612-a5da7257a6f7 + google.golang.org/protobuf v1.27.1 gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b ) diff --git a/go.sum b/go.sum index f2355ed3a..f59c7cdf5 100644 --- a/go.sum +++ b/go.sum @@ -33,6 +33,8 @@ cloud.google.com/go/bigtable v1.10.1/go.mod h1:cyHeKlx6dcZCO0oSQucYdauseD8kIENGu cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE= cloud.google.com/go/datastore v1.1.0/go.mod h1:umbIZjpQpHh4hmRpGhH4tLFup+FVzqBi1b3c64qFpCk= cloud.google.com/go/firestore v1.1.0/go.mod h1:ulACoGHTpvq5r8rxGJ4ddJZBZqakUQqClKRT5SZwBmk= +cloud.google.com/go/logging v1.4.2 h1:Mu2Q75VBDQlW1HlBMjTX4X84UFR73G1TiLlRYc/b7tA= +cloud.google.com/go/logging v1.4.2/go.mod h1:jco9QZSx8HiVVqLJReq7z7bVdj0P1Jb9PDFs63T+axo= cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2kNxGRt3I= cloud.google.com/go/pubsub v1.1.0/go.mod h1:EwwdRX2sKPjnvnqCa270oGRyludottCI76h+R3AArQw= cloud.google.com/go/pubsub v1.2.0/go.mod h1:jhfEVHT8odbXTkndysNHCcx0awwzvfOlguIAii9o8iA= @@ -117,6 +119,7 @@ github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58/go.mod h1:EOBUe0h github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= +github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/containerd/console v1.0.2/go.mod h1:ytZPjGgY2oeTkAONYafi2kSj0aYggsf8acV1PGKCbzQ= github.com/containerd/continuity v0.0.0-20190827140505-75bee3e2ccb6/go.mod h1:GL3xCUCBDV3CZiTSEKksMWbLE66hEyuu9qyDOOqM47Y= github.com/containerd/continuity v0.1.0 h1:UFRRY5JemiAhPZrr/uE0n8fMTLcZsUvySPr1+D7pgr8= @@ -169,6 +172,7 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m github.com/envoyproxy/go-control-plane v0.9.7/go.mod h1:cwu0lG7PUMfa9snN8LXBig5ynNVH9qI8YYLbd1fK2po= github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= +github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/etsy/statsd v0.9.0 h1:GLP1pAzn1fGE7/kM2S5QXSU0ZTUV6QnZsyZVMx7IVF4= github.com/etsy/statsd v0.9.0/go.mod h1:rmx2gVm1TEkQUIcU/KAM4prmC/AAUU8Wndeule9gvW4= @@ -682,6 +686,7 @@ go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= +go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= @@ -1060,8 +1065,9 @@ google.golang.org/genproto v0.0.0-20210429181445-86c259c2b4ab/go.mod h1:P3QM42oQ google.golang.org/genproto v0.0.0-20210513213006-bf773b8c8384/go.mod h1:P3QM42oQyzQSnHPnZ/vqoCdDmzH28fzWByN9asMeM8A= google.golang.org/genproto v0.0.0-20210517163617-5e0236093d7a/go.mod h1:P3QM42oQyzQSnHPnZ/vqoCdDmzH28fzWByN9asMeM8A= google.golang.org/genproto v0.0.0-20210601144548-a796c710e9b6/go.mod h1:P3QM42oQyzQSnHPnZ/vqoCdDmzH28fzWByN9asMeM8A= -google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c h1:wtujag7C+4D6KMoulW9YauvK2lgdvCMS260jsqqBXr0= google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c/go.mod h1:UODoCrxHCcBojKKwX1terBiRUaqAsFqJiF615XL43r0= +google.golang.org/genproto v0.0.0-20211115160612-a5da7257a6f7 h1:0LoCYJF53PEqtJOntKxGD72X/c8Xto5EZ4HLrt9D80I= +google.golang.org/genproto v0.0.0-20211115160612-a5da7257a6f7/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= google.golang.org/grpc v1.8.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= @@ -1085,8 +1091,9 @@ google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAG google.golang.org/grpc v1.36.1/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= google.golang.org/grpc v1.37.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= google.golang.org/grpc v1.37.1/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= -google.golang.org/grpc v1.38.0 h1:/9BgsAsa5nWe26HqOlvlgJnqBuktYOLCgjCPqsa56W0= google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= +google.golang.org/grpc v1.40.0 h1:AGJ0Ih4mHjSeibYkFGh1dD9KJ/eOtZ93I6hoHhukQ5Q= +google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= @@ -1098,8 +1105,9 @@ google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpAD google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGjtUeSXeh4= google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= -google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ= +google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/models/odpf/assets/table.pb.go b/models/odpf/assets/table.pb.go index 26a463f6a..fdbce81c8 100644 --- a/models/odpf/assets/table.pb.go +++ b/models/odpf/assets/table.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.26.0 -// protoc v3.18.1 +// protoc v3.19.1 // source: odpf/assets/table.proto package assets @@ -163,8 +163,10 @@ type TableProfile struct { TotalRows int64 `protobuf:"varint,1,opt,name=total_rows,json=totalRows,proto3" json:"total_rows,omitempty"` // The number of rows in the table that are not deleted. // Example: `event_timestamp`. - PartitionKey string `protobuf:"bytes,2,opt,name=partition_key,json=partitionKey,proto3" json:"partition_key,omitempty"` - PartitionValue string `protobuf:"bytes,3,opt,name=partition_value,json=partitionValue,proto3" json:"partition_value,omitempty"` + PartitionKey string `protobuf:"bytes,2,opt,name=partition_key,json=partitionKey,proto3" json:"partition_key,omitempty"` + PartitionValue string `protobuf:"bytes,3,opt,name=partition_value,json=partitionValue,proto3" json:"partition_value,omitempty"` + UsageCount int64 `protobuf:"varint,4,opt,name=usage_count,json=usageCount,proto3" json:"usage_count,omitempty"` + CommonJoin []*TableCommonJoin `protobuf:"bytes,5,rep,name=common_join,json=commonJoin,proto3" json:"common_join,omitempty"` } func (x *TableProfile) Reset() { @@ -220,6 +222,77 @@ func (x *TableProfile) GetPartitionValue() string { return "" } +func (x *TableProfile) GetUsageCount() int64 { + if x != nil { + return x.UsageCount + } + return 0 +} + +func (x *TableProfile) GetCommonJoin() []*TableCommonJoin { + if x != nil { + return x.CommonJoin + } + return nil +} + +// TableCommonJoin is the metric of which are other tables that +// are joined with this table +type TableCommonJoin struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Urn string `protobuf:"bytes,1,opt,name=urn,proto3" json:"urn,omitempty"` + Count int64 `protobuf:"varint,2,opt,name=count,proto3" json:"count,omitempty"` +} + +func (x *TableCommonJoin) Reset() { + *x = TableCommonJoin{} + if protoimpl.UnsafeEnabled { + mi := &file_odpf_assets_table_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *TableCommonJoin) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*TableCommonJoin) ProtoMessage() {} + +func (x *TableCommonJoin) ProtoReflect() protoreflect.Message { + mi := &file_odpf_assets_table_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use TableCommonJoin.ProtoReflect.Descriptor instead. +func (*TableCommonJoin) Descriptor() ([]byte, []int) { + return file_odpf_assets_table_proto_rawDescGZIP(), []int{2} +} + +func (x *TableCommonJoin) GetUrn() string { + if x != nil { + return x.Urn + } + return "" +} + +func (x *TableCommonJoin) GetCount() int64 { + if x != nil { + return x.Count + } + return 0 +} + var File_odpf_assets_table_proto protoreflect.FileDescriptor var file_odpf_assets_table_proto_rawDesc = []byte{ @@ -242,7 +315,7 @@ var file_odpf_assets_table_proto_rawDesc = []byte{ 0x73, 0x65, 0x74, 0x73, 0x2f, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1e, 0x6f, 0x64, 0x70, 0x66, 0x2f, 0x61, 0x73, 0x73, 0x65, 0x74, 0x73, 0x2f, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2f, - 0x65, 0x76, 0x65, 0x6e, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x8c, 0x04, 0x0a, 0x05, + 0x65, 0x76, 0x65, 0x6e, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x86, 0x04, 0x0a, 0x05, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x12, 0x38, 0x0a, 0x08, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x6f, 0x64, 0x70, 0x66, 0x2e, 0x61, 0x73, 0x73, 0x65, 0x74, 0x73, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2e, 0x52, 0x65, 0x73, @@ -275,19 +348,29 @@ var file_odpf_assets_table_proto_rawDesc = []byte{ 0x6d, 0x70, 0x73, 0x12, 0x2f, 0x0a, 0x05, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x18, 0x64, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x6f, 0x64, 0x70, 0x66, 0x2e, 0x61, 0x73, 0x73, 0x65, 0x74, 0x73, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x05, 0x65, - 0x76, 0x65, 0x6e, 0x74, 0x4a, 0x04, 0x08, 0x04, 0x10, 0x05, 0x22, 0x7b, 0x0a, 0x0c, 0x54, 0x61, - 0x62, 0x6c, 0x65, 0x50, 0x72, 0x6f, 0x66, 0x69, 0x6c, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x74, 0x6f, - 0x74, 0x61, 0x6c, 0x5f, 0x72, 0x6f, 0x77, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x09, - 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x52, 0x6f, 0x77, 0x73, 0x12, 0x23, 0x0a, 0x0d, 0x70, 0x61, 0x72, - 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x0c, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x4b, 0x65, 0x79, 0x12, 0x27, - 0x0a, 0x0f, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x76, 0x61, 0x6c, 0x75, - 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0e, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, - 0x6f, 0x6e, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x42, 0x3b, 0x0a, 0x0e, 0x69, 0x6f, 0x2e, 0x6f, 0x64, - 0x70, 0x66, 0x2e, 0x61, 0x73, 0x73, 0x65, 0x74, 0x73, 0x42, 0x0a, 0x54, 0x61, 0x62, 0x6c, 0x65, - 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x5a, 0x1d, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, - 0x6d, 0x2f, 0x6f, 0x64, 0x70, 0x66, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x6e, 0x2f, 0x61, 0x73, - 0x73, 0x65, 0x74, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x76, 0x65, 0x6e, 0x74, 0x22, 0xdb, 0x01, 0x0a, 0x0c, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x50, 0x72, + 0x6f, 0x66, 0x69, 0x6c, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x5f, 0x72, + 0x6f, 0x77, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x09, 0x74, 0x6f, 0x74, 0x61, 0x6c, + 0x52, 0x6f, 0x77, 0x73, 0x12, 0x23, 0x0a, 0x0d, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, + 0x6e, 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x70, 0x61, 0x72, + 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x4b, 0x65, 0x79, 0x12, 0x27, 0x0a, 0x0f, 0x70, 0x61, 0x72, + 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x03, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x0e, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x56, 0x61, 0x6c, + 0x75, 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x75, 0x73, 0x61, 0x67, 0x65, 0x5f, 0x63, 0x6f, 0x75, 0x6e, + 0x74, 0x18, 0x04, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0a, 0x75, 0x73, 0x61, 0x67, 0x65, 0x43, 0x6f, + 0x75, 0x6e, 0x74, 0x12, 0x3d, 0x0a, 0x0b, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x5f, 0x6a, 0x6f, + 0x69, 0x6e, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x6f, 0x64, 0x70, 0x66, 0x2e, + 0x61, 0x73, 0x73, 0x65, 0x74, 0x73, 0x2e, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x43, 0x6f, 0x6d, 0x6d, + 0x6f, 0x6e, 0x4a, 0x6f, 0x69, 0x6e, 0x52, 0x0a, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x4a, 0x6f, + 0x69, 0x6e, 0x22, 0x39, 0x0a, 0x0f, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x43, 0x6f, 0x6d, 0x6d, 0x6f, + 0x6e, 0x4a, 0x6f, 0x69, 0x6e, 0x12, 0x10, 0x0a, 0x03, 0x75, 0x72, 0x6e, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x03, 0x75, 0x72, 0x6e, 0x12, 0x14, 0x0a, 0x05, 0x63, 0x6f, 0x75, 0x6e, 0x74, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x42, 0x3b, 0x0a, + 0x0e, 0x69, 0x6f, 0x2e, 0x6f, 0x64, 0x70, 0x66, 0x2e, 0x61, 0x73, 0x73, 0x65, 0x74, 0x73, 0x42, + 0x0a, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x5a, 0x1d, 0x67, 0x69, 0x74, + 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6f, 0x64, 0x70, 0x66, 0x2f, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x6e, 0x2f, 0x61, 0x73, 0x73, 0x65, 0x74, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x33, } var ( @@ -302,34 +385,36 @@ func file_odpf_assets_table_proto_rawDescGZIP() []byte { return file_odpf_assets_table_proto_rawDescData } -var file_odpf_assets_table_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_odpf_assets_table_proto_msgTypes = make([]protoimpl.MessageInfo, 3) var file_odpf_assets_table_proto_goTypes = []interface{}{ (*Table)(nil), // 0: odpf.assets.Table (*TableProfile)(nil), // 1: odpf.assets.TableProfile - (*common.Resource)(nil), // 2: odpf.assets.common.Resource - (*facets.Columns)(nil), // 3: odpf.assets.facets.Columns - (*facets.Preview)(nil), // 4: odpf.assets.facets.Preview - (*facets.Ownership)(nil), // 5: odpf.assets.facets.Ownership - (*facets.Lineage)(nil), // 6: odpf.assets.facets.Lineage - (*facets.Properties)(nil), // 7: odpf.assets.facets.Properties - (*common.Timestamp)(nil), // 8: odpf.assets.common.Timestamp - (*common.Event)(nil), // 9: odpf.assets.common.Event + (*TableCommonJoin)(nil), // 2: odpf.assets.TableCommonJoin + (*common.Resource)(nil), // 3: odpf.assets.common.Resource + (*facets.Columns)(nil), // 4: odpf.assets.facets.Columns + (*facets.Preview)(nil), // 5: odpf.assets.facets.Preview + (*facets.Ownership)(nil), // 6: odpf.assets.facets.Ownership + (*facets.Lineage)(nil), // 7: odpf.assets.facets.Lineage + (*facets.Properties)(nil), // 8: odpf.assets.facets.Properties + (*common.Timestamp)(nil), // 9: odpf.assets.common.Timestamp + (*common.Event)(nil), // 10: odpf.assets.common.Event } var file_odpf_assets_table_proto_depIdxs = []int32{ - 2, // 0: odpf.assets.Table.resource:type_name -> odpf.assets.common.Resource - 1, // 1: odpf.assets.Table.profile:type_name -> odpf.assets.TableProfile - 3, // 2: odpf.assets.Table.schema:type_name -> odpf.assets.facets.Columns - 4, // 3: odpf.assets.Table.preview:type_name -> odpf.assets.facets.Preview - 5, // 4: odpf.assets.Table.ownership:type_name -> odpf.assets.facets.Ownership - 6, // 5: odpf.assets.Table.lineage:type_name -> odpf.assets.facets.Lineage - 7, // 6: odpf.assets.Table.properties:type_name -> odpf.assets.facets.Properties - 8, // 7: odpf.assets.Table.timestamps:type_name -> odpf.assets.common.Timestamp - 9, // 8: odpf.assets.Table.event:type_name -> odpf.assets.common.Event - 9, // [9:9] is the sub-list for method output_type - 9, // [9:9] is the sub-list for method input_type - 9, // [9:9] is the sub-list for extension type_name - 9, // [9:9] is the sub-list for extension extendee - 0, // [0:9] is the sub-list for field type_name + 3, // 0: odpf.assets.Table.resource:type_name -> odpf.assets.common.Resource + 1, // 1: odpf.assets.Table.profile:type_name -> odpf.assets.TableProfile + 4, // 2: odpf.assets.Table.schema:type_name -> odpf.assets.facets.Columns + 5, // 3: odpf.assets.Table.preview:type_name -> odpf.assets.facets.Preview + 6, // 4: odpf.assets.Table.ownership:type_name -> odpf.assets.facets.Ownership + 7, // 5: odpf.assets.Table.lineage:type_name -> odpf.assets.facets.Lineage + 8, // 6: odpf.assets.Table.properties:type_name -> odpf.assets.facets.Properties + 9, // 7: odpf.assets.Table.timestamps:type_name -> odpf.assets.common.Timestamp + 10, // 8: odpf.assets.Table.event:type_name -> odpf.assets.common.Event + 2, // 9: odpf.assets.TableProfile.common_join:type_name -> odpf.assets.TableCommonJoin + 10, // [10:10] is the sub-list for method output_type + 10, // [10:10] is the sub-list for method input_type + 10, // [10:10] is the sub-list for extension type_name + 10, // [10:10] is the sub-list for extension extendee + 0, // [0:10] is the sub-list for field type_name } func init() { file_odpf_assets_table_proto_init() } @@ -362,6 +447,18 @@ func file_odpf_assets_table_proto_init() { return nil } } + file_odpf_assets_table_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*TableCommonJoin); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -369,7 +466,7 @@ func file_odpf_assets_table_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_odpf_assets_table_proto_rawDesc, NumEnums: 0, - NumMessages: 2, + NumMessages: 3, NumExtensions: 0, NumServices: 0, }, diff --git a/plugins/extractors/bigquery/README.md b/plugins/extractors/bigquery/README.md index aaef69f8f..9490bb610 100644 --- a/plugins/extractors/bigquery/README.md +++ b/plugins/extractors/bigquery/README.md @@ -21,6 +21,8 @@ source: "auth_provider_x509_cert_url": "xxxxxxx", "client_x509_cert_url": "xxxxxxx" } + collect_table_usage: false + usage_period_in_day: 7 ``` ## Inputs @@ -32,6 +34,8 @@ source: | `table_pattern` | `string` | `gofood.fact_` | Regex pattern to filter which bigquery table to scan (whitelist) | *optional* | | `include_column_profile` | `bool` | `true` | true if you want to profile the column value such min, max, med, avg, top, and freq | *optional* | | `max_preview_rows` | `int` | `30` | max number of preview rows to fetch, `0` will skip preview fetching. Default to `30`. | *optional* | +| `collect_table_usage` | `bools` | `false` | toggle feature to collect table usage, `true` will enable collecting table usage. Default to `false`. | *optional* | +| `usage_period_in_day` | `int` | `7` | collecting log from `(now - usage_period_in_day)` until `now`. only matter if `collect_table_usage` is true. Default to `7`. | *optional* | ### *Notes* @@ -46,6 +50,8 @@ Leaving `credentials_json` blank will default to [Google's default authenticatio | `resource.service` | `bigquery` | | `description` | `table description` | | `profile.total_rows` | `2100` | +| `profile.usage_count` | `15` | +| `profile.common_join` | [][CommonJoin](#CommonJoin) | | `schema` | [][Column](#column) | ### Column @@ -59,6 +65,14 @@ Leaving `credentials_json` blank will default to [Google's default authenticatio | `length` | `12,2` | | `profile` | `{"min":...,"max": ...,"unique": ...}` | +### CommonJoin + +| Field | Sample Value | +| :---- | :---- | +| `urn` | `project_id.dataset_name.table_name` | +| `count` | `3` | + + ## Contributing Refer to the [contribution guidelines](../../../docs/contribute/guide.md#adding-a-new-extractor) for information on contributing to this module. diff --git a/plugins/extractors/bigquery/auditlog/auditlog.go b/plugins/extractors/bigquery/auditlog/auditlog.go new file mode 100644 index 000000000..9242744ec --- /dev/null +++ b/plugins/extractors/bigquery/auditlog/auditlog.go @@ -0,0 +1,177 @@ +package auditlog + +import ( + "context" + "fmt" + "time" + + "cloud.google.com/go/logging/logadmin" + "github.com/odpf/meteor/plugins" + "github.com/odpf/meteor/utils" + "github.com/odpf/salt/log" + "github.com/pkg/errors" + "google.golang.org/api/iterator" + "google.golang.org/api/option" + auditpb "google.golang.org/genproto/googleapis/cloud/audit" + loggingpb "google.golang.org/genproto/googleapis/cloud/bigquery/logging/v1" + "google.golang.org/protobuf/encoding/protojson" +) + +type Config struct { + ProjectID string `mapstructure:"project_id" validate:"required"` + ServiceAccountJSON string `mapstructure:"service_account_json"` + IsCollectTableUsage bool `mapstructure:"collect_table_usage" default:"false"` + UsagePeriodInDay int64 `mapstructure:"usage_period_in_day" default:"7"` +} + +const advancedFilterTemplate = `protoPayload.methodName="jobservice.jobcompleted" AND ` + + `resource.type="bigquery_resource" AND NOT ` + + `protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.query.query:(INFORMATION_SCHEMA OR __TABLES__) AND ` + + `timestamp >= "%s" AND timestamp < "%s"` + +type AuditLog struct { + logger log.Logger + client *logadmin.Client + config Config +} + +func New(logger log.Logger) *AuditLog { + return &AuditLog{ + logger: logger, + } +} + +func (l *AuditLog) Init(ctx context.Context, configMap map[string]interface{}) (err error) { + err = utils.BuildConfig(configMap, &l.config) + if err != nil { + return plugins.InvalidConfigError{} + } + + l.client, err = l.createClient(ctx) + if err != nil { + err = errors.Wrap(err, "failed to create logadmin client") + return + } + + return +} + +func (l *AuditLog) createClient(ctx context.Context) (client *logadmin.Client, err error) { + if l.config.ServiceAccountJSON == "" { + l.logger.Info("credentials are not specified, creating logadmin using default credentials...") + client, err = logadmin.NewClient(ctx, l.config.ProjectID) + return + } + + client, err = logadmin.NewClient(ctx, l.config.ProjectID, option.WithCredentialsJSON([]byte(l.config.ServiceAccountJSON))) + if client == nil { + err = errors.New("client is nil, failed initiating client") + } + return +} + +func (l *AuditLog) Collect(ctx context.Context) (tableStats *TableStats, err error) { + tableStats = NewTableStats() + + filter := l.buildFilter() + it := l.client.Entries(ctx, + logadmin.ProjectIDs([]string{l.config.ProjectID}), + logadmin.Filter(filter)) + + l.logger.Info("getting logs with the filter", "filter", filter) + + for { + entry, errF := it.Next() + if errF == iterator.Done { + break + } + if errF != nil { + err = errors.Wrap(errF, "error iterating logEntries") + break + } + logData, errF := parsePayload(entry.Payload) + if errF != nil { + l.logger.Warn("error parsing LogEntry payload", "payload", entry.Payload) + continue + } + + tableStats.Populate(logData) + } + return +} + +func (l *AuditLog) buildFilter() string { + + timeNow := time.Now().UTC() + dayDuration := time.Duration(24*l.config.UsagePeriodInDay) * time.Hour + timeFrom := timeNow.Add(-1 * dayDuration) + + timeNowFormatted := timeNow.Format(time.RFC3339) + timeFromFormatted := timeFrom.Format(time.RFC3339) + + return fmt.Sprintf(advancedFilterTemplate, timeFromFormatted, timeNowFormatted) +} + +func parsePayload(payload interface{}) (ld *LogData, err error) { + + ad := &loggingpb.AuditData{} + pl, ok := payload.(*auditpb.AuditLog) + if !ok { + err = errors.New("cannot parse payload to AuditLog") + return + } + + if errPB := getAuditData(pl, ad); errPB != nil { + err = errors.Wrap(err, "failed to get audit data from metadata") + return + } + + ld = &LogData{ad} + err = ld.validateAuditData() + return +} + +func getAuditData(pl *auditpb.AuditLog, ad *loggingpb.AuditData) (err error) { + // ServiceData is deprecated and suggested to be replaced with Metadata + // But in some logs, ServiceData is still being used + //nolint:staticcheck + if pl.GetServiceData() == nil { + // perhaps with metadata + if errPB := getAuditDataFromMetadata(pl, ad); errPB != nil { + err = errors.Wrap(err, "failed to get audit data from metadata") + return + } + err = errors.New("serviceData not found") + return + } + + // if ServiceData is not nil, the log is still using the old one + if errPB := getAuditDataFromServiceData(pl, ad); errPB != nil { + err = errors.Wrap(err, "failed to get audit data from service data") + return + } + return +} + +func getAuditDataFromServiceData(pl *auditpb.AuditLog, ad *loggingpb.AuditData) (err error) { + //nolint:staticcheck + if errPB := pl.GetServiceData().UnmarshalTo(ad); errPB != nil { + err = errors.New("failed to marshal service data to audit data") + return + } + return +} + +func getAuditDataFromMetadata(pl *auditpb.AuditLog, ad *loggingpb.AuditData) (err error) { + mdJSON, err := pl.GetMetadata().MarshalJSON() + if err != nil { + err = errors.New("cannot marshal payload metadata") + return + } + + if errPB := protojson.Unmarshal(mdJSON, ad); errPB != nil { + err = errors.New("cannot parse service data to Audit") + return + } + return +} diff --git a/plugins/extractors/bigquery/auditlog/auditlog_test.go b/plugins/extractors/bigquery/auditlog/auditlog_test.go new file mode 100644 index 000000000..8a9610aad --- /dev/null +++ b/plugins/extractors/bigquery/auditlog/auditlog_test.go @@ -0,0 +1,34 @@ +package auditlog + +import ( + "context" + "testing" + + "github.com/odpf/meteor/plugins" + "github.com/odpf/meteor/test/utils" + "github.com/stretchr/testify/assert" +) + +func TestInit(t *testing.T) { + t.Run("should return error if config is invalid", func(t *testing.T) { + la := New(utils.Logger) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + err := la.Init(ctx, map[string]interface{}{ + "wrong-config": "sample-project", + }) + + assert.Equal(t, plugins.InvalidConfigError{}, err) + }) + + t.Run("should not return invalid config error if config is valid", func(t *testing.T) { + la := New(utils.Logger) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + err := la.Init(ctx, map[string]interface{}{ + "project_id": "sample-project", + }) + + assert.NotEqual(t, plugins.InvalidConfigError{}, err) + }) +} diff --git a/plugins/extractors/bigquery/auditlog/logdata.go b/plugins/extractors/bigquery/auditlog/logdata.go new file mode 100644 index 000000000..3c72dd558 --- /dev/null +++ b/plugins/extractors/bigquery/auditlog/logdata.go @@ -0,0 +1,68 @@ +package auditlog + +import ( + "github.com/odpf/meteor/models" + "github.com/pkg/errors" + loggingpb "google.golang.org/genproto/googleapis/cloud/bigquery/logging/v1" +) + +const serviceName = "bigquery" + +type LogData struct { + *loggingpb.AuditData +} + +func (ld *LogData) validateAuditData() (err error) { + if ld.GetJobCompletedEvent() == nil { + err = errors.New("can't found jobCompletedEvent field") + return + } + + job := ld.GetJobCompletedEvent().GetJob() + if job == nil { + err = errors.New("can't found jobCompletedEvent.job field") + return + } + + jobStatus := job.GetJobStatus() + if jobStatus == nil { + err = errors.New("can't found jobCompletedEvent.job.jobStatus field") + return + } + + jobState := jobStatus.GetState() + if jobState == "" { + err = errors.New("jobCompletedEvent.job.jobStatus.state is empty") + return + } + + // ignoring the job that has not finished + if jobState != "DONE" { + err = errors.New("job status state is not DONE") + return + } + + if jobStatus.GetError() != nil { + if jobErrMsg := jobStatus.GetError().GetMessage(); jobErrMsg != "" { + err = errors.Errorf("job status has error: %s", jobErrMsg) + return + } + err = errors.Errorf("job status error is not nil but cannot get the message") + return + } + + return +} + +func (ld *LogData) GetReferencedTablesURN() (refTablesURN []string) { + refTablesURN = []string{} + var stats *loggingpb.JobStatistics + if stats = ld.GetJobCompletedEvent().GetJob().GetJobStatistics(); stats == nil { + return + } + for _, rt := range stats.ReferencedTables { + tableURN := models.TableURN(serviceName, rt.ProjectId, rt.DatasetId, rt.TableId) + refTablesURN = append(refTablesURN, tableURN) + } + return +} diff --git a/plugins/extractors/bigquery/auditlog/logdata_test.go b/plugins/extractors/bigquery/auditlog/logdata_test.go new file mode 100644 index 000000000..a451bb93f --- /dev/null +++ b/plugins/extractors/bigquery/auditlog/logdata_test.go @@ -0,0 +1,168 @@ +package auditlog + +import ( + "testing" + + "github.com/odpf/meteor/models" + "github.com/stretchr/testify/assert" + loggingpb "google.golang.org/genproto/googleapis/cloud/bigquery/logging/v1" + statuspb "google.golang.org/genproto/googleapis/rpc/status" +) + +func TestValidateAuditData(t *testing.T) { + + t.Run("return error if AuditData does not have JobCompletedEvent data", func(t *testing.T) { + ld := &LogData{ + &loggingpb.AuditData{}, + } + err := ld.validateAuditData() + + assert.EqualError(t, err, "can't found jobCompletedEvent field") + }) + + t.Run("return error if JobCompletedEvent does not have Job data", func(t *testing.T) { + ld := &LogData{ + &loggingpb.AuditData{ + JobCompletedEvent: &loggingpb.JobCompletedEvent{ + EventName: "", + }, + }, + } + err := ld.validateAuditData() + + assert.EqualError(t, err, "can't found jobCompletedEvent.job field") + }) + + t.Run("return error if JobCompletedEvent.Job does not have JobStatus info", func(t *testing.T) { + ld := &LogData{ + &loggingpb.AuditData{ + JobCompletedEvent: &loggingpb.JobCompletedEvent{ + EventName: "", + Job: &loggingpb.Job{}, + }, + }, + } + err := ld.validateAuditData() + + assert.EqualError(t, err, "can't found jobCompletedEvent.job.jobStatus field") + }) + + t.Run("return error if JobCompletedEvent.Job.JobStatus's state is empty", func(t *testing.T) { + ld := &LogData{ + &loggingpb.AuditData{ + JobCompletedEvent: &loggingpb.JobCompletedEvent{ + EventName: "", + Job: &loggingpb.Job{ + JobStatus: &loggingpb.JobStatus{}, + }, + }, + }, + } + err := ld.validateAuditData() + + assert.EqualError(t, err, "jobCompletedEvent.job.jobStatus.state is empty") + }) + + t.Run("return error if JobCompletedEvent.Job.JobStatus's state is not done", func(t *testing.T) { + ld := &LogData{ + &loggingpb.AuditData{ + JobCompletedEvent: &loggingpb.JobCompletedEvent{ + EventName: "", + Job: &loggingpb.Job{ + JobStatus: &loggingpb.JobStatus{ + State: "WORKING", + }, + }, + }, + }, + } + err := ld.validateAuditData() + + assert.EqualError(t, err, "job status state is not DONE") + }) + + t.Run("return error if JobCompletedEvent.Job.JobStatus error is not nil but empty", func(t *testing.T) { + ld := &LogData{ + &loggingpb.AuditData{ + JobCompletedEvent: &loggingpb.JobCompletedEvent{ + EventName: "", + Job: &loggingpb.Job{ + JobStatus: &loggingpb.JobStatus{ + State: "DONE", + Error: &statuspb.Status{}, + }, + }, + }, + }, + } + err := ld.validateAuditData() + + assert.EqualError(t, err, "job status error is not nil but cannot get the message") + }) + + t.Run("return error if JobCompletedEvent.Job.JobStatus error is not nil and has an error message", func(t *testing.T) { + ld := &LogData{ + &loggingpb.AuditData{ + JobCompletedEvent: &loggingpb.JobCompletedEvent{ + EventName: "", + Job: &loggingpb.Job{ + JobStatus: &loggingpb.JobStatus{ + State: "DONE", + Error: &statuspb.Status{ + Message: "error parsing the data", + }, + }, + }, + }, + }, + } + err := ld.validateAuditData() + + assert.EqualError(t, err, "job status has error: error parsing the data") + }) + + t.Run("return nil if AuditData's Job is DONE and no error", func(t *testing.T) { + ld := &LogData{ + &loggingpb.AuditData{ + JobCompletedEvent: &loggingpb.JobCompletedEvent{ + EventName: "", + Job: &loggingpb.Job{ + JobStatus: &loggingpb.JobStatus{ + State: "DONE", + }, + }, + }, + }, + } + err := ld.validateAuditData() + + assert.Nil(t, err) + }) +} + +func TestGetReferencedTablesURN(t *testing.T) { + t.Run("return empty slice if JobStatistics is nil", func(t *testing.T) { + ld := &LogData{ + &loggingpb.AuditData{ + JobCompletedEvent: &loggingpb.JobCompletedEvent{ + EventName: "", + Job: &loggingpb.Job{}, + }, + }, + } + rts := ld.GetReferencedTablesURN() + + assert.Empty(t, rts) + }) + + t.Run("return slice of urns if referenced tables exists if JobStatistics", func(t *testing.T) { + rts := testDataLogData1.GetReferencedTablesURN() + + expectedRefTablesURN := []string{ + models.TableURN("bigquery", "project1", "dataset1", "table1"), + models.TableURN("bigquery", "project2", "dataset1", "table1"), + models.TableURN("bigquery", "project3", "dataset1", "table1"), + } + assert.EqualValues(t, expectedRefTablesURN, rts) + }) +} diff --git a/plugins/extractors/bigquery/auditlog/stats.go b/plugins/extractors/bigquery/auditlog/stats.go new file mode 100644 index 000000000..33a859142 --- /dev/null +++ b/plugins/extractors/bigquery/auditlog/stats.go @@ -0,0 +1,71 @@ +package auditlog + +type TableStats struct { + TableUsage map[string]int64 `json:"table_usage_count"` + JoinUsage map[string]map[string]int64 `json:"join"` + processedLog *LogData `json:"-"` +} + +func NewTableStats() *TableStats { + ts := &TableStats{} + ts.initPopulate() + return ts +} + +func (b *TableStats) initPopulate() { + b.TableUsage = map[string]int64{} + b.JoinUsage = map[string]map[string]int64{} +} + +func (b *TableStats) Populate(ld *LogData) { + b.processedLog = ld + + // if 0, query is not involving table + refTablesURN := b.processedLog.GetReferencedTablesURN() + if len(refTablesURN) == 0 { + return + } + + // populate table usage + b.populateTableUsage(refTablesURN) + // populate join usage + b.populateJoinUsage(refTablesURN) +} + +func (b *TableStats) populateTableUsage(refTablesURN []string) { + for _, rt := range refTablesURN { + + // single table usage + if _, exist := b.TableUsage[rt]; !exist { + b.TableUsage[rt] = 0 + } + b.TableUsage[rt]++ + } +} + +func (b *TableStats) populateJoinUsage(refTablesURN []string) { + // no common join if only 1 referenced tables + if len(refTablesURN) < 2 { + return + } + + for _, rtPtr := range refTablesURN { + if _, exist := b.JoinUsage[rtPtr]; !exist { + b.JoinUsage[rtPtr] = map[string]int64{} + } + + for _, selectedTableURN := range refTablesURN { + if selectedTableURN == rtPtr { + continue + } + + juCnt, exist := b.JoinUsage[rtPtr][selectedTableURN] + if !exist { + b.JoinUsage[rtPtr][selectedTableURN] = 1 + continue + } + juCnt++ + b.JoinUsage[rtPtr][selectedTableURN] = juCnt + } + } +} diff --git a/plugins/extractors/bigquery/auditlog/stats_test.go b/plugins/extractors/bigquery/auditlog/stats_test.go new file mode 100644 index 000000000..4d8504333 --- /dev/null +++ b/plugins/extractors/bigquery/auditlog/stats_test.go @@ -0,0 +1,73 @@ +package auditlog + +import ( + "testing" + + "github.com/stretchr/testify/assert" + loggingpb "google.golang.org/genproto/googleapis/cloud/bigquery/logging/v1" +) + +func TestCreatingTableStats(t *testing.T) { + ts := NewTableStats() + + assert.EqualValues(t, ts.TableUsage, map[string]int64{}) + assert.EqualValues(t, ts.JoinUsage, map[string]map[string]int64{}) +} + +func TestPopulate(t *testing.T) { + t.Run("populate table usage by counting every table in referenced tables", func(t *testing.T) { + ts := NewTableStats() + + ts.populateTableUsage(testDataRefTables1) + ts.populateTableUsage(testDataRefTables2) + ts.populateTableUsage(testDataRefTables3) + ts.populateTableUsage(testDataRefTables4) + + assert.EqualValues(t, testDataTableUsage1234, ts.TableUsage) + }) + + t.Run("populate join usage by counting every table occurences in referenced tables", func(t *testing.T) { + ts := NewTableStats() + + ts.populateJoinUsage(testDataRefTables1) + ts.populateJoinUsage(testDataRefTables2) + ts.populateJoinUsage(testDataRefTables3) + ts.populateJoinUsage(testDataRefTables4) + + assert.EqualValues(t, testDataJoinUsage1234, ts.JoinUsage) + }) + + t.Run("populate all usage data from log data", func(t *testing.T) { + ts := NewTableStats() + + ts.Populate(testDataLogData1) + ts.Populate(testDataLogData2) + ts.Populate(testDataLogData3) + ts.Populate(testDataLogData4) + + assert.EqualValues(t, testDataTableUsage1234, ts.TableUsage) + assert.EqualValues(t, testDataJoinUsage1234, ts.JoinUsage) + }) + + t.Run("not populating table stats if no referenced tables found in log data", func(t *testing.T) { + ld := &LogData{ + &loggingpb.AuditData{ + JobCompletedEvent: &loggingpb.JobCompletedEvent{ + EventName: "", + Job: &loggingpb.Job{ + JobStatistics: &loggingpb.JobStatistics{ + ReferencedTables: []*loggingpb.TableName{}, + }, + }, + }, + }, + } + + ts := NewTableStats() + + ts.Populate(ld) + + assert.Empty(t, ts.TableUsage) + assert.Empty(t, ts.JoinUsage) + }) +} diff --git a/plugins/extractors/bigquery/auditlog/testdata.go b/plugins/extractors/bigquery/auditlog/testdata.go new file mode 100644 index 000000000..3dcdbc127 --- /dev/null +++ b/plugins/extractors/bigquery/auditlog/testdata.go @@ -0,0 +1,151 @@ +package auditlog + +import ( + "github.com/odpf/meteor/models" + loggingpb "google.golang.org/genproto/googleapis/cloud/bigquery/logging/v1" +) + +var testDataRefTables1 = []string{ + models.TableURN("bigquery", "project1", "dataset1", "table1"), + models.TableURN("bigquery", "project2", "dataset1", "table1"), + models.TableURN("bigquery", "project3", "dataset1", "table1"), +} + +var testDataRefTables2 = []string{ + models.TableURN("bigquery", "project1", "dataset1", "table1"), + models.TableURN("bigquery", "project3", "dataset1", "table1"), + models.TableURN("bigquery", "project4", "dataset1", "table1"), +} + +var testDataRefTables3 = []string{ + models.TableURN("bigquery", "project1", "dataset1", "table1"), + models.TableURN("bigquery", "project3", "dataset1", "table1"), +} + +var testDataRefTables4 = []string{ + models.TableURN("bigquery", "project1", "dataset1", "table1"), +} + +var testDataLogData1 = &LogData{ + &loggingpb.AuditData{ + JobCompletedEvent: &loggingpb.JobCompletedEvent{ + EventName: "", + Job: &loggingpb.Job{ + JobStatistics: &loggingpb.JobStatistics{ + ReferencedTables: []*loggingpb.TableName{ + { + ProjectId: "project1", + DatasetId: "dataset1", + TableId: "table1", + }, { + ProjectId: "project2", + DatasetId: "dataset1", + TableId: "table1", + }, { + ProjectId: "project3", + DatasetId: "dataset1", + TableId: "table1", + }, + }, + }, + }, + }, + }, +} + +var testDataLogData2 = &LogData{ + &loggingpb.AuditData{ + JobCompletedEvent: &loggingpb.JobCompletedEvent{ + EventName: "", + Job: &loggingpb.Job{ + JobStatistics: &loggingpb.JobStatistics{ + ReferencedTables: []*loggingpb.TableName{ + { + ProjectId: "project1", + DatasetId: "dataset1", + TableId: "table1", + }, { + ProjectId: "project3", + DatasetId: "dataset1", + TableId: "table1", + }, { + ProjectId: "project4", + DatasetId: "dataset1", + TableId: "table1", + }, + }, + }, + }, + }, + }, +} + +var testDataLogData3 = &LogData{ + &loggingpb.AuditData{ + JobCompletedEvent: &loggingpb.JobCompletedEvent{ + EventName: "", + Job: &loggingpb.Job{ + JobStatistics: &loggingpb.JobStatistics{ + ReferencedTables: []*loggingpb.TableName{ + { + ProjectId: "project1", + DatasetId: "dataset1", + TableId: "table1", + }, { + ProjectId: "project3", + DatasetId: "dataset1", + TableId: "table1", + }, + }, + }, + }, + }, + }, +} + +var testDataLogData4 = &LogData{ + &loggingpb.AuditData{ + JobCompletedEvent: &loggingpb.JobCompletedEvent{ + EventName: "", + Job: &loggingpb.Job{ + JobStatistics: &loggingpb.JobStatistics{ + ReferencedTables: []*loggingpb.TableName{ + { + ProjectId: "project1", + DatasetId: "dataset1", + TableId: "table1", + }, + }, + }, + }, + }, + }, +} + +var testDataJoinUsage1234 = map[string]map[string]int64{ + models.TableURN("bigquery", "project1", "dataset1", "table1"): { + models.TableURN("bigquery", "project2", "dataset1", "table1"): 1, + models.TableURN("bigquery", "project3", "dataset1", "table1"): 3, + models.TableURN("bigquery", "project4", "dataset1", "table1"): 1, + }, + models.TableURN("bigquery", "project2", "dataset1", "table1"): { + models.TableURN("bigquery", "project1", "dataset1", "table1"): 1, + models.TableURN("bigquery", "project3", "dataset1", "table1"): 1, + }, + models.TableURN("bigquery", "project3", "dataset1", "table1"): { + models.TableURN("bigquery", "project1", "dataset1", "table1"): 3, + models.TableURN("bigquery", "project2", "dataset1", "table1"): 1, + models.TableURN("bigquery", "project4", "dataset1", "table1"): 1, + }, + models.TableURN("bigquery", "project4", "dataset1", "table1"): { + models.TableURN("bigquery", "project1", "dataset1", "table1"): 1, + models.TableURN("bigquery", "project3", "dataset1", "table1"): 1, + }, +} + +var testDataTableUsage1234 = map[string]int64{ + models.TableURN("bigquery", "project1", "dataset1", "table1"): 4, + models.TableURN("bigquery", "project2", "dataset1", "table1"): 1, + models.TableURN("bigquery", "project3", "dataset1", "table1"): 3, + models.TableURN("bigquery", "project4", "dataset1", "table1"): 1, +} diff --git a/plugins/extractors/bigquery/bigquery.go b/plugins/extractors/bigquery/bigquery.go index 9200dc2db..6a83a4274 100644 --- a/plugins/extractors/bigquery/bigquery.go +++ b/plugins/extractors/bigquery/bigquery.go @@ -14,6 +14,7 @@ import ( "github.com/odpf/meteor/models/odpf/assets/common" "github.com/odpf/meteor/models/odpf/assets/facets" "github.com/odpf/meteor/plugins" + "github.com/odpf/meteor/plugins/extractors/bigquery/auditlog" "github.com/odpf/meteor/registry" "github.com/odpf/meteor/utils" "github.com/odpf/salt/log" @@ -34,6 +35,8 @@ type Config struct { TablePattern string `mapstructure:"table_pattern"` IncludeColumnProfile bool `mapstructure:"include_column_profile"` MaxPreviewRows int `mapstructure:"max_preview_rows" default:"30"` + IsCollectTableUsage bool `mapstructure:"collect_table_usage" default:"false"` + UsagePeriodInDay int64 `mapstructure:"usage_period_in_day" default:"7"` } var sampleConfig = ` @@ -51,18 +54,24 @@ service_account_json: |- "token_uri": "https://oauth2.googleapis.com/token", "auth_provider_x509_cert_url": "xxxxxxx", "client_x509_cert_url": "xxxxxxx" - }` + } +collect_table_usage: false +usage_period_in_day: 7` // Extractor manages the communication with the bigquery service type Extractor struct { - logger log.Logger - client *bigquery.Client - config Config + logger log.Logger + client *bigquery.Client + config Config + galClient *auditlog.AuditLog + tableStats *auditlog.TableStats } func New(logger log.Logger) *Extractor { + galc := auditlog.New(logger) return &Extractor{ - logger: logger, + logger: logger, + galClient: galc, } } @@ -93,11 +102,27 @@ func (e *Extractor) Init(ctx context.Context, configMap map[string]interface{}) return errors.Wrap(err, "failed to create client") } + if e.config.IsCollectTableUsage { + errL := e.galClient.Init(ctx, configMap) + if errL != nil { + e.logger.Error("failed to create google audit log client", "err", errL) + } + } + return } // Extract checks if the table is valid and extracts the table schema func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) (err error) { + if e.config.IsCollectTableUsage { + // Fetch and extract logs first to build a map + ts, errL := e.galClient.Collect(ctx) + e.tableStats = ts + if errL != nil { + e.logger.Error("error populating table stats usage", errL) + } + } + // Fetch and iterate over datasets it := e.client.Datasets(ctx) for { @@ -150,6 +175,9 @@ func (e *Extractor) extractTable(ctx context.Context, ds *bigquery.Dataset, emit // Build the bigquery table metadata func (e *Extractor) buildTable(ctx context.Context, t *bigquery.Table, md *bigquery.TableMetadata) *assets.Table { tableFQN := t.FullyQualifiedName() + tableURN := models.TableURN("bigquery", t.ProjectID, t.DatasetID, t.TableID) + + tableProfile := e.buildTableProfile(tableURN) var partitionField string if md.TimePartitioning != nil { @@ -167,7 +195,7 @@ func (e *Extractor) buildTable(ctx context.Context, t *bigquery.Table, md *bigqu return &assets.Table{ Resource: &common.Resource{ - Urn: models.TableURN("bigquery", t.ProjectID, t.DatasetID, t.TableID), + Urn: tableURN, Name: t.TableID, Service: "bigquery", }, @@ -185,6 +213,7 @@ func (e *Extractor) buildTable(ctx context.Context, t *bigquery.Table, md *bigqu }), Labels: md.Labels, }, + Profile: tableProfile, Timestamps: &common.Timestamp{ CreateTime: timestamppb.New(md.CreationTime), UpdateTime: timestamppb.New(md.LastModifiedTime), diff --git a/plugins/extractors/bigquery/bigquery_test.go b/plugins/extractors/bigquery/bigquery_test.go index 520cc69e9..7b79f2a0c 100644 --- a/plugins/extractors/bigquery/bigquery_test.go +++ b/plugins/extractors/bigquery/bigquery_test.go @@ -1,3 +1,4 @@ +//go:build integration // +build integration package bigquery_test diff --git a/plugins/extractors/bigquery/profile.go b/plugins/extractors/bigquery/profile.go new file mode 100644 index 000000000..6b79925f5 --- /dev/null +++ b/plugins/extractors/bigquery/profile.go @@ -0,0 +1,30 @@ +package bigquery + +import "github.com/odpf/meteor/models/odpf/assets" + +func (e *Extractor) buildTableProfile(tableURN string) (tp *assets.TableProfile) { + var tableUsage int64 + var commonJoins []*assets.TableCommonJoin + + if e.config.IsCollectTableUsage && e.tableStats != nil { + // table usage + tableUsage = e.tableStats.TableUsage[tableURN] + + // common join + if cjList, exist := e.tableStats.JoinUsage[tableURN]; exist { + for cjURN, cjCount := range cjList { + commonJoins = append(commonJoins, &assets.TableCommonJoin{ + Urn: cjURN, + Count: cjCount, + }) + } + } + } + + tp = &assets.TableProfile{ + UsageCount: tableUsage, + CommonJoin: commonJoins, + } + + return +} diff --git a/plugins/extractors/bigquery/profile_test.go b/plugins/extractors/bigquery/profile_test.go new file mode 100644 index 000000000..062e0aa53 --- /dev/null +++ b/plugins/extractors/bigquery/profile_test.go @@ -0,0 +1,78 @@ +package bigquery + +import ( + "testing" + + "github.com/alecthomas/assert" + "github.com/odpf/meteor/models" + "github.com/odpf/meteor/models/odpf/assets" + "github.com/odpf/meteor/plugins/extractors/bigquery/auditlog" +) + +func TestBuildTableProfile(t *testing.T) { + tableURN := models.TableURN("bigquery", "project1", "dataset1", "table1") + t.Run("table profile usage related fields are empty if usage collection is disabled", func(t *testing.T) { + extr := &Extractor{ + config: Config{ + IsCollectTableUsage: false, + }, + } + + tp := extr.buildTableProfile(tableURN) + + assert.Empty(t, tp.UsageCount) + assert.Empty(t, tp.CommonJoin) + }) + + t.Run("table profile usage related fields are empty if table stats is nil", func(t *testing.T) { + extr := &Extractor{ + config: Config{ + IsCollectTableUsage: true, + }, + tableStats: nil, + } + + tp := extr.buildTableProfile(tableURN) + + assert.Empty(t, tp.UsageCount) + assert.Empty(t, tp.CommonJoin) + }) + + t.Run("table profile usage related fields are populated if table stats is not nil and usage collection is enabled", func(t *testing.T) { + extr := &Extractor{ + config: Config{ + IsCollectTableUsage: true, + }, + tableStats: &auditlog.TableStats{ + TableUsage: map[string]int64{ + models.TableURN("bigquery", "project1", "dataset1", "table1"): 5, + models.TableURN("bigquery", "project2", "dataset1", "table1"): 3, + models.TableURN("bigquery", "project3", "dataset1", "table1"): 1, + }, + JoinUsage: map[string]map[string]int64{ + models.TableURN("bigquery", "project1", "dataset1", "table1"): { + models.TableURN("bigquery", "project2", "dataset1", "table1"): 1, + models.TableURN("bigquery", "project3", "dataset1", "table1"): 3, + models.TableURN("bigquery", "project4", "dataset1", "table1"): 1, + }, + }, + }, + } + + tp := extr.buildTableProfile(tableURN) + + assert.EqualValues(t, 5, tp.UsageCount) + assert.Contains(t, tp.CommonJoin, &assets.TableCommonJoin{ + Urn: models.TableURN("bigquery", "project2", "dataset1", "table1"), + Count: 1, + }) + assert.Contains(t, tp.CommonJoin, &assets.TableCommonJoin{ + Urn: models.TableURN("bigquery", "project3", "dataset1", "table1"), + Count: 3, + }) + assert.Contains(t, tp.CommonJoin, &assets.TableCommonJoin{ + Urn: models.TableURN("bigquery", "project4", "dataset1", "table1"), + Count: 1, + }) + }) +}