Skip to content

Commit faa0d98

Browse files
authored
feat: add error log skywalking reporter (#4633)
1 parent 137f09f commit faa0d98

File tree

5 files changed

+567
-65
lines changed

5 files changed

+567
-65
lines changed

apisix/plugins/error-log-logger.lua

Lines changed: 132 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,15 @@ local errlog = require("ngx.errlog")
2020
local batch_processor = require("apisix.utils.batch-processor")
2121
local plugin = require("apisix.plugin")
2222
local timers = require("apisix.timers")
23+
local http = require("resty.http")
2324
local plugin_name = "error-log-logger"
2425
local table = core.table
2526
local schema_def = core.schema
2627
local ngx = ngx
2728
local tcp = ngx.socket.tcp
2829
local tostring = tostring
29-
local ipairs = ipairs
30+
local ipairs = ipairs
31+
local string = require("string")
3032
local lrucache = core.lrucache.new({
3133
ttl = 300, count = 32
3234
})
@@ -35,23 +37,50 @@ local lrucache = core.lrucache.new({
3537
local metadata_schema = {
3638
type = "object",
3739
properties = {
38-
host = schema_def.host_def,
39-
port = {type = "integer", minimum = 0},
40-
tls = {type = "boolean", default = false},
41-
tls_server_name = {type = "string"},
42-
timeout = {type = "integer", minimum = 1, default = 3},
43-
keepalive = {type = "integer", minimum = 1, default = 30},
40+
tcp = {
41+
type = "object",
42+
properties = {
43+
host = schema_def.host_def,
44+
port = {type = "integer", minimum = 0},
45+
tls = {type = "boolean", default = false},
46+
tls_server_name = {type = "string"},
47+
},
48+
required = {"host", "port"}
49+
},
50+
skywalking = {
51+
type = "object",
52+
properties = {
53+
endpoint_addr = {schema_def.uri, default = "http://127.0.0.1:12900/v3/logs"},
54+
service_name = {type = "string", default = "APISIX"},
55+
service_instance_name = {type="string", default = "APISIX Service Instance"},
56+
},
57+
},
58+
host = {schema_def.host_def, description = "Deprecated, use `tcp.host` instead."},
59+
port = {type = "integer", minimum = 0, description = "Deprecated, use `tcp.port` instead."},
60+
tls = {type = "boolean", default = false,
61+
description = "Deprecated, use `tcp.tls` instead."},
62+
tls_server_name = {type = "string",
63+
description = "Deprecated, use `tcp.tls_server_name` instead."},
4464
name = {type = "string", default = plugin_name},
4565
level = {type = "string", default = "WARN", enum = {"STDERR", "EMERG", "ALERT", "CRIT",
4666
"ERR", "ERROR", "WARN", "NOTICE", "INFO", "DEBUG"}},
67+
timeout = {type = "integer", minimum = 1, default = 3},
68+
keepalive = {type = "integer", minimum = 1, default = 30},
4769
batch_max_size = {type = "integer", minimum = 0, default = 1000},
4870
max_retry_count = {type = "integer", minimum = 0, default = 0},
4971
retry_delay = {type = "integer", minimum = 0, default = 1},
5072
buffer_duration = {type = "integer", minimum = 1, default = 60},
5173
inactive_timeout = {type = "integer", minimum = 1, default = 3},
5274
},
53-
required = {"host", "port"}
75+
oneOf = {
76+
{required = {"skywalking"}},
77+
{required = {"tcp"}},
78+
-- for compatible with old schema
79+
{required = {"host", "port"}}
80+
}
5481
}
82+
83+
5584
local schema = {
5685
type = "object",
5786
}
@@ -84,7 +113,15 @@ local _M = {
84113
}
85114

86115

87-
local function send_to_server(data)
116+
function _M.check_schema(conf, schema_type)
117+
if schema_type == core.schema.TYPE_METADATA then
118+
return core.schema.check(metadata_schema, conf)
119+
end
120+
return core.schema.check(schema, conf)
121+
end
122+
123+
124+
local function send_to_tcp_server(data)
88125
local sock, soc_err = tcp()
89126

90127
if not sock then
@@ -93,33 +130,89 @@ local function send_to_server(data)
93130

94131
sock:settimeout(config.timeout * 1000)
95132

96-
local ok, err = sock:connect(config.host, config.port)
133+
local tcp_config = config.tcp
134+
local ok, err = sock:connect(tcp_config.host, tcp_config.port)
97135
if not ok then
98-
return false, "failed to connect the TCP server: host[" .. config.host
99-
.. "] port[" .. tostring(config.port) .. "] err: " .. err
136+
return false, "failed to connect the TCP server: host[" .. tcp_config.host
137+
.. "] port[" .. tostring(tcp_config.port) .. "] err: " .. err
100138
end
101139

102-
if config.tls then
103-
ok, err = sock:sslhandshake(false, config.tls_server_name, false)
140+
if tcp_config.tls then
141+
ok, err = sock:sslhandshake(false, tcp_config.tls_server_name, false)
104142
if not ok then
105143
sock:close()
106144
return false, "failed to perform TLS handshake to TCP server: host["
107-
.. config.host .. "] port[" .. tostring(config.port) .. "] err: " .. err
145+
.. tcp_config.host .. "] port[" .. tostring(tcp_config.port) .. "] err: " .. err
108146
end
109147
end
110148

111149
local bytes, err = sock:send(data)
112150
if not bytes then
113151
sock:close()
114-
return false, "failed to send data to TCP server: host[" .. config.host
115-
.. "] port[" .. tostring(config.port) .. "] err: " .. err
152+
return false, "failed to send data to TCP server: host[" .. tcp_config.host
153+
.. "] port[" .. tostring(tcp_config.port) .. "] err: " .. err
116154
end
117155

118156
sock:setkeepalive(config.keepalive * 1000)
119157
return true
120158
end
121159

122160

161+
local function send_to_skywalking(log_message)
162+
local err_msg
163+
local res = true
164+
core.log.info("sending a batch logs to ", config.skywalking.endpoint_addr)
165+
166+
local httpc = http.new()
167+
httpc:set_timeout(config.timeout * 1000)
168+
169+
local entries = {}
170+
for i = 1, #log_message, 2 do
171+
local content = {
172+
service = config.skywalking.service_name,
173+
serviceInstance = config.skywalking.service_instance_name,
174+
endpoint = "",
175+
body = {
176+
text = {
177+
text = log_message[i]
178+
}
179+
}
180+
}
181+
table.insert(entries, content)
182+
end
183+
184+
local httpc_res, httpc_err = httpc:request_uri(
185+
config.skywalking.endpoint_addr,
186+
{
187+
method = "POST",
188+
body = core.json.encode(entries),
189+
keepalive_timeout = config.keepalive * 1000,
190+
headers = {
191+
["Content-Type"] = "application/json",
192+
}
193+
}
194+
)
195+
196+
if not httpc_res then
197+
return false, "error while sending data to skywalking["
198+
.. config.skywalking.endpoint_addr .. "] " .. httpc_err
199+
end
200+
201+
-- some error occurred in the server
202+
if httpc_res.status >= 400 then
203+
res = false
204+
err_msg = string.format(
205+
"server returned status code[%s] skywalking[%s] body[%s]",
206+
httpc_res.status,
207+
config.skywalking.endpoint_addr.endpoint_addr,
208+
httpc_res:read_body()
209+
)
210+
end
211+
212+
return res, err_msg
213+
end
214+
215+
123216
local function update_filter(value)
124217
local level = log_level[value.level]
125218
local status, err = errlog.set_filter_level(level)
@@ -133,6 +226,14 @@ local function update_filter(value)
133226
end
134227

135228

229+
local function send(data)
230+
if config.skywalking then
231+
return send_to_skywalking(data)
232+
end
233+
return send_to_tcp_server(data)
234+
end
235+
236+
136237
local function process()
137238
local metadata = plugin.plugin_metadata(plugin_name)
138239
if not (metadata and metadata.value and metadata.modifiedIndex) then
@@ -145,7 +246,19 @@ local function process()
145246
core.log.warn("set log filter failed for ", err)
146247
return
147248
end
148-
249+
if not (config.tcp or config.skywalking) then
250+
config.tcp = {
251+
host = config.host,
252+
port = config.port,
253+
tls = config.tls,
254+
tls_server_name = config.tls_server_name
255+
}
256+
core.log.warn(
257+
string.format("The schema is out of date. Please update to the new configuration, "
258+
.. "for example: {\"tcp\": {\"host\": \"%s\", \"port\": \"%s\"}}",
259+
config.host, config.port
260+
))
261+
end
149262
end
150263

151264
local err_level = log_level[metadata.value.level]
@@ -184,7 +297,7 @@ local function process()
184297
}
185298

186299
local err
187-
log_buffer, err = batch_processor:new(send_to_server, config_bat)
300+
log_buffer, err = batch_processor:new(send, config_bat)
188301

189302
if not log_buffer then
190303
core.log.warn("error when creating the batch processor: ", err)

docs/en/latest/plugins/error-log-logger.md

Lines changed: 42 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -30,31 +30,38 @@ title: error-log-logger
3030

3131
## Name
3232

33-
`error-log-logger` is a plugin which pushes the log data of APISIX's error.log to TCP servers.
33+
`error-log-logger` is a plugin which pushes the log data of APISIX's `error.log` to TCP servers or [Apache SkyWalking](https://skywalking.apache.org/).
3434

35-
This plugin will provide the ability to send the log data which selected by the level to Monitoring tools and other TCP servers.
35+
This plugin will provide the ability to send the log data which selected by the level to Monitoring tools and other TCP servers, and SkyWalking over HTTP.
3636

37-
This plugin provides the ability as a batch to push the log data to your external TCP servers. If not receive the log data, don't worry, it will automatically send the logs after the timer function expires in our Batch Processor.
37+
This plugin provides the ability as a batch to push the log data to your external TCP servers or monitoring tools. If not receive the log data, don't worry, it will automatically send the logs after the timer function expires in our Batch Processor.
3838

3939
For more info on Batch-Processor in Apache APISIX please refer.
4040
[Batch-Processor](../batch-processor.md)
4141

4242
## Attributes
4343

44-
| Name | Type | Requirement | Default | Valid | Description |
45-
| ---------------- | ------- | ----------- | ------- | ------- | ---------------------------------------------------------------------------------------- |
46-
| host | string | required | | | IP address or the Hostname of the TCP server. |
47-
| port | integer | required | | [0,...] | Target upstream port. |
48-
| timeout | integer | optional | 3 | [1,...] | Timeout for the upstream to connect and send, unit: second. |
49-
| keepalive | integer | optional | 30 | [1,...] | Time for keeping the cosocket alive, unit: second. |
50-
| level | string | optional | WARN | | The filter's log level, default warn, choose the level in ["STDERR", "EMERG", "ALERT", "CRIT", "ERR", "ERROR", "WARN", "NOTICE", "INFO", "DEBUG"], the value ERR equals ERROR. |
51-
| tls | boolean | optional | false | | Control whether to perform SSL verification |
52-
| tls_server_name | string | optional | | | The server name for the new TLS extension SNI |
53-
| batch_max_size | integer | optional | 1000 | [1,...] | Max size of each batch |
54-
| inactive_timeout | integer | optional | 3 | [1,...] | Maximum age in seconds when the buffer will be flushed if inactive |
55-
| buffer_duration | integer | optional | 60 | [1,...] | Maximum age in seconds of the oldest entry in a batch before the batch must be processed |
56-
| max_retry_count | integer | optional | 0 | [0,...] | Maximum number of retries before removing from the processing pipe line |
57-
| retry_delay | integer | optional | 1 | [0,...] | Number of seconds the process execution should be delayed if the execution fails |
44+
| Name | Type | Requirement | Default | Valid | Description |
45+
| -------------------------------- | ------- | ----------- | ------------------------------ | ------- | ---------------------------------------------------------------------------------------------------- |
46+
| tcp.host | string | required | | | IP address or the Hostname of the TCP server. |
47+
| tcp.port | integer | required | | [0,...] | Target upstream port. |
48+
| tcp.tls | boolean | optional | false | | Control whether to perform SSL verification. |
49+
| tcp.tls_server_name | string | optional | | | The server name for the new TLS extension SNI. |
50+
| skywalking.endpoint_addr | string | optional | http://127.0.0.1:12900/v3/logs | | the http endpoint of Skywalking. |
51+
| skywalking.service_name | string | optional | APISIX | | service name for skywalking reporter |
52+
| skywalking.service_instance_name | String | optional | APISIX Instance Name | | Service instance name for skywalking reporter, set it to `$hostname` to get local hostname directly. |
53+
| host | string | optional | | | (`Deprecated`, use `tcp.host` instead) IP address or the Hostname of the TCP server. |
54+
| port | integer | optional | | [0,...] | (`Deprecated`, use `tcp.port` instead) Target upstream port. |
55+
| tls | boolean | optional | false | | (`Deprecated`, use `tcp.tls` instread) Control whether to perform SSL verification. |
56+
| tls_server_name | string | optional | | | (`Deprecated`, use `tcp.tls_server_name` instead) The server name for the new TLS extension SNI. |
57+
| timeout | integer | optional | 3 | [1,...] | Timeout for the upstream to connect and send, unit: second. |
58+
| keepalive | integer | optional | 30 | [1,...] | Time for keeping the cosocket alive, unit: second. |
59+
| level | string | optional | WARN | | The filter's log level, default warn, choose the level in ["STDERR", "EMERG", "ALERT", "CRIT", "ERR", "ERROR", "WARN", "NOTICE", "INFO", "DEBUG"], the value ERR equals ERROR. |
60+
| batch_max_size | integer | optional | 1000 | [1,...] | Max size of each batch. |
61+
| inactive_timeout | integer | optional | 3 | [1,...] | Maximum age in seconds when the buffer will be flushed if inactive. |
62+
| buffer_duration | integer | optional | 60 | [1,...] | Maximum age in seconds of the oldest entry in a batch before the batch must be processed. |
63+
| max_retry_count | integer | optional | 0 | [0,...] | Maximum number of retries before removing from the processing pipe line. |
64+
| retry_delay | integer | optional | 1 | [0,...] | Number of seconds the process execution should be delayed if the execution fails. |
5865

5966
## How To Enable And Disable
6067

@@ -96,8 +103,24 @@ Step: update the attributes of the plugin
96103
```shell
97104
curl http://127.0.0.1:9080/apisix/admin/plugin_metadata/error-log-logger -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
98105
{
99-
"host": "127.0.0.1",
100-
"port": 1999,
106+
"tcp": {
107+
"host": "127.0.0.1",
108+
"port": 1999
109+
},
110+
"inactive_timeout": 1
111+
}'
112+
```
113+
114+
## How to set the SkyWalking OAP server address
115+
116+
Step: update the attributes of the plugin
117+
118+
```shell
119+
curl http://127.0.0.1:9080/apisix/admin/plugin_metadata/error-log-logger -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
120+
{
121+
"skywalking": {
122+
"endpoint_addr":"http://127.0.0.1:12800/v3/logs"
123+
},
101124
"inactive_timeout": 1
102125
}'
103126
```

0 commit comments

Comments
 (0)