Skip to content

Commit b3310a7

Browse files
committed
MessageHandler receive pool improvment
1 parent 75d8177 commit b3310a7

2 files changed

Lines changed: 32 additions & 14 deletions

File tree

Include/Asio/MessageUtils.h

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -108,15 +108,14 @@ class CORE_LIBRARY_DLL_SHARED_API MessageHandler final
108108
static bool CheckMessage(defs::char_buf_cspan_t message);
109109
/*!
110110
* \brief Initialise message pool.
111-
* \param[in] memPoolMsgCount - Pool size as number of messages..
112-
* \param[in] defaultMsgSize - Initial size of a message in the pool.
111+
* \param[in] memPoolMsgCount - Pool size as number of messages.
113112
*/
114-
void InitialiseMsgPool(size_t memPoolMsgCount, size_t defaultMsgSize);
113+
void InitialiseMsgPool(size_t memPoolMsgCount);
115114
/*!
116115
* \brief Get next message to use from pool.
117116
* \return A new message object or a one from the pool.
118117
*/
119-
defs::default_received_message_ptr_t GetNewMessageObject() const;
118+
defs::default_received_message_ptr_t GetNewMessageObject(size_t requiredSize) const;
120119

121120
private:
122121
mutable std::mutex m_mutex;
@@ -129,7 +128,11 @@ class CORE_LIBRARY_DLL_SHARED_API MessageHandler final
129128
/*! \brief Magic string. */
130129
std::string m_magicString{defs::DEFAULT_MAGIC_STRING};
131130
#endif
131+
/*! \brief Default pool message size. */
132+
size_t m_defaultMsgSize{defs::RECV_POOL_DEFAULT_MSG_SIZE};
133+
/*! \brief Message pool index tracker */
132134
mutable size_t m_msgPoolIndex{0};
135+
/*! \brief The message pool. */
133136
std::vector<asio::defs::default_received_message_ptr_t> m_msgPool;
134137
};
135138

Source/Asio/MessageUtils.cpp

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,9 @@ MessageHandler::MessageHandler(const defs::default_message_dispatcher_t& message
8383
size_t defaultMsgSize)
8484
: m_messageDispatcher(messageDispatcher)
8585
, m_magicString(magicString)
86+
, m_defaultMsgSize(defaultMsgSize)
8687
{
87-
InitialiseMsgPool(memPoolMsgCount, defaultMsgSize);
88+
InitialiseMsgPool(memPoolMsgCount);
8889
}
8990

9091
size_t MessageHandler::CheckBytesLeftToRead(defs::char_buf_cspan_t message) const
@@ -112,8 +113,10 @@ size_t MessageHandler::CheckBytesLeftToRead(defs::char_buf_cspan_t message) cons
112113

113114
uint32_t totalLength;
114115
std::memcpy(&totalLength, message.data() + MESSAGE_LENGTH_OFFSET, sizeof(totalLength));
116+
117+
auto msgSize = static_cast<uint32_t>(message.size());
115118

116-
if (totalLength < message.size())
119+
if (totalLength < msgSize)
117120
{
118121
#if defined(USE_SOCKET_DEBUG)
119122
DEBUG_MESSAGE_EX_ERROR("Message length error, header length field ("
@@ -123,7 +126,7 @@ size_t MessageHandler::CheckBytesLeftToRead(defs::char_buf_cspan_t message) cons
123126
return std::numeric_limits<size_t>::max();
124127
}
125128

126-
return totalLength - message.size();
129+
return static_cast<size_t>(totalLength - msgSize);
127130
}
128131

129132
void MessageHandler::MessageReceivedHandler(defs::char_buf_cspan_t message) const
@@ -134,8 +137,13 @@ void MessageHandler::MessageReceivedHandler(defs::char_buf_cspan_t message) cons
134137
DEBUG_MESSAGE_EX_ERROR("Incomplete message header");
135138
#endif
136139
}
140+
141+
uint32_t totalLength;
142+
std::memcpy(&totalLength, message.data() + MESSAGE_LENGTH_OFFSET, sizeof(totalLength));
137143

138-
auto receivedMessage = GetNewMessageObject();
144+
auto hdrLength = static_cast<uint32_t>(defs::MESSAGE_HEADER_LEN);
145+
auto requiredLength = totalLength > hdrLength ? totalLength - hdrLength : 0;
146+
auto receivedMessage = GetNewMessageObject(requiredLength);
139147

140148
if (!TryConvertToPod<defs::MessageHeader>(receivedMessage->header, message))
141149
{
@@ -145,10 +153,10 @@ void MessageHandler::MessageReceivedHandler(defs::char_buf_cspan_t message) cons
145153
return;
146154
}
147155

148-
if (receivedMessage->header.totalLength > defs::MESSAGE_HEADER_LEN)
156+
if (requiredLength > 0)
149157
{
150-
receivedMessage->body.resize(message.size() - defs::MESSAGE_HEADER_LEN);
151-
std::memcpy(receivedMessage->body.data(), message.data() + defs::MESSAGE_HEADER_LEN, receivedMessage->body.size());
158+
receivedMessage->body.resize(requiredLength);
159+
std::memcpy(receivedMessage->body.data(), message.data() + defs::MESSAGE_HEADER_LEN, requiredLength);
152160
}
153161
else
154162
{
@@ -163,7 +171,7 @@ bool MessageHandler::CheckMessage(defs::char_buf_cspan_t message)
163171
return message.size() >= sizeof(defs::MessageHeader);
164172
}
165173

166-
void MessageHandler::InitialiseMsgPool(size_t memPoolMsgCount, size_t defaultMsgSize)
174+
void MessageHandler::InitialiseMsgPool(size_t memPoolMsgCount)
167175
{
168176
m_msgPoolIndex = 0;
169177

@@ -183,6 +191,13 @@ void MessageHandler::InitialiseMsgPool(size_t memPoolMsgCount, size_t defaultMsg
183191
#endif
184192

185193
m_msgPool.resize(memPoolMsgCount);
194+
195+
if (0 == m_defaultMsgSize)
196+
{
197+
m_defaultMsgSize = defs::RECV_POOL_DEFAULT_MSG_SIZE;
198+
}
199+
200+
auto defaultMsgSize = m_defaultMsgSize;
186201

187202
auto generateMsg = [defaultMsgSize]()
188203
{
@@ -199,11 +214,11 @@ void MessageHandler::InitialiseMsgPool(size_t memPoolMsgCount, size_t defaultMsg
199214
std::generate(m_msgPool.begin(), m_msgPool.end(), generateMsg);
200215
}
201216

202-
defs::default_received_message_ptr_t MessageHandler::GetNewMessageObject() const
217+
defs::default_received_message_ptr_t MessageHandler::GetNewMessageObject(size_t requiredSize) const
203218
{
204219
defs::default_received_message_ptr_t newMessage;
205220

206-
if (m_msgPool.empty())
221+
if (m_msgPool.empty() || (requiredSize > m_defaultMsgSize))
207222
{
208223
newMessage = std::make_shared<defs::default_received_message_t>();
209224
}

0 commit comments

Comments
 (0)