Skip to content

Commit 2c041a3

Browse files
authored
feat: add max pending entries to all logger plugins (#12709)
1 parent 7b1e78a commit 2c041a3

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+988
-33
lines changed

apisix/plugins/clickhouse-logger.lua

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
local bp_manager_mod = require("apisix.utils.batch-processor-manager")
1919
local log_util = require("apisix.utils.log-util")
20+
local plugin = require("apisix.plugin")
2021
local core = require("apisix.core")
2122
local http = require("resty.http")
2223
local url = require("net.url")
@@ -71,7 +72,12 @@ local metadata_schema = {
7172
properties = {
7273
log_format = {
7374
type = "object"
74-
}
75+
},
76+
max_pending_entries = {
77+
type = "integer",
78+
description = "maximum number of pending entries in the batch processor",
79+
minimum = 1,
80+
},
7581
},
7682
}
7783

@@ -174,9 +180,12 @@ end
174180

175181

176182
function _M.log(conf, ctx)
183+
local metadata = plugin.plugin_metadata(plugin_name)
184+
local max_pending_entries = metadata and metadata.value and
185+
metadata.value.max_pending_entries or nil
177186
local entry = log_util.get_log_entry(plugin_name, conf, ctx)
178187

179-
if batch_processor_manager:add_entry(conf, entry) then
188+
if batch_processor_manager:add_entry(conf, entry, max_pending_entries) then
180189
return
181190
end
182191

@@ -201,7 +210,7 @@ function _M.log(conf, ctx)
201210
return send_http_data(conf, data)
202211
end
203212

204-
batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, func)
213+
batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, func, max_pending_entries)
205214
end
206215

207216

apisix/plugins/elasticsearch-logger.lua

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ local core = require("apisix.core")
1919
local http = require("resty.http")
2020
local log_util = require("apisix.utils.log-util")
2121
local bp_manager_mod = require("apisix.utils.batch-processor-manager")
22-
22+
local plugin = require("apisix.plugin")
2323
local ngx = ngx
2424
local str_format = core.string.format
2525
local math_random = math.random
@@ -104,7 +104,12 @@ local metadata_schema = {
104104
properties = {
105105
log_format = {
106106
type = "object"
107-
}
107+
},
108+
max_pending_entries = {
109+
type = "integer",
110+
description = "maximum number of pending entries in the batch processor",
111+
minimum = 1,
112+
},
108113
},
109114
}
110115

@@ -264,17 +269,21 @@ function _M.access(conf)
264269
end
265270

266271
function _M.log(conf, ctx)
272+
local metadata = plugin.plugin_metadata(plugin_name)
273+
local max_pending_entries = metadata and metadata.value and
274+
metadata.value.max_pending_entries or nil
267275
local entry = get_logger_entry(conf, ctx)
268276

269-
if batch_processor_manager:add_entry(conf, entry) then
277+
if batch_processor_manager:add_entry(conf, entry, max_pending_entries) then
270278
return
271279
end
272280

273281
local process = function(entries)
274282
return send_to_elasticsearch(conf, entries)
275283
end
276284

277-
batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, process)
285+
batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx,
286+
process, max_pending_entries)
278287
end
279288

280289

apisix/plugins/google-cloud-logging.lua

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
--
1717

1818
local core = require("apisix.core")
19+
local plugin = require("apisix.plugin")
1920
local tostring = tostring
2021
local http = require("resty.http")
2122
local log_util = require("apisix.utils.log-util")
@@ -110,7 +111,12 @@ local metadata_schema = {
110111
properties = {
111112
log_format = {
112113
type = "object"
113-
}
114+
},
115+
max_pending_entries = {
116+
type = "integer",
117+
description = "maximum number of pending entries in the batch processor",
118+
minimum = 1,
119+
},
114120
},
115121
}
116122

@@ -241,6 +247,9 @@ end
241247

242248

243249
function _M.log(conf, ctx)
250+
local metadata = plugin.plugin_metadata(plugin_name)
251+
local max_pending_entries = metadata and metadata.value and
252+
metadata.value.max_pending_entries or nil
244253
local oauth, err = core.lrucache.plugin_ctx(lrucache, ctx, nil,
245254
create_oauth_object, conf)
246255
if not oauth then
@@ -250,15 +259,16 @@ function _M.log(conf, ctx)
250259

