Skip to content

Commit 6bfeb83

Browse files
committed
Initial commit.
0 parents  commit 6bfeb83

4 files changed

Lines changed: 353 additions & 0 deletions

File tree

README.md

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
timerqueue
2+
==========
3+
4+
The timerqueue package implements a priority queue for objects scheduled to
5+
perform actions at clock times.
6+
7+
See http://godoc.org/github.com/beevik/timerqueue for godoc-formatted API
8+
documentation.
9+
10+
###Example: Scheduling timers
11+
12+
The following code declares an object implementing the Timer interface,
13+
creates a timerqueue, and adds three events to the timerqueue.
14+
15+
```go
16+
type event int
17+
18+
func (e event) OnTimer(t time.Time) {
19+
fmt.Printf("event.OnTimer %d fired at %v\n", int(e), t)
20+
}
21+
22+
queue := timerqueue.New()
23+
queue.Schedule(event(1), time.Date(2015, 1, 1, 0, 0, 0, 0, time.UTC))
24+
queue.Schedule(event(2), time.Date(2015, 1, 3, 0, 0, 0, 0, time.UTC))
25+
queue.Schedule(event(3), time.Date(2015, 1, 2, 0, 0, 0, 0, time.UTC))
26+
27+
```
28+
29+
###Example: Peeking at the next timer to be scheduled
30+
31+
Using the queue initialized in the first example, the following code
32+
examines the head of the timerqueue and outputs the id and time of
33+
the event found there.
34+
35+
```go
36+
e, t := queue.PeekFirst()
37+
if e != nil {
38+
fmt.Printf("Event %d will be first to fire at %v.\n", int(e.(event)), t)
39+
fmt.Printf("%d events remain in the timerqueue.", queue.Len())
40+
}
41+
```
42+
43+
Output:
44+
```
45+
Event 1 will be first to fire at 2015-01-01 00:00:00 +0000 UTC.
46+
3 events remain in the timerqueue.
47+
```
48+
49+
###Example: Popping the next timer to be scheduled
50+
51+
Using the queue initialized in the first example, this code
52+
removes the next timer to be executed until the queue is empty.
53+
54+
```go
55+
for queue.Len() > 0 {
56+
e, t := queue.PopFirst()
57+
fmt.Printf("Event %d fires at %v.\n", int(e.(event)), t)
58+
}
59+
```
60+
61+
Output:
62+
```
63+
Event 1 fires at 2015-01-01 00:00:00 +0000 UTC.
64+
Event 3 fires at 2015-01-02 00:00:00 +0000 UTC.
65+
Event 2 fires at 2015-01-03 00:00:00 +0000 UTC.
66+
```
67+
68+
###Example: Issuing OnTimer callbacks with Advance
69+
70+
The final example shows how to dispatch OnTimer callbacks to
71+
timers using the timerqueue's Advance method.
72+
73+
Advance calls the OnTimer method for each timer scheduled
74+
before the requested time. Timers are removed from the timerqueue
75+
in order of their scheduling.
76+
77+
```go
78+
// Call the OnTimer method for each event scheduled before
79+
// January 10, 2015. Pop the called timer from the queue.
80+
queue.Advance(time.Date(2015, 1, 10, 0, 0, 0, 0, time.UTC))
81+
```
82+
83+
Output:
84+
```
85+
event.OnTimer 1 fired at 2015-01-01 00:00:00 +0000 UTC.
86+
event.OnTimer 3 fired at 2015-01-02 00:00:00 +0000 UTC.
87+
event.OnTimer 2 fired at 2015-01-03 00:00:00 +0000 UTC.
88+
```

