Skip to content

Commit 719fef2

Browse files
committed
tests: integeration: Add out_syslog cases for UDP, TCP, and DTLS
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
1 parent c194c54 commit 719fef2

4 files changed

Lines changed: 329 additions & 0 deletions

File tree

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
service:
2+
flush: 1
3+
log_level: info
4+
http_server: on
5+
http_port: ${FLUENT_BIT_HTTP_MONITORING_PORT}
6+
7+
pipeline:
8+
inputs:
9+
- name: dummy
10+
tag: out_syslog
11+
dummy: '{"message":"hello from out_syslog"}'
12+
rate: 1
13+
14+
outputs:
15+
- name: syslog
16+
match: out_syslog
17+
host: 127.0.0.1
18+
port: ${SYSLOG_RECEIVER_PORT}
19+
mode: dtls
20+
tls: on
21+
tls.verify: off
22+
syslog_message_key: message
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
service:
2+
flush: 1
3+
log_level: info
4+
http_server: on
5+
http_port: ${FLUENT_BIT_HTTP_MONITORING_PORT}
6+
7+
pipeline:
8+
inputs:
9+
- name: dummy
10+
tag: out_syslog
11+
dummy: '{"message":"hello from out_syslog"}'
12+
rate: 1
13+
14+
outputs:
15+
- name: syslog
16+
match: out_syslog
17+
host: 127.0.0.1
18+
port: ${SYSLOG_RECEIVER_PORT}
19+
mode: tcp
20+
syslog_message_key: message
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
service:
2+
flush: 1
3+
log_level: info
4+
http_server: on
5+
http_port: ${FLUENT_BIT_HTTP_MONITORING_PORT}
6+
7+
pipeline:
8+
inputs:
9+
- name: dummy
10+
tag: out_syslog
11+
dummy: '{"message":"hello from out_syslog"}'
12+
rate: 1
13+
14+
outputs:
15+
- name: syslog
16+
match: out_syslog
17+
host: 127.0.0.1
18+
port: ${SYSLOG_RECEIVER_PORT}
19+
mode: udp
20+
syslog_message_key: message
Lines changed: 267 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,267 @@
1+
import os
2+
import shutil
3+
import socket
4+
import subprocess
5+
import threading
6+
import time
7+
8+
import pytest
9+
10+
from utils.test_service import FluentBitTestService
11+
12+
13+
class UdpReceiver:
14+
def __init__(self, host, port):
15+
self.host = host
16+
self.port = port
17+
self.message = None
18+
self.error = None
19+
self._ready = threading.Event()
20+
self._done = threading.Event()
21+
self._thread = threading.Thread(target=self._run, daemon=True)
22+
23+
def _run(self):
24+
try:
25+
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as server:
26+
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
27+
server.bind((self.host, self.port))
28+
server.settimeout(120)
29+
self._ready.set()
30+
31+
data, _ = server.recvfrom(4096)
32+
self.message = data
33+
self._done.set()
34+
except Exception as exc:
35+
self.error = exc
36+
self._ready.set()
37+
self._done.set()
38+
39+
def start(self):
40+
self._thread.start()
41+
42+
def wait_ready(self, timeout=5):
43+
if not self._ready.wait(timeout):
44+
raise TimeoutError("Timed out waiting for UDP receiver readiness")
45+
46+
def wait_message(self, timeout=10):
47+
if not self._done.wait(timeout):
48+
raise TimeoutError("Timed out waiting for UDP syslog payload")
49+
50+
if self.error is not None:
51+
raise self.error
52+
53+
return self.message
54+
55+
56+
class TcpReceiver:
57+
def __init__(self, host, port):
58+
self.host = host
59+
self.port = port
60+
self.message = None
61+
self.error = None
62+
self._ready = threading.Event()
63+
self._done = threading.Event()
64+
self._thread = threading.Thread(target=self._run, daemon=True)
65+
66+
def _run(self):
67+
try:
68+
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server:
69+
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
70+
server.bind((self.host, self.port))
71+
server.listen(1)
72+
server.settimeout(120)
73+
self._ready.set()
74+
conn, _ = server.accept()
75+
76+
with conn:
77+
conn.settimeout(20)
78+
chunks = []
79+
80+
while True:
81+
chunk = conn.recv(4096)
82+
if not chunk:
83+
break
84+
chunks.append(chunk)
85+
if b"\n" in chunk:
86+
break
87+
88+
self.message = b"".join(chunks)
89+
self._done.set()
90+
except Exception as exc:
91+
self.error = exc
92+
self._ready.set()
93+
self._done.set()
94+
95+
def start(self):
96+
self._thread.start()
97+
98+
def wait_ready(self, timeout=5):
99+
if not self._ready.wait(timeout):
100+
raise TimeoutError("Timed out waiting for TCP receiver readiness")
101+
102+
def wait_message(self, timeout=10):
103+
if not self._done.wait(timeout):
104+
raise TimeoutError("Timed out waiting for TCP syslog payload")
105+
106+
if self.error is not None:
107+
raise self.error
108+
109+
return self.message
110+
111+
112+
class DtlsReceiver:
113+
def __init__(self, port, cert_file, key_file):
114+
self.port = port
115+
self.cert_file = cert_file
116+
self.key_file = key_file
117+
self.process = None
118+
119+
def start(self):
120+
self.process = subprocess.Popen(
121+
[
122+
"openssl",
123+
"s_server",
124+
"-dtls",
125+
"-accept",
126+
str(self.port),
127+
"-cert",
128+
self.cert_file,
129+
"-key",
130+
self.key_file,
131+
"-naccept",
132+
"1",
133+
"-ign_eof",
134+
],
135+
stdin=subprocess.PIPE,
136+
stdout=subprocess.PIPE,
137+
stderr=subprocess.PIPE,
138+
)
139+
140+
time.sleep(0.5)
141+
if self.process.poll() is not None:
142+
output = self._read_output(timeout=2)
143+
raise RuntimeError(f"DTLS receiver failed to start: {output}")
144+
145+
def wait_ready(self, timeout=5):
146+
deadline = time.time() + timeout
147+
while time.time() < deadline:
148+
if self.process.poll() is not None:
149+
output = self._read_output(timeout=2)
150+
raise RuntimeError(f"DTLS receiver terminated early: {output}")
151+
time.sleep(0.1)
152+
153+
def _read_output(self, timeout=2):
154+
stdout, stderr = self.process.communicate(timeout=timeout)
155+
return (stdout + stderr).decode("utf-8", errors="replace")
156+
157+
def wait_message(self, timeout=30):
158+
try:
159+
output = self._read_output(timeout=timeout)
160+
except subprocess.TimeoutExpired as exc:
161+
raise TimeoutError("Timed out waiting for DTLS handshake") from exc
162+
163+
return output
164+
165+
def stop(self):
166+
if self.process is None:
167+
return
168+
169+
if self.process.poll() is None:
170+
self.process.terminate()
171+
try:
172+
self.process.wait(timeout=5)
173+
except subprocess.TimeoutExpired:
174+
self.process.kill()
175+
self.process.wait(timeout=5)
176+
177+
178+
class Service:
179+
def __init__(self, config_file, receiver_type):
180+
self.config_file = os.path.abspath(os.path.join(os.path.dirname(__file__), "../config", config_file))
181+
self.receiver_type = receiver_type
182+
self.receiver = None
183+
184+
cert_dir = os.path.abspath(
185+
os.path.join(os.path.dirname(__file__), "../../in_splunk/certificate")
186+
)
187+
self.tls_crt_file = os.path.join(cert_dir, "certificate.pem")
188+
self.tls_key_file = os.path.join(cert_dir, "private_key.pem")
189+
190+
self.service = FluentBitTestService(
191+
self.config_file,
192+
pre_start=self._start_receiver,
193+
post_stop=self._stop_receiver,
194+
)
195+
196+
def _start_receiver(self, service):
197+
self.receiver_port = service.allocate_port_env("SYSLOG_RECEIVER_PORT")
198+
199+
if self.receiver_type == "udp":
200+
self.receiver = UdpReceiver("127.0.0.1", self.receiver_port)
201+
elif self.receiver_type == "tcp":
202+
self.receiver = TcpReceiver("127.0.0.1", self.receiver_port)
203+
elif self.receiver_type == "dtls":
204+
self.receiver = DtlsReceiver(self.receiver_port, self.tls_crt_file, self.tls_key_file)
205+
else:
206+
raise ValueError(f"Unknown receiver type: {self.receiver_type}")
207+
208+
self.receiver.start()
209+
self.receiver.wait_ready(timeout=5)
210+
211+
def _stop_receiver(self, _service):
212+
if self.receiver_type == "dtls" and self.receiver is not None:
213+
self.receiver.stop()
214+
215+
def start(self):
216+
self.service.start()
217+
218+
def stop(self):
219+
self.service.stop()
220+
221+
222+
def _assert_syslog_payload(payload):
223+
text = payload.decode("utf-8", errors="replace")
224+
assert "hello from out_syslog" in text
225+
assert text.startswith("<")
226+
227+
228+
def _assert_dtls_payload(output):
229+
assert "ACCEPT" in output
230+
assert "DONE" in output
231+
232+
233+
def test_out_syslog_udp():
234+
service = Service("out_syslog_udp.yaml", "udp")
235+
service.start()
236+
237+
try:
238+
payload = service.receiver.wait_message(timeout=20)
239+
finally:
240+
service.stop()
241+
242+
_assert_syslog_payload(payload)
243+
244+
245+
def test_out_syslog_tcp():
246+
service = Service("out_syslog_tcp.yaml", "tcp")
247+
service.start()
248+
249+
try:
250+
payload = service.receiver.wait_message(timeout=15)
251+
finally:
252+
service.stop()
253+
254+
_assert_syslog_payload(payload)
255+
256+
257+
@pytest.mark.skipif(not shutil.which("openssl"), reason="openssl is required for DTLS test")
258+
def test_out_syslog_dtls():
259+
service = Service("out_syslog_dtls.yaml", "dtls")
260+
service.start()
261+
262+
try:
263+
output = service.receiver.wait_message(timeout=30)
264+
finally:
265+
service.stop()
266+
267+
_assert_dtls_payload(output)

0 commit comments

Comments
 (0)