Skip to content

Commit 3824370

Browse files
authored
chore(transforms): Refactor output types into sub-module (#24604)
1 parent 383c2ff commit 3824370

File tree

2 files changed

+364
-359
lines changed

2 files changed

+364
-359
lines changed

lib/vector-core/src/transform/mod.rs

Lines changed: 6 additions & 359 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,19 @@
1-
use std::{collections::HashMap, error, pin::Pin, sync::Arc, time::Instant};
1+
use std::{collections::HashMap, pin::Pin, sync::Arc};
22

33
use futures::{Stream, StreamExt};
4-
use vector_common::{
5-
EventDataEq,
6-
byte_size_of::ByteSizeOf,
7-
internal_event::{
8-
self, CountByteSize, DEFAULT_OUTPUT, EventsSent, InternalEventHandle as _, Registered,
9-
register,
10-
},
11-
json_size::JsonSize,
12-
};
134

145
use crate::{
15-
config,
16-
config::{ComponentKey, OutputId},
17-
event::{
18-
EstimatedJsonEncodedSizeOf, Event, EventArray, EventContainer, EventMutRef, EventRef,
19-
into_event_stream,
20-
},
21-
fanout::{self, Fanout},
22-
schema,
6+
config::OutputId,
7+
event::{Event, EventArray, EventContainer, EventMutRef, into_event_stream},
238
schema::Definition,
249
};
2510

11+
mod outputs;
2612
#[cfg(feature = "lua")]
2713
pub mod runtime_transform;
2814

15+
pub use outputs::{OutputBuffer, TransformOutputs, TransformOutputsBuf};
16+
2917
/// Transforms come in two variants. Functions, or tasks.
3018
///
3119
/// While function transforms can be run out of order, or concurrently, task
@@ -182,132 +170,6 @@ impl SyncTransform for Box<dyn FunctionTransform> {
182170
}
183171
}
184172

185-
struct TransformOutput {
186-
fanout: Fanout,
187-
events_sent: Registered<EventsSent>,
188-
log_schema_definitions: HashMap<OutputId, Arc<schema::Definition>>,
189-
output_id: Arc<OutputId>,
190-
}
191-
192-
pub struct TransformOutputs {
193-
outputs_spec: Vec<config::TransformOutput>,
194-
primary_output: Option<TransformOutput>,
195-
named_outputs: HashMap<String, TransformOutput>,
196-
}
197-
198-
impl TransformOutputs {
199-
pub fn new(
200-
outputs_in: Vec<config::TransformOutput>,
201-
component_key: &ComponentKey,
202-
) -> (Self, HashMap<Option<String>, fanout::ControlChannel>) {
203-
let outputs_spec = outputs_in.clone();
204-
let mut primary_output = None;
205-
let mut named_outputs = HashMap::new();
206-
let mut controls = HashMap::new();
207-
208-
for output in outputs_in {
209-
let (fanout, control) = Fanout::new();
210-
211-
let log_schema_definitions = output
212-
.log_schema_definitions
213-
.into_iter()
214-
.map(|(id, definition)| (id, Arc::new(definition)))
215-
.collect();
216-
217-
match output.port {
218-
None => {
219-
primary_output = Some(TransformOutput {
220-
fanout,
221-
events_sent: register(EventsSent::from(internal_event::Output(Some(
222-
DEFAULT_OUTPUT.into(),
223-
)))),
224-
log_schema_definitions,
225-
output_id: Arc::new(OutputId {
226-
component: component_key.clone(),
227-
port: None,
228-
}),
229-
});
230-
controls.insert(None, control);
231-
}
232-
Some(name) => {
233-
named_outputs.insert(
234-
name.clone(),
235-
TransformOutput {
236-
fanout,
237-
events_sent: register(EventsSent::from(internal_event::Output(Some(
238-
name.clone().into(),
239-
)))),
240-
log_schema_definitions,
241-
output_id: Arc::new(OutputId {
242-
component: component_key.clone(),
243-
port: Some(name.clone()),
244-
}),
245-
},
246-
);
247-
controls.insert(Some(name.clone()), control);
248-
}
249-
}
250-
}
251-
252-
let me = Self {
253-
outputs_spec,
254-
primary_output,
255-
named_outputs,
256-
};
257-
258-
(me, controls)
259-
}
260-
261-
pub fn new_buf_with_capacity(&self, capacity: usize) -> TransformOutputsBuf {
262-
TransformOutputsBuf::new_with_capacity(self.outputs_spec.clone(), capacity)
263-
}
264-
265-
/// Sends the events in the buffer to their respective outputs.
266-
///
267-
/// # Errors
268-
///
269-
/// If an error occurs while sending events to their respective output, an error variant will be
270-
/// returned detailing the cause.
271-
pub async fn send(
272-
&mut self,
273-
buf: &mut TransformOutputsBuf,
274-
) -> Result<(), Box<dyn error::Error + Send + Sync>> {
275-
if let Some(primary) = self.primary_output.as_mut() {
276-
let buf = buf
277-
.primary_buffer
278-
.as_mut()
279-
.unwrap_or_else(|| unreachable!("mismatched outputs"));
280-
Self::send_single_buffer(buf, primary).await?;
281-
}
282-
for (key, buf) in &mut buf.named_buffers {
283-
let output = self
284-
.named_outputs
285-
.get_mut(key)
286-
.unwrap_or_else(|| unreachable!("unknown output"));
287-
Self::send_single_buffer(buf, output).await?;
288-
}
289-
Ok(())
290-
}
291-
292-
async fn send_single_buffer(
293-
buf: &mut OutputBuffer,
294-
output: &mut TransformOutput,
295-
) -> Result<(), Box<dyn error::Error + Send + Sync>> {
296-
for event in buf.events_mut() {
297-
update_runtime_schema_definition(
298-
event,
299-
&output.output_id,
300-
&output.log_schema_definitions,
301-
);
302-
}
303-
let count = buf.len();
304-
let byte_size = buf.estimated_json_encoded_size_of();
305-
buf.send(&mut output.fanout).await?;
306-
output.events_sent.emit(CountByteSize(count, byte_size));
307-
Ok(())
308-
}
309-
}
310-
311173
#[allow(clippy::implicit_hasher)]
312174
/// `event`: The event that will be updated
313175
/// `output_id`: The `output_id` that the current even is being sent to (will be used as the new `parent_id`)
@@ -334,221 +196,6 @@ pub fn update_runtime_schema_definition(
334196
event.metadata_mut().set_upstream_id(Arc::clone(output_id));
335197
}
336198

