Skip to content

Commit 5f1fdbd

Browse files
committed
Merge #921: Add comments to the UDP server
d1c2d15 fix: [#918] revision for UDP active reqeust buffer comments (Cameron Garnham) 6495a4c docs: [#918] add comments to the UDP server (Jose Celano) Pull request description: After a dicussion with @da2ce7 I understood the reasons why he implemented the active requests buffer in the way it's implemented. This PR adds some clarifications. My main concern was tasks starvation, which means if we always accept new requests (spawning new tasks) we could not give enough time to current active tasks to finish (a reasonable time). @da2ce7 clarify that the yieldinf `tokio::task::yield_now().await;` should give the task enough time to finish. My second concern was why we immediately spawn new tasks for the incomming requests instead of waiting until we have place in the buffer. @da2ce7 replied that we are going to run that's tasks anyway because we force a place for the new tasks. ACKs for top commit: josecelano: ACK d1c2d15 Tree-SHA512: f6b17fe1daa0582c5c235fdbcfa6f8d36c81d8a0cf44608b910494bd35c84d57894a09d208e97e6fa6e079bd799e3a6891781bacc59459c270efc261a693a03e
2 parents d4e3208 + d1c2d15 commit 5f1fdbd

2 files changed

Lines changed: 117 additions & 80 deletions

File tree

src/servers/udp/server/launcher.rs

Lines changed: 15 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ impl Launcher {
103103
}
104104

105105
async fn run_udp_server_main(mut receiver: Receiver, tracker: Arc<Tracker>) {
106-
let reqs = &mut ActiveRequests::default();
106+
let active_requests = &mut ActiveRequests::default();
107107

108108
let addr = receiver.bound_socket_address();
109109
let local_addr = format!("udp://{addr}");
@@ -127,37 +127,29 @@ impl Launcher {
127127
}
128128
};
129129

130-
/* code-review:
131-
132-
Does it make sense to spawn a new request processor task when
133-
the ActiveRequests buffer is full?
134-
135-
We could store the UDP request in a secondary buffer and wait
136-
until active tasks are finished. When a active request is finished
137-
we can move a new UDP request from the pending to process requests
138-
buffer to the active requests buffer.
139-
140-
This forces us to define an explicit timeout for active requests.
141-
142-
In the current solution the timeout is dynamic, it depends on
143-
the system load. With high load we can remove tasks without
144-
giving them enough time to be processed. With low load we could
145-
keep processing running longer than a reasonable time for
146-
the client to receive the response.
147-
148-
*/
149-
150-
let abort_handle =
130+
// We spawn the new task even if there active requests buffer is
131+
// full. This could seem counterintuitive because we are accepting
132+
// more request and consuming more memory even if the server is
133+
// already busy. However, we "force_push" the new tasks in the
134+
// buffer. That means, in the worst scenario we will abort a
135+
// running task to make place for the new task.
136+
//
137+
// Once concern could be to reach an starvation point were we
138+
// are only adding and removing tasks without given them the
139+
// chance to finish. However, the buffer is yielding before
140+
// aborting one tasks, giving it the chance to finish.
141+
let abort_handle: tokio::task::AbortHandle =
151142
tokio::task::spawn(Launcher::process_request(req, tracker.clone(), receiver.bound_socket.clone()))
152143
.abort_handle();
153144

154145
if abort_handle.is_finished() {
155146
continue;
156147
}
157148

158-
reqs.force_push(abort_handle, &local_addr).await;
149+
active_requests.force_push(abort_handle, &local_addr).await;
159150
} else {
160151
tokio::task::yield_now().await;
152+
161153
// the request iterator returned `None`.
162154
tracing::error!(target: UDP_TRACKER_LOG_TARGET, local_addr, "Udp::run_udp_server breaking: (ran dry, should not happen in production!)");
163155
break;

src/servers/udp/server/request_buffer.rs

Lines changed: 102 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,15 @@ use tokio::task::AbortHandle;
44

55
use crate::servers::udp::UDP_TRACKER_LOG_TARGET;
66

7-
/// Ring-Buffer of Active Requests
7+
/// A ring buffer for managing active UDP request abort handles.
8+
///
9+
/// The `ActiveRequests` struct maintains a fixed-size ring buffer of abort
10+
/// handles for UDP request processor tasks. It ensures that at most 50 requests
11+
/// are handled concurrently, and provides mechanisms to handle buffer overflow
12+
/// by removing finished or oldest unfinished tasks.
813
#[derive(Default)]
914
pub struct ActiveRequests {
10-
rb: StaticRb<AbortHandle, 50>, // the number of requests we handle at the same time.
15+
rb: StaticRb<AbortHandle, 50>, // The number of requests handled simultaneously.
1116
}
1217

1318
impl std::fmt::Debug for ActiveRequests {
@@ -29,67 +34,107 @@ impl Drop for ActiveRequests {
2934
}
3035

3136
impl ActiveRequests {
32-
/// It inserts the abort handle for the UDP request processor tasks.
37+
/// Inserts an abort handle for a UDP request processor task.
3338
///
34-
/// If there is no room for the new task, it tries to make place:
39+
/// If the buffer is full, this method attempts to make space by:
3540
///
36-
/// - Firstly, removing finished tasks.
37-
/// - Secondly, removing the oldest unfinished tasks.
41+
/// 1. Removing finished tasks.
42+
/// 2. Removing the oldest unfinished task if no finished tasks are found.
3843
///
3944
/// # Panics
4045
///
41-
/// Will panics if it can't make space for the new handle.
42-
pub async fn force_push(&mut self, abort_handle: AbortHandle, local_addr: &str) {
43-
// fill buffer with requests
44-
let Err(abort_handle) = self.rb.try_push(abort_handle) else {
45-
return;
46-
};
47-
48-
let mut finished: u64 = 0;
49-
let mut unfinished_task = None;
50-
51-
// buffer is full.. lets make some space.
52-
for h in self.rb.pop_iter() {
53-
// remove some finished tasks
54-
if h.is_finished() {
55-
finished += 1;
56-
continue;
46+
/// This method will panic if it cannot make space for adding a new handle.
47+
///
48+
/// # Arguments
49+
///
50+
/// * `abort_handle` - The `AbortHandle` for the UDP request processor task.
51+
/// * `local_addr` - A string slice representing the local address for logging.
52+
pub async fn force_push(&mut self, new_task: AbortHandle, local_addr: &str) {
53+
// Attempt to add the new handle to the buffer.
54+
match self.rb.try_push(new_task) {
55+
Ok(()) => {
56+
// Successfully added the task, no further action needed.
5757
}
58-
59-
// task is unfinished.. give it another chance.
60-
tokio::task::yield_now().await;
61-
62-
// if now finished, we continue.
63-
if h.is_finished() {
64-
finished += 1;
65-
continue;
58+
Err(new_task) => {
59+
// Buffer is full, attempt to make space.
60+
61+
let mut finished: u64 = 0;
62+
let mut unfinished_task = None;
63+
64+
for old_task in self.rb.pop_iter() {
65+
// We found a finished tasks ... increase the counter and
66+
// continue searching for more and ...
67+
if old_task.is_finished() {
68+
finished += 1;
69+
continue;
70+
}
71+
72+
// The current removed tasks is not finished.
73+
74+
// Give it a second chance to finish.
75+
tokio::task::yield_now().await;
76+
77+
// Recheck if it finished ... increase the counter and
78+
// continue searching for more and ...
79+
if old_task.is_finished() {
80+
finished += 1;
81+
continue;
82+
}
83+
84+
// At this point we found a "definitive" unfinished task.
85+
86+
// Log unfinished task.
87+
tracing::debug!(
88+
target: UDP_TRACKER_LOG_TARGET,
89+
local_addr,
90+
removed_count = finished,
91+
"Udp::run_udp_server::loop (got unfinished task)"
92+
);
93+
94+
// If no finished tasks were found, abort the current
95+
// unfinished task.
96+
if finished == 0 {
97+
// We make place aborting this task.
98+
old_task.abort();
99+
100+
tracing::warn!(
101+
target: UDP_TRACKER_LOG_TARGET,
102+
local_addr,
103+
"Udp::run_udp_server::loop aborting request: (no finished tasks)"
104+
);
105+
106+
break;
107+
}
108+
109+
// At this point we found at least one finished task, but the
110+
// current one is not finished and it was removed from the
111+
// buffer, so we need to re-insert in in the buffer.
112+
113+
// Save the unfinished task for re-entry.
114+
unfinished_task = Some(old_task);
115+
}
116+
117+
// After this point there can't be a race condition because only
118+
// one thread owns the active buffer. There is no way for the
119+
// buffer to be full again. That means the "expects" should
120+
// never happen.
121+
122+
// Reinsert the unfinished task if any.
123+
if let Some(h) = unfinished_task {
124+
self.rb.try_push(h).expect("it was previously inserted");
125+
}
126+
127+
// Insert the new task.
128+
//
129+
// Notice that space has already been made for this new task in
130+
// the buffer. One or many old task have already been finished
131+
// or yielded, freeing space in the buffer. Or a single
132+
// unfinished task has been aborted to make space for this new
133+
// task.
134+
if !new_task.is_finished() {
135+
self.rb.try_push(new_task).expect("it should have space for this new task.");
136+
}
66137
}
67-
68-
tracing::debug!(target: UDP_TRACKER_LOG_TARGET, local_addr, removed_count = finished, "Udp::run_udp_server::loop (got unfinished task)");
69-
70-
if finished == 0 {
71-
// we have _no_ finished tasks.. will abort the unfinished task to make space...
72-
h.abort();
73-
74-
tracing::warn!(target: UDP_TRACKER_LOG_TARGET, local_addr, "Udp::run_udp_server::loop aborting request: (no finished tasks)");
75-
76-
break;
77-
}
78-
79-
// we have space, return unfinished task for re-entry.
80-
unfinished_task = Some(h);
81-
}
82-
83-
// re-insert the previous unfinished task.
84-
if let Some(h) = unfinished_task {
85-
self.rb.try_push(h).expect("it was previously inserted");
86-
}
87-
88-
// insert the new task.
89-
if !abort_handle.is_finished() {
90-
self.rb
91-
.try_push(abort_handle)
92-
.expect("it should remove at least one element.");
93-
}
138+
};
94139
}
95140
}

0 commit comments

Comments
 (0)