forked from Blizzard/node-rdkafka
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathadmin.js
More file actions
238 lines (208 loc) · 6.06 KB
/
admin.js
File metadata and controls
238 lines (208 loc) · 6.06 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
/*
* node-rdkafka - Node.js wrapper for RdKafka C/C++ library
*
* Copyright (c) 2016 Blizzard Entertainment
*
* This software may be modified and distributed under the terms
* of the MIT license. See the LICENSE.txt file for details.
*/
'use strict';
module.exports = {
create: createAdminClient,
};
var Client = require('./client');
var util = require('util');
var Kafka = require('../librdkafka');
var LibrdKafkaError = require('./error');
var shallowCopy = require('./util').shallowCopy;
/**
* Create a new AdminClient for making topics, partitions, and more.
*
* This is a factory method because it immediately starts an
* active handle with the brokers.
*
*/
function createAdminClient(conf, initOauthBearerToken) {
var client = new AdminClient(conf);
if (initOauthBearerToken) {
client.refreshOauthBearerToken(initOauthBearerToken);
}
// Wrap the error so we throw if it failed with some context
LibrdKafkaError.wrap(client.connect(), true);
// Return the client if we succeeded
return client;
}
/**
* AdminClient class for administering Kafka
*
* This client is the way you can interface with the Kafka Admin APIs.
* This class should not be made using the constructor, but instead
* should be made using the factory method.
*
* <code>
* var client = AdminClient.create({ ... });
* </code>
*
* Once you instantiate this object, it will have a handle to the kafka broker.
* Unlike the other node-rdkafka classes, this class does not ensure that
* it is connected to the upstream broker. Instead, making an action will
* validate that.
*
* @param {object} conf - Key value pairs to configure the admin client
* topic configuration
* @constructor
*/
function AdminClient(conf) {
if (!(this instanceof AdminClient)) {
return new AdminClient(conf);
}
conf = shallowCopy(conf);
/**
* NewTopic model.
*
* This is the representation of a new message that is requested to be made
* using the Admin client.
*
* @typedef {object} AdminClient~NewTopic
* @property {string} topic - the topic name to create
* @property {number} num_partitions - the number of partitions to give the topic
* @property {number} replication_factor - the replication factor of the topic
* @property {object} config - a list of key values to be passed as configuration
* for the topic.
*/
this._client = new Kafka.AdminClient(conf);
this._isConnected = false;
this.globalConfig = conf;
}
/**
* Connect using the admin client.
*
* Should be run using the factory method, so should never
* need to be called outside.
*
* Unlike the other connect methods, this one is synchronous.
*/
AdminClient.prototype.connect = function() {
LibrdKafkaError.wrap(this._client.connect(), true);
this._isConnected = true;
};
/**
* Disconnect the admin client.
*
* This is a synchronous method, but all it does is clean up
* some memory and shut some threads down
*/
AdminClient.prototype.disconnect = function() {
LibrdKafkaError.wrap(this._client.disconnect(), true);
this._isConnected = false;
};
/**
* Refresh OAuthBearer token, initially provided in factory method.
* Expiry is always set to maximum value when lifetime is not provided.
* for token refresh is not used.
*
* @param {string} tokenStr - OAuthBearer token string
* @param {number} lifetimeMs - Optional lifetime in milliseconds
* @see connection.cc
*/
AdminClient.prototype.refreshOauthBearerToken = function (tokenStr, lifetimeMs) {
if (!tokenStr || typeof tokenStr !== 'string') {
throw new Error("OAuthBearer token is undefined/empty or not a string");
}
if (lifetimeMs && typeof lifetimeMs !== 'number') {
throw new Error("OAuthBearer lifetimeMs is not a number");
}
this._client.setToken(tokenStr, lifetimeMs);
};
/**
* Create a topic with a given config.
*
* @param {NewTopic} topic - Topic to create.
* @param {number} timeout - Number of milliseconds to wait while trying to create the topic.
* @param {function} cb - The callback to be executed when finished
*/
AdminClient.prototype.createTopic = function(topic, timeout, cb) {
if (!this._isConnected) {
throw new Error('Client is disconnected');
}
if (typeof timeout === 'function') {
cb = timeout;
timeout = 5000;
}
if (!timeout) {
timeout = 5000;
}
this._client.createTopic(topic, timeout, function(err) {
if (err) {
if (cb) {
cb(LibrdKafkaError.create(err));
}
return;
}
if (cb) {
cb();
}
});
};
/**
* Delete a topic.
*
* @param {string} topic - The topic to delete, by name.
* @param {number} timeout - Number of milliseconds to wait while trying to delete the topic.
* @param {function} cb - The callback to be executed when finished
*/
AdminClient.prototype.deleteTopic = function(topic, timeout, cb) {
if (!this._isConnected) {
throw new Error('Client is disconnected');
}
if (typeof timeout === 'function') {
cb = timeout;
timeout = 5000;
}
if (!timeout) {
timeout = 5000;
}
this._client.deleteTopic(topic, timeout, function(err) {
if (err) {
if (cb) {
cb(LibrdKafkaError.create(err));
}
return;
}
if (cb) {
cb();
}
});
};
/**
* Create new partitions for a topic.
*
* @param {string} topic - The topic to add partitions to, by name.
* @param {number} totalPartitions - The total number of partitions the topic should have
* after the request
* @param {number} timeout - Number of milliseconds to wait while trying to create the partitions.
* @param {function} cb - The callback to be executed when finished
*/
AdminClient.prototype.createPartitions = function(topic, totalPartitions, timeout, cb) {
if (!this._isConnected) {
throw new Error('Client is disconnected');
}
if (typeof timeout === 'function') {
cb = timeout;
timeout = 5000;
}
if (!timeout) {
timeout = 5000;
}
this._client.createPartitions(topic, totalPartitions, timeout, function(err) {
if (err) {
if (cb) {
cb(LibrdKafkaError.create(err));
}
return;
}
if (cb) {
cb();
}
});
};