Skip to content

Commit 26d868b

Browse files
ferhatelmasLinkinStars
authored andcommitted
refactor(queue): improve queues
* fix race condition for registering handler * add close method * use generics to reduce duplication * rename packages to drop underscore for go convention * rename interface to drop stutter with package name Signed-off-by: ferhat elmas <elmas.ferhat@gmail.com>
1 parent a1f0b09 commit 26d868b

31 files changed

Lines changed: 553 additions & 399 deletions

cmd/wire_gen.go

Lines changed: 27 additions & 27 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/base/queue/queue.go

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package queue
21+
22+
import (
23+
"context"
24+
"sync"
25+
26+
"github.com/segmentfault/pacman/log"
27+
)
28+
29+
// Queue is a generic message queue service that processes messages asynchronously.
30+
// It is thread-safe and supports graceful shutdown.
31+
type Queue[T any] struct {
32+
name string
33+
queue chan T
34+
handler func(ctx context.Context, msg T) error
35+
mu sync.RWMutex
36+
closed bool
37+
wg sync.WaitGroup
38+
}
39+
40+
// New creates a new queue with the given name and buffer size.
41+
func New[T any](name string, bufferSize int) *Queue[T] {
42+
q := &Queue[T]{
43+
name: name,
44+
queue: make(chan T, bufferSize),
45+
}
46+
q.startWorker()
47+
return q
48+
}
49+
50+
// Send enqueues a message to be processed asynchronously.
51+
// It will block if the queue is full.
52+
func (q *Queue[T]) Send(ctx context.Context, msg T) {
53+
q.mu.RLock()
54+
closed := q.closed
55+
q.mu.RUnlock()
56+
57+
if closed {
58+
log.Warnf("[%s] queue is closed, dropping message", q.name)
59+
return
60+
}
61+
62+
select {
63+
case q.queue <- msg:
64+
log.Debugf("[%s] enqueued message: %+v", q.name, msg)
65+
case <-ctx.Done():
66+
log.Warnf("[%s] context cancelled while sending message", q.name)
67+
}
68+
}
69+
70+
// RegisterHandler sets the handler function for processing messages.
71+
// This is thread-safe and can be called at any time.
72+
func (q *Queue[T]) RegisterHandler(handler func(ctx context.Context, msg T) error) {
73+
q.mu.Lock()
74+
defer q.mu.Unlock()
75+
q.handler = handler
76+
}
77+
78+
// Close gracefully shuts down the queue, waiting for pending messages to be processed.
79+
func (q *Queue[T]) Close() {
80+
q.mu.Lock()
81+
if q.closed {
82+
q.mu.Unlock()
83+
return
84+
}
85+
q.closed = true
86+
q.mu.Unlock()
87+
88+
close(q.queue)
89+
q.wg.Wait()
90+
log.Infof("[%s] queue closed", q.name)
91+
}
92+
93+
// startWorker starts the background goroutine that processes messages.
94+
func (q *Queue[T]) startWorker() {
95+
q.wg.Add(1)
96+
go func() {
97+
defer q.wg.Done()
98+
for msg := range q.queue {
99+
q.processMessage(msg)
100+
}
101+
}()
102+
}
103+
104+
// processMessage handles a single message with proper synchronization.
105+
func (q *Queue[T]) processMessage(msg T) {
106+
q.mu.RLock()
107+
handler := q.handler
108+
q.mu.RUnlock()
109+
110+
if handler == nil {
111+
log.Warnf("[%s] no handler registered, dropping message: %+v", q.name, msg)
112+
return
113+
}
114+
115+
// Use background context for async processing
116+
// TODO: Consider adding timeout or using a derived context
117+
if err := handler(context.TODO(), msg); err != nil {
118+
log.Errorf("[%s] handler error: %v", q.name, err)
119+
}
120+
}

0 commit comments

Comments
 (0)