@@ -6,6 +6,7 @@ namespace Emulsion.Tests
66
77open System
88open System.Threading
9+ open System.Threading .Channels
910open System.Threading .Tasks
1011open Emulsion
1112open Emulsion.Messaging
@@ -30,9 +31,11 @@ type MessagingCoreTests(output: ITestOutputHelper) =
3031 let waitProcessingError lt ( core : MessagingCore ) =
3132 Signals.WaitWithTimeout lt core.MessageProcessingError waitTimeout " message processed with error"
3233
33- let newMessageSystem ( receivedMessages : ResizeArray < _ >) = {
34+ let newMessageSystem ( receivedMessages : Channel < _ >) = {
3435 new IMessageSystem with
35- override this.PutMessage m = lock receivedMessages ( fun () -> receivedMessages.Add m)
36+ override this.PutMessage m =
37+ let result = receivedMessages.Writer.TryWrite m
38+ Assert.True( result, " Channel should accept an incoming message" )
3639 override this.RunSynchronously _ = ()
3740 }
3841
@@ -45,29 +48,28 @@ type MessagingCoreTests(output: ITestOutputHelper) =
4548 member _. ``MessagingCore calls archive if it's present`` (): Task = task {
4649 use ld = new LifetimeDefinition()
4750 let lt = ld.Lifetime
48- let messages = ResizeArray ()
51+ let messages = Channel.CreateUnbounded ()
4952 let archive = {
5053 new IMessageArchive with
5154 override this.Archive ( message ) =
52- lock messages ( fun () -> messages.Add message )
53- async.Return ()
55+ messages.Writer.WriteAsync ( message ). AsTask ( )
56+ |> Async.AwaitTask
5457 }
5558
5659 let core = MessagingCore( lt, logger, Some archive)
57- let awaitMessage = waitSuccessfulProcessing lt core
5860 core.Start( dummyMessageSystem, dummyMessageSystem)
5961
60- let message = IncomingMessage.TelegramMessage( testMessage)
61- core.ReceiveMessage message
62- do ! awaitMessage
62+ let expected = IncomingMessage.TelegramMessage( testMessage)
63+ core.ReceiveMessage expected
64+ let! actual = messages.Reader.ReadAsync ()
6365
64- Assert.Equal([| message |], messages )
66+ Assert.Equal( expected , actual )
6567 }
6668
6769 [<Fact>]
6870 member _. ``MessagingCore sends XMPP message to Telegram and vise - versa`` (): Task = task {
69- let telegramReceived = ResizeArray ()
70- let xmppReceived = ResizeArray ()
71+ let telegramReceived = Channel.CreateUnbounded ()
72+ let xmppReceived = Channel.CreateUnbounded ()
7173
7274 let xmpp = newMessageSystem xmppReceived
7375 let telegram = newMessageSystem telegramReceived
@@ -77,19 +79,16 @@ type MessagingCoreTests(output: ITestOutputHelper) =
7779 let core = MessagingCore( lt, logger, None)
7880 core.Start( telegram, xmpp)
7981
80- let sendMessageAndAssertReceival incomingMessage text ( received : _ seq ) = task {
81- let awaitMessage = waitSuccessfulProcessing lt core
82+ let sendMessageAndAssertReceival incomingMessage text ( received : Channel < _ >) = task {
8283 let message = Authored {
8384 author = " cthulhu"
8485 text = text
8586 }
8687
8788 let incoming = incomingMessage message
8889 core.ReceiveMessage incoming
89- do ! awaitMessage
90- lock received ( fun () ->
91- Assert.Equal([| OutgoingMessage message|], received)
92- )
90+ let! outgoing = received.Reader.ReadAsync()
91+ Assert.Equal( OutgoingMessage message, outgoing)
9392 }
9493
9594 do ! sendMessageAndAssertReceival XmppMessage " text1" telegramReceived
@@ -98,21 +97,19 @@ type MessagingCoreTests(output: ITestOutputHelper) =
9897
9998 [<Fact>]
10099 member _. ``MessagingCore buffers the message received before start`` (): Task = task {
101- let telegramReceived = ResizeArray ()
100+ let telegramReceived = Channel.CreateUnbounded ()
102101 let telegram = newMessageSystem telegramReceived
103102
104103 use ld = new LifetimeDefinition()
105104 let lt = ld.Lifetime
106105 let core = MessagingCore( lt, logger, None)
107106
108107 core.ReceiveMessage( XmppMessage testMessage)
109- Assert.Empty( lock telegramReceived ( fun () -> telegramReceived))
108+ let hasMessages , _ = telegramReceived.Reader.TryPeek()
109+ Assert.False( hasMessages, " No message is expected to be available." )
110110
111- let waitForProcessing = waitSuccessfulProcessing lt core
112111 core.Start( telegram, dummyMessageSystem)
113- do ! waitForProcessing
114-
115- let receivedMessage = Assert.Single( lock telegramReceived ( fun () -> telegramReceived))
112+ let! receivedMessage = telegramReceived.Reader.ReadAsync()
116113 Assert.Equal( OutgoingMessage testMessage, receivedMessage)
117114 }
118115
0 commit comments