Skip to content

Commit 48f4e40

Browse files
authored
chore: Add doc and rename function for flushing strategy (#740)
# Motivation It took me quite some effort to understand flushing strategies. I want to make it easier to understand for me and future developers. # This PR Tries to make flushing strategy code more readable: 1. Add/move comments 2. Create an enum `ConcreteFlushStrategy`, which doesn't contain `Default` because it is required to be resolved to a concrete strategy 3. Rename `should_adapt` to `evaluate_concrete_strategy()` # To reviewers There are still a few things I don't understand, which are marked with `TODO`. Appreciate explanation! Also correct me if any comment I added is wrong.
1 parent 04a8b7d commit 48f4e40

File tree

4 files changed

+102
-57
lines changed

4 files changed

+102
-57
lines changed

bottlecap/src/config/flush_strategy.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,27 @@ pub struct PeriodicStrategy {
88

99
#[derive(Clone, Copy, Debug, PartialEq)]
1010
pub enum FlushStrategy {
11+
// Flush every 1s and at the end of the invocation
1112
Default,
13+
// User specifies the interval in milliseconds, will not block on the runtimeDone event
14+
Periodically(PeriodicStrategy),
15+
// Always flush at the end of the invocation
1216
End,
17+
// Flush both (1) at the end of the invocation and (2) periodically with the specified interval
1318
EndPeriodically(PeriodicStrategy),
19+
// Flush in a non-blocking, asynchronous manner, so the next invocation can start without waiting
20+
// for the flush to complete
21+
Continuously(PeriodicStrategy),
22+
}
23+
24+
// A restricted subset of `FlushStrategy`. The Default strategy is now allowed, which is required to be
25+
// translated into a concrete strategy.
26+
#[allow(clippy::module_name_repetitions)]
27+
#[derive(Clone, Copy, Debug, PartialEq)]
28+
pub enum ConcreteFlushStrategy {
1429
Periodically(PeriodicStrategy),
30+
End,
31+
EndPeriodically(PeriodicStrategy),
1532
Continuously(PeriodicStrategy),
1633
}
1734

bottlecap/src/config/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,7 @@ pub struct Config {
240240
pub api_key: String,
241241
pub log_level: LogLevel,
242242

243+
// Timeout for the request to flush data to Datadog endpoint
243244
pub flush_timeout: u64,
244245

245246
// Proxy

bottlecap/src/lifecycle/flush_control.rs

Lines changed: 16 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::config::flush_strategy::FlushStrategy;
1+
use crate::config::flush_strategy::{ConcreteFlushStrategy, FlushStrategy};
22
use std::time;
33
use tokio::time::{Interval, MissedTickBehavior::Skip};
44

@@ -15,6 +15,7 @@ pub struct FlushControl {
1515
flush_timeout: u64,
1616
}
1717

18+
// The flush behavior for the current moment
1819
#[derive(Copy, Clone, Debug, PartialEq)]
1920
pub enum FlushDecision {
2021
Continuous,
@@ -23,12 +24,6 @@ pub enum FlushDecision {
2324
Dont,
2425
}
2526

26-
// 1. Default Strategy
27-
// - Flush every 1s and at the end of the invocation
28-
// 2. Periodic Strategy
29-
// - User specifies the interval in milliseconds, will not block on the runtimeDone event
30-
// 3. End strategy
31-
// - Always flush at the end of the invocation
3227
impl FlushControl {
3328
#[must_use]
3429
pub fn new(flush_strategy: FlushStrategy, flush_timeout: u64) -> FlushControl {
@@ -58,29 +53,29 @@ impl FlushControl {
5853
i
5954
}
6055
FlushStrategy::End => {
56+
// Set the race flush interval to the maximum value of Lambda timeout, so flush will
57+
// only happen at the end of the invocation, and race flush will never happen.
6158
tokio::time::interval(tokio::time::Duration::from_millis(FIFTEEN_MINUTES))
6259
}
6360
}
6461
}
6562

63+
// Evaluate the flush decision for the current moment, based on the flush strategy, current time,
64+
// and the past invocation times.
6665
#[must_use]
6766
pub fn evaluate_flush_decision(&mut self) -> FlushDecision {
6867
let now = time::SystemTime::now()
6968
.duration_since(time::UNIX_EPOCH)
7069
.expect("unable to poll clock, unrecoverable")
7170
.as_secs();
7271
self.invocation_times.add(now);
73-
let evaluated_flush_strategy = if self.flush_strategy == FlushStrategy::Default {
74-
&self.invocation_times.should_adapt(now, self.flush_timeout)
75-
} else {
76-
// User specified one
77-
&self.flush_strategy
78-
};
79-
match evaluated_flush_strategy {
80-
FlushStrategy::Default => {
81-
unreachable!("should_adapt must translate default strategy to concrete strategy")
82-
}
83-
FlushStrategy::Periodically(strategy) => {
72+
let concrete_flush_strategy = self.invocation_times.evaluate_concrete_strategy(
73+
now,
74+
self.flush_timeout,
75+
self.flush_strategy,
76+
);
77+
match concrete_flush_strategy {
78+
ConcreteFlushStrategy::Periodically(strategy) => {
8479
if self.interval_passed(now, strategy.interval) {
8580
self.last_flush = now;
8681
// TODO calculate periodic rate. if it's more frequent than the flush_timeout
@@ -90,7 +85,7 @@ impl FlushControl {
9085
FlushDecision::Dont
9186
}
9287
}
93-
FlushStrategy::Continuously(strategy) => {
88+
ConcreteFlushStrategy::Continuously(strategy) => {
9489
if self.interval_passed(now, strategy.interval) {
9590
self.last_flush = now;
9691
// TODO calculate periodic rate. if it's more frequent than the flush_timeout
@@ -100,8 +95,8 @@ impl FlushControl {
10095
FlushDecision::Dont
10196
}
10297
}
103-
FlushStrategy::End => FlushDecision::End,
104-
FlushStrategy::EndPeriodically(strategy) => {
98+
ConcreteFlushStrategy::End => FlushDecision::End,
99+
ConcreteFlushStrategy::EndPeriodically(strategy) => {
105100
if self.interval_passed(now, strategy.interval) {
106101
self.last_flush = now;
107102
FlushDecision::End

bottlecap/src/lifecycle/invocation_times.rs

Lines changed: 68 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::config::flush_strategy::{FlushStrategy, PeriodicStrategy};
1+
use crate::config::flush_strategy::{ConcreteFlushStrategy, FlushStrategy, PeriodicStrategy};
22

33
const TWENTY_SECONDS: u64 = 20 * 1000;
44
const LOOKBACK_COUNT: usize = 20;
@@ -23,39 +23,62 @@ impl InvocationTimes {
2323
self.head = (self.head + 1) % LOOKBACK_COUNT;
2424
}
2525

26-
pub(crate) fn should_adapt(&self, now: u64, flush_timeout: u64) -> FlushStrategy {
27-
// If the buffer isn't full, then we haven't seen enough invocations, so we should flush.
28-
for idx in self.head..LOOKBACK_COUNT {
29-
if self.times[idx] == 0 {
30-
return FlushStrategy::End;
26+
// Translate FlushStrategy to a ConcreteFlushStrategy
27+
// For FlushStrategy::Default, evaluate based on past invocation times. Otherwise, return the
28+
// strategy as is.
29+
pub(crate) fn evaluate_concrete_strategy(
30+
&self,
31+
now: u64,
32+
flush_timeout: u64,
33+
flush_strategy: FlushStrategy,
34+
) -> ConcreteFlushStrategy {
35+
match flush_strategy {
36+
FlushStrategy::Periodically(p) => ConcreteFlushStrategy::Periodically(p),
37+
FlushStrategy::End => ConcreteFlushStrategy::End,
38+
FlushStrategy::Continuously(p) => ConcreteFlushStrategy::Continuously(p),
39+
FlushStrategy::EndPeriodically(p) => ConcreteFlushStrategy::EndPeriodically(p),
40+
FlushStrategy::Default => {
41+
// If the buffer isn't full, then we haven't seen enough invocations, so we should flush
42+
// at the end of the invocation.
43+
for idx in self.head..LOOKBACK_COUNT {
44+
if self.times[idx] == 0 {
45+
return ConcreteFlushStrategy::End;
46+
}
47+
}
48+
49+
// Now we've seen at least 20 invocations. Possible cases:
50+
// 1. If the average time between invocations is longer than 2 minutes, stick to End strategy.
51+
// 2. If average interval is shorter than 2 minutes:
52+
// 2.1 If it's very short, use the continuous strategy to minimize delaying the next invocation.
53+
// 2.2 If it's not too short, use the periodic strategy to minimize the risk that
54+
// flushing is delayed due to the Lambda environment being frozen between invocations.
55+
// We get the average time between each invocation by taking the difference between newest (`now`) and the
56+
// oldest invocation in the buffer, then dividing by `LOOKBACK_COUNT - 1`.
57+
let oldest = self.times[self.head];
58+
59+
let elapsed = now - oldest;
60+
let should_adapt =
61+
(elapsed as f64 / (LOOKBACK_COUNT - 1) as f64) < ONE_TWENTY_SECONDS;
62+
if should_adapt {
63+
// Both units here are in seconds
64+
if elapsed < flush_timeout {
65+
return ConcreteFlushStrategy::Continuously(PeriodicStrategy {
66+
interval: TWENTY_SECONDS,
67+
});
68+
}
69+
return ConcreteFlushStrategy::Periodically(PeriodicStrategy {
70+
interval: TWENTY_SECONDS,
71+
});
72+
}
73+
ConcreteFlushStrategy::End
3174
}
3275
}
33-
34-
// Now we've seen at least 20 invocations. Switch to periodic if we're invoked at least once every 2 minutes.
35-
// We get the average time between each invocation by taking the difference between newest (`now`) and the
36-
// oldest invocation in the buffer, then dividing by `LOOKBACK_COUNT - 1`.
37-
let oldest = self.times[self.head];
38-
39-
let elapsed = now - oldest;
40-
let should_adapt = (elapsed as f64 / (LOOKBACK_COUNT - 1) as f64) < ONE_TWENTY_SECONDS;
41-
if should_adapt {
42-
// Both units here are in seconds
43-
if elapsed < flush_timeout {
44-
return FlushStrategy::Continuously(PeriodicStrategy {
45-
interval: TWENTY_SECONDS,
46-
});
47-
}
48-
return FlushStrategy::Periodically(PeriodicStrategy {
49-
interval: TWENTY_SECONDS,
50-
});
51-
}
52-
FlushStrategy::End
5376
}
5477
}
5578

5679
#[cfg(test)]
5780
mod tests {
58-
use crate::config::flush_strategy::{FlushStrategy, PeriodicStrategy};
81+
use crate::config::flush_strategy::{ConcreteFlushStrategy, FlushStrategy, PeriodicStrategy};
5982
use crate::lifecycle::invocation_times::{self, TWENTY_SECONDS};
6083

6184
#[test]
@@ -75,7 +98,10 @@ mod tests {
7598
invocation_times.add(timestamp);
7699
assert_eq!(invocation_times.times[0], timestamp);
77100
assert_eq!(invocation_times.head, 1);
78-
assert_eq!(invocation_times.should_adapt(1, 60), FlushStrategy::End);
101+
assert_eq!(
102+
invocation_times.evaluate_concrete_strategy(1, 60, FlushStrategy::Default),
103+
ConcreteFlushStrategy::End
104+
);
79105
}
80106

81107
#[test]
@@ -88,8 +114,8 @@ mod tests {
88114
assert_eq!(invocation_times.times[0], 20);
89115
assert_eq!(invocation_times.head, 1);
90116
assert_eq!(
91-
invocation_times.should_adapt(21, 60),
92-
FlushStrategy::Continuously(PeriodicStrategy {
117+
invocation_times.evaluate_concrete_strategy(21, 60, FlushStrategy::Default),
118+
ConcreteFlushStrategy::Continuously(PeriodicStrategy {
93119
interval: TWENTY_SECONDS
94120
})
95121
);
@@ -105,8 +131,8 @@ mod tests {
105131
assert_eq!(invocation_times.times[0], 20);
106132
assert_eq!(invocation_times.head, 1);
107133
assert_eq!(
108-
invocation_times.should_adapt(21, 1),
109-
FlushStrategy::Periodically(PeriodicStrategy {
134+
invocation_times.evaluate_concrete_strategy(21, 1, FlushStrategy::Default),
135+
ConcreteFlushStrategy::Periodically(PeriodicStrategy {
110136
interval: TWENTY_SECONDS
111137
})
112138
);
@@ -122,7 +148,10 @@ mod tests {
122148
// should wrap around
123149
assert_eq!(invocation_times.times[0], 5019);
124150
assert_eq!(invocation_times.head, 1);
125-
assert_eq!(invocation_times.should_adapt(10000, 60), FlushStrategy::End);
151+
assert_eq!(
152+
invocation_times.evaluate_concrete_strategy(10000, 60, FlushStrategy::Default),
153+
ConcreteFlushStrategy::End
154+
);
126155
}
127156

128157
#[test]
@@ -140,8 +169,8 @@ mod tests {
140169
1901
141170
);
142171
assert_eq!(
143-
invocation_times.should_adapt(2501, 60),
144-
FlushStrategy::Periodically(PeriodicStrategy {
172+
invocation_times.evaluate_concrete_strategy(2501, 60, FlushStrategy::Default),
173+
ConcreteFlushStrategy::Periodically(PeriodicStrategy {
145174
interval: TWENTY_SECONDS
146175
})
147176
);
@@ -161,6 +190,9 @@ mod tests {
161190
invocation_times.times[invocation_times::LOOKBACK_COUNT - 1],
162191
2471
163192
);
164-
assert_eq!(invocation_times.should_adapt(3251, 60), FlushStrategy::End);
193+
assert_eq!(
194+
invocation_times.evaluate_concrete_strategy(3251, 60, FlushStrategy::Default),
195+
ConcreteFlushStrategy::End
196+
);
165197
}
166198
}

0 commit comments

Comments
 (0)