251260
local entry = get_logger_entry(conf, ctx, oauth)
252261

253-
if batch_processor_manager:add_entry(conf, entry) then
262+
if batch_processor_manager:add_entry(conf, entry, max_pending_entries) then
254263
return
255264
end
256265

257266
local process = function(entries)
258267
return send_to_google(oauth, entries)
259268
end
260269

261-
batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, process)
270+
batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx,
271+
process, max_pending_entries)
262272
end
263273

264274

apisix/plugins/http-logger.lua

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
--
1717

1818
local bp_manager_mod = require("apisix.utils.batch-processor-manager")
19+
local plugin = require("apisix.plugin")
1920
local log_util = require("apisix.utils.log-util")
2021
local core = require("apisix.core")
2122
local http = require("resty.http")
@@ -63,7 +64,12 @@ local metadata_schema = {
6364
properties = {
6465
log_format = {
6566
type = "object"
66-
}
67+
},
68+
max_pending_entries = {
69+
type = "integer",
70+
description = "maximum number of pending entries in the batch processor",
71+
minimum = 1,
72+
},
6773
},
6874
}
6975

@@ -168,13 +174,16 @@ end
168174

169175

170176
function _M.log(conf, ctx)
177+
local metadata = plugin.plugin_metadata(plugin_name)
178+
local max_pending_entries = metadata and metadata.value and
179+
metadata.value.max_pending_entries or nil
171180
local entry = log_util.get_log_entry(plugin_name, conf, ctx)
172181

173182
if not entry.route_id then
174183
entry.route_id = "no-matched"
175184
end
176185

177-
if batch_processor_manager:add_entry(conf, entry) then
186+
if batch_processor_manager:add_entry(conf, entry, max_pending_entries) then
178187
return
179188
end
180189

@@ -216,7 +225,7 @@ function _M.log(conf, ctx)
216225
return send_http_data(conf, data)
217226
end
218227

219-
batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, func)
228+
batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, func, max_pending_entries)
220229
end
221230

222231

apisix/plugins/loki-logger.lua

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
local bp_manager_mod = require("apisix.utils.batch-processor-manager")
1919
local log_util = require("apisix.utils.log-util")
2020
local core = require("apisix.core")
21+
local plugin = require("apisix.plugin")
2122
local http = require("resty.http")
2223
local new_tab = require("table.new")
2324

@@ -115,7 +116,12 @@ local metadata_schema = {
115116
properties = {
116117
log_format = {
117118
type = "object"
118-
}
119+
},
120+
max_pending_entries = {
121+
type = "integer",
122+
description = "maximum number of pending entries in the batch processor",
123+
minimum = 1,
124+
},
119125
},
120126
}
121127

@@ -193,6 +199,9 @@ end
193199

194200

195201
function _M.log(conf, ctx)
202+
local metadata = plugin.plugin_metadata(plugin_name)
203+
local max_pending_entries = metadata and metadata.value and
204+
metadata.value.max_pending_entries or nil
196205
local entry = log_util.get_log_entry(plugin_name, conf, ctx)
197206

198207
if not entry.route_id then
@@ -205,7 +214,7 @@ function _M.log(conf, ctx)
205214
-- and then add 6 zeros by string concatenation
206215
entry.loki_log_time = tostring(ngx.req.start_time() * 1000) .. "000000"
207216

208-
if batch_processor_manager:add_entry(conf, entry) then
217+
if batch_processor_manager:add_entry(conf, entry, max_pending_entries) then
209218
return
210219
end
211220

@@ -244,7 +253,7 @@ function _M.log(conf, ctx)
244253
return send_http_data(conf, data)
245254
end
246255

247-
batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, func)
256+
batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, func, max_pending_entries)
248257
end
249258

250259

apisix/plugins/rocketmq-logger.lua

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
-- limitations under the License.
1616
--
1717
local core = require("apisix.core")
18+
local plugin = require("apisix.plugin")
1819
local log_util = require("apisix.utils.log-util")
1920
local producer = require ("resty.rocketmq.producer")
2021
local acl_rpchook = require("resty.rocketmq.acl_rpchook")
@@ -77,7 +78,12 @@ local metadata_schema = {
7778
properties = {
7879
log_format = {
7980
type = "object"
80-
}
81+
},
82+
max_pending_entries = {
83+
type = "integer",
84+
description = "maximum number of pending entries in the batch processor",
85+
minimum = 1,
86+
},
8187
},
8288
}
8389

