Skip to content

Commit cf91f7f

Browse files
committed
Refactor of MessageHandler
Implemented new MessagePool class. This provides a smart pool that falls back to dynamic allocation to handle back pressure. Messages auto return to the pool when done or destroy themselves if they were dynamically allocated due to fallback
1 parent b3310a7 commit cf91f7f

2 files changed

Lines changed: 121 additions & 89 deletions

File tree

Include/Asio/MessageUtils.h

Lines changed: 115 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,10 @@
3232
#include <algorithm>
3333
#include <cstring>
3434
#include <cassert>
35+
#include <type_traits>
3536
#include "AsioDefines.h"
3637
#include "Serialization/SerializeToVector.h"
38+
#include "Threads/MutexHelpers.hpp"
3739

3840
/*! \brief The core_lib namespace. */
3941
namespace core_lib
@@ -45,6 +47,116 @@ namespace asio
4547
namespace messages
4648
{
4749

50+
// TODO: tidy this up and reuse it in TcpConnection and in MessageHandler.
51+
// Add proper doxygen comments!
52+
template <typename Header>
53+
class ReceivedMessagePool : public std::enable_shared_from_this<ReceivedMessagePool<Header>>
54+
{
55+
static_assert(std::is_default_constructible<Header>::value,
56+
"ReceivedMessagePool requires Header to be default constructible");
57+
58+
public:
59+
using msg_t = defs::ReceivedMessage<Header>;
60+
using msg_sh_ptr_t = std::shared_ptr<msg_t>;
61+
62+
ReceivedMessagePool(size_t poolSize, size_t reservedBodySize)
63+
: m_pool(poolSize)
64+
, m_reservedBodySize(reservedBodySize)
65+
{
66+
for (size_t i = 0; i < poolSize; ++i)
67+
{
68+
m_pool[i] = std::make_unique<msg_t>();
69+
m_pool[i]->body.reserve(reservedBodySize);
70+
m_available.push_back(i);
71+
}
72+
}
73+
74+
size_t PoolSize() const
75+
{
76+
STD_SHARED_LOCK(m_mutex);
77+
return m_pool.size();
78+
}
79+
80+
size_t ReservedBodySize() const
81+
{
82+
return m_reservedBodySize;
83+
}
84+
85+
msg_sh_ptr_t Acquire(std::size_t requiredBodySize)
86+
{
87+
if (requiredBodySize <= m_reservedBodySize)
88+
{
89+
STD_UNIQUE_LOCK(m_mutex);
90+
91+
if (!m_available.empty())
92+
{
93+
const auto index = m_available.front();
94+
m_available.pop_front();
95+
96+
auto* msg = m_pool[index].get();
97+
msg->header = Header{};
98+
msg->body.clear();
99+
100+
auto self = this->shared_from_this();
101+
102+
return msg_sh_ptr_t(
103+
msg,
104+
[self, index](msg_t* p)
105+
{
106+
self->ReleaseToPool(index, p);
107+
});
108+
}
109+
}
110+
111+
return MakeDynamic(requiredBodySize);
112+
}
113+
114+
private:
115+
msg_sh_ptr_t MakeDynamic(size_t requiredBodySize)
116+
{
117+
auto msg = std::make_shared<msg_t>();
118+
msg->body.reserve(requiredBodySize);
119+
return msg;
120+
}
121+
122+
void ReleaseToPool(size_t index, msg_t* msg)
123+
{
124+
// Guard against null pointer shenanigans.
125+
if (nullptr == msg)
126+
{
127+
return;
128+
}
129+
130+
msg->header = Header{};
131+
msg->body.clear();
132+
133+
// Shrink oversized buffers before reusing.
134+
//
135+
// This should never get called unless someone tampered with the
136+
// message body after acquiring from the pool, but we can be
137+
// defensive here.
138+
if (msg->body.capacity() > m_reservedBodySize)
139+
{
140+
defs::char_buffer_t tmp;
141+
tmp.reserve(m_reservedBodySize);
142+
msg->body.swap(tmp);
143+
}
144+
145+
// Reduce scope of lock to just when we return the index to the pool.
146+
STD_UNIQUE_LOCK(m_mutex);
147+
148+
// Return index to pool of available messages.
149+
m_available.push_back(index);
150+
}
151+
152+
private:
153+
mutable std::shared_mutex m_mutex;
154+
using msg_ptr_t = std::unique_ptr<msg_t>;
155+
std::vector<msg_ptr_t> m_pool;
156+
std::deque<size_t> m_available;
157+
size_t m_reservedBodySize{0};
158+
};
159+
48160
/*!
49161
* \brief Default message handler class.
50162
*
@@ -54,6 +166,8 @@ namespace messages
54166
*/
55167
class CORE_LIBRARY_DLL_SHARED_API MessageHandler final
56168
{
169+
using msg_pool_t = ReceivedMessagePool<defs::MessageHeader>;
170+
57171
public:
58172
#ifdef USE_DEFAULT_CONSTRUCTOR_
59173
/*! \brief Default constructor. */
@@ -106,16 +220,6 @@ class CORE_LIBRARY_DLL_SHARED_API MessageHandler final
106220
* \param[in] message - A received message buffer.
107221
*/
108222
static bool CheckMessage(defs::char_buf_cspan_t message);
109-
/*!
110-
* \brief Initialise message pool.
111-
* \param[in] memPoolMsgCount - Pool size as number of messages.
112-
*/
113-
void InitialiseMsgPool(size_t memPoolMsgCount);
114-
/*!
115-
* \brief Get next message to use from pool.
116-
* \return A new message object or a one from the pool.
117-
*/
118-
defs::default_received_message_ptr_t GetNewMessageObject(size_t requiredSize) const;
119223

120224
private:
121225
mutable std::mutex m_mutex;
@@ -128,12 +232,8 @@ class CORE_LIBRARY_DLL_SHARED_API MessageHandler final
128232
/*! \brief Magic string. */
129233
std::string m_magicString{defs::DEFAULT_MAGIC_STRING};
130234
#endif
131-
/*! \brief Default pool message size. */
132-
size_t m_defaultMsgSize{defs::RECV_POOL_DEFAULT_MSG_SIZE};
133-
/*! \brief Message pool index tracker */
134-
mutable size_t m_msgPoolIndex{0};
135235
/*! \brief The message pool. */
136-
std::vector<asio::defs::default_received_message_ptr_t> m_msgPool;
236+
std::shared_ptr<msg_pool_t> m_msgPool;
137237
};
138238

139239
/*!

Source/Asio/MessageUtils.cpp

Lines changed: 6 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -56,24 +56,23 @@ constexpr size_t MESSAGE_LENGTH_OFFSET = sizeof(defs::MessageHeader) - sizeof(ui
5656
#ifdef USE_DEFAULT_CONSTRUCTOR_
5757
MessageHandler::MessageHandler()
5858
: m_magicString(defs::DEFAULT_MAGIC_STRING)
59+
, m_msgPool(std::make_shared<msg_pool_t>(0, defs::RECV_POOL_DEFAULT_MSG_SIZE))
5960
{
60-
InitialiseMsgPool(0, 0);
6161
}
6262
#endif
6363

6464
#ifdef USE_EXPLICIT_MOVE_
6565
MessageHandler::MessageHandler(MessageHandler&& mh)
6666
: m_magicString(defs::DEFAULT_MAGIC_STRING)
67+
, m_msgPool(std::make_shared<msg_pool_t>(0, defs::RECV_POOL_DEFAULT_MSG_SIZE))
6768
{
68-
InitialiseMsgPool(0, 0);
6969
*this = std::move(mh);
7070
}
7171

7272
MessageHandler& MessageHandler::operator=(MessageHandler&& mh)
7373
{
7474
std::swap(m_messageDispatcher, mh.m_messageDispatcher);
7575
m_magicString.swap(mh.m_magicString);
76-
std::swap(m_msgPoolIndex, mh.m_msgPoolIndex);
7776
m_msgPool.swap(mh.m_msgPool);
7877
}
7978
#endif
@@ -83,9 +82,8 @@ MessageHandler::MessageHandler(const defs::default_message_dispatcher_t& message
8382
size_t defaultMsgSize)
8483
: m_messageDispatcher(messageDispatcher)
8584
, m_magicString(magicString)
86-
, m_defaultMsgSize(defaultMsgSize)
85+
, m_msgPool(std::make_shared<msg_pool_t>(memPoolMsgCount, defaultMsgSize))
8786
{
88-
InitialiseMsgPool(memPoolMsgCount);
8987
}
9088

9189
size_t MessageHandler::CheckBytesLeftToRead(defs::char_buf_cspan_t message) const
@@ -113,7 +111,7 @@ size_t MessageHandler::CheckBytesLeftToRead(defs::char_buf_cspan_t message) cons
113111

114112
uint32_t totalLength;
115113
std::memcpy(&totalLength, message.data() + MESSAGE_LENGTH_OFFSET, sizeof(totalLength));
116-
114+
117115
auto msgSize = static_cast<uint32_t>(message.size());
118116

119117
if (totalLength < msgSize)
@@ -137,13 +135,13 @@ void MessageHandler::MessageReceivedHandler(defs::char_buf_cspan_t message) cons
137135
DEBUG_MESSAGE_EX_ERROR("Incomplete message header");
138136
#endif
139137
}
140-
138+
141139
uint32_t totalLength;
142140
std::memcpy(&totalLength, message.data() + MESSAGE_LENGTH_OFFSET, sizeof(totalLength));
143141

144142
auto hdrLength = static_cast<uint32_t>(defs::MESSAGE_HEADER_LEN);
145143
auto requiredLength = totalLength > hdrLength ? totalLength - hdrLength : 0;
146-
auto receivedMessage = GetNewMessageObject(requiredLength);
144+
auto receivedMessage = m_msgPool->Acquire(requiredLength);
147145

148146
if (!TryConvertToPod<defs::MessageHeader>(receivedMessage->header, message))
149147
{
@@ -171,72 +169,6 @@ bool MessageHandler::CheckMessage(defs::char_buf_cspan_t message)
171169
return message.size() >= sizeof(defs::MessageHeader);
172170
}
173171

174-
void MessageHandler::InitialiseMsgPool(size_t memPoolMsgCount)
175-
{
176-
m_msgPoolIndex = 0;
177-
178-
if (0 == memPoolMsgCount)
179-
{
180-
#if defined(USE_SOCKET_DEBUG)
181-
DEBUG_MESSAGE_EX_DEBUG("Receive message pool NOT being used because memPoolMsgCount = "
182-
<< memPoolMsgCount << " and defaultMsgSize = " << defaultMsgSize);
183-
#endif
184-
m_msgPool.clear();
185-
return;
186-
}
187-
188-
#if defined(USE_SOCKET_DEBUG)
189-
DEBUG_MESSAGE_EX_DEBUG("Receive message pool will be used with memPoolMsgCount = "
190-
<< memPoolMsgCount << " and defaultMsgSize = " << defaultMsgSize);
191-
#endif
192-
193-
m_msgPool.resize(memPoolMsgCount);
194-
195-
if (0 == m_defaultMsgSize)
196-
{
197-
m_defaultMsgSize = defs::RECV_POOL_DEFAULT_MSG_SIZE;
198-
}
199-
200-
auto defaultMsgSize = m_defaultMsgSize;
201-
202-
auto generateMsg = [defaultMsgSize]()
203-
{
204-
auto msg = std::make_shared<defs::default_received_message_t>();
205-
206-
if (defaultMsgSize > 0)
207-
{
208-
msg->body.reserve(defaultMsgSize);
209-
}
210-
211-
return msg;
212-
};
213-
214-
std::generate(m_msgPool.begin(), m_msgPool.end(), generateMsg);
215-
}
216-
217-
defs::default_received_message_ptr_t MessageHandler::GetNewMessageObject(size_t requiredSize) const
218-
{
219-
defs::default_received_message_ptr_t newMessage;
220-
221-
if (m_msgPool.empty() || (requiredSize > m_defaultMsgSize))
222-
{
223-
newMessage = std::make_shared<defs::default_received_message_t>();
224-
}
225-
else
226-
{
227-
std::lock_guard<std::mutex> lock(m_mutex);
228-
229-
newMessage = m_msgPool[m_msgPoolIndex];
230-
231-
if (++m_msgPoolIndex >= m_msgPool.size())
232-
{
233-
m_msgPoolIndex = 0;
234-
}
235-
}
236-
237-
return newMessage;
238-
}
239-
240172
// ****************************************************************************
241173
// Utility functions
242174
// ****************************************************************************

0 commit comments

Comments
 (0)