Skip to content

Commit c5ecbf7

Browse files
Merge pull request #388 from stephenplusplus/spp--pubsub-batch-methods
Implement publishBatch and pullBatch
2 parents d3fe788 + 3ec367c commit c5ecbf7

8 files changed

Lines changed: 423 additions & 173 deletions

File tree

README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,9 @@ pubsub.createTopic('my-new-topic', function(err, topic) {});
246246
var topic = pubsub.topic('my-existing-topic');
247247

248248
// Publish a message to the topic.
249-
topic.publish('New message!', function(err) {});
249+
topic.publish({
250+
data: 'New message!'
251+
}, function(err) {});
250252

251253
// Subscribe to the topic.
252254
topic.subscribe('new-subscription', function(err, subscription) {

lib/common/util.js

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,9 +92,14 @@ module.exports.extendGlobalConfig = extendGlobalConfig;
9292
* // [ 'Hi' ]
9393
*/
9494
function arrayize(input) {
95+
if (input === null || input === undefined) {
96+
return [];
97+
}
98+
9599
if (!Array.isArray(input)) {
96100
return [input];
97101
}
102+
98103
return input;
99104
}
100105

lib/pubsub/subscription.js

Lines changed: 67 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -146,16 +146,24 @@ Subscription.formatName_ = function(projectId, name) {
146146
* @private
147147
*/
148148
Subscription.formatMessage_ = function(msg) {
149+
var event = msg.pubsubEvent;
150+
149151
var message = {
150-
id: msg.ackId
152+
ackId: msg.ackId
151153
};
152-
var evt = msg.pubsubEvent;
153-
if (evt && evt.message && evt.message.data) {
154-
message.data = new Buffer(evt.message.data, 'base64').toString('utf-8');
155-
try {
156-
message.data = JSON.parse(message.data);
157-
} catch(e) {}
154+
155+
if (event && event.message) {
156+
message.id = event.message.messageId;
157+
158+
if (event.message.data) {
159+
message.data = new Buffer(event.message.data, 'base64').toString('utf-8');
160+
161+
try {
162+
message.data = JSON.parse(message.data);
163+
} catch(e) {}
164+
}
158165
}
166+
159167
return message;
160168
};
161169

@@ -216,6 +224,7 @@ Subscription.prototype.startPulling_ = function() {
216224
return;
217225
}
218226
this.pull({
227+
maxResults: 1,
219228
returnImmediately: false
220229
}, function(err, message) {
221230
if (err) {
@@ -243,7 +252,7 @@ Subscription.prototype.startPulling_ = function() {
243252
Subscription.prototype.ack = function(ids, callback) {
244253
if (!ids || ids.length === 0) {
245254
throw new Error(
246-
'At least one ID must be specified before it can be acknowledged');
255+
'At least one ID must be specified before it can be acknowledged.');
247256
}
248257
ids = util.arrayize(ids);
249258
var body = {
@@ -278,50 +287,86 @@ Subscription.prototype.delete = function(callback) {
278287

279288
/**
280289
* Pull messages from the subscribed topic. If messages were found, your
281-
* callback is executed with the message object.
290+
* callback is executed with an array of message objects.
282291
*
283292
* Note that messages are pulled automatically once you register your first
284293
* event listener to the subscription, thus the call to `pull` is handled for
285294
* you. If you don't want to start pulling, simply don't register a
286295
* `subscription.on('message', function() {})` event handler.
287296
*
297+
* @todo Should not be racing with other pull.
298+
* @todo Fix API to return a list of messages.
299+
*
288300
* @param {object=} options - Configuration object.
289-
* @param {boolean=} options.returnImmediately - If set, the system will respond
301+
* @param {boolean} options.returnImmediately - If set, the system will respond
290302
* immediately. Otherwise, wait until new messages are available. Returns if
291303
* timeout is reached.
304+
* @param {number} options.maxResults - Limit the amount of messages pulled.
292305
* @param {function} callback - The callback function.
293306
*
294307
* @example
295-
* subscription.pull(function(err, message) {
296-
* // message.id = ID used to acknowledge its receival.
297-
* // message.data = Contents of the message.
308+
* //-
309+
* // Pull all available messages.
310+
* //-
311+
* subscription.pull(function(err, messages) {
312+
* // messages = [
313+
* // {
314+
* // ackId: '', // ID used to acknowledge its receival.
315+
* // id: '', // Unique message ID.
316+
* // data: '' // Contents of the message.
317+
* // },
318+
* // // ...
319+
* // ]
298320
* });
321+
*
322+
* //-
323+
* // Pull a single message.
324+
* //-
325+
* var opts = {
326+
* maxResults: 1
327+
* };
328+
*
329+
* subscription.pull(opts, function(err, messages) {});
299330
*/
300331
Subscription.prototype.pull = function(options, callback) {
301332
var that = this;
302-
// TODO(jbd): Should not be racing with other pull.
333+
var MAX_EVENTS_LIMIT = 1000;
334+
var apiEndpoint = 'subscriptions/pullBatch';
335+
303336
if (!callback) {
304337
callback = options;
305338
options = {};
306339
}
340+
341+
if (!util.is(options.maxResults, 'number')) {
342+
options.maxResults = MAX_EVENTS_LIMIT;
343+
}
344+
307345
var body = {
308346
subscription: this.name,
309-
returnImmediately: !!options.returnImmediately
347+
returnImmediately: !!options.returnImmediately,
348+
maxEvents: options.maxResults
310349
};
311-
this.makeReq_(
312-
'POST', 'subscriptions/pull', null, body, function(err, message) {
313-
// TODO(jbd): Fix API to return a list of messages.
350+
351+
this.makeReq_('POST', apiEndpoint, null, body, function(err, response) {
314352
if (err) {
315353
callback(err);
316354
return;
317355
}
318-
message = Subscription.formatMessage_(message);
356+
357+
var messages = response.pullResponses || [response];
358+
messages = messages.map(Subscription.formatMessage_);
359+
319360
if (that.autoAck) {
320-
that.ack(message.id, function(err) {
321-
callback(err, message);
361+
var ackIds = messages.map(function(message) {
362+
return message.ackId;
363+
});
364+
365+
that.ack(ackIds, function(err) {
366+
callback(err, messages);
322367
});
323368
} else {
324-
callback(null, message);
369+
callback(null, messages);
325370
}
326371
});
327372
};

lib/pubsub/topic.js

Lines changed: 56 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,23 @@ function Topic(pubsub, options) {
6262
this.pubsub = pubsub;
6363
}
6464

65+
/**
66+
* Format a message object as the upstream API expects it.
67+
*
68+
* @private
69+
*
70+
* @return {object}
71+
*/
72+
Topic.formatMessage_ = function(message) {
73+
if (!util.is(message.data, 'buffer')) {
74+
message.data = new Buffer(JSON.stringify(message.data));
75+
}
76+
77+
message.data = message.data.toString('base64');
78+
79+
return message;
80+
};
81+
6582
/**
6683
* Format the name of a topic. A Topic's full name is in the format of
6784
* /topics/{projectId}/{name}.
@@ -83,59 +100,55 @@ Topic.formatName_ = function(projectId, name) {
83100
*
84101
* @throws {Error} If no message is provided.
85102
*
86-
* @param {*} message - The message to publish.
103+
* @param {object|object[]} message - The message(s) to publish.
104+
* @param {*} message.data - The contents of the message.
105+
* @param {array=} message.labels - Labels to apply to the message.
87106
* @param {function=} callback - The callback function.
88107
*
89108
* @example
90-
* topic.publish('New message!', function(err) {});
91-
*
92-
* topic.publish({
93-
* user_id: 3,
94-
* name: 'Stephen',
95-
* message: 'Hello from me!'
96-
* }, function(err) {});
97-
*/
98-
Topic.prototype.publish = function(message, callback) {
99-
if (!message) {
100-
throw new Error('Cannot publish an empty message.');
101-
}
102-
callback = callback || util.noop;
103-
if (!util.is(message, 'string') && !util.is(message, 'buffer')) {
104-
message = JSON.stringify(message);
105-
}
106-
this.publishRaw({
107-
data: new Buffer(message).toString('base64')
108-
}, callback);
109-
};
110-
111-
/**
112-
* Publish a raw message.
109+
* var registrationMessage = {
110+
* data: {
111+
* userId: 3,
112+
* name: 'Stephen',
113+
* event: 'new user'
114+
* },
115+
* labels: [
116+
* 'registration'
117+
* ]
118+
* };
119+
* topic.publish(registrationMessage, function(err) {});
113120
*
114-
* @throws {Error} If no message is provided.
115-
*
116-
* @param {object} message - Raw message to publish.
117-
* @param {array=} message.label - List of labels for the message.
118-
* @param {string} message.data - The base64-encoded contents of the message.
119-
* @param {function=} callback - The callback function.
121+
* //-
122+
* // You can publish a batch of messages at once by supplying an array.
123+
* //-
124+
* var purchaseMessage = {
125+
* data: {
126+
* userId: 3,
127+
* product: 'computer',
128+
* event: 'purchase'
129+
* }
130+
* };
120131
*
121-
* @example
122-
* topic.publishRaw({
123-
* data: new Buffer('New message!').toString('base64')
124-
* }, function(err) {});
132+
* topic.publish([
133+
* registrationMessage,
134+
* purchaseMessage
135+
* ], function(err) {});
125136
*/
126-
Topic.prototype.publishRaw = function(message, callback) {
127-
if (!message) {
128-
throw new Error('Cannot publish an empty message.');
137+
Topic.prototype.publish = function(messages, callback) {
138+
messages = util.arrayize(messages);
139+
140+
if (messages.length === 0) {
141+
throw new Error('Cannot publish without a message.');
129142
}
143+
130144
callback = callback || util.noop;
131-
if (!util.is(message.data, 'string') && !util.is(message.data, 'buffer')) {
132-
message.data = new Buffer(JSON.stringify(message.data)).toString('base64');
133-
}
145+
134146
var body = {
135-
message: message,
136-
topic: this.name
147+
topic: this.name,
148+
messages: messages.map(Topic.formatMessage_)
137149
};
138-
this.makeReq_('POST', 'topics/publish', null, body, callback);
150+
151+
this.makeReq_('POST', 'topics/publishBatch', null, body, callback);
139152
};
140153

141154
/**

0 commit comments

Comments
 (0)