diff --git a/async_worker_group.go b/async_worker_group.go index 3ed870a..d1a1e15 100644 --- a/async_worker_group.go +++ b/async_worker_group.go @@ -1,13 +1,14 @@ package bqstreamer import ( - "errors" "net/http" "sync" "time" "golang.org/x/oauth2" + "golang.org/x/oauth2/google" "golang.org/x/oauth2/jwt" + "google.golang.org/api/bigquery/v2" ) // AsyncWorkerGroup asynchronously streams rows to BigQuery in bulk. @@ -50,11 +51,21 @@ type AsyncWorkerGroup struct { } // New returns a new AsyncWorkerGroup using given OAuth2/JWT configuration. +// Set jwtConfig to nil if your system corresponds to either of the following conditions: +// - a system that has called "gcloud auth application-default login" +// - a system running in Google Application Engine +// - a system running in Google Compute Engine +// ref: https://developers.google.com/identity/protocols/application-default-credentials func NewAsyncWorkerGroup(jwtConfig *jwt.Config, options ...AsyncOptionFunc) (*AsyncWorkerGroup, error) { if jwtConfig == nil { - return nil, errors.New("jwt.Config is nil") + ctx := oauth2.NoContext + client, err := google.DefaultClient(ctx, bigquery.BigqueryInsertdataScope) + if err != nil { + return nil, err + } + newHTTPClient := func() *http.Client { return client } + return newAsyncWorkerGroup(newHTTPClient, options...) } - // Create a new Streamer, with OAuth2/JWT http.Client constructor function. newHTTPClient := func() *http.Client { return jwtConfig.Client(oauth2.NoContext) } return newAsyncWorkerGroup(newHTTPClient, options...) diff --git a/sync_worker.go b/sync_worker.go index be21a6a..6a1663a 100644 --- a/sync_worker.go +++ b/sync_worker.go @@ -3,6 +3,7 @@ package bqstreamer import ( + "context" "net/http" "time" @@ -146,16 +147,22 @@ func (w *SyncWorker) insertAll(insertFunc func(projectID, datasetID, tableID str // TODO cache bigquery service instead of creating a new one every insertTable() call // TODO add support for SkipInvalidRows, IgnoreUnknownValues func (w *SyncWorker) insertTable(projectID, datasetID, tableID string, tbl table) *TableInsertErrors { - res, err := bigquery.NewTabledataService(w.service). - InsertAll( - projectID, datasetID, tableID, - &bigquery.TableDataInsertAllRequest{ - Kind: "bigquery#tableDataInsertAllRequest", - Rows: tbl, - IgnoreUnknownValues: w.ignoreUnknownValues, - SkipInvalidRows: w.skipInvalidRows, - }). - Do() + tabledataInsertAllCall := bigquery.NewTabledataService(w.service).InsertAll( + projectID, datasetID, tableID, + &bigquery.TableDataInsertAllRequest{ + Kind: "bigquery#tableDataInsertAllRequest", + Rows: tbl, + IgnoreUnknownValues: w.ignoreUnknownValues, + SkipInvalidRows: w.skipInvalidRows, + }) + + // Set a timeout on the bigtable insert + // TODO make this configurable by passing through a context + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + tabledataInsertAllCall.Context(ctx) + res, err := tabledataInsertAllCall.Do() var rows []*bigquery.TableDataInsertAllResponseInsertErrors if res != nil {