-
-
Notifications
You must be signed in to change notification settings - Fork 7
feat(TcpSocket): support multiple DataPackageAdapter #542
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
3bf37c5
9f25885
2077f6b
2629fb2
e15229f
511b080
cf1e474
f890775
a0c8444
cf15e9d
4a14f36
3d16438
797298f
2ce7544
c2a499f
8c8f744
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,7 +1,6 @@ | ||
| // Licensed to the .NET Foundation under one or more agreements. | ||
| // The .NET Foundation licenses this file to you under the Apache 2.0 License | ||
| // See the LICENSE file in the project root for more information. | ||
| // Maintainer: Argo Zhang(argo@live.ca) Website: https://www.blazor.zone | ||
| // Copyright (c) BootstrapBlazor & Argo Zhang (argo@live.ca). All rights reserved. | ||
| // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. | ||
| // Website: https://www.blazor.zone or https://argozhang.github.io/ | ||
|
|
||
| using Microsoft.Extensions.DependencyInjection; | ||
| using Microsoft.Extensions.Options; | ||
|
|
@@ -52,6 +51,55 @@ public static ValueTask<bool> ConnectAsync(this ITcpSocketClient client, string | |
| return client.ConnectAsync(endPoint, token); | ||
| } | ||
|
|
||
| private static readonly Dictionary<ITcpSocketClient, List<(IDataPackageAdapter Adapter, Func<ReadOnlyMemory<byte>, ValueTask> Callback)>> _cache = []; | ||
|
|
||
| /// <summary> | ||
| /// 增加 <see cref="ITcpSocketClient"/> 数据适配器及其对应的回调方法 | ||
| /// </summary> | ||
| /// <param name="client"></param> | ||
| /// <param name="adapter"></param> | ||
| /// <param name="callback"></param> | ||
| public static void AddDataPackageAdapter(this ITcpSocketClient client, IDataPackageAdapter adapter, Func<ReadOnlyMemory<byte>, ValueTask> callback) | ||
| { | ||
| if (_cache.TryGetValue(client, out var list)) | ||
| { | ||
| list.Add((adapter, cb)); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. issue (bug_risk): Variable 'cb' is used before its declaration. 'cb' is referenced in list.Add and _cache.Add before it is declared as an async local function, which will cause a compilation error. Please declare 'cb' before using it. |
||
| } | ||
| else | ||
| { | ||
| _cache.Add(client, [(adapter, cb)]); | ||
| } | ||
|
|
||
| client.ReceivedCallBack += cb; | ||
|
Comment on lines
+64
to
+73
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. issue (bug_risk): Callback removal logic may not match added callbacks. Since the callback added is a wrapper function, direct comparison with the original callback may fail. Store and compare the actual delegate instance used in ReceivedCallBack to ensure correct removal. |
||
|
|
||
| // 设置 DataPackageAdapter 的回调函数 | ||
| adapter.ReceivedCallBack = callback; | ||
|
|
||
| async ValueTask cb(ReadOnlyMemory<byte> buffer) | ||
| { | ||
| // 将接收到的数据传递给 DataPackageAdapter 进行数据处理合规数据触发 ReceivedCallBack 回调 | ||
| await adapter.HandlerAsync(buffer); | ||
| } | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// 移除 <see cref="ITcpSocketClient"/> 数据适配器及其对应的回调方法 | ||
| /// </summary> | ||
| /// <param name="client"></param> | ||
| /// <param name="callback"></param> | ||
| public static void RemoveDataPackageAdapter(this ITcpSocketClient client, Func<ReadOnlyMemory<byte>, ValueTask> callback) | ||
| { | ||
| if (_cache.TryGetValue(client, out var list)) | ||
| { | ||
| var items = list.Where(i => i.Adapter.ReceivedCallBack == callback).ToList(); | ||
| foreach (var c in items) | ||
| { | ||
| client.ReceivedCallBack -= c.Callback; | ||
| list.Remove(c); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Configures the specified <see cref="ITcpSocketClient"/> to use the provided <see cref="IDataPackageAdapter"/> | ||
| /// for processing received data and sets a callback to handle processed data. | ||
|
|
@@ -67,6 +115,16 @@ public static ValueTask<bool> ConnectAsync(this ITcpSocketClient client, string | |
| /// containing the processed data and returns a <see cref="ValueTask"/>.</param> | ||
| public static void SetDataPackageAdapter(this ITcpSocketClient client, IDataPackageAdapter adapter, Func<ReadOnlyMemory<byte>, ValueTask> callback) | ||
| { | ||
| // 释放缓存 | ||
| if (_cache.TryGetValue(client, out var list)) | ||
| { | ||
| foreach (var (Adapter, Callback) in list) | ||
| { | ||
| client.ReceivedCallBack -= Callback; | ||
| } | ||
| list.Clear(); | ||
| } | ||
|
|
||
| // 设置 ITcpSocketClient 的回调函数 | ||
| client.ReceivedCallBack = async buffer => | ||
| { | ||
|
|
@@ -75,7 +133,18 @@ public static void SetDataPackageAdapter(this ITcpSocketClient client, IDataPack | |
| }; | ||
|
|
||
| // 设置 DataPackageAdapter 的回调函数 | ||
| adapter.ReceivedCallBack = buffer => callback(buffer); | ||
| adapter.ReceivedCallBack = callback; | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// 通过指定 <see cref="IDataPackageHandler"/> 数据处理实例,设置数据适配器并配置回调方法 | ||
| /// </summary> | ||
| /// <param name="client"><see cref="ITcpSocketClient"/> 实例</param> | ||
| /// <param name="handler"><see cref="IDataPackageHandler"/> 数据处理实例</param> | ||
| /// <param name="callback">回调方法</param> | ||
| public static void SetDataPackageAdapter(this ITcpSocketClient client, IDataPackageHandler handler, Func<ReadOnlyMemory<byte>, ValueTask> callback) | ||
| { | ||
| client.SetDataPackageAdapter(new DataPackageAdapter(handler), callback); | ||
| } | ||
|
|
||
| /// <summary> | ||
|
|
@@ -92,6 +161,16 @@ public static void SetDataPackageAdapter(this ITcpSocketClient client, IDataPack | |
| /// <param name="callback">The callback function to be invoked with the converted entity.</param> | ||
| public static void SetDataPackageAdapter<TEntity>(this ITcpSocketClient client, IDataPackageAdapter adapter, IDataConverter<TEntity> socketDataConverter, Func<TEntity?, Task> callback) | ||
| { | ||
| // 释放缓存 | ||
| if (_cache.TryGetValue(client, out var list)) | ||
| { | ||
| foreach (var (Adapter, Callback) in list) | ||
| { | ||
| client.ReceivedCallBack -= Callback; | ||
| } | ||
| list.Clear(); | ||
| } | ||
|
|
||
| // 设置 ITcpSocketClient 的回调函数 | ||
| client.ReceivedCallBack = async buffer => | ||
| { | ||
|
|
@@ -111,6 +190,19 @@ public static void SetDataPackageAdapter<TEntity>(this ITcpSocketClient client, | |
| }; | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// 通过指定 <see cref="IDataPackageHandler"/> 数据处理实例,设置数据适配器并配置回调方法 | ||
| /// </summary> | ||
| /// <typeparam name="TEntity"></typeparam> | ||
| /// <param name="client"></param> | ||
| /// <param name="handler"></param> | ||
| /// <param name="socketDataConverter"></param> | ||
| /// <param name="callback"></param> | ||
| public static void SetDataPackageAdapter<TEntity>(this ITcpSocketClient client, IDataPackageHandler handler, IDataConverter<TEntity> socketDataConverter, Func<TEntity?, Task> callback) | ||
| { | ||
| client.SetDataPackageAdapter(new DataPackageAdapter(handler), socketDataConverter, callback); | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Configures the specified <see cref="ITcpSocketClient"/> to use a custom data package adapter and callback | ||
| /// function. | ||
|
|
@@ -126,6 +218,16 @@ public static void SetDataPackageAdapter<TEntity>(this ITcpSocketClient client, | |
| /// <param name="callback">The callback function to invoke with the processed entity of type <typeparamref name="TEntity"/>.</param> | ||
| public static void SetDataPackageAdapter<TEntity>(this ITcpSocketClient client, IDataPackageAdapter adapter, Func<TEntity?, Task> callback) | ||
| { | ||
| // 释放缓存 | ||
| if (_cache.TryGetValue(client, out var list)) | ||
| { | ||
| foreach (var (Adapter, Callback) in list) | ||
| { | ||
| client.ReceivedCallBack -= Callback; | ||
| } | ||
| list.Clear(); | ||
| } | ||
|
|
||
| // 设置 ITcpSocketClient 的回调函数 | ||
| client.ReceivedCallBack = async buffer => | ||
| { | ||
|
|
@@ -163,6 +265,17 @@ public static void SetDataPackageAdapter<TEntity>(this ITcpSocketClient client, | |
| } | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// 通过指定 <see cref="IDataPackageHandler"/> 数据处理实例,设置数据适配器并配置回调方法 | ||
| /// </summary> | ||
| /// <param name="client"><see cref="ITcpSocketClient"/> 实例</param> | ||
| /// <param name="handler"><see cref="IDataPackageHandler"/> 数据处理实例</param> | ||
| /// <param name="callback">回调方法</param> | ||
| public static void SetDataPackageAdapter<TEntity>(this ITcpSocketClient client, IDataPackageHandler handler, Func<TEntity?, Task> callback) | ||
| { | ||
| client.SetDataPackageAdapter(new DataPackageAdapter(handler), callback); | ||
| } | ||
|
|
||
| private static void SetDataAdapterCallback<TEntity>(this IDataPackageAdapter adapter, IDataConverter<TEntity> converter, Func<TEntity?, Task> callback) | ||
| { | ||
| adapter.ReceivedCallBack = async buffer => | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (bug_risk): Consider thread safety for the static _cache dictionary.
Since _cache is modified and read by multiple extension methods, this can cause race conditions in multi-threaded use. Use a thread-safe collection like ConcurrentDictionary or implement locking to prevent concurrent access issues.