example_test.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package timerqueue_test
2+
3+
import (
4+
"fmt"
5+
"time"
6+
7+
"github.com/beevik/timerqueue"
8+
)
9+
10+
type event int
11+
12+
func (e event) OnTimer(t time.Time) {
13+
fmt.Printf(" Event %d executed at %v\n", int(e), t)
14+
}
15+
16+
// Schedule several events with a timerqueue, and dispatch
17+
// them by calling Advance.
18+
func ExampleQueue() {
19+
queue := timerqueue.New()
20+
21+
// Schedule an event each day from Jan 1 to Jan 7, 2015.
22+
tm := time.Date(2015, 1, 1, 0, 0, 0, 0, time.UTC)
23+
for i := 1; i <= 7; i++ {
24+
queue.Schedule(event(i), tm)
25+
tm = tm.Add(24 * time.Hour)
26+
}
27+
28+
fmt.Println("Advancing to Jan 4...")
29+
queue.Advance(time.Date(2015, 1, 4, 0, 0, 0, 0, time.UTC))
30+
31+
fmt.Println("Advancing to Jan 10...")
32+
queue.Advance(time.Date(2015, 1, 10, 0, 0, 0, 0, time.UTC))
33+
34+
// Output:
35+
// Advancing to Jan 4...
36+
// Event 1 executed at 2015-01-01 00:00:00 +0000 UTC
37+
// Event 2 executed at 2015-01-02 00:00:00 +0000 UTC
38+
// Event 3 executed at 2015-01-03 00:00:00 +0000 UTC
39+
// Event 4 executed at 2015-01-04 00:00:00 +0000 UTC
40+
// Advancing to Jan 10...
41+
// Event 5 executed at 2015-01-05 00:00:00 +0000 UTC
42+
// Event 6 executed at 2015-01-06 00:00:00 +0000 UTC
43+
// Event 7 executed at 2015-01-07 00:00:00 +0000 UTC
44+
}

queue.go

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
// Package timerqueue implements a priority queue for objects scheduled at a
2+
// particular time.
3+
package timerqueue
4+
5+
import (
6+
"container/heap"
7+
"errors"
8+
"time"
9+
)
10+
11+
// Timer is an interface that types implement to schedule and receive OnTimer
12+
// callbacks.
13+
type Timer interface {
14+
OnTimer(t time.Time)
15+
}
16+
17+
// Queue is a time-sorted collection of Timer objects.
18+
type Queue struct {
19+
heap timerHeap
20+
table map[Timer]*timerData
21+
}
22+
23+
type timerData struct {
24+
timer Timer
25+
time time.Time
26+
index int
27+
}
28+
29+
// New creates a new timer priority queue.
30+
func New() *Queue {
31+
return &Queue{
32+
table: make(map[Timer]*timerData),
33+
}
34+
}
35+
36+
// Len returns the current number of timer objects in the queue.
37+
func (q *Queue) Len() int {
38+
return len(q.heap)
39+
}
40+
41+
// Schedule schedules a timer for exectuion at time tm. If the
42+
// timer was already scheduled, it is rescheduled.
43+
func (q *Queue) Schedule(t Timer, tm time.Time) {
44+
if data, ok := q.table[t]; !ok {
45+
data = &timerData{t, tm, 0}
46+
heap.Push(&q.heap, data)
47+
q.table[t] = data
48+
} else {
49+
data.time = tm
50+
heap.Fix(&q.heap, data.index)
51+
}
52+
}
53+
54+
// Unschedule unschedules a timer's execution.
55+
func (q *Queue) Unschedule(t Timer) {
56+
if data, ok := q.table[t]; ok {
57+
heap.Remove(&q.heap, data.index)
58+
delete(q.table, t)
59+
}
60+
}
61+
62+
// GetTime returns the time at which the timer is scheduled.
63+
// If the timer isn't currently scheduled, an error is returned.
64+
func (q *Queue) GetTime(t Timer) (tm time.Time, err error) {
65+
if data, ok := q.table[t]; ok {
66+
return data.time, nil
67+
}
68+
return time.Time{}, errors.New("timerqueue: timer not scheduled")
69+
}
70+
71+
// IsScheduled returns true if the timer is currently scheduled.
72+
func (q *Queue) IsScheduled(t Timer) bool {
73+
_, ok := q.table[t]
74+
return ok
75+
}
76+
77+
// Clear unschedules all currently scheduled timers.
78+
func (q *Queue) Clear() {
79+
q.heap, q.table = nil, make(map[Timer]*timerData)
80+
}
81+
82+
// PopFirst removes and returns the next timer to be scheduled and
83+
// the time at which it is scheduled to run.
84+
func (q *Queue) PopFirst() (t Timer, tm time.Time) {
85+
if len(q.heap) > 0 {
86+
data := heap.Pop(&q.heap).(*timerData)
87+
delete(q.table, data.timer)
88+
return data.timer, data.time
89+
}
90+
return nil, time.Time{}
91+
}
92+
93+
// PeekFirst returns the next timer to be scheduled and the time
94+
// at which it is scheduled to run. It does not modify the contents
95+
// of the timer queue.
96+
func (q *Queue) PeekFirst() (t Timer, tm time.Time) {
97+
if len(q.heap) > 0 {
98+
return q.heap[0].timer, q.heap[0].time
99+
}
100+
return nil, time.Time{}
101+
}
102+
103+
// Advance executes OnTimer callbacks for all timers scheduled to be
104+
// run before the time 'tm'. Executed timers are removed from the
105+
// timer queue.
106+
func (q *Queue) Advance(tm time.Time) {
107+
for len(q.heap) > 0 && !tm.Before(q.heap[0].time) {
108+
data := q.heap[0]
109+
heap.Remove(&q.heap, data.index)
110+
delete(q.table, data.timer)
111+
data.timer.OnTimer(data.time)
112+
}
113+
}
114+
115+
/*
116+
* timerHeap
117+
*/
118+
119+
type timerHeap []*timerData
120+
121+
func (h timerHeap) Len() int {
122+
return len(h)
123+
}
124+
125+
func (h timerHeap) Less(i, j int) bool {
126+
return h[i].time.Before(h[j].time)
127+
}
128+
129+
func (h timerHeap) Swap(i, j int) {
130+
h[i], h[j] = h[j], h[i]
131+
h[i].index, h[j].index = i, j
132+
}
133+
134+
func (h *timerHeap) Push(x interface{}) {
135+
data := x.(*timerData)
136+
*h = append(*h, data)
137+
data.index = len(*h) - 1
138+
}
139+
140+
func (h *timerHeap) Pop() interface{} {
141+
n := len(*h)
142+
data := (*h)[n-1]
143+
*h = (*h)[:n-1]
144+
data.index = -1
145+
return data
146+
}

