11using FaasNet . EventMesh . Client . Extensions ;
22using FaasNet . EventMesh . Client . Messages ;
33using FaasNet . EventMesh . Runtime . Exceptions ;
4- using FaasNet . EventMesh . Runtime . Models ;
54using FaasNet . EventMesh . Runtime . Stores ;
65using FaasNet . RaftConsensus . Client ;
76using FaasNet . RaftConsensus . Core ;
@@ -36,10 +35,20 @@ public async Task<EventMeshPackageResult> Run(Package package, IEnumerable<IPeer
3635 {
3736 var publishMessageRequest = package as PublishMessageRequest ;
3837 await CheckSession ( publishMessageRequest , cancellationToken ) ;
39- var queueNames = await GetQueueNames ( publishMessageRequest , cancellationToken ) ;
40- await BroadcastMessage ( publishMessageRequest , queueNames , cancellationToken ) ;
4138 var result = PackageResponseBuilder . PublishMessage ( package . Header . Seq ) ;
42- return EventMeshPackageResult . SendResult ( result ) ;
39+ if ( CheckPeerExists ( peers , publishMessageRequest . Topic ) )
40+ {
41+ await SendMessage ( publishMessageRequest , cancellationToken ) ;
42+ return EventMeshPackageResult . SendResult ( result ) ;
43+ }
44+
45+ var base64Message = publishMessageRequest . CloudEvent . SerializeBase64 ( ) ;
46+ return EventMeshPackageResult . AddPeer ( publishMessageRequest . Topic , result , new LogRecord
47+ {
48+ Index = 0 ,
49+ Value = base64Message ,
50+ InsertionDateTime = DateTime . UtcNow
51+ } ) ;
4352 }
4453
4554 private async Task CheckSession ( PublishMessageRequest message , CancellationToken cancellationToken )
@@ -57,36 +66,21 @@ private async Task CheckSession(PublishMessageRequest message, CancellationToken
5766 }
5867 }
5968
60- private async Task < IEnumerable < string > > GetQueueNames ( PublishMessageRequest message , CancellationToken cancellationToken )
69+ private bool CheckPeerExists ( IEnumerable < IPeerHost > peers , string topicName )
6170 {
62- IEnumerable < MessageExchange > messageExchanges ;
63- using ( var activity = EventMeshMeter . RequestActivitySource . StartActivity ( "Get all message exchange" ) )
64- {
65- messageExchanges = await _messageExchangeStore . GetAll ( cancellationToken ) ;
66- activity ? . SetStatus ( ActivityStatusCode . Ok ) ;
67- }
68-
69- var queueNames = new List < string > ( ) ;
70- foreach ( var messageExchange in messageExchanges )
71- {
72- if ( messageExchange . IsMatch ( message . Topic ) ) queueNames . AddRange ( messageExchange . QueueNames ) ;
73- }
74-
75- return queueNames . Distinct ( ) ;
71+ var peer = peers . FirstOrDefault ( p => p . Info . TermId == topicName ) ;
72+ return peer != null ;
7673 }
7774
78- private async Task BroadcastMessage ( PublishMessageRequest message , IEnumerable < string > queueNames , CancellationToken cancellationToken )
75+ private async Task SendMessage ( PublishMessageRequest message , CancellationToken cancellationToken )
7976 {
8077 var rndClusterNode = await GetRandomClusterNode ( cancellationToken ) ;
8178 var base64Message = message . CloudEvent . SerializeBase64 ( ) ;
8279 using ( var activity = EventMeshMeter . RequestActivitySource . StartActivity ( "Broadcast message" ) )
8380 {
8481 using ( var consensusClient = new ConsensusClient ( rndClusterNode . Url , rndClusterNode . Port ) )
8582 {
86- foreach ( var queueName in queueNames )
87- {
88- await consensusClient . AppendEntry ( queueName , base64Message , cancellationToken ) ;
89- }
83+ await consensusClient . AppendEntry ( message . Topic , base64Message , cancellationToken ) ;
9084 }
9185 }
9286 }
0 commit comments