1010
1111namespace BootstrapBlazor . Components ;
1212
13- sealed class DefaultTcpSocketClient ( IPEndPoint endPoint ) : ITcpSocketClient
13+ sealed class DefaultTcpSocketClient ( IPEndPoint localEndPoint ) : TcpSocketClientBase
1414{
1515 private TcpClient ? _client ;
16- private IDataPackageHandler ? _dataPackageHandler ;
1716 private CancellationTokenSource ? _receiveCancellationTokenSource ;
1817 private IPEndPoint ? _remoteEndPoint ;
1918
20- public bool IsConnected => _client ? . Online ?? false ;
19+ public override bool IsConnected => _client ? . Online ?? false ;
2120
22- public IPEndPoint LocalEndPoint { get ; set ; } = endPoint ;
21+ private IReceiver < IReceiverResult > ? _receiver ;
2322
2423 [ NotNull ]
2524 public ILogger < DefaultTcpSocketClient > ? Logger { get ; set ; }
2625
27- public int ReceiveBufferSize { get ; set ; } = 1024 * 10 ;
28-
29- public Func < ReadOnlyMemory < byte > , ValueTask > ? ReceivedCallBack { get ; set ; }
30-
31- public void SetDataHandler ( IDataPackageHandler handler )
32- {
33- _dataPackageHandler = handler ;
34- }
35-
36- public async ValueTask < bool > ConnectAsync ( IPEndPoint endPoint , CancellationToken token = default )
26+ public override async ValueTask < bool > ConnectAsync ( IPEndPoint endPoint , CancellationToken token = default )
3727 {
3828 var ret = false ;
3929 try
4030 {
4131 // 释放资源
42- Close ( ) ;
32+ await CloseAsync ( ) ;
4333
44- // 创建新的 TouchSocket TcpClient 实例
34+ // 创建新的 TcpClient 实例
4535 _client ??= new TcpClient ( ) ;
46- await _client . SetupAsync ( new TouchSocketConfig ( ) . SetBindIPHost ( new IPHost ( LocalEndPoint . Address , LocalEndPoint . Port ) ) ) ;
36+ _remoteEndPoint = endPoint ;
4737
48- await _client . ConnectAsync ( new IPHost ( endPoint . Address , endPoint . Port ) , ) ;
38+ // 设置本地端点
39+ var config = new TouchSocketConfig ( )
40+ . SetBindIPHost ( new IPHost ( localEndPoint . Address , localEndPoint . Port ) )
41+ . SetRemoteIPHost ( new IPHost ( endPoint . Address , endPoint . Port ) )
42+ . SetMaxBufferSize ( ReceiveBufferSize )
43+ . SetMinBufferSize ( ReceiveBufferSize / 10 ) ;
44+ await _client . SetupAsync ( config ) ;
4945
50- // 开始接收数据
51- //_ = Task.Run(ReceiveAsync , token);
46+ var connectTimeout = ConnectTimeout == 0 ? int . MaxValue : ConnectTimeout ;
47+ await _client . ConnectAsync ( connectTimeout , token ) ;
5248
53- //LocalEndPoint = (IPEndPoint)_client.Client.LocalEndPoint!;
54- _remoteEndPoint = endPoint ;
49+ if ( IsConnected )
50+ {
51+ // 设置本地以及远端端点信息
52+ if ( _client . MainSocket . LocalEndPoint is IPEndPoint local )
53+ {
54+ LocalEndPoint = local ;
55+ }
56+
57+ _receiver = _client . CreateReceiver ( ) ;
58+ _receiver . CacheMode = true ;
59+ _receiver . MaxCacheSize = ReceiveBufferSize ;
60+
61+ if ( _client . MainSocket . RemoteEndPoint is IPEndPoint remote )
62+ {
63+ _remoteEndPoint = remote ;
64+ }
65+ if ( IsAutoReceive )
66+ {
67+ _ = Task . Run ( AutoReceiveAsync ) ;
68+ }
69+ }
5570 ret = true ;
5671 }
5772 catch ( OperationCanceledException ex )
5873 {
59- Logger . LogWarning ( ex , "TCP Socket connect operation was canceled from {LocalEndPoint} to {RemoteEndPoint}" , LocalEndPoint , endPoint ) ;
74+ if ( token . IsCancellationRequested )
75+ {
76+ Logger . LogWarning ( ex , "TCP Socket connect operation was canceled from {LocalEndPoint} to {RemoteEndPoint}" , LocalEndPoint , endPoint ) ;
77+ }
78+ else
79+ {
80+ Logger . LogWarning ( ex , "TCP Socket connect operation timed out from {LocalEndPoint} to {RemoteEndPoint}" , LocalEndPoint , endPoint ) ;
81+ }
6082 }
6183 catch ( Exception ex )
6284 {
@@ -65,7 +87,7 @@ public async ValueTask<bool> ConnectAsync(IPEndPoint endPoint, CancellationToken
6587 return ret ;
6688 }
6789
68- public async ValueTask < bool > SendAsync ( ReadOnlyMemory < byte > data , CancellationToken token = default )
90+ public override async ValueTask < bool > SendAsync ( ReadOnlyMemory < byte > data , CancellationToken token = default )
6991 {
7092 if ( _client is not { Online : true } )
7193 {
@@ -75,17 +97,32 @@ public async ValueTask<bool> SendAsync(ReadOnlyMemory<byte> data, CancellationTo
7597 var ret = false ;
7698 try
7799 {
78- if ( _dataPackageHandler != null )
100+ var sendToken = token ;
101+ if ( SendTimeout > 0 )
102+ {
103+ // 设置发送超时时间
104+ var sendTokenSource = new CancellationTokenSource ( SendTimeout ) ;
105+ sendToken = CancellationTokenSource . CreateLinkedTokenSource ( token , sendTokenSource . Token ) . Token ;
106+ }
107+
108+ if ( DataPackageHandler != null )
79109 {
80- data = await _dataPackageHandler . SendAsync ( data ) ;
110+ data = await DataPackageHandler . SendAsync ( data , sendToken ) ;
81111 }
82- //var stream = _client.GetStream();
83- // await stream.WriteAsync (data, token );
112+
113+ await _client . SendAsync ( data ) . WaitAsync ( sendToken ) ;
84114 ret = true ;
85115 }
86116 catch ( OperationCanceledException ex )
87117 {
88- Logger . LogWarning ( ex , "TCP Socket send operation was canceled from {LocalEndPoint} to {RemoteEndPoint}" , LocalEndPoint , _remoteEndPoint ) ;
118+ if ( token . IsCancellationRequested )
119+ {
120+ Logger . LogWarning ( ex , "TCP Socket send operation was canceled from {LocalEndPoint} to {RemoteEndPoint}" , LocalEndPoint , _remoteEndPoint ) ;
121+ }
122+ else
123+ {
124+ Logger . LogWarning ( ex , "TCP Socket send operation timed out from {LocalEndPoint} to {RemoteEndPoint}" , LocalEndPoint , _remoteEndPoint ) ;
125+ }
89126 }
90127 catch ( Exception ex )
91128 {
@@ -94,7 +131,25 @@ public async ValueTask<bool> SendAsync(ReadOnlyMemory<byte> data, CancellationTo
94131 return ret ;
95132 }
96133
97- private async ValueTask ReceiveAsync ( )
134+ public override async ValueTask < Memory < byte > > ReceiveAsync ( CancellationToken token = default )
135+ {
136+ if ( _client is not { Online : true } )
137+ {
138+ throw new InvalidOperationException ( $ "TCP Socket is not connected { LocalEndPoint } ") ;
139+ }
140+
141+ if ( IsAutoReceive )
142+ {
143+ throw new InvalidOperationException ( "Cannot call ReceiveAsync when IsAutoReceive is true. Use the auto-receive mechanism instead." ) ;
144+ }
145+
146+ using var block = MemoryPool < byte > . Shared . Rent ( ReceiveBufferSize ) ;
147+ var buffer = block . Memory ;
148+ var len = await ReceiveCoreAsync ( buffer , token ) ;
149+ return buffer [ 0 ..len ] ;
150+ }
151+
152+ private async ValueTask AutoReceiveAsync ( )
98153 {
99154 _receiveCancellationTokenSource ??= new ( ) ;
100155 while ( _receiveCancellationTokenSource is { IsCancellationRequested : false } )
@@ -104,78 +159,95 @@ private async ValueTask ReceiveAsync()
104159 throw new InvalidOperationException ( $ "TCP Socket is not connected { LocalEndPoint } ") ;
105160 }
106161
107- try
162+ using var block = MemoryPool < byte > . Shared . Rent ( ReceiveBufferSize ) ;
163+ var buffer = block . Memory ;
164+ var len = await ReceiveCoreAsync ( buffer , _receiveCancellationTokenSource . Token ) ;
165+ if ( len == 0 )
108166 {
109- using var block = MemoryPool < byte > . Shared . Rent ( ReceiveBufferSize ) ;
110- var buffer = block . Memory ;
111- //var stream = _client.GetStream();
112- //var len = await stream.ReadAsync(buffer, _receiveCancellationTokenSource.Token);
113- //if (len == 0)
114- //{
115- // // 远端主机关闭链路
116- // Logger.LogInformation("TCP Socket {LocalEndPoint} received 0 data closed by {RemoteEndPoint}", LocalEndPoint, _remoteEndPoint);
117- // break;
118- //}
119- //else
120- //{
121- // buffer = buffer[..len];
167+ break ;
168+ }
169+ }
170+ }
122171
123- // if (ReceivedCallBack != null)
124- // {
125- // await ReceivedCallBack(buffer);
126- // }
172+ private async ValueTask < int > ReceiveCoreAsync ( Memory < byte > buffer , CancellationToken token )
173+ {
174+ var len = 0 ;
175+ try
176+ {
177+ var receiveToken = token ;
178+ if ( ReceiveTimeout > 0 )
179+ {
180+ // 设置接收超时时间
181+ var receiveTokenSource = new CancellationTokenSource ( ReceiveTimeout ) ;
182+ receiveToken = CancellationTokenSource . CreateLinkedTokenSource ( receiveToken , receiveTokenSource . Token ) . Token ;
183+ }
127184
128- // if (_dataPackageHandler != null)
129- // {
130- // await _dataPackageHandler.ReceiveAsync(buffer);
131- // }
132- //}
185+ using var result = await _receiver ! . ReadAsync ( receiveToken ) ;
186+ if ( result . IsCompleted )
187+ {
188+ Logger . LogInformation ( "TCP Socket {LocalEndPoint} received 0 data closed by {RemoteEndPoint}" , LocalEndPoint , _remoteEndPoint ) ;
189+ return 0 ;
133190 }
134- catch ( OperationCanceledException ex )
191+
192+ result . ByteBlock . Memory . CopyTo ( buffer ) ;
193+ len = result . ByteBlock . Length ;
194+ buffer = buffer [ ..len ] ;
195+
196+ if ( ReceivedCallBack != null )
135197 {
136- Logger . LogWarning ( ex , "TCP Socket receive operation was canceled from {LocalEndPoint} to {RemoteEndPoint}" , LocalEndPoint , _remoteEndPoint ) ;
198+ await ReceivedCallBack ( buffer ) ;
137199 }
138- catch ( Exception ex )
200+
201+ if ( DataPackageHandler != null )
139202 {
140- Logger . LogError ( ex , "TCP Socket receive failed from {LocalEndPoint} to {RemoteEndPoint}" , LocalEndPoint , _remoteEndPoint ) ;
203+ await DataPackageHandler . ReceiveAsync ( buffer , receiveToken ) ;
204+ result . ByteBlock . Seek ( len ) ;
141205 }
142206 }
207+ catch ( OperationCanceledException ex )
208+ {
209+ if ( token . IsCancellationRequested )
210+ {
211+ Logger . LogWarning ( ex , "TCP Socket receive operation canceled from {LocalEndPoint} to {RemoteEndPoint}" , LocalEndPoint , _remoteEndPoint ) ;
212+ }
213+ else
214+ {
215+ Logger . LogWarning ( ex , "TCP Socket receive operation timed out from {LocalEndPoint} to {RemoteEndPoint}" , LocalEndPoint , _remoteEndPoint ) ;
216+ }
217+ }
218+ catch ( Exception ex )
219+ {
220+ Logger . LogError ( ex , "TCP Socket receive failed from {LocalEndPoint} to {RemoteEndPoint}" , LocalEndPoint , _remoteEndPoint ) ;
221+ }
222+ return len ;
143223 }
144224
145- public void Close ( )
146- {
147- Dispose ( true ) ;
148- }
149-
150- private void Dispose ( bool disposing )
225+ protected override async ValueTask DisposeAsync ( bool disposing )
151226 {
152227 if ( disposing )
153228 {
154229 _remoteEndPoint = null ;
155230
156231 // 取消接收数据的任务
157- if ( _receiveCancellationTokenSource is not null )
232+ if ( _receiveCancellationTokenSource != null )
158233 {
159234 _receiveCancellationTokenSource . Cancel ( ) ;
160235 _receiveCancellationTokenSource . Dispose ( ) ;
161236 _receiveCancellationTokenSource = null ;
162237 }
163238
239+ if ( _receiver != null )
240+ {
241+ _receiver . Dispose ( ) ;
242+ _receiver = null ;
243+ }
244+
164245 // 释放 TcpClient 资源
165246 if ( _client != null )
166247 {
167- _client . CloseAsync ( ) ;
248+ await _client . CloseAsync ( ) ;
168249 _client = null ;
169250 }
170251 }
171252 }
172-
173- /// <summary>
174- /// <inheritdoc/>
175- /// </summary>
176- public void Dispose ( )
177- {
178- Dispose ( true ) ;
179- GC . SuppressFinalize ( this ) ;
180- }
181253}
0 commit comments