@@ -20,9 +20,9 @@ async fn create_test_storage() -> Arc<dyn Storage> {
2020async fn test_concurrent_event_recording ( ) {
2121 // Test that concurrent event recording doesn't lose data
2222 let agg = Arc :: new ( AnalyticsAggregator :: new ( ) ) ;
23-
23+
2424 let mut handles = vec ! [ ] ;
25-
25+
2626 // Spawn 10 concurrent tasks that each record 100 events
2727 for task_id in 0 ..10 {
2828 let agg_clone = Arc :: clone ( & agg) ;
@@ -38,15 +38,15 @@ async fn test_concurrent_event_recording() {
3838 } ) ;
3939 handles. push ( handle) ;
4040 }
41-
41+
4242 // Wait for all tasks to complete
4343 for handle in handles {
4444 handle. await . unwrap ( ) ;
4545 }
46-
46+
4747 // Wait a bit for events to be processed
4848 sleep ( Duration :: from_millis ( 500 ) ) . await ;
49-
49+
5050 // Drain events and verify count
5151 let events = agg. drain_events ( ) ;
5252 assert_eq ! ( events. len( ) , 1000 , "Should have all 1000 events" ) ;
@@ -57,7 +57,7 @@ async fn test_rapid_flush_cycles() {
5757 // Test that rapid flush cycles don't corrupt data
5858 let storage = create_test_storage ( ) . await ;
5959 let agg = Arc :: new ( AnalyticsAggregator :: new ( ) ) ;
60-
60+
6161 // Record initial data
6262 for i in 0 ..100 {
6363 let rec = AnalyticsRecord {
@@ -76,7 +76,7 @@ async fn test_rapid_flush_cycles() {
7676 } ;
7777 agg. record ( rec) ;
7878 }
79-
79+
8080 // Perform rapid flushes
8181 for _ in 0 ..10 {
8282 let entries = agg. drain ( ) ;
@@ -95,11 +95,11 @@ async fn test_rapid_flush_cycles() {
9595 )
9696 } )
9797 . collect ( ) ;
98-
98+
9999 if !records. is_empty ( ) {
100100 storage. upsert_analytics_batch ( records) . await . unwrap ( ) ;
101101 }
102-
102+
103103 // Record more data between flushes
104104 for i in 0 ..10 {
105105 let rec = AnalyticsRecord {
@@ -119,7 +119,7 @@ async fn test_rapid_flush_cycles() {
119119 agg. record ( rec) ;
120120 }
121121 }
122-
122+
123123 // Final flush
124124 let entries = agg. drain ( ) ;
125125 if !entries. is_empty ( ) {
@@ -140,48 +140,51 @@ async fn test_rapid_flush_cycles() {
140140 . collect ( ) ;
141141 storage. upsert_analytics_batch ( records) . await . unwrap ( ) ;
142142 }
143-
143+
144144 // Verify all data was saved
145145 let analytics = storage
146146 . get_analytics ( "rapid" , None , None , 1000 )
147147 . await
148148 . unwrap ( ) ;
149-
149+
150150 let total: i64 = analytics. iter ( ) . map ( |a| a. visit_count ) . sum ( ) ;
151- assert_eq ! ( total, 200 , "Should have all 200 visits (100 initial + 100 incremental)" ) ;
151+ assert_eq ! (
152+ total, 200 ,
153+ "Should have all 200 visits (100 initial + 100 incremental)"
154+ ) ;
152155}
153156
154157#[ tokio:: test]
155158async fn test_prune_preserves_data_consistency ( ) {
156159 // Test that pruning maintains data consistency and doesn't lose visit counts
157160 let storage = create_test_storage ( ) . await ;
158-
161+
159162 // Create a short code
160163 storage
161164 . create_with_code ( "prune_test" , "https://example.com" , Some ( "user1" ) )
162165 . await
163166 . unwrap ( ) ;
164-
167+
165168 // Insert old data with various dimensions
166169 let old_time = chrono:: Utc :: now ( ) . timestamp ( ) - ( 60 * 86400 ) ; // 60 days ago
167170 let mut records = vec ! [ ] ;
168-
171+
169172 // Create 100 entries with different dimensions
170173 for i in 0 ..100 {
171174 records. push ( (
172175 "prune_test" . to_string ( ) ,
173- old_time + ( i * 3600 ) , // Different hours
174- Some ( format ! ( "C{}" , i % 5 ) ) , // 5 different countries
175- Some ( format ! ( "R{}" , i % 10 ) ) , // 10 different regions
176+ old_time + ( i * 3600 ) , // Different hours
177+ Some ( format ! ( "C{}" , i % 5 ) ) , // 5 different countries
178+ Some ( format ! ( "R{}" , i % 10 ) ) , // 10 different regions
176179 Some ( format ! ( "City{}" , i % 20 ) ) , // 20 different cities
177- Some ( 15169 + ( i % 3 ) ) , // 3 different ASNs
180+ Some ( 15169 + ( i % 3 ) ) , // 3 different ASNs
178181 4 ,
179182 1 ,
180183 ) ) ;
181184 }
182-
185+
183186 storage. upsert_analytics_batch ( records) . await . unwrap ( ) ;
184-
187+
185188 // Verify we have 100 entries
186189 let before_prune = storage
187190 . get_analytics ( "prune_test" , None , None , 200 )
@@ -190,24 +193,24 @@ async fn test_prune_preserves_data_consistency() {
190193 assert_eq ! ( before_prune. len( ) , 100 ) ;
191194 let total_before: i64 = before_prune. iter ( ) . map ( |a| a. visit_count ) . sum ( ) ;
192195 assert_eq ! ( total_before, 100 ) ;
193-
196+
194197 // Prune with dropping city and region
195198 let ( deleted, inserted) = storage
196199 . prune_analytics ( 30 , & [ "city" . to_string ( ) , "region" . to_string ( ) ] )
197200 . await
198201 . unwrap ( ) ;
199-
202+
200203 assert_eq ! ( deleted, 100 , "Should have deleted all old entries" ) ;
201204 assert ! ( inserted > 0 , "Should have created aggregated entries" ) ;
202-
205+
203206 // Verify all visits are preserved
204207 let after_prune = storage
205208 . get_analytics ( "prune_test" , None , None , 200 )
206209 . await
207210 . unwrap ( ) ;
208211 let total_after: i64 = after_prune. iter ( ) . map ( |a| a. visit_count ) . sum ( ) ;
209212 assert_eq ! ( total_after, 100 , "Total visit count should be preserved" ) ;
210-
213+
211214 // Verify dimensions were dropped
212215 let has_dropped_city = after_prune
213216 . iter ( )
@@ -219,14 +222,14 @@ async fn test_prune_preserves_data_consistency() {
219222async fn test_aggregation_consistency_with_multiple_upserts ( ) {
220223 // Test that multiple upserts maintain consistency
221224 let storage = create_test_storage ( ) . await ;
222-
225+
223226 storage
224227 . create_with_code ( "multi_upsert" , "https://example.com" , Some ( "user1" ) )
225228 . await
226229 . unwrap ( ) ;
227-
230+
228231 let time_bucket = 1698768000 ;
229-
232+
230233 // First batch
231234 let records1 = vec ! [ (
232235 "multi_upsert" . to_string( ) ,
@@ -239,7 +242,7 @@ async fn test_aggregation_consistency_with_multiple_upserts() {
239242 10 ,
240243 ) ] ;
241244 storage. upsert_analytics_batch ( records1) . await . unwrap ( ) ;
242-
245+
243246 // Second batch (same key, should increment)
244247 let records2 = vec ! [ (
245248 "multi_upsert" . to_string( ) ,
@@ -252,7 +255,7 @@ async fn test_aggregation_consistency_with_multiple_upserts() {
252255 5 ,
253256 ) ] ;
254257 storage. upsert_analytics_batch ( records2) . await . unwrap ( ) ;
255-
258+
256259 // Third batch (same key, should increment again)
257260 let records3 = vec ! [ (
258261 "multi_upsert" . to_string( ) ,
@@ -265,13 +268,13 @@ async fn test_aggregation_consistency_with_multiple_upserts() {
265268 3 ,
266269 ) ] ;
267270 storage. upsert_analytics_batch ( records3) . await . unwrap ( ) ;
268-
271+
269272 // Verify total is correct
270273 let analytics = storage
271274 . get_analytics ( "multi_upsert" , None , None , 100 )
272275 . await
273276 . unwrap ( ) ;
274-
277+
275278 assert_eq ! ( analytics. len( ) , 1 , "Should have exactly one entry" ) ;
276279 assert_eq ! ( analytics[ 0 ] . visit_count, 18 , "Should have 10+5+3=18 visits" ) ;
277280}
@@ -280,12 +283,12 @@ async fn test_aggregation_consistency_with_multiple_upserts() {
280283async fn test_time_range_filtering_edge_cases ( ) {
281284 // Test edge cases in time range filtering
282285 let storage = create_test_storage ( ) . await ;
283-
286+
284287 storage
285288 . create_with_code ( "time_test" , "https://example.com" , Some ( "user1" ) )
286289 . await
287290 . unwrap ( ) ;
288-
291+
289292 // Insert records at specific timestamps
290293 let records = vec ! [
291294 (
@@ -320,23 +323,23 @@ async fn test_time_range_filtering_edge_cases() {
320323 ) ,
321324 ] ;
322325 storage. upsert_analytics_batch ( records) . await . unwrap ( ) ;
323-
326+
324327 // Test exact boundary: start_time = 2000
325328 let result = storage
326329 . get_analytics ( "time_test" , Some ( 2000 ) , None , 100 )
327330 . await
328331 . unwrap ( ) ;
329332 let total: i64 = result. iter ( ) . map ( |a| a. visit_count ) . sum ( ) ;
330333 assert_eq ! ( total, 5 , "Should include entries at and after 2000" ) ;
331-
334+
332335 // Test exact boundary: end_time = 2000
333336 let result = storage
334337 . get_analytics ( "time_test" , None , Some ( 2000 ) , 100 )
335338 . await
336339 . unwrap ( ) ;
337340 let total: i64 = result. iter ( ) . map ( |a| a. visit_count ) . sum ( ) ;
338341 assert_eq ! ( total, 3 , "Should include entries at and before 2000" ) ;
339-
342+
340343 // Test exact match
341344 let result = storage
342345 . get_analytics ( "time_test" , Some ( 2000 ) , Some ( 2000 ) , 100 )
@@ -350,12 +353,12 @@ async fn test_time_range_filtering_edge_cases() {
350353async fn test_aggregation_with_null_dimensions ( ) {
351354 // Test that aggregation correctly handles NULL dimensions
352355 let storage = create_test_storage ( ) . await ;
353-
356+
354357 storage
355358 . create_with_code ( "null_test" , "https://example.com" , Some ( "user1" ) )
356359 . await
357360 . unwrap ( ) ;
358-
361+
359362 let time_bucket = 1698768000 ;
360363 let records = vec ! [
361364 // Entry with all dimensions
@@ -392,24 +395,27 @@ async fn test_aggregation_with_null_dimensions() {
392395 2 ,
393396 ) ,
394397 ] ;
395-
398+
396399 storage. upsert_analytics_batch ( records) . await . unwrap ( ) ;
397-
400+
398401 // Aggregate by country (should combine all)
399402 let country_agg = storage
400403 . get_analytics_aggregate ( "null_test" , None , None , "country" , 10 )
401404 . await
402405 . unwrap ( ) ;
403406 assert_eq ! ( country_agg. len( ) , 1 ) ;
404407 assert_eq ! ( country_agg[ 0 ] . visit_count, 10 ) ;
405-
408+
406409 // Aggregate by region (should handle NULLs correctly)
407410 let region_agg = storage
408411 . get_analytics_aggregate ( "null_test" , None , None , "region" , 10 )
409412 . await
410413 . unwrap ( ) ;
411- assert ! ( region_agg. len( ) >= 2 , "Should have at least 2 regions (CA and Unknown)" ) ;
412-
414+ assert ! (
415+ region_agg. len( ) >= 2 ,
416+ "Should have at least 2 regions (CA and Unknown)"
417+ ) ;
418+
413419 // Aggregate by ASN (should handle NULLs)
414420 let asn_agg = storage
415421 . get_analytics_aggregate ( "null_test" , None , None , "asn" , 10 )
0 commit comments