queue_test.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package timerqueue
2+
3+
import (
4+
"math/rand"
5+
"testing"
6+
"time"
7+
)
8+
9+
type object struct {
10+
value int
11+
}
12+
13+
var executed int
14+
15+
func (o *object) OnTimer(t time.Time) {
16+
executed++
17+
}
18+
19+
func populateQueue(t *testing.T, now time.Time) *Queue {
20+
q := New()
21+
22+
count := 200
23+
objects := make([]*object, count)
24+
25+
// Add a bunch of objects to the queue in random order.
26+
for i, j := range rand.Perm(count) {
27+
tm := now.Add(time.Duration(i+1) * time.Hour)
28+
objects[j] = &object{j}
29+
q.Schedule(objects[j], tm)
30+
}
31+
32+
if q.Len() != count {
33+
t.Error("invalid queue length:", q.Len())
34+
}
35+
36+
return q
37+
}
38+
39+
func TestQueue(t *testing.T) {
40+
for iter := 0; iter < 100; iter++ {
41+
now := time.Date(2015, 1, 1, 0, 0, 0, 0, time.UTC)
42+
queue := populateQueue(t, now)
43+
44+
// Make sure objects are removed from the queue in order.
45+
for prev := now; queue.Len() > 0; {
46+
_, tm := queue.PopFirst()
47+
if tm.Sub(prev) != time.Hour {
48+
t.Errorf("Invalid queue ordering.\n"+
49+
" Got: %v\n"+
50+
"Expected: %v\n", tm, prev.Add(time.Hour))
51+
}
52+
prev = tm
53+
}
54+
}
55+
}
56+
57+
func TestAdvance(t *testing.T) {
58+
for iter := 0; iter < 100; iter++ {
59+
now := time.Date(2015, 1, 1, 0, 0, 0, 0, time.UTC)
60+
queue := populateQueue(t, now)
61+
62+
executed = 0
63+
count := queue.Len()
64+
lastTime := now.Add(time.Duration(count) * time.Hour)
65+
66+
for adv := 0; adv < 5; adv++ {
67+
queue.Advance(lastTime)
68+
if executed != count {
69+
t.Errorf("Advance failed.\n"+
70+
"Should have executed %d times.\n"+
71+
"Only executed %d times.\n", count, executed)
72+
}
73+
}
74+
}
75+
}

0 commit comments

Comments
 (0)