diff --git a/packages/common/package.json b/packages/common/package.json index a3c1e0616d2..5854493e876 100644 --- a/packages/common/package.json +++ b/packages/common/package.json @@ -51,6 +51,7 @@ "grpc": "^0.14.1", "is": "^3.0.1", "methmeth": "^1.0.0", + "modelo": "^4.2.0", "request": "^2.70.0", "retry-request": "^1.3.0", "split-array-stream": "^1.0.0", diff --git a/packages/common/src/grpc-operation.js b/packages/common/src/grpc-operation.js new file mode 100644 index 00000000000..ceaf72c59c4 --- /dev/null +++ b/packages/common/src/grpc-operation.js @@ -0,0 +1,198 @@ +/*! + * Copyright 2016 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*! + * @module common/grpcOperation + */ + +'use strict'; + +var events = require('events'); +var modelo = require('modelo'); + +/** + * @type {module:common/grpcService} + * @private + */ +var GrpcService = require('./grpc-service.js'); + +/** + * @type {module:common/grpcServiceObject} + * @private + */ +var GrpcServiceObject = require('./grpc-service-object.js'); + +/** + * @type {module:common/util} + * @private + */ +var util = require('./util.js'); + +// jscs:disable maximumLineLength +/** + * An Operation object allows you to interact with APIs that take longer to + * process things. + * + * @constructor + * @alias module:common/grpcOperation + * + * @param {module:common/grpcService|module:common/grpcServiceObject} parent - The + * parent object. This should be configured to use the longrunning.operation + * service. + * @param {string} name - The operation name. + */ +// jscs:enable maximumLineLength +function GrpcOperation(parent, name) { + var methods = { + + /** + * Deletes an operation. + */ + delete: { + protoOpts: { + service: 'Operations', + method: 'deleteOperation' + }, + reqOpts: { + name: name + } + }, + + /** + * Checks to see if an operation exists. + */ + exists: true, + + /** + * Retrieves the operation. + */ + get: true, + + /** + * Retrieves metadata for the operation. + */ + getMetadata: { + protoOpts: { + service: 'Operations', + method: 'getOperation' + }, + reqOpts: { + name: name + } + } + }; + + var config = { + parent: parent, + id: name, + methods: methods + }; + + GrpcServiceObject.call(this, config); + events.EventEmitter.call(this); + + this.completeListeners = 0; + this.hasActiveListeners = false; + + this.listenForEvents_(); +} + +modelo.inherits(GrpcOperation, GrpcServiceObject, events.EventEmitter); + +/** + * Cancel the operation. + * + * @param {function=} callback - The callback function. + * @param {?error} callback.err - An error returned while making this + * request. + * @param {object} callback.apiResponse - The full API response. + */ +GrpcOperation.prototype.cancel = function(callback) { + var protoOpts = { + service: 'Operations', + method: 'cancelOperation' + }; + + var reqOpts = { + name: this.id + }; + + this.request(protoOpts, reqOpts, callback || util.noop); +}; + +/** + * Begin listening for events on the operation. This method keeps track of how + * many "complete" listeners are registered and removed, making sure polling is + * handled automatically. + * + * As long as there is one active "complete" listener, the connection is open. + * When there are no more listeners, the polling stops. + * + * @private + */ +GrpcOperation.prototype.listenForEvents_ = function() { + var self = this; + + this.on('newListener', function(event) { + if (event === 'complete') { + self.completeListeners++; + + if (!self.hasActiveListeners) { + self.hasActiveListeners = true; + self.startPolling_(); + } + } + }); + + this.on('removeListener', function(event) { + if (event === 'complete' && --self.completeListeners === 0) { + self.hasActiveListeners = false; + } + }); +}; + +/** + * Poll `getMetadata` to check the operation's status. This runs a loop to ping + * the API on an interval. + * + * Note: This method is automatically called once a "complete" event handler is + * registered on the operation. + * + * @private + */ +GrpcOperation.prototype.startPolling_ = function() { + var self = this; + + if (!this.hasActiveListeners) { + return; + } + + this.getMetadata(function(err, resp) { + if (err || resp.error) { + self.emit('error', err || GrpcService.decorateStatus_(resp.result)); + return; + } + + if (!resp.done) { + setTimeout(self.startPolling_.bind(self), 500); + return; + } + + self.emit('complete', resp); + }); +}; + +module.exports = GrpcOperation; diff --git a/packages/common/src/index.js b/packages/common/src/index.js index 13858023fef..290a7ce60a0 100644 --- a/packages/common/src/index.js +++ b/packages/common/src/index.js @@ -14,6 +14,12 @@ * limitations under the License. */ +/** + * @type {module:common/grpcOperation} + * @private + */ +exports.GrpcOperation = require('./grpc-operation.js'); + /** * @type {module:common/grpcService} * @private diff --git a/packages/common/test/grpc-operation.js b/packages/common/test/grpc-operation.js new file mode 100644 index 00000000000..20b51a089c4 --- /dev/null +++ b/packages/common/test/grpc-operation.js @@ -0,0 +1,347 @@ +/*! + * Copyright 2016 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +'use strict'; + +var assert = require('assert'); +var proxyquire = require('proxyquire'); +var modelo = require('modelo'); +var EventEmitter = require('events').EventEmitter; +var util = require('../src/util.js'); +var nodeutil = require('util'); + +var GrpcServiceObject = require('../src/grpc-service-object.js'); +var GrpcService = require('../src/grpc-service.js'); + +function createFake(Class) { + function Fake() { + this.calledWith_ = arguments; + Class.apply(this, arguments); + } + + nodeutil.inherits(Fake, Class); + return Fake; +} + +var fakeModelo = { + inherits: function() { + this.calledWith_ = arguments; + modelo.inherits.apply(modelo, arguments); + } +}; + +var FakeGrpcServiceObject = createFake(GrpcServiceObject); +var FakeGrpcService = createFake(GrpcService); + +describe('GrpcOperation', function() { + var FAKE_SERVICE = {}; + var OPERATION_ID = '/a/b/c/d'; + + var GrpcOperation; + var grpcOperation; + + before(function() { + GrpcOperation = proxyquire('../src/grpc-operation.js', { + modelo: fakeModelo, + './grpc-service-object.js': FakeGrpcServiceObject, + './grpc-service.js': FakeGrpcService, + './util.js': util + }); + }); + + beforeEach(function() { + grpcOperation = new GrpcOperation(FAKE_SERVICE, OPERATION_ID); + }); + + describe('instantiation', function() { + it('should extend GrpcServiceObject and EventEmitter', function() { + var args = fakeModelo.calledWith_; + + assert.strictEqual(args[0], GrpcOperation); + assert.strictEqual(args[1], FakeGrpcServiceObject); + assert.strictEqual(args[2], EventEmitter); + }); + + it('should pass GrpcServiceObject the correct config', function() { + var config = grpcOperation.calledWith_[0]; + + assert.strictEqual(config.parent, FAKE_SERVICE); + assert.strictEqual(config.id, OPERATION_ID); + + assert.deepEqual(config.methods, { + delete: { + protoOpts: { + service: 'Operations', + method: 'deleteOperation' + }, + reqOpts: { + name: OPERATION_ID + } + }, + exists: true, + get: true, + getMetadata: { + protoOpts: { + service: 'Operations', + method: 'getOperation' + }, + reqOpts: { + name: OPERATION_ID + } + } + }); + }); + + it('should localize listener variables', function() { + assert.strictEqual(grpcOperation.completeListeners, 0); + assert.strictEqual(grpcOperation.hasActiveListeners, false); + }); + + it('should call listenForEvents_', function() { + var listenForEvents = GrpcOperation.prototype.listenForEvents_; + var called = false; + + GrpcOperation.prototype.listenForEvents_ = function() { + called = true; + }; + + new GrpcOperation(FAKE_SERVICE, OPERATION_ID); + assert.strictEqual(called, true); + GrpcOperation.prototype.listenForEvents_ = listenForEvents; + }); + }); + + describe('cancel', function() { + it('should provide the proper request options', function(done) { + grpcOperation.request = function(protoOpts, reqOpts, callback) { + assert.deepEqual(protoOpts, { + service: 'Operations', + method: 'cancelOperation' + }); + + assert.strictEqual(reqOpts.name, OPERATION_ID); + callback(); + }; + + grpcOperation.cancel(done); + }); + + it('should use util.noop if callback is omitted', function(done) { + grpcOperation.request = function(protoOpts, reqOpts, callback) { + assert.strictEqual(callback, util.noop); + done(); + }; + + grpcOperation.cancel(); + }); + }); + + describe('listenForEvents_', function() { + beforeEach(function() { + grpcOperation.startPolling_ = util.noop; + }); + + it('should start polling when complete listener is bound', function(done) { + grpcOperation.startPolling_ = function() { + done(); + }; + + grpcOperation.on('complete', util.noop); + }); + + it('should track the number of listeners', function() { + assert.strictEqual(grpcOperation.completeListeners, 0); + + grpcOperation.on('complete', util.noop); + assert.strictEqual(grpcOperation.completeListeners, 1); + + grpcOperation.removeListener('complete', util.noop); + assert.strictEqual(grpcOperation.completeListeners, 0); + }); + + it('should only run a single pulling loop', function() { + var startPollingCallCount = 0; + + grpcOperation.startPolling_ = function() { + startPollingCallCount++; + }; + + grpcOperation.on('complete', util.noop); + grpcOperation.on('complete', util.noop); + + assert.strictEqual(startPollingCallCount, 1); + }); + + it('should close when no more message listeners are bound', function() { + grpcOperation.on('complete', util.noop); + grpcOperation.on('complete', util.noop); + assert.strictEqual(grpcOperation.hasActiveListeners, true); + + grpcOperation.removeListener('complete', util.noop); + assert.strictEqual(grpcOperation.hasActiveListeners, true); + + grpcOperation.removeListener('complete', util.noop); + assert.strictEqual(grpcOperation.hasActiveListeners, false); + }); + }); + + describe('startPolling_', function() { + var listenForEvents_; + + before(function() { + listenForEvents_ = GrpcOperation.prototype.listenForEvents_; + }); + + after(function() { + GrpcOperation.prototype.listenForEvents_ = listenForEvents_; + }); + + beforeEach(function() { + GrpcOperation.prototype.listenForEvents_ = util.noop; + grpcOperation.hasActiveListeners = true; + }); + + afterEach(function() { + grpcOperation.hasActiveListeners = false; + }); + + it('should not call getMetadata if no listeners', function(done) { + grpcOperation.hasActiveListeners = false; + + grpcOperation.getMetadata = done; // if called, test will fail. + + grpcOperation.startPolling_(); + done(); + }); + + it('should call getMetadata if listeners are registered', function(done) { + grpcOperation.hasActiveListeners = true; + + grpcOperation.getMetadata = function() { + done(); + }; + + grpcOperation.startPolling_(); + }); + + describe('API error', function() { + var error = new Error('Error.'); + + beforeEach(function() { + grpcOperation.getMetadata = function(callback) { + callback(error); + }; + }); + + it('should emit the error', function(done) { + grpcOperation.on('error', function(err) { + assert.strictEqual(err, error); + done(); + }); + + grpcOperation.startPolling_(); + }); + }); + + describe('operation failure', function() { + var formattedError = { status: 'a' }; + + var apiResponse = { + error: true, + result: 'b' + }; + + beforeEach(function() { + grpcOperation.getMetadata = function(callback) { + callback(null, apiResponse, apiResponse); + }; + }); + + it('should emit the operation error', function(done) { + FakeGrpcService.decorateStatus_ = function(status) { + assert.strictEqual(status, apiResponse.result); + return formattedError; + }; + + grpcOperation.on('error', function(err) { + assert.strictEqual(err, formattedError); + done(); + }); + + grpcOperation.startPolling_(); + }); + }); + + describe('operation pending', function() { + var apiResponse = { done: false }; + var setTimeoutCached = global.setTimeout; + + beforeEach(function() { + grpcOperation.getMetadata = function(callback) { + callback(null, apiResponse, apiResponse); + }; + }); + + after(function() { + global.setTimeout = setTimeoutCached; + }); + + it('should call startPolling_ after 500 ms', function(done) { + var startPolling_ = grpcOperation.startPolling_; + var startPollingCalled = false; + + global.setTimeout = function(fn, timeoutMs) { + fn(); // should call startPolling_ + assert.strictEqual(timeoutMs, 500); + }; + + grpcOperation.startPolling_ = function() { + if (!startPollingCalled) { + // Call #1. + startPollingCalled = true; + startPolling_.apply(this, arguments); + return; + } + + // This is from the setTimeout call. + assert.strictEqual(this, grpcOperation); + done(); + }; + + grpcOperation.startPolling_(); + }); + }); + + describe('operation complete', function() { + var apiResponse = { done: true }; + + beforeEach(function() { + grpcOperation.getMetadata = function(callback) { + callback(null, apiResponse, apiResponse); + }; + }); + + it('should emit complete with metadata', function(done) { + grpcOperation.on('complete', function(metadata) { + assert.strictEqual(metadata, apiResponse); + done(); + }); + + grpcOperation.startPolling_(); + }); + }); + }); +});