Skip to content

Commit 7d56e83

Browse files
committed
limit the number of buffers that are in the copy queue
If PostgreSQL is much slower than the data processing, the queue can become very large and lead to out-of-memory errors.
1 parent 8965e65 commit 7d56e83

2 files changed

Lines changed: 15 additions & 4 deletions

File tree

db-copy.cpp

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,18 @@ db_copy_thread_t::~db_copy_thread_t() { finish(); }
2828
void db_copy_thread_t::add_buffer(std::unique_ptr<db_cmd_t> &&buffer)
2929
{
3030
assert(m_worker.joinable()); // thread must not have been finished
31-
std::unique_lock<std::mutex> lock(m_queue_mutex);
32-
m_worker_queue.push_back(std::move(buffer));
33-
m_queue_cond.notify_one();
31+
32+
for (;;) {
33+
std::unique_lock<std::mutex> lock(m_queue_mutex);
34+
if (m_worker_queue.size() >= db_cmd_copy_t::Max_buffers) {
35+
m_queue_full_cond.wait(lock);
36+
continue;
37+
}
38+
39+
m_worker_queue.push_back(std::move(buffer));
40+
m_queue_cond.notify_one();
41+
break;
42+
}
3443
}
3544

3645
void db_copy_thread_t::sync_and_wait()
@@ -67,6 +76,7 @@ void db_copy_thread_t::worker_thread()
6776

6877
item = std::move(m_worker_queue.front());
6978
m_worker_queue.pop_front();
79+
m_queue_full_cond.notify_one();
7080
}
7181

7282
switch (item->type) {

db-copy.hpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ class db_cmd_t
6767

6868
struct db_cmd_copy_t : public db_cmd_t
6969
{
70-
enum { Max_buf_size = 10 * 1024 * 1024 };
70+
enum { Max_buf_size = 10 * 1024 * 1024, Max_buffers = 10 };
7171
/// Name of the target table for the copy operation
7272
std::shared_ptr<db_target_descr_t> target;
7373
/// Vector with object to delete before copying
@@ -141,6 +141,7 @@ class db_copy_thread_t
141141
std::thread m_worker;
142142
std::mutex m_queue_mutex;
143143
std::condition_variable m_queue_cond;
144+
std::condition_variable m_queue_full_cond;
144145
std::deque<std::unique_ptr<db_cmd_t>> m_worker_queue;
145146

146147
// Target for copy operation currently ongoing.

0 commit comments

Comments
 (0)