Skip to content

Commit 4c92daf

Browse files
authored
Introduce remaining BigQuery to GA items (#686)
* introduce job configuration classes * update tests/docs and add copy/load * modify retry logic * add create on insert logic
1 parent 7da67d2 commit 4c92daf

32 files changed

Lines changed: 3686 additions & 633 deletions

src/BigQuery/BigQueryClient.php

Lines changed: 116 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,12 @@
2121
use Google\Cloud\BigQuery\Connection\Rest;
2222
use Google\Cloud\BigQuery\Exception\JobException;
2323
use Google\Cloud\BigQuery\Job;
24+
use Google\Cloud\Core\ArrayTrait;
2425
use Google\Cloud\Core\ClientTrait;
25-
use Google\Cloud\Core\ExponentialBackoff;
2626
use Google\Cloud\Core\Int64;
2727
use Google\Cloud\Core\Iterator\ItemIterator;
2828
use Google\Cloud\Core\Iterator\PageIterator;
29+
use Google\Cloud\Core\RetryDeciderTrait;
2930
use Psr\Cache\CacheItemPoolInterface;
3031
use Psr\Http\Message\StreamInterface;
3132

@@ -43,11 +44,14 @@
4344
*/
4445
class BigQueryClient
4546
{
47+
use ArrayTrait;
4648
use ClientTrait;
47-
use JobConfigurationTrait;
49+
use RetryDeciderTrait;
4850

4951
const VERSION = '0.2.2';
5052

53+
const MAX_DELAY_MICROSECONDS = 32000000;
54+
5155
const SCOPE = 'https://www.googleapis.com/auth/bigquery';
5256
const INSERT_SCOPE = 'https://www.googleapis.com/auth/bigquery.insertdata';
5357

@@ -57,7 +61,7 @@ class BigQueryClient
5761
protected $connection;
5862

5963
/**
60-
* @var ValueMapper $mapper Maps values between PHP and BigQuery.
64+
* @var ValueMapper Maps values between PHP and BigQuery.
6165
*/
6266
private $mapper;
6367

@@ -96,15 +100,85 @@ class BigQueryClient
96100
*/
97101
public function __construct(array $config = [])
98102
{
103+
$this->setHttpRetryCodes([]);
104+
$this->setHttpRetryMessages([
105+
'rateLimitExceeded',
106+
'backendError'
107+
]);
99108
$config += [
100109
'scopes' => [self::SCOPE],
101-
'returnInt64AsObject' => false
110+
'returnInt64AsObject' => false,
111+
'restRetryFunction' => $this->getRetryFunction(),
112+
'restDelayFunction' => function ($attempt) {
113+
return min(
114+
mt_rand(0, 1000000) + (pow(2, $attempt) * 1000000),
115+
self::MAX_DELAY_MICROSECONDS
116+
);
117+
}
102118
];
103119

104120
$this->connection = new Rest($this->configureAuthentication($config));
105121
$this->mapper = new ValueMapper($config['returnInt64AsObject']);
106122
}
107123

124+
/**
125+
* Returns a job configuration to be passed to either
126+
* {@see Google\Cloud\BigQuery\BigQueryClient::runQuery()} or
127+
* {@see Google\Cloud\BigQuery\BigQueryClient::startQuery()}. A
128+
* configuration can be built using fluent setters or by providing a full
129+
* set of options at once.
130+
*
131+
* Unless otherwise specified, all configuration options will default based
132+
* on the [Jobs configuration API documentation]
133+
* (https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration)
134+
* except for `configuration.query.useLegacySql`, which defaults to `false`
135+
* in this client.
136+
*
137+
* Example:
138+
* ```
139+
* $queryJobConfig = $bigQuery->query(
140+
* 'SELECT commit FROM `bigquery-public-data.github_repos.commits` LIMIT 100'
141+
* );
142+
* ```
143+
*
144+
* ```
145+
* // Set create disposition using fluent setters.
146+
* $queryJobConfig = $bigQuery->query(
147+
* 'SELECT commit FROM `bigquery-public-data.github_repos.commits` LIMIT 100'
148+
* )->createDisposition('CREATE_NEVER');
149+
* ```
150+
*
151+
* ```
152+
* // This is equivalent to the above example, using array configuration
153+
* // instead of fluent setters.
154+
* $queryJobConfig = $bigQuery->query(
155+
* 'SELECT commit FROM `bigquery-public-data.github_repos.commits` LIMIT 100',
156+
* [
157+
* 'configuration' => [
158+
* 'query' => [
159+
* 'createDisposition' => 'CREATE_NEVER'
160+
* ]
161+
* ]
162+
* ]
163+
* );
164+
* ```
165+
*
166+
* @param string $query A BigQuery SQL query.
167+
* @param array $options [optional] Please see the
168+
* [API documentation for Job configuration]
169+
* (https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration)
170+
* for the available options.
171+
* @return QueryJobConfiguration
172+
*/
173+
public function query($query, array $options = [])
174+
{
175+
return (new QueryJobConfiguration(
176+
$this->mapper,
177+
$this->projectId,
178+
$options
179+
))->query($query);
180+
}
181+
108182
/**
109183
* Runs a BigQuery SQL query in a synchronous fashion. Rows are returned
110184
* immediately as long as the query completes within a specified timeout. In
@@ -137,7 +211,10 @@ public function __construct(array $config = [])
137211
*
138212
* Example:
139213
* ```
140-
* $queryResults = $bigQuery->runQuery('SELECT commit FROM `bigquery-public-data.github_repos.commits` LIMIT 100');
214+
* $queryJobConfig = $bigQuery->query(
215+
* 'SELECT commit FROM `bigquery-public-data.github_repos.commits` LIMIT 100'
216+
* );
217+
* $queryResults = $bigQuery->runQuery($queryJobConfig);
141218
*
142219
* foreach ($queryResults as $row) {
143220
* echo $row['commit'];
@@ -148,12 +225,12 @@ public function __construct(array $config = [])
148225
* // Construct a query utilizing named parameters.
149226
* $query = 'SELECT commit FROM `bigquery-public-data.github_repos.commits`' .
150227
* 'WHERE author.date < @date AND message = @message LIMIT 100';
151-
* $queryResults = $bigQuery->runQuery($query, [
152-
* 'parameters' => [
228+
* $queryJobConfig = $bigQuery->query($query)
229+
* ->parameters([
153230
* 'date' => $bigQuery->timestamp(new \DateTime('1980-01-01 12:15:00Z')),
154231
* 'message' => 'A commit message.'
155-
* ]
156-
* ]);
232+
* ]);
233+
* $queryResults = $bigQuery->runQuery($queryJobConfig);
157234
*
158235
* foreach ($queryResults as $row) {
159236
* echo $row['commit'];
@@ -163,9 +240,9 @@ public function __construct(array $config = [])
163240
* ```
164241
* // Construct a query utilizing positional parameters.
165242
* $query = 'SELECT commit FROM `bigquery-public-data.github_repos.commits` WHERE message = ? LIMIT 100';
166-
* $queryResults = $bigQuery->runQuery($query, [
167-
* 'parameters' => ['A commit message.']
168-
* ]);
243+
* $queryJobConfig = $bigQuery->query($query)
244+
* ->parameters(['A commit message.']);
245+
* $queryResults = $bigQuery->runQuery($queryJobConfig);
169246
*
170247
* foreach ($queryResults as $row) {
171248
* echo $row['commit'];
@@ -174,7 +251,7 @@ public function __construct(array $config = [])
174251
*
175252
* @see https://cloud.google.com/bigquery/docs/reference/v2/jobs/query Query API documentation.
176253
*
177-
* @param string $query A BigQuery SQL query.
254+
* @param QueryJobConfiguration $query A BigQuery SQL query configuration.
178255
* @param array $options [optional] {
179256
* Configuration options.
180257
*
@@ -187,36 +264,23 @@ public function __construct(array $config = [])
187264
* milliseconds. **Defaults to** `10000` milliseconds (10 seconds).
188265
* @type int $maxRetries The number of times to retry, checking if the
189266
* query has completed. **Defaults to** `100`.
190-
* @type array $parameters Only available for standard SQL queries.
191-
* When providing a non-associative array positional parameters
192-
* (`?`) will be used. When providing an associative array
193-
* named parameters will be used (`@name`).
194-
* @type array $jobConfig Configuration settings for a query job are
195-
* outlined in the [API Docs for `configuration.query`](https://goo.gl/PuRa3I).
196-
* If not provided default settings will be used, with the exception
197-
* of `configuration.query.useLegacySql`, which defaults to `false`
198-
* in this client.
199267
* }
200268
* @return QueryResults
201269
* @throws JobException If the maximum number of retries while waiting for
202270
* query completion has been exceeded.
203271
*/
204-
public function runQuery($query, array $options = [])
272+
public function runQuery(JobConfigurationInterface $query, array $options = [])
205273
{
206-
$jobOptions = $this->pluckArray([
207-
'parameters',
208-
'jobConfig'
209-
], $options);
210274
$queryResultsOptions = $this->pluckArray([
211275
'maxResults',
212276
'startIndex',
213277
'timeoutMs',
214278
'maxRetries'
215279
], $options);
216280

217-
return $this->runQueryAsJob(
281+
return $this->startQuery(
218282
$query,
219-
$jobOptions + $options
283+
$options
220284
)->queryResults($queryResultsOptions + $options);
221285
}
222286

@@ -230,7 +294,10 @@ public function runQuery($query, array $options = [])
230294
*
231295
* Example:
232296
* ```
233-
* $job = $bigQuery->runQueryAsJob('SELECT commit FROM `bigquery-public-data.github_repos.commits` LIMIT 100');
297+
* $queryJobConfig = $bigQuery->query(
298+
* 'SELECT commit FROM `bigquery-public-data.github_repos.commits` LIMIT 100'
299+
* );
300+
* $job = $bigQuery->startQuery($queryJobConfig);
234301
* $queryResults = $job->queryResults();
235302
*
236303
* foreach ($queryResults as $row) {
@@ -240,52 +307,18 @@ public function runQuery($query, array $options = [])
240307
*
241308
* @see https://cloud.google.com/bigquery/docs/reference/v2/jobs/insert Jobs insert API documentation.
242309
*
243-
* @param string $query A BigQuery SQL query.
244-
* @param array $options [optional] {
245-
* Configuration options.
246-
*
247-
* @type array $parameters Only available for standard SQL queries.
248-
* When providing a non-associative array positional parameters
249-
* (`?`) will be used. When providing an associative array
250-
* named parameters will be used (`@name`).
251-
* @type array $jobConfig Configuration settings for a query job are
252-
* outlined in the [API Docs for `configuration.query`](https://goo.gl/PuRa3I).
253-
* If not provided default settings will be used, with the exception
254-
* of `configuration.query.useLegacySql`, which defaults to `false`
255-
* in this client.
256-
* @type string $jobIdPrefix If given, the returned job ID will be of
257-
* format `{$jobIdPrefix-}{jobId}`. **Defaults to** `null`.
258-
* }
310+
* @param QueryJobConfiguration $query A BigQuery SQL query configuration.
311+
* @param array $options [optional] Configuration options.
259312
* @return Job
260313
*/
261-
public function runQueryAsJob($query, array $options = [])
314+
public function startQuery(JobConfigurationInterface $query, array $options = [])
262315
{
263-
$options += [
264-
'jobConfig' => []
265-
];
266-
267-
if (isset($options['parameters'])) {
268-
$options['jobConfig'] += $this->formatQueryParameters($options['parameters']);
269-
270-
unset($options['parameters']);
271-
}
272-
273-
$options['jobConfig'] += [
274-
'useLegacySql' => false
275-
];
276-
277-
$config = $this->buildJobConfig(
278-
'query',
279-
$this->projectId,
280-
['query' => $query],
281-
$options
282-
);
283-
284-
$response = $this->connection->insertJob($config);
316+
$config = $query->toArray();
317+
$response = $this->connection->insertJob($config + $options);
285318

286319
return new Job(
287320
$this->connection,
288-
$response['jobReference']['jobId'],
321+
$config['jobReference']['jobId'],
289322
$this->projectId,
290323
$this->mapper,
291324
$response
@@ -302,7 +335,7 @@ public function runQueryAsJob($query, array $options = [])
302335
* $job = $bigQuery->job('myJobId');
303336
* ```
304337
*
305-
* @param string $id The id of the job to request.
338+
* @param string $id The id of the already run or running job to request.
306339
* @return Job
307340
*/
308341
public function job($id)
@@ -445,6 +478,9 @@ function (array $dataset) {
445478
/**
446479
* Creates a dataset.
447480
*
481+
* Please note that by default the library will not attempt to retry this
482+
* call on your behalf.
483+
*
448484
* Example:
449485
* ```
450486
* $dataset = $bigQuery->createDataset('aDataset');
@@ -469,12 +505,16 @@ public function createDataset($id, array $options = [])
469505
unset($options['metadata']);
470506
}
471507

472-
$response = $this->connection->insertDataset([
473-
'projectId' => $this->projectId,
474-
'datasetReference' => [
475-
'datasetId' => $id
508+
$response = $this->connection->insertDataset(
509+
[
510+
'projectId' => $this->projectId,
511+
'datasetReference' => [
512+
'datasetId' => $id
513+
]
476514
]
477-
] + $options);
515+
+ $options
516+
+ ['retries' => 0]
517+
);
478518

479519
return new Dataset(
480520
$this->connection,
@@ -565,30 +605,4 @@ public function timestamp(\DateTimeInterface $value)
565605
{
566606
return new Timestamp($value);
567607
}
568-
569-
/**
570-
* Formats query parameters for the API.
571-
*
572-
* @param array $parameters The parameters to format.
573-
* @return array
574-
*/
575-
private function formatQueryParameters(array $parameters)
576-
{
577-
$options = [
578-
'parameterMode' => $this->isAssoc($parameters) ? 'named' : 'positional',
579-
'useLegacySql' => false
580-
];
581-
582-
foreach ($parameters as $name => $value) {
583-
$param = $this->mapper->toParameter($value);
584-
585-
if ($options['parameterMode'] === 'named') {
586-
$param += ['name' => $name];
587-
}
588-
589-
$options['queryParameters'][] = $param;
590-
}
591-
592-
return $options;
593-
}
594608
}

src/BigQuery/Connection/Rest.php

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -246,12 +246,19 @@ private function resolveUploadOptions(array $args)
246246
$args += [
247247
'projectId' => null,
248248
'data' => null,
249-
'configuration' => []
249+
'configuration' => [],
250+
'labels' => [],
251+
'dryRun' => false,
252+
'jobReference' => []
250253
];
251254

252255
$args['data'] = Psr7\stream_for($args['data']);
253-
$args['metadata']['configuration'] = $args['configuration'];
254-
unset($args['configuration']);
256+
$args['metadata'] = $this->pluckArray([
257+
'labels',
258+
'dryRun',
259+
'jobReference',
260+
'configuration'
261+
], $args);
255262

256263
$uploaderOptionKeys = [
257264
'restOptions',

0 commit comments

Comments
 (0)