Skip to content

Commit 05244ec

Browse files
authored
Prioritize thread snapshot event processing to avoid zone initialization stalls (#47)
* Prioritize thread snapshot events * Fix some logging
1 parent 39dd0d2 commit 05244ec

4 files changed

Lines changed: 60 additions & 26 deletions

File tree

src/source_plugin/client/mod.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,15 @@ impl Client {
176176
sender: mpsc::UnboundedSender<MonitorZoneKernelEventReply>,
177177
zone_exit_tx: mpsc::UnboundedSender<String>,
178178
) -> Result<()> {
179+
// Consistency check: should never hit this
180+
if self.zone_pump_handles.contains_key(&zone_id) {
181+
warn!(
182+
"Pump already exists for zone {}, skipping duplicate",
183+
zone_id
184+
);
185+
return Ok(());
186+
}
187+
179188
let (cancel_tx, cancel_rx) = watch::channel::<bool>(false);
180189
let done_cancel = cancel_tx.clone();
181190
let zone_id_local = zone_id.clone();
@@ -244,7 +253,7 @@ impl Client {
244253
cancel_rx: watch::Receiver<bool>,
245254
zone_exit_tx: mpsc::UnboundedSender<String>,
246255
) -> Result<JoinSet<()>> {
247-
info!("Listening for kernel events from zone {}", zone_id);
256+
debug!("listening for kernel events from zone {}", zone_id);
248257
let event = MonitorZoneKernelEventRequest {
249258
zone_id: zone_id.clone(),
250259
request: Some(zk_req::Request::Update(ZoneKernelEventStreamUpdate {

src/source_plugin/client/zone_creation_watcher.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use anyhow::Result;
55
use tonic::transport::Channel;
66

77
use super::edera_client::zone_watcher::ZoneWatcher as watcher;
8-
use log::{debug, error};
8+
use log::{debug, error, warn};
99
use tokio::{sync::broadcast, task::JoinHandle};
1010

1111
pub struct ZoneWatcher {}
@@ -35,13 +35,19 @@ impl ZoneWatcher {
3535
// to zones that are not in Ready state yet.
3636
// We don't care about zones that aren't ready yet, so filter them out.
3737
if zone.status == ZoneState::Ready {
38-
// if it's ready, add it to the list whether we've seen it or not.
39-
ready_zones.push(zid.clone());
4038
// send it if we haven't seen it before.
4139
if !last_ready_zones.contains(&zid) {
4240
debug!("got new zone {:?}", zone);
43-
let _ = tx.send(zid);
41+
if tx.send(zid.clone()).is_err() {
42+
// If send fails, don't track it, so we retry next iteration
43+
warn!(
44+
"failed to notify subcribers of zone {}, will retry",
45+
zid
46+
);
47+
continue;
48+
}
4449
}
50+
ready_zones.push(zid.clone());
4551
}
4652
}
4753

src/source_plugin/mod.rs

Lines changed: 39 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -45,40 +45,59 @@ impl EderaSourcePluginInstance {
4545
batch: &mut EventBatch,
4646
plugin: &mut EderaPlugin,
4747
) -> Result<u32> {
48-
let mut drained_count = 0;
48+
let mut drained_event_count = 0;
49+
let mut threadsnap_count = 0;
4950
if let Some(rx) = &mut self.event_rx {
50-
for _ in 0..max_count {
51+
// This loop processes both syscall events and new-zone thread snapshots,
52+
// which are interleaved on the same channel.
53+
// The zone thread snapshots are required for initializing zone event
54+
// watchers, and so we want to prioritize processing those, and
55+
// (naturally) avoid counting them against "max_count", since we do not
56+
// include threadsnaps in the set of events we hand to falco proper.
57+
loop {
5158
match rx.try_recv() {
5259
Ok(event) => {
5360
match event.reply {
54-
// extract the internal syscall struct (if present) to
55-
// pull out the timestamp and use it as the scap event's timestamp.
56-
Some(Reply::Syscall(evt)) => {
57-
let encoded = evt.encode_length_delimited_to_vec();
58-
let mut wrapped_evt = Self::plugin_event(encoded.as_slice());
59-
// set the wrapped evt TS to the original evt ts
60-
wrapped_evt.metadata.ts = evt.timestamp;
61-
batch.add(wrapped_evt).expect("event should add");
62-
drained_count += 1;
63-
}
6461
Some(Reply::Threadsnap(snap)) => {
6562
// We will silently *take* snapshot events here,
6663
// filtering them out of the plugin event stream at this point,
6764
// and populate our internal thread table with them, rather than
6865
// adding them to the batch and passing them back to `scap`.
69-
debug!(
70-
"thread snapshot: zone_id {:?} entry_count: {}",
71-
snap.zone_id,
72-
snap.thread_info.len(),
73-
);
66+
threadsnap_count += 1;
67+
info!("discovered and initialized zone {}", snap.zone_id,);
7468
plugin.threadstate.init_zone_with_snap(snap);
7569
}
70+
Some(Reply::Syscall(evt)) => {
71+
// extract the internal syscall struct (if present) to
72+
// pull out the timestamp and use it as the scap event's timestamp.
73+
let encoded = evt.encode_length_delimited_to_vec();
74+
let mut wrapped_evt = Self::plugin_event(encoded.as_slice());
75+
// set the wrapped evt TS to the original evt ts
76+
wrapped_evt.metadata.ts = evt.timestamp;
77+
batch.add(wrapped_evt).expect("event should add");
78+
drained_event_count += 1;
79+
80+
// If we've hit max_count, stop draining
81+
if drained_event_count >= max_count {
82+
debug!(
83+
"take_events: hit max_count={}, processed {} threadsnaps, {} syscalls",
84+
max_count, threadsnap_count, drained_event_count
85+
);
86+
break;
87+
}
88+
}
7689
None => {
7790
warn!("got empty event!")
7891
}
7992
}
8093
}
81-
Err(mpsc::error::TryRecvError::Empty) => break,
94+
Err(mpsc::error::TryRecvError::Empty) => {
95+
debug!(
96+
"take_events: channel empty after processing {} threadsnaps, {} syscalls",
97+
threadsnap_count, drained_event_count
98+
);
99+
break;
100+
}
82101
Err(_) => return Err(anyhow!("channel closed")),
83102
}
84103
}
@@ -88,12 +107,12 @@ impl EderaSourcePluginInstance {
88107
// we want to handle these before dealing with any subsequent events
89108
if let Some(zone_dead_rx) = &mut self.zone_exit_rx {
90109
while let Ok(term_zone_id) = zone_dead_rx.try_recv() {
91-
debug!("zone {term_zone_id} is dead, dropping from threadsnap");
110+
info!("stopped streaming events from zone {term_zone_id}");
92111
plugin.threadstate.drop_zone_from_snap(&term_zone_id);
93112
}
94113
}
95114

96-
Ok(drained_count)
115+
Ok(drained_event_count)
97116
}
98117
}
99118

src/threadstate.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3374,7 +3374,7 @@ impl ThreadState {
33743374
.expect("should parse");
33753375

33763376
let Some(zinfo) = self.zone_info.get_mut(&event.zone_id) else {
3377-
warn!("ignoring event for unknown zone {:?}", &event.zone_id);
3377+
debug!("ignoring event for unmonitored zone {:?}", &event.zone_id);
33783378
return Ok(());
33793379
};
33803380

0 commit comments

Comments
 (0)