@@ -138,14 +144,17 @@ end
138144

139145

140146
function _M.log(conf, ctx)
147+
local metadata = plugin.plugin_metadata(plugin_name)
148+
local max_pending_entries = metadata and metadata.value and
149+
metadata.value.max_pending_entries or nil
141150
local entry
142151
if conf.meta_format == "origin" then
143152
entry = log_util.get_req_original(ctx, conf)
144153
else
145154
entry = log_util.get_log_entry(plugin_name, conf, ctx)
146155
end
147156

148-
if batch_processor_manager:add_entry(conf, entry) then
157+
if batch_processor_manager:add_entry(conf, entry, max_pending_entries) then
149158
return
150159
end
151160

@@ -184,7 +193,7 @@ function _M.log(conf, ctx)
184193
return send_rocketmq_data(conf, data, prod)
185194
end
186195

187-
batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, func)
196+
batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, func, max_pending_entries)
188197
end
189198

190199

apisix/plugins/skywalking-logger.lua

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
--
1717

1818
local bp_manager_mod = require("apisix.utils.batch-processor-manager")
19+
local plugin = require("apisix.plugin")
1920
local log_util = require("apisix.utils.log-util")
2021
local core = require("apisix.core")
2122
local http = require("resty.http")
@@ -64,7 +65,12 @@ local metadata_schema = {
6465
properties = {
6566
log_format = {
6667
type = "object"
67-
}
68+
},
69+
max_pending_entries = {
70+
type = "integer",
71+
description = "maximum number of pending entries in the batch processor",
72+
minimum = 1,
73+
},
6874
},
6975
}
7076

@@ -139,6 +145,9 @@ end
139145

140146

141147
function _M.log(conf, ctx)
148+
local metadata = plugin.plugin_metadata(plugin_name)
149+
local max_pending_entries = metadata and metadata.value and
150+
metadata.value.max_pending_entries or nil
142151
local log_body = log_util.get_log_entry(plugin_name, conf, ctx)
143152
local trace_context
144153
local sw_header = ngx.req.get_headers()["sw8"]
@@ -173,7 +182,7 @@ function _M.log(conf, ctx)
173182
endpoint = ctx.var.uri,
174183
}
175184

176-
if batch_processor_manager:add_entry(conf, entry) then
185+
if batch_processor_manager:add_entry(conf, entry, max_pending_entries) then
177186
return
178187
end
179188

@@ -187,7 +196,7 @@ function _M.log(conf, ctx)
187196
return send_http_data(conf, data)
188197
end
189198

190-
batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, func)
199+
batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, func, max_pending_entries)
191200
end
192201

193202

apisix/plugins/splunk-hec-logging.lua

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ local ngx_now = ngx.now
2121
local http = require("resty.http")
2222
local log_util = require("apisix.utils.log-util")
2323
local bp_manager_mod = require("apisix.utils.batch-processor-manager")
24+
local plugin = require("apisix.plugin")
2425
local table_insert = core.table.insert
2526
local table_concat = core.table.concat
2627
local ipairs = ipairs
@@ -75,6 +76,11 @@ local metadata_schema = {
7576
properties = {
7677
log_format = {
7778
type = "object"
79+
},
80+
max_pending_entries = {
81+
type = "integer",
82+
description = "maximum number of pending entries in the batch processor",
83+
minimum = 1,
7884
}
7985
},
8086
}
@@ -169,17 +175,21 @@ end
169175

170176

171177
function _M.log(conf, ctx)
178+
local metadata = plugin.plugin_metadata(plugin_name)
179+
local max_pending_entries = metadata and metadata.value and
180+
metadata.value.max_pending_entries or nil
172181
local entry = get_logger_entry(conf, ctx)
173182

174-
if batch_processor_manager:add_entry(conf, entry) then
183+
if batch_processor_manager:add_entry(conf, entry, max_pending_entries) then
175184
return
176185
end
177186

178187
local process = function(entries)
179188
return send_to_splunk(conf, entries)
180189
end
181190

182-
batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, process)
191+
batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx,
192+
process, max_pending_entries)
183193
end
184194

185195

0 commit comments

Comments
 (0)