Skip to content

Commit d904efc

Browse files
mauroservientiabparticulardanielmarbach
authored
Fixed outbox test for multiple subscribers handling the same event (#7680) (#7686)
* Fixed outbox test for multiple subscribers handling the same event * Fix all issues caused by Mauro's merge 😅 * Only for persisters supporting outbox * Add back tracing * Update When_subscribers_handles_the_same_event.cs * More tracing, syntax error * syntax * Use routing instead of SendOpetions * Refactor handlers and routing in `When_subscribers_handles_the_same_event` acceptance test for clarity and alignment with naming conventions. --------- Co-authored-by: Andreas Bednarz <110360248+abparticular@users.noreply.github.com> Co-authored-by: Daniel Marbach <danielmarbach@users.noreply.github.com>
1 parent 07db2ae commit d904efc

1 file changed

Lines changed: 60 additions & 45 deletions

File tree

Lines changed: 60 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
namespace NServiceBus.AcceptanceTests.Outbox;
22

3-
using System.Threading;
43
using System.Threading.Tasks;
54
using AcceptanceTesting;
65
using AcceptanceTesting.Customization;
@@ -9,17 +8,17 @@ namespace NServiceBus.AcceptanceTests.Outbox;
98
using Features;
109
using NUnit.Framework;
1110

12-
public class When_subscribers_handles_the_same_event : NServiceBusAcceptanceTest
11+
public class When_outbox_is_used_by_multiple_subscribers_for_the_same_event : NServiceBusAcceptanceTest
1312
{
14-
[Test, CancelAfter(10_000)]
15-
public async Task Should_be_processed_by_all_subscribers(CancellationToken cancellationToken = default)
13+
[Test]
14+
public async Task Each_subscriber_should_dispatch_its_own_transport_operations()
1615
{
1716
Requires.OutboxPersistence();
1817

1918
var context = await Scenario.Define<Context>()
20-
.WithEndpoint<Publisher>(b =>
21-
b.When(c => c.Subscriber1Subscribed && c.Subscriber2Subscribed, session => session.Publish(new MyEvent()))
22-
)
19+
.WithEndpoint<Publisher>(b => b.When(
20+
c => c is { Subscriber1Subscribed: true, Subscriber2Subscribed: true },
21+
session => session.Publish(new MyEvent())))
2322
.WithEndpoint<Subscriber1>(b => b.When(async (session, ctx) =>
2423
{
2524
await session.Subscribe<MyEvent>();
@@ -46,92 +45,108 @@ public async Task Should_be_processed_by_all_subscribers(CancellationToken cance
4645
ctx.AddTrace("Subscriber2 has now asked to be subscribed to MyEvent");
4746
}
4847
}))
49-
.Run(cancellationToken);
48+
.WithEndpoint<Collector>()
49+
.Run();
5050

5151
using (Assert.EnterMultipleScope())
5252
{
53-
Assert.That(context.Subscriber1GotTheEvent, Is.True);
54-
Assert.That(context.Subscriber2GotTheEvent, Is.True);
53+
Assert.That(context.FailedMessages.IsEmpty, Is.True);
54+
Assert.That(context.Subscriber1ProcessedConfirmed, Is.True);
55+
Assert.That(context.Subscriber2ProcessedConfirmed, Is.True);
5556
}
5657
}
5758

5859
public class Context : ScenarioContext
5960
{
6061
public bool Subscriber1Subscribed { get; set; }
6162
public bool Subscriber2Subscribed { get; set; }
63+
public bool Subscriber1ProcessedConfirmed { get; set; }
64+
public bool Subscriber2ProcessedConfirmed { get; set; }
6265

63-
public bool Subscriber1GotTheEvent { get; set; }
64-
public bool Subscriber2GotTheEvent { get; set; }
65-
66-
public void MaybeCompleted() => MarkAsCompleted(Subscriber1GotTheEvent, Subscriber2GotTheEvent);
66+
public void MaybeCompleted() => MarkAsCompleted(Subscriber1ProcessedConfirmed, Subscriber2ProcessedConfirmed);
6767
}
6868

6969
public class Publisher : EndpointConfigurationBuilder
7070
{
7171
public Publisher() => EndpointSetup<DefaultPublisher>(c =>
7272
{
73-
c.OnEndpointSubscribed<Context>((s, context) =>
73+
c.OnEndpointSubscribed<Context>((s, ctx) =>
7474
{
7575
var subscriber1 = Conventions.EndpointNamingConvention(typeof(Subscriber1));
7676
if (s.SubscriberEndpoint.Contains(subscriber1))
7777
{
78-
context.Subscriber1Subscribed = true;
79-
context.AddTrace($"{subscriber1} is now subscribed");
78+
ctx.Subscriber1Subscribed = true;
79+
ctx.AddTrace($"{subscriber1} is now subscribed");
8080
}
8181

8282
var subscriber2 = Conventions.EndpointNamingConvention(typeof(Subscriber2));
8383
if (s.SubscriberEndpoint.Contains(subscriber2))
8484
{
85-
context.Subscriber2Subscribed = true;
86-
context.AddTrace($"{subscriber2} is now subscribed");
85+
ctx.Subscriber2Subscribed = true;
86+
ctx.AddTrace($"{subscriber2} is now subscribed");
8787
}
8888
});
8989
}, metadata => metadata.RegisterSelfAsPublisherFor<MyEvent>(this));
9090
}
9191

9292
public class Subscriber1 : EndpointConfigurationBuilder
9393
{
94-
public Subscriber1() =>
95-
EndpointSetup<DefaultServer>(c =>
96-
{
97-
c.DisableFeature<AutoSubscribe>();
98-
99-
c.ConfigureTransport().TransportTransactionMode = TransportTransactionMode.ReceiveOnly;
100-
c.EnableOutbox();
101-
}, metadata => metadata.RegisterPublisherFor<MyEvent>(typeof(Publisher)));
94+
public Subscriber1() => EndpointSetup<DefaultServer>(c =>
95+
{
96+
c.DisableFeature<AutoSubscribe>();
97+
c.ConfigureTransport().TransportTransactionMode = TransportTransactionMode.ReceiveOnly;
98+
c.EnableOutbox();
99+
c.ConfigureRouting().RouteToEndpoint(typeof(ProcessBySubscriber1), typeof(Collector));
100+
}, metadata => metadata.RegisterPublisherFor<MyEvent>(typeof(Publisher)));
102101

103-
public class MyHandler(Context testContext) : IHandleMessages<MyEvent>
102+
public class MyEventMessageHandler : IHandleMessages<MyEvent>
104103
{
105-
public Task Handle(MyEvent message, IMessageHandlerContext context)
106-
{
107-
testContext.Subscriber1GotTheEvent = true;
108-
testContext.MaybeCompleted();
109-
return Task.CompletedTask;
110-
}
104+
public Task Handle(MyEvent message, IMessageHandlerContext context) => context.Send(new ProcessBySubscriber1());
111105
}
112106
}
113107

114108
public class Subscriber2 : EndpointConfigurationBuilder
115109
{
116-
public Subscriber2() =>
117-
EndpointSetup<DefaultServer>(c =>
118-
{
119-
c.DisableFeature<AutoSubscribe>();
110+
public Subscriber2() => EndpointSetup<DefaultServer>(c =>
111+
{
112+
c.DisableFeature<AutoSubscribe>();
113+
c.ConfigureTransport().TransportTransactionMode = TransportTransactionMode.ReceiveOnly;
114+
c.EnableOutbox();
115+
c.ConfigureRouting().RouteToEndpoint(typeof(ProcessBySubscriber2), typeof(Collector));
116+
}, metadata => metadata.RegisterPublisherFor<MyEvent>(typeof(Publisher)));
120117

121-
c.ConfigureTransport().TransportTransactionMode = TransportTransactionMode.ReceiveOnly;
122-
c.EnableOutbox();
123-
}, metadata => metadata.RegisterPublisherFor<MyEvent>(typeof(Publisher)));
118+
public class MyEventMessageHandler : IHandleMessages<MyEvent>
119+
{
120+
public Task Handle(MyEvent message, IMessageHandlerContext context) => context.Send(new ProcessBySubscriber2());
121+
}
122+
}
123+
124+
public class Collector : EndpointConfigurationBuilder
125+
{
126+
public Collector() => EndpointSetup<DefaultServer>();
127+
128+
public class Subscriber1ProcessedHandler(Context testContext) : IHandleMessages<ProcessBySubscriber1>
129+
{
130+
public Task Handle(ProcessBySubscriber1 message, IMessageHandlerContext context)
131+
{
132+
testContext.Subscriber1ProcessedConfirmed = true;
133+
testContext.MaybeCompleted();
134+
return Task.CompletedTask;
135+
}
136+
}
124137

125-
public class MyHandler(Context testContext) : IHandleMessages<MyEvent>
138+
public class Subscriber2ProcessedHandler(Context testContext) : IHandleMessages<ProcessBySubscriber2>
126139
{
127-
public Task Handle(MyEvent messageThatIsEnlisted, IMessageHandlerContext context)
140+
public Task Handle(ProcessBySubscriber2 message, IMessageHandlerContext context)
128141
{
129-
testContext.Subscriber2GotTheEvent = true;
142+
testContext.Subscriber2ProcessedConfirmed = true;
130143
testContext.MaybeCompleted();
131144
return Task.CompletedTask;
132145
}
133146
}
134147
}
135148

136149
public class MyEvent : IEvent;
137-
}
150+
public class ProcessBySubscriber1 : IMessage;
151+
public class ProcessBySubscriber2 : IMessage;
152+
}

0 commit comments

Comments
 (0)