Skip to content

Commit 6321563

Browse files
scorteanucosminMrBlue
authored andcommitted
Add Message broker service pooling back and improve writing performance
1 parent 5c23e80 commit 6321563

1 file changed

Lines changed: 50 additions & 56 deletions

File tree

src/CompilerStream/MessageBrokerService.cs

Lines changed: 50 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
extern alias References;
22
using System;
3+
using System.IO;
34
using System.IO.Pipes;
45
using System.Threading;
56
using System.Threading.Tasks;
67
using Oxide.CompilerServices;
78
using Oxide.Core;
89
using Oxide.CSharp.Common;
10+
using Oxide.Pooling;
911

1012
namespace Oxide.CSharp.CompilerStream
1113
{
@@ -36,6 +38,20 @@ public void Start(string pipeName)
3638
Task.Run(() => WorkerAsync(cancellationToken), cancellationToken);
3739
}
3840

41+
public void SendMessage(CompilerMessage message) => WriteMessage(message);
42+
43+
public int SendShutdownMessage()
44+
{
45+
CompilerMessage message = new()
46+
{
47+
Id = _messageId++,
48+
Type = MessageType.Shutdown,
49+
};
50+
51+
SendMessage(message);
52+
return message.Id;
53+
}
54+
3955
private async Task WorkerAsync(CancellationToken cancellationToken)
4056
{
4157
while (!cancellationToken.IsCancellationRequested)
@@ -72,49 +88,63 @@ private async Task WorkerAsync(CancellationToken cancellationToken)
7288
}
7389
}
7490

75-
public void SendMessage(CompilerMessage message) => WriteMessage(message);
76-
7791
private void WriteMessage(CompilerMessage message)
7892
{
93+
byte[] headerBuffer = ArrayPool<byte>.Shared.Take(sizeof(int));
7994
try
8095
{
81-
byte[] data = Constants.Serializer.Serialize(message);
82-
byte[] buffer = new byte[sizeof(int) + data.Length];
83-
int destinationIndex = data.Length.WriteBigEndian(buffer);
84-
Array.Copy(data, 0, buffer, destinationIndex, data.Length);
85-
OnWrite(buffer, 0, buffer.Length);
96+
using MemoryStream memoryStream = new();
97+
Constants.Serializer.SerializeToStream(memoryStream, message, DefaultMaxBufferSize);
98+
99+
int length = (int)memoryStream.Length;
100+
101+
length.WriteBigEndian(headerBuffer);
102+
103+
_pipeServer.Write(headerBuffer, 0, sizeof(int));
104+
_pipeServer.Write(memoryStream.GetBuffer(), 0, length);
86105
}
87106
catch (Exception exception)
88107
{
89108
Interface.Oxide.LogError($"Error sending message to compiler: {exception}");
90109
}
110+
finally
111+
{
112+
ArrayPool<byte>.Shared.Return(headerBuffer);
113+
}
91114
}
92115

93116
private CompilerMessage? ReadMessage()
94117
{
95-
byte[] buffer = new byte[sizeof(int)];
118+
byte[] headerBuffer = ArrayPool<byte>.Shared.Take(sizeof(int));
96119
int read = 0;
97120
try
98121
{
99-
while (read < buffer.Length)
122+
while (read < headerBuffer.Length)
100123
{
101-
read += OnRead(buffer, read, buffer.Length - read);
124+
read += OnRead(headerBuffer, read, headerBuffer.Length - read);
102125
if (read == 0)
103126
{
104127
return null;
105128
}
106129
}
107130

108-
int length = buffer.ReadBigEndian();
109-
byte[] buffer2 = new byte[length];
131+
int length = headerBuffer.ReadBigEndian();
132+
byte[] messageBuffer = ArrayPool<byte>.Shared.Take(length);
133+
try
134+
{
110135

111-
read = 0;
112-
while (read < length)
136+
read = 0;
137+
while (read < length)
138+
{
139+
read += OnRead(messageBuffer, read, length - read);
140+
}
141+
142+
return Constants.Serializer.Deserialize<CompilerMessage>(messageBuffer);
143+
}
144+
finally
113145
{
114-
read += OnRead(buffer2, read, length - read);
146+
ArrayPool<byte>.Shared.Return(messageBuffer);
115147
}
116-
117-
return Constants.Serializer.Deserialize<CompilerMessage>(buffer2);
118148
}
119149
catch (Exception exception)
120150
{
@@ -126,21 +156,9 @@ private void WriteMessage(CompilerMessage message)
126156
Interface.Oxide.LogError($"Error reading message from compiler: {exception}");
127157
return null;
128158
}
129-
}
130-
131-
private void OnWrite(byte[] buffer, int index, int count)
132-
{
133-
Validate(buffer, index, count);
134-
135-
int remaining = count;
136-
int written = 0;
137-
while (remaining > 0)
159+
finally
138160
{
139-
int toWrite = Math.Min(DefaultMaxBufferSize, remaining);
140-
_pipeServer.Write(buffer, index + written, toWrite);
141-
remaining -= toWrite;
142-
written += toWrite;
143-
_pipeServer.Flush();
161+
ArrayPool<byte>.Shared.Return(headerBuffer);
144162
}
145163
}
146164

@@ -168,31 +186,6 @@ private int OnRead(byte[] buffer, int index, int count)
168186
return read;
169187
}
170188

171-
public int SendShutdownMessage()
172-
{
173-
CompilerMessage message = new CompilerMessage
174-
{
175-
Id = _messageId++,
176-
Type = MessageType.Shutdown,
177-
};
178-
179-
SendMessage(message);
180-
return message.Id;
181-
}
182-
183-
public int SendCompileMessage(CompilerData project)
184-
{
185-
CompilerMessage message = new CompilerMessage
186-
{
187-
Id = _messageId++,
188-
Type = MessageType.Data,
189-
Data = Constants.Serializer.Serialize(project)
190-
};
191-
192-
SendMessage(message);
193-
return message.Id;
194-
}
195-
196189
private void Validate(byte[] buffer, int index, int count)
197190
{
198191
if (buffer == null)
@@ -220,6 +213,7 @@ private void Validate(byte[] buffer, int index, int count)
220213
public void Stop()
221214
{
222215
_cancellationTokenSource.Cancel();
216+
_cancellationTokenSource.Dispose();
223217
_pipeServer.Disconnect();
224218
_pipeServer.Dispose();
225219
}

0 commit comments

Comments
 (0)