Skip to content
This repository was archived by the owner on Feb 2, 2025. It is now read-only.

Commit 2e7b8ca

Browse files
benjaminhuokaran56625
authored andcommitted
Merge pull request fluent#825 from verrazzano/karak/support_file_system_as_storage
Support file system as storage layer in service section of fluenbit
1 parent d8debc6 commit 2e7b8ca

18 files changed

Lines changed: 473 additions & 0 deletions

apis/fluentbit/v1alpha2/clusterfluentbitconfig_types.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,27 @@ type FluentBitConfigSpec struct {
4646
Namespace *string `json:"namespace,omitempty"`
4747
}
4848

49+
type Storage struct {
50+
// Select an optional location in the file system to store streams and chunks of data/
51+
Path string `json:"path,omitempty"`
52+
// Configure the synchronization mode used to store the data into the file system
53+
// +kubebuilder:validation:Enum:=normal;full
54+
Sync string `json:"sync,omitempty"`
55+
// Enable the data integrity check when writing and reading data from the filesystem
56+
// +kubebuilder:validation:Enum:=on;off
57+
Checksum string `json:"checksum,omitempty"`
58+
// This option configure a hint of maximum value of memory to use when processing these records
59+
BacklogMemLimit string `json:"backlogMemLimit,omitempty"`
60+
// If the input plugin has enabled filesystem storage type, this property sets the maximum number of Chunks that can be up in memory
61+
MaxChunksUp *int64 `json:"maxChunksUp,omitempty"`
62+
// If http_server option has been enabled in the Service section, this option registers a new endpoint where internal metrics of the storage layer can be consumed
63+
// +kubebuilder:validation:Enum:=on;off
64+
Metrics string `json:"metrics,omitempty"`
65+
// When enabled, irrecoverable chunks will be deleted during runtime, and any other irrecoverable chunk located in the configured storage path directory will be deleted when Fluent-Bit starts.
66+
// +kubebuilder:validation:Enum:=on;off
67+
DeleteIrrecoverableChunks string `json:"deleteIrrecoverableChunks,omitempty"`
68+
}
69+
4970
type Service struct {
5071
// If true go to background on start
5172
Daemon *bool `json:"daemon,omitempty"`
@@ -80,6 +101,8 @@ type Service struct {
80101
LogLevel string `json:"logLevel,omitempty"`
81102
// Optional 'parsers' config file (can be multiple)
82103
ParsersFile string `json:"parsersFile,omitempty"`
104+
// Configure a global environment for the storage layer in Service. It is recommended to configure the volume and volumeMount separately for this storage. The hostPath type should be used for that Volume in Fluentbit daemon set.
105+
Storage *Storage `json:"storage,omitempty"`
83106
}
84107

85108
// +kubebuilder:object:root=true
@@ -149,6 +172,29 @@ func (s *Service) Params() *params.KVs {
149172
if s.ParsersFile != "" {
150173
m.Insert("Parsers_File", s.ParsersFile)
151174
}
175+
if s.Storage != nil {
176+
if s.Storage.Path != "" {
177+
m.Insert("storage.path", s.Storage.Path)
178+
}
179+
if s.Storage.Sync != "" {
180+
m.Insert("storage.sync", s.Storage.Sync)
181+
}
182+
if s.Storage.Checksum != "" {
183+
m.Insert("storage.checksum", s.Storage.Checksum)
184+
}
185+
if s.Storage.BacklogMemLimit != "" {
186+
m.Insert("storage.backlog.mem_limit", s.Storage.BacklogMemLimit)
187+
}
188+
if s.Storage.Metrics != "" {
189+
m.Insert("storage.metrics", s.Storage.Metrics)
190+
}
191+
if s.Storage.MaxChunksUp != nil {
192+
m.Insert("storage.max_chunks_up", fmt.Sprint(*s.Storage.MaxChunksUp))
193+
}
194+
if s.Storage.DeleteIrrecoverableChunks != "" {
195+
m.Insert("storage.delete_irrecoverable_chunks", s.Storage.DeleteIrrecoverableChunks)
196+
}
197+
}
152198
return m
153199
}
154200

apis/fluentbit/v1alpha2/plugins/input/systemd_types.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,12 @@ type Systemd struct {
4444
// Remove the leading underscore of the Journald field (key). For example the Journald field _PID becomes the key PID.
4545
// +kubebuilder:validation:Enum:=on;off
4646
StripUnderscores string `json:"stripUnderscores,omitempty"`
47+
// Specify the buffering mechanism to use. It can be memory or filesystem
48+
// +kubebuilder:validation:Enum:=filesystem;memory
49+
StorageType string `json:"storageType,omitempty"`
50+
// Specifies if the input plugin should be paused (stop ingesting new data) when the storage.max_chunks_up value is reached.
51+
// +kubebuilder:validation:Enum:=on;off
52+
PauseOnChunksOverlimit string `json:"pauseOnChunksOverlimit,omitempty"`
4753
}
4854

4955
func (_ *Systemd) Name() string {
@@ -85,6 +91,12 @@ func (s *Systemd) Params(_ plugins.SecretLoader) (*params.KVs, error) {
8591
if s.StripUnderscores != "" {
8692
kvs.Insert("Strip_Underscores", s.StripUnderscores)
8793
}
94+
if s.StorageType != "" {
95+
kvs.Insert("storage.type", s.StorageType)
96+
}
97+
if s.PauseOnChunksOverlimit != "" {
98+
kvs.Insert("storage.pause_on_chunks_overlimit", s.PauseOnChunksOverlimit)
99+
}
88100

89101
return kvs, nil
90102
}

apis/fluentbit/v1alpha2/plugins/input/tail_types.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,12 @@ type Tail struct {
9393
// This will help to reassembly multiline messages originally split by Docker or CRI
9494
//Specify one or Multiline Parser definition to apply to the content.
9595
MultilineParser string `json:"multilineParser,omitempty"`
96+
// Specify the buffering mechanism to use. It can be memory or filesystem
97+
// +kubebuilder:validation:Enum:=filesystem;memory
98+
StorageType string `json:"storageType,omitempty"`
99+
// Specifies if the input plugin should be paused (stop ingesting new data) when the storage.max_chunks_up value is reached.
100+
// +kubebuilder:validation:Enum:=on;off
101+
PauseOnChunksOverlimit string `json:"pauseOnChunksOverlimit,omitempty"`
96102
}
97103

98104
func (_ *Tail) Name() string {
@@ -179,5 +185,11 @@ func (t *Tail) Params(_ plugins.SecretLoader) (*params.KVs, error) {
179185
if t.MultilineParser != "" {
180186
kvs.Insert("multiline.parser", t.MultilineParser)
181187
}
188+
if t.StorageType != "" {
189+
kvs.Insert("storage.type", t.StorageType)
190+
}
191+
if t.PauseOnChunksOverlimit != "" {
192+
kvs.Insert("storage.pause_on_chunks_overlimit", t.PauseOnChunksOverlimit)
193+
}
182194
return kvs, nil
183195
}

apis/fluentbit/v1alpha2/plugins/output/open_search_types.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,8 @@ type OpenSearch struct {
104104
// Enables dedicated thread(s) for this output. Default value is set since version 1.8.13. For previous versions is 0.
105105
Workers *int32 `json:"Workers,omitempty"`
106106
*plugins.TLS `json:"tls,omitempty"`
107+
// Limit the maximum number of Chunks in the filesystem for the current output logical destination.
108+
TotalLimitSize string `json:"totalLimitSize,omitempty"`
107109
}
108110

109111
// Name implement Section() method
@@ -240,5 +242,8 @@ func (o *OpenSearch) Params(sl plugins.SecretLoader) (*params.KVs, error) {
240242
}
241243
kvs.Merge(tls)
242244
}
245+
if o.TotalLimitSize != "" {
246+
kvs.Insert("storage.total_limit_size", o.TotalLimitSize)
247+
}
243248
return kvs, nil
244249
}

charts/fluent-operator/charts/fluent-bit-crds/crds/fluentbit.fluent.io_clusterfluentbitconfigs.yaml

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,58 @@ spec:
296296
parsersFile:
297297
description: Optional 'parsers' config file (can be multiple)
298298
type: string
299+
storage:
300+
description: Configure a global environment for the storage layer
301+
in Service. It is recommended to configure the volume and volumeMount
302+
separately for this storage. The hostPath type should be used
303+
for that Volume in Fluentbit daemon set.
304+
properties:
305+
backlogMemLimit:
306+
description: This option configure a hint of maximum value
307+
of memory to use when processing these records
308+
type: string
309+
checksum:
310+
description: Enable the data integrity check when writing
311+
and reading data from the filesystem
312+
enum:
313+
- "on"
314+
- "off"
315+
type: string
316+
deleteIrrecoverableChunks:
317+
description: When enabled, irrecoverable chunks will be deleted
318+
during runtime, and any other irrecoverable chunk located
319+
in the configured storage path directory will be deleted
320+
when Fluent-Bit starts.
321+
enum:
322+
- "on"
323+
- "off"
324+
type: string
325+
maxChunksUp:
326+
description: If the input plugin has enabled filesystem storage
327+
type, this property sets the maximum number of Chunks that
328+
can be up in memory
329+
format: int64
330+
type: integer
331+
metrics:
332+
description: If http_server option has been enabled in the
333+
Service section, this option registers a new endpoint where
334+
internal metrics of the storage layer can be consumed
335+
enum:
336+
- "on"
337+
- "off"
338+
type: string
339+
path:
340+
description: Select an optional location in the file system
341+
to store streams and chunks of data/
342+
type: string
343+
sync:
344+
description: Configure the synchronization mode used to store
345+
the data into the file system
346+
enum:
347+
- normal
348+
- full
349+
type: string
350+
type: object
299351
type: object
300352
type: object
301353
type: object

charts/fluent-operator/charts/fluent-bit-crds/crds/fluentbit.fluent.io_clusterinputs.yaml

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,13 +167,28 @@ spec:
167167
not set, the plugin will use default paths to read local-only
168168
logs.
169169
type: string
170+
pauseOnChunksOverlimit:
171+
description: Specifies if the input plugin should be paused (stop
172+
ingesting new data) when the storage.max_chunks_up value is
173+
reached.
174+
enum:
175+
- "on"
176+
- "off"
177+
type: string
170178
readFromTail:
171179
description: Start reading new entries. Skip entries already stored
172180
in Journald.
173181
enum:
174182
- "on"
175183
- "off"
176184
type: string
185+
storageType:
186+
description: Specify the buffering mechanism to use. It can be
187+
memory or filesystem
188+
enum:
189+
- filesystem
190+
- memory
191+
type: string
177192
stripUnderscores:
178193
description: Remove the leading underscore of the Journald field
179194
(key). For example the Journald field _PID becomes the key PID.
@@ -319,6 +334,14 @@ spec:
319334
file as part of the record. The value assigned becomes the key
320335
in the map.
321336
type: string
337+
pauseOnChunksOverlimit:
338+
description: Specifies if the input plugin should be paused (stop
339+
ingesting new data) when the storage.max_chunks_up value is
340+
reached.
341+
enum:
342+
- "on"
343+
- "off"
344+
type: string
322345
readFromHead:
323346
description: For new discovered files on start (without a database
324347
offset/position), read the content from the head of the file,
@@ -341,6 +364,13 @@ spec:
341364
behavior and instruct Fluent Bit to skip long lines and continue
342365
processing other lines that fits into the buffer size.
343366
type: boolean
367+
storageType:
368+
description: Specify the buffering mechanism to use. It can be
369+
memory or filesystem
370+
enum:
371+
- filesystem
372+
- memory
373+
type: string
344374
tag:
345375
description: Set a tag (with regex-extract fields) that will be
346376
placed on lines read. E.g. kube.<namespace_name>.<pod_name>.<container_name>

charts/fluent-operator/charts/fluent-bit-crds/crds/fluentbit.fluent.io_clusteroutputs.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1573,6 +1573,10 @@ spec:
15731573
description: Hostname to be used for TLS SNI extension
15741574
type: string
15751575
type: object
1576+
totalLimitSize:
1577+
description: Limit the maximum number of Chunks in the filesystem
1578+
for the current output logical destination.
1579+
type: string
15761580
traceError:
15771581
description: When enabled print the elasticsearch API calls to
15781582
stdout when elasticsearch returns an error

charts/fluent-operator/charts/fluent-bit-crds/crds/fluentbit.fluent.io_outputs.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1573,6 +1573,10 @@ spec:
15731573
description: Hostname to be used for TLS SNI extension
15741574
type: string
15751575
type: object
1576+
totalLimitSize:
1577+
description: Limit the maximum number of Chunks in the filesystem
1578+
for the current output logical destination.
1579+
type: string
15761580
traceError:
15771581
description: When enabled print the elasticsearch API calls to
15781582
stdout when elasticsearch returns an error

charts/fluent-operator/templates/fluentbit-clusterinput-systemd.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@ spec:
2525
{{- toYaml .Values.fluentbit.input.systemd.systemdFilter.filters | nindent 6 }}
2626
{{- end }}
2727
{{- end }}
28+
storageType: {{ .Values.fluentbit.input.systemd.storageType }}
29+
{{- if eq .Values.fluentbit.input.systemd.storageType "filesystem" }}
30+
pauseOnChunksOverlimit: {{ .Values.fluentbit.input.systemd.pauseOnChunksOverlimit | quote }}
31+
{{- end }}
2832
{{- end }}
2933
{{- end }}
3034
{{- end }}

charts/fluent-operator/templates/fluentbit-clusterinput-tail.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@ spec:
2525
skipLongLines: {{ .Values.fluentbit.input.tail.skipLongLines }}
2626
db: /fluent-bit/tail/pos.db
2727
dbSync: Normal
28+
storageType: {{ .Values.fluentbit.input.tail.storageType }}
29+
{{- if eq .Values.fluentbit.input.tail.storageType "filesystem" }}
30+
pauseOnChunksOverlimit: {{ .Values.fluentbit.input.tail.pauseOnChunksOverlimit | quote }}
31+
{{- end }}
2832
{{- end }}
2933
{{- end }}
3034
{{- end }}

0 commit comments

Comments
 (0)