337-
#[derive(Debug, Clone)]
338-
pub struct TransformOutputsBuf {
339-
primary_buffer: Option<OutputBuffer>,
340-
named_buffers: HashMap<String, OutputBuffer>,
341-
}
342-
343-
impl TransformOutputsBuf {
344-
pub fn new_with_capacity(outputs_in: Vec<config::TransformOutput>, capacity: usize) -> Self {
345-
let mut primary_buffer = None;
346-
let mut named_buffers = HashMap::new();
347-
348-
for output in outputs_in {
349-
match output.port {
350-
None => {
351-
primary_buffer = Some(OutputBuffer::with_capacity(capacity));
352-
}
353-
Some(name) => {
354-
named_buffers.insert(name.clone(), OutputBuffer::default());
355-
}
356-
}
357-
}
358-
359-
Self {
360-
primary_buffer,
361-
named_buffers,
362-
}
363-
}
364-
365-
/// Adds a new event to the named output buffer.
366-
///
367-
/// # Panics
368-
///
369-
/// Panics if there is no output with the given name.
370-
pub fn push(&mut self, name: Option<&str>, event: Event) {
371-
match name {
372-
Some(name) => self.named_buffers.get_mut(name),
373-
None => self.primary_buffer.as_mut(),
374-
}
375-
.expect("unknown output")
376-
.push(event);
377-
}
378-
379-
/// Drains the default output buffer.
380-
///
381-
/// # Panics
382-
///
383-
/// Panics if there is no default output.
384-
pub fn drain(&mut self) -> impl Iterator<Item = Event> + '_ {
385-
self.primary_buffer
386-
.as_mut()
387-
.expect("no default output")
388-
.drain()
389-
}
390-
391-
/// Drains the named output buffer.
392-
///
393-
/// # Panics
394-
///
395-
/// Panics if there is no output with the given name.
396-
pub fn drain_named(&mut self, name: &str) -> impl Iterator<Item = Event> + '_ {
397-
self.named_buffers
398-
.get_mut(name)
399-
.expect("unknown output")
400-
.drain()
401-
}
402-
403-
/// Takes the default output buffer.
404-
///
405-
/// # Panics
406-
///
407-
/// Panics if there is no default output.
408-
pub fn take_primary(&mut self) -> OutputBuffer {
409-
std::mem::take(self.primary_buffer.as_mut().expect("no default output"))
410-
}
411-
412-
pub fn take_all_named(&mut self) -> HashMap<String, OutputBuffer> {
413-
std::mem::take(&mut self.named_buffers)
414-
}
415-
}
416-
417-
impl ByteSizeOf for TransformOutputsBuf {
418-
fn allocated_bytes(&self) -> usize {
419-
self.primary_buffer.size_of()
420-
+ self
421-
.named_buffers
422-
.values()
423-
.map(ByteSizeOf::size_of)
424-
.sum::<usize>()
425-
}
426-
}
427-
428-
#[derive(Debug, Default, Clone)]
429-
pub struct OutputBuffer(Vec<EventArray>);
430-
431-
impl OutputBuffer {
432-
pub fn with_capacity(capacity: usize) -> Self {
433-
Self(Vec::with_capacity(capacity))
434-
}
435-
436-
pub fn push(&mut self, event: Event) {
437-
// Coalesce multiple pushes of the same type into one array.
438-
match (event, self.0.last_mut()) {
439-
(Event::Log(log), Some(EventArray::Logs(logs))) => {
440-
logs.push(log);
441-
}
442-
(Event::Metric(metric), Some(EventArray::Metrics(metrics))) => {
443-
metrics.push(metric);
444-
}
445-
(Event::Trace(trace), Some(EventArray::Traces(traces))) => {
446-
traces.push(trace);
447-
}
448-
(event, _) => {
449-
self.0.push(event.into());
450-
}
451-
}
452-
}
453-
454-
pub fn append(&mut self, events: &mut Vec<Event>) {
455-
for event in events.drain(..) {
456-
self.push(event);
457-
}
458-
}
459-
460-
pub fn extend(&mut self, events: impl Iterator<Item = Event>) {
461-
for event in events {
462-
self.push(event);
463-
}
464-
}
465-
466-
pub fn is_empty(&self) -> bool {
467-
self.0.is_empty()
468-
}
469-
470-
pub fn len(&self) -> usize {
471-
self.0.iter().map(EventArray::len).sum()
472-
}
473-
474-
pub fn capacity(&self) -> usize {
475-
self.0.capacity()
476-
}
477-
478-
pub fn first(&self) -> Option<EventRef<'_>> {
479-
self.0.first().and_then(|first| match first {
480-
EventArray::Logs(l) => l.first().map(Into::into),
481-
EventArray::Metrics(m) => m.first().map(Into::into),
482-
EventArray::Traces(t) => t.first().map(Into::into),
483-
})
484-
}
485-
486-
pub fn drain(&mut self) -> impl Iterator<Item = Event> + '_ {
487-
self.0.drain(..).flat_map(EventArray::into_events)
488-
}
489-
490-
async fn send(
491-
&mut self,
492-
output: &mut Fanout,
493-
) -> Result<(), Box<dyn error::Error + Send + Sync>> {
494-
let send_start = Some(Instant::now());
495-
for array in std::mem::take(&mut self.0) {
496-
output.send(array, send_start).await?;
497-
}
498-
499-
Ok(())
500-
}
501-
502-
fn iter_events(&self) -> impl Iterator<Item = EventRef<'_>> {
503-
self.0.iter().flat_map(EventArray::iter_events)
504-
}
505-
506-
fn events_mut(&mut self) -> impl Iterator<Item = EventMutRef<'_>> {
507-
self.0.iter_mut().flat_map(EventArray::iter_events_mut)
508-
}
509-
510-
pub fn into_events(self) -> impl Iterator<Item = Event> {
511-
self.0.into_iter().flat_map(EventArray::into_events)
512-
}
513-
}
514-
515-
impl ByteSizeOf for OutputBuffer {
516-
fn allocated_bytes(&self) -> usize {
517-
self.0.iter().map(ByteSizeOf::size_of).sum()
518-
}
519-
}
520-
521-
impl EventDataEq<Vec<Event>> for OutputBuffer {
522-
fn event_data_eq(&self, other: &Vec<Event>) -> bool {
523-
struct Comparator<'a>(EventRef<'a>);
524-
525-
impl PartialEq<&Event> for Comparator<'_> {
526-
fn eq(&self, that: &&Event) -> bool {
527-
self.0.event_data_eq(that)
528-
}
529-
}
530-
531-
self.iter_events().map(Comparator).eq(other.iter())
532-
}
533-
}
534-
535-
impl EstimatedJsonEncodedSizeOf for OutputBuffer {
536-
fn estimated_json_encoded_size_of(&self) -> JsonSize {
537-
self.0
538-
.iter()
539-
.map(EstimatedJsonEncodedSizeOf::estimated_json_encoded_size_of)
540-
.sum()
541-
}
542-
}
543-
544-
impl From<Vec<Event>> for OutputBuffer {
545-
fn from(events: Vec<Event>) -> Self {
546-
let mut result = Self::default();
547-
result.extend(events.into_iter());
548-
result
549-
}
550-
}
551-
552199
struct WrapEventTask<T>(T);
553200

554201
impl<T: TaskTransform<Event> + Send + 'static> TaskTransform<EventArray> for WrapEventTask<T> {

0 commit comments

Comments
 (0)