Skip to content

Make Message and Client pools thread-safe using ConcurrentDictionary#180

Open
CrossV4 wants to merge 1 commit into
RiptideNetworking:mainfrom
CrossV4:main
Open

Make Message and Client pools thread-safe using ConcurrentDictionary#180
CrossV4 wants to merge 1 commit into
RiptideNetworking:mainfrom
CrossV4:main

Conversation

@CrossV4
Copy link
Copy Markdown

@CrossV4 CrossV4 commented May 23, 2026

Summary of Changes

I have developed a highly efficient, multi-threaded, and thread-safe networking architectural pattern for RiptideNetworking, specifically designed for high-concurrency projects like MMORPGs.

The Problem: Thread Contention & Race Conditions

When offloading network I/O operations to a dedicated background thread (e.g., executing Server.Update() asynchronously to keep the main game loop responsive), serious race conditions occur. The primary culprit is Riptide’s internal message pool (Message.Create(), Message.Release()). Accessing the message pool concurrently from both the Main Thread (Game Logic) and the Background Thread (Network Loop) corrupts internal byte arrays, leading to network desynchronization, corrupted data, and server crashes.

The Solution: The Command Pattern & Asynchronous Action Queue

To bridge the gap between the Main Thread and the Network Thread safely without expensive resource locking, I implemented a custom Command Pattern execution pipeline combined with a ConcurrentQueue.

  1. Incoming Request Queuing: Network message handlers running on the background thread no longer execute game logic or manipulate the message pool directly. Instead, they deserialize raw data into lightweight, immutable INetworkCommand structures and enqueue them into a thread-safe ConcurrentQueue.
public static readonly ConcurrentQueue<INetworkCommand> ACTION_QUEUE = new();
public interface INetworkCommand
{
    ushort ClientId { get; }
    void Execute();
}

private class PPromo : INetworkCommand
{
    public ushort ClientId { get; set; }
    public string promocode { get; set; }

    public void Execute()
    {
        if (CheckIsInList(ClientId, out Player player))
        {
            bool success = TryClaimPromoLocal(player, promocode);
            Message message = Message.Create(MessageSendMode.Reliable, ServerToClientId.PromoResult);
            message.AddBool(success);
            NetworkManager.Singleton.Server.Send(message, ClientId);
        }
    }
}
[MessageHandler((ushort)ClientToServerId.PromoRequest)]
private static void PROCMO_ACTIVATE(ushort fromClientId, Message message)
{
    NetworkManager.ACTION_QUEUE.Enqueue(new PPromo() { ClientId = fromClientId, promocode = message.GetString() });
}
  1. Main Thread Thread-Safe Execution & Frame Budgeting: The Main Thread dequeues and processes these commands sequentially during its standard Update() loop. To ensure server stability and prevent CPU spikes caused by a massive surge of packets, I integrated a Frame Budget Monitor using a Stopwatch. If processing takes longer than the allocated frame time budget (MaxFrameMs), the execution breaks, deferred actions are carried over to the next frame, and a warning is logged.
private readonly System.Diagnostics.Stopwatch SW = new();
private void Update()
{
        SW.Restart();
        while (ACTION_QUEUE.TryDequeue(out INetworkCommand CR))
        {
            CR.Execute();

            ACTION_REQUEST_COUNTER.AddOrUpdate(CR.ClientId, 0, (o, k) => (byte)(k - 1));

            // Frame Budgeting Control (e.g., ~33.3ms for a 30-Tick Server)
            if (SW.ElapsedMilliseconds > MaxFrameMs)
            {
                Debug.LogWarning($"{{{TICK_FORDEBUG}}} - [Performans] Bu frame'de {SW.ElapsedMilliseconds}ms harcandı! (Dikkat: {CR.GetType().Name}). Kalan istekleri bir sonrakine aktarılıyor.");
                break;
            }
        }
}
  1. Isolated Background Network Loop: With the main loop freed from networking I/O overhead, Server.Update() runs continuously on a high-priority background worker thread, throttled naturally to prevent CPU core exhaustion.
            SERVER_THREAD = new Thread(() => ServerThreadLoop())
            {
                Name = "MokSha-Rin Riptide Server",
                IsBackground = true
            };
            SERVER_THREAD.Start();
    private void ServerThreadLoop()
    {
        try
        {
            while (!GLOBAL_SOURCE.Token.IsCancellationRequested && !QUIT_FLAG)
            {
                // read Riptide packages and fill the actionqueue
                if (Server != null && Server.IsRunning)
                {
                    Server.Update();
                }

                //preventing cpu lock making the thread sleep for a tiny duration and also check for requesting server is going to shutdown
                Thread.Sleep(16);
                GLOBAL_SOURCE.Token.ThrowIfCancellationRequested();
            }
        }
        catch (Exception ex)
        {
            Debug.LogError($"{{{TICK_FORDEBUG}}} - [SERVER - CORE] Server error: {ex.Message}");
        }
        finally
        {
            Debug.Log($"{{{TICK_FORDEBUG}}} - [SERVER - CORE] Server has shutdown (Loop end).");
        }
    }
  1. Why we need this ?
public void Execute()
    {
        if (CheckIsInList(ClientId, out Player player))
        {
            bool success = TryClaimPromoLocal(player, promocode);
            Message message = Message.Create(MessageSendMode.Reliable, ServerToClientId.PromoResult);
            message.AddBool(success);
            NetworkManager.Singleton.Server.Send(message, ClientId);
        }
    }

Message message = Message.Create(MessageSendMode.Reliable, ServerToClientId.PromoResult);
Why This is Vital for the Community
By enforcing this design, Riptide’s pooling mechanism (Message.Create) is only ever touched from a single thread sequence during actual processing, completely eliminating Riptide-pool-related Multi-Thread crashes. I wanted to share this architectural solution with the Riptide community to help anyone struggling to transition their dedicated servers to an asynchronous, high-performance, and crash-proof multi-threaded environment. Hope it helps!

…and ConcurrentStack for avoiding calling Update function from another thread.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant