Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/bigquery/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,13 @@
"duplexify": "^3.2.0",
"extend": "^3.0.0",
"is": "^3.0.1",
"modelo": "^4.2.0",
"stream-events": "^1.0.1",
"string-format-obj": "^1.0.0"
},
"devDependencies": {
"@google-cloud/storage": "*",
"async": "^2.0.1",
"methmeth": "^1.1.0",
"mocha": "^3.0.1",
"node-uuid": "^1.4.3",
"propprop": "^0.3.0",
Expand Down
92 changes: 12 additions & 80 deletions packages/bigquery/src/job.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@
'use strict';

var common = require('@google-cloud/common');
var events = require('events');
var is = require('is');
var modelo = require('modelo');
var util = require('util');

/*! Developer Documentation
*
Expand Down Expand Up @@ -144,15 +143,13 @@ function Job(bigQuery, id) {
getMetadata: true
};

common.ServiceObject.call(this, {
common.Operation.call(this, {
parent: bigQuery,
baseUrl: '/jobs',
id: id,
methods: methods
});

events.EventEmitter.call(this);

this.bigQuery = bigQuery;

// The API endpoint for cancel is: .../bigquery/v2/project/projectId/...
Expand All @@ -166,14 +163,9 @@ function Job(bigQuery, id) {
return reqOpts;
}
});

this.completeListeners = 0;
this.hasActiveListeners = false;

this.listenForEvents_();
}

modelo.inherits(Job, common.ServiceObject, events.EventEmitter);
util.inherits(Job, common.Operation);

/**
* Cancel a job. Use {module:bigquery/job#getMetadata} to see if the cancel
Expand Down Expand Up @@ -313,93 +305,33 @@ Job.prototype.getQueryResultsStream = function(options) {
};

/**
* Convenience method that wraps the `complete` and `error` events in a
* Promise.
*
* @return {promise}
*
* @example
* job.promise().then(function(metadata) {
* // The job is complete.
* }, function(err) {
* // An error occurred during the job.
* });
*/
Job.prototype.promise = function() {
var self = this;

return new self.Promise(function(resolve, reject) {
self
.on('error', reject)
.on('complete', function(metadata) {
resolve([metadata]);
});
});
};

/**
* Begin listening for events on the job. This method keeps track of how many
* "complete" listeners are registered and removed, making sure polling is
* handled automatically.
* Poll for a status update. Execute the callback:
*
* As long as there is one active "complete" listener, the connection is open.
* When there are no more listeners, the polling stops.
* - callback(err): Job failed
* - callback(): Job incomplete
* - callback(null, metadata): Job complete
*
* @private
*/
Job.prototype.listenForEvents_ = function() {
var self = this;

this.on('newListener', function(event) {
if (event === 'complete') {
self.completeListeners++;

if (!self.hasActiveListeners) {
self.hasActiveListeners = true;
self.startPolling_();
}
}
});

this.on('removeListener', function(event) {
if (event === 'complete' && --self.completeListeners === 0) {
self.hasActiveListeners = false;
}
});
};

/**
* Poll `getMetadata` to check the operation's status. This runs a loop to ping
* the API on an interval.
*
* Note: This method is automatically called once a "complete" event handler is
* registered on the operation.
*
* @private
* @param {function} callback
*/
Job.prototype.startPolling_ = function() {
var self = this;

if (!this.hasActiveListeners) {
return;
}

Job.prototype.poll_ = function(callback) {
this.getMetadata(function(err, metadata, apiResponse) {
if (apiResponse.status && apiResponse.status.errors) {
err = common.util.ApiError(apiResponse.status);

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

}

if (err) {
self.emit('error', err);
callback(err);
return;
}

if (metadata.status.state !== 'DONE') {
setTimeout(self.startPolling_.bind(self), 500);
callback();
return;
}

self.emit('complete', metadata);
callback(null, metadata);
});
};

Expand Down
31 changes: 23 additions & 8 deletions packages/bigquery/system-test/bigquery.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

var assert = require('assert');
var async = require('async');
var exec = require('methmeth');
var fs = require('fs');
var uuid = require('node-uuid');

Expand Down Expand Up @@ -210,13 +211,31 @@ describe('BigQuery', function() {

job.getQueryResults(function(err, rows) {
assert.ifError(err);
assert.equal(rows.length, 100);
assert.equal(typeof rows[0].url, 'string');
assert.strictEqual(rows.length, 100);
assert.strictEqual(typeof rows[0].url, 'string');
done();
});
});
});

it('should run a query job as a promise', function() {
var job;

return bigquery.startQuery(query)
.then(function(response) {
job = response[0];
return job.promise();
})
.then(function() {
return job.getQueryResults();
})
.then(function(response) {
var rows = response[0];
assert.strictEqual(rows.length, 100);
assert.strictEqual(typeof rows[0].url, 'string');
});
});

it('should get query results as a stream', function(done) {
bigquery.startQuery(query, function(err, job) {
assert.ifError(err);
Expand Down Expand Up @@ -606,9 +625,7 @@ describe('BigQuery', function() {
return;
}

async.each(files, function(file, next) {
file.delete(next);
}, function(err) {
async.each(files, exec('delete'), function(err) {
if (err) {
callback(err);
return;
Expand Down Expand Up @@ -640,9 +657,7 @@ describe('BigQuery', function() {
return;
}

async.each(datasets, function(dataset, next) {
dataset.delete({ force: true }, next);
}, callback);
async.each(datasets, exec('delete', { force: true }), callback);
});
}
});
Loading