Skip to content

Commit 2b9dd83

Browse files
authored
Fix unique scheduling issues (#619)
PR #590 introduced a more flexible unique job mechanism that allowed for the unique states to be customized, all while still benefiting from the performance improvements of using a unique index rather than an advisory lock. The retryable state is included in the default list of states, but can be removed if the user doesn't want to prevent an erroring job from blocking duplicate inserts. However this created an issue: when trying to schedule a retryable job (move it to available) it could potentially have a conflict with a duplicate unique job. To handle this, special logic was added to try to deal with this scenario for unique jobs, moving the conflicting row to discarded rather than available. Unfortunately this logic had issues and was insufficiently tested. There were a couple specific scenarios that caused issues: A unique job that was being scheduled because it was either inserted as scheduled or had errored and become retryable would actually be considered a conflict with itself because the query didn't properly exclude the row being scheduled. Attempting to schedule two duplicate retryable unique jobs at the same time would lead to a unique conflict because there was no mechanism to prevent this. The query changes in this PR address both of the above cases along with test coverage. The increased complexity is unfortunate, and we're probably nearing the limit of what should be dealt with in a single SQL query. If this still isn't complete I'm more inclined to fix the issues by catching these conflicts at the application level, explicitly moving the conflicting row(s) to discarded, and trying again. This can be looped with a backoff or recursed to ensure that progress keeps being made as individual conflicts get resolved. But hopefully that won't be necessary. Fixes #618.
1 parent bd43738 commit 2b9dd83

5 files changed

Lines changed: 342 additions & 103 deletions

File tree

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1111

1212
- The `BatchCompleter` that marks jobs as completed can now batch database updates for _all_ states of jobs that have finished execution. Prior to this change, only `completed` jobs were batched into a single `UPDATE` call, while jobs moving to any other state used a single `UPDATE` per job. This change should significantly reduce database and pool contention on high volume system when jobs get retried, snoozed, cancelled, or discarded following execution. [PR #617](https://github.com/riverqueue/river/pull/617).
1313

14+
### Fixed
15+
16+
- Unique job changes from v0.12.0 / [PR #590](https://github.com/riverqueue/river/pull/590) introduced a bug with scheduled or retryable unique jobs where they could be considered in conflict with themselves and moved to `discarded` by mistake. There was also a possibility of a broken job scheduler if duplicate `retryable` unique jobs were attempted to be scheduled at the same time. The job scheduling query was corrected to address these issues along with missing test coverage. [PR #619](https://github.com/riverqueue/river/pull/619).
17+
1418
## [0.12.0] - 2024-09-23
1519

1620
⚠️ Version 0.12.0 contains a new database migration, version 6. See [documentation on running River migrations](https://riverqueue.com/docs/migrations). If migrating with the CLI, make sure to update it to its latest version:

internal/riverinternaltest/riverdrivertest/riverdrivertest.go

Lines changed: 176 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,11 @@ import (
1010
"time"
1111

1212
"github.com/stretchr/testify/require"
13+
"github.com/tidwall/gjson"
1314
"golang.org/x/text/cases"
1415
"golang.org/x/text/language"
1516

17+
"github.com/riverqueue/river/internal/dbunique"
1618
"github.com/riverqueue/river/internal/notifier"
1719
"github.com/riverqueue/river/internal/rivercommon"
1820
"github.com/riverqueue/river/riverdriver"
@@ -1503,54 +1505,191 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,
15031505
t.Run("JobSchedule", func(t *testing.T) {
15041506
t.Parallel()
15051507

1506-
exec, _ := setup(ctx, t)
1508+
t.Run("BasicScheduling", func(t *testing.T) {
1509+
exec, _ := setup(ctx, t)
15071510

1508-
var (
1509-
horizon = time.Now()
1510-
beforeHorizon = horizon.Add(-1 * time.Minute)
1511-
afterHorizon = horizon.Add(1 * time.Minute)
1512-
)
1511+
var (
1512+
horizon = time.Now()
1513+
beforeHorizon = horizon.Add(-1 * time.Minute)
1514+
afterHorizon = horizon.Add(1 * time.Minute)
1515+
)
1516+
1517+
job1 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ScheduledAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateRetryable)})
1518+
job2 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ScheduledAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateScheduled)})
1519+
job3 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ScheduledAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateScheduled)})
15131520

1514-
job1 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ScheduledAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateRetryable)})
1515-
job2 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ScheduledAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateScheduled)})
1516-
job3 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ScheduledAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateScheduled)})
1521+
// States that aren't scheduled.
1522+
_ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ScheduledAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateAvailable)})
1523+
_ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{FinalizedAt: &beforeHorizon, ScheduledAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateCompleted)})
1524+
_ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{FinalizedAt: &beforeHorizon, ScheduledAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateDiscarded)})
15171525

1518-
// States that aren't scheduled.
1519-
_ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ScheduledAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateAvailable)})
1520-
_ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{FinalizedAt: &beforeHorizon, ScheduledAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateCompleted)})
1521-
_ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{FinalizedAt: &beforeHorizon, ScheduledAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateDiscarded)})
1526+
// Right state, but after horizon.
1527+
_ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ScheduledAt: &afterHorizon, State: ptrutil.Ptr(rivertype.JobStateRetryable)})
1528+
_ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ScheduledAt: &afterHorizon, State: ptrutil.Ptr(rivertype.JobStateScheduled)})
15221529

1523-
// Right state, but after horizon.
1524-
_ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ScheduledAt: &afterHorizon, State: ptrutil.Ptr(rivertype.JobStateRetryable)})
1525-
_ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ScheduledAt: &afterHorizon, State: ptrutil.Ptr(rivertype.JobStateScheduled)})
1530+
// First two scheduled because of limit.
1531+
result, err := exec.JobSchedule(ctx, &riverdriver.JobScheduleParams{
1532+
Max: 2,
1533+
Now: horizon,
1534+
})
1535+
require.NoError(t, err)
1536+
require.Len(t, result, 2)
1537+
1538+
// And then job3 scheduled.
1539+
result, err = exec.JobSchedule(ctx, &riverdriver.JobScheduleParams{
1540+
Max: 2,
1541+
Now: horizon,
1542+
})
1543+
require.NoError(t, err)
1544+
require.Len(t, result, 1)
15261545

1527-
// First two scheduled because of limit.
1528-
result, err := exec.JobSchedule(ctx, &riverdriver.JobScheduleParams{
1529-
Max: 2,
1530-
Now: horizon,
1546+
updatedJob1, err := exec.JobGetByID(ctx, job1.ID)
1547+
require.NoError(t, err)
1548+
require.Equal(t, rivertype.JobStateAvailable, updatedJob1.State)
1549+
1550+
updatedJob2, err := exec.JobGetByID(ctx, job2.ID)
1551+
require.NoError(t, err)
1552+
require.Equal(t, rivertype.JobStateAvailable, updatedJob2.State)
1553+
1554+
updatedJob3, err := exec.JobGetByID(ctx, job3.ID)
1555+
require.NoError(t, err)
1556+
require.Equal(t, rivertype.JobStateAvailable, updatedJob3.State)
15311557
})
1532-
require.NoError(t, err)
1533-
require.Len(t, result, 2)
15341558

1535-
// And then job3 scheduled.
1536-
result, err = exec.JobSchedule(ctx, &riverdriver.JobScheduleParams{
1537-
Max: 2,
1538-
Now: horizon,
1559+
t.Run("HandlesUniqueConflicts", func(t *testing.T) {
1560+
t.Parallel()
1561+
1562+
exec, _ := setup(ctx, t)
1563+
1564+
var (
1565+
horizon = time.Now()
1566+
beforeHorizon = horizon.Add(-1 * time.Minute)
1567+
)
1568+
1569+
defaultUniqueStates := []rivertype.JobState{
1570+
rivertype.JobStateAvailable,
1571+
rivertype.JobStatePending,
1572+
rivertype.JobStateRetryable,
1573+
rivertype.JobStateRunning,
1574+
rivertype.JobStateScheduled,
1575+
}
1576+
// The default unique state list, minus retryable to allow for these conflicts:
1577+
nonRetryableUniqueStates := []rivertype.JobState{
1578+
rivertype.JobStateAvailable,
1579+
rivertype.JobStatePending,
1580+
rivertype.JobStateRunning,
1581+
rivertype.JobStateScheduled,
1582+
}
1583+
1584+
job1 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{
1585+
ScheduledAt: &beforeHorizon,
1586+
State: ptrutil.Ptr(rivertype.JobStateRetryable),
1587+
UniqueKey: []byte("unique-key-1"),
1588+
UniqueStates: dbunique.UniqueStatesToBitmask(nonRetryableUniqueStates),
1589+
})
1590+
job2 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{
1591+
ScheduledAt: &beforeHorizon,
1592+
State: ptrutil.Ptr(rivertype.JobStateRetryable),
1593+
UniqueKey: []byte("unique-key-2"),
1594+
UniqueStates: dbunique.UniqueStatesToBitmask(nonRetryableUniqueStates),
1595+
})
1596+
// job3 has no conflict (it's the only one with this key), so it should be
1597+
// scheduled.
1598+
job3 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{
1599+
ScheduledAt: &beforeHorizon,
1600+
State: ptrutil.Ptr(rivertype.JobStateRetryable),
1601+
UniqueKey: []byte("unique-key-3"),
1602+
UniqueStates: dbunique.UniqueStatesToBitmask(defaultUniqueStates),
1603+
})
1604+
1605+
// This one is a conflict with job1 because it's already running and has
1606+
// the same unique properties:
1607+
_ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{
1608+
ScheduledAt: &beforeHorizon,
1609+
State: ptrutil.Ptr(rivertype.JobStateRunning),
1610+
UniqueKey: []byte("unique-key-1"),
1611+
UniqueStates: dbunique.UniqueStatesToBitmask(nonRetryableUniqueStates),
1612+
})
1613+
// This one is *not* a conflict with job2 because it's completed, which
1614+
// isn't in the unique states:
1615+
_ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{
1616+
ScheduledAt: &beforeHorizon,
1617+
State: ptrutil.Ptr(rivertype.JobStateCompleted),
1618+
UniqueKey: []byte("unique-key-2"),
1619+
UniqueStates: dbunique.UniqueStatesToBitmask(nonRetryableUniqueStates),
1620+
})
1621+
1622+
result, err := exec.JobSchedule(ctx, &riverdriver.JobScheduleParams{
1623+
Max: 100,
1624+
Now: horizon,
1625+
})
1626+
require.NoError(t, err)
1627+
require.Len(t, result, 3)
1628+
1629+
updatedJob1, err := exec.JobGetByID(ctx, job1.ID)
1630+
require.NoError(t, err)
1631+
require.Equal(t, rivertype.JobStateDiscarded, updatedJob1.State)
1632+
require.Equal(t, "scheduler_discarded", gjson.GetBytes(updatedJob1.Metadata, "unique_key_conflict").String())
1633+
1634+
updatedJob2, err := exec.JobGetByID(ctx, job2.ID)
1635+
require.NoError(t, err)
1636+
require.Equal(t, rivertype.JobStateAvailable, updatedJob2.State)
1637+
require.False(t, gjson.GetBytes(updatedJob2.Metadata, "unique_key_conflict").Exists())
1638+
1639+
updatedJob3, err := exec.JobGetByID(ctx, job3.ID)
1640+
require.NoError(t, err)
1641+
require.Equal(t, rivertype.JobStateAvailable, updatedJob3.State)
1642+
require.False(t, gjson.GetBytes(updatedJob3.Metadata, "unique_key_conflict").Exists())
15391643
})
1540-
require.NoError(t, err)
1541-
require.Len(t, result, 1)
15421644

1543-
updatedJob1, err := exec.JobGetByID(ctx, job1.ID)
1544-
require.NoError(t, err)
1545-
require.Equal(t, rivertype.JobStateAvailable, updatedJob1.State)
1645+
t.Run("SchedulingTwoRetryableJobsThatWillConflictWithEachOther", func(t *testing.T) {
1646+
t.Parallel()
15461647

1547-
updatedJob2, err := exec.JobGetByID(ctx, job2.ID)
1548-
require.NoError(t, err)
1549-
require.Equal(t, rivertype.JobStateAvailable, updatedJob2.State)
1648+
exec, _ := setup(ctx, t)
15501649

1551-
updatedJob3, err := exec.JobGetByID(ctx, job3.ID)
1552-
require.NoError(t, err)
1553-
require.Equal(t, rivertype.JobStateAvailable, updatedJob3.State)
1650+
var (
1651+
horizon = time.Now()
1652+
beforeHorizon = horizon.Add(-1 * time.Minute)
1653+
)
1654+
1655+
// The default unique state list, minus retryable to allow for these conflicts:
1656+
nonRetryableUniqueStates := []rivertype.JobState{
1657+
rivertype.JobStateAvailable,
1658+
rivertype.JobStatePending,
1659+
rivertype.JobStateRunning,
1660+
rivertype.JobStateScheduled,
1661+
}
1662+
1663+
job1 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{
1664+
ScheduledAt: &beforeHorizon,
1665+
State: ptrutil.Ptr(rivertype.JobStateRetryable),
1666+
UniqueKey: []byte("unique-key-1"),
1667+
UniqueStates: dbunique.UniqueStatesToBitmask(nonRetryableUniqueStates),
1668+
})
1669+
job2 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{
1670+
ScheduledAt: &beforeHorizon,
1671+
State: ptrutil.Ptr(rivertype.JobStateRetryable),
1672+
UniqueKey: []byte("unique-key-1"),
1673+
UniqueStates: dbunique.UniqueStatesToBitmask(nonRetryableUniqueStates),
1674+
})
1675+
1676+
result, err := exec.JobSchedule(ctx, &riverdriver.JobScheduleParams{
1677+
Max: 100,
1678+
Now: horizon,
1679+
})
1680+
require.NoError(t, err)
1681+
require.Len(t, result, 2)
1682+
1683+
updatedJob1, err := exec.JobGetByID(ctx, job1.ID)
1684+
require.NoError(t, err)
1685+
require.Equal(t, rivertype.JobStateAvailable, updatedJob1.State)
1686+
require.False(t, gjson.GetBytes(updatedJob1.Metadata, "unique_key_conflict").Exists())
1687+
1688+
updatedJob2, err := exec.JobGetByID(ctx, job2.ID)
1689+
require.NoError(t, err)
1690+
require.Equal(t, rivertype.JobStateDiscarded, updatedJob2.State)
1691+
require.Equal(t, "scheduler_discarded", gjson.GetBytes(updatedJob2.Metadata, "unique_key_conflict").String())
1692+
})
15541693
})
15551694

15561695
t.Run("JobSetCompleteIfRunningMany", func(t *testing.T) {

riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go

Lines changed: 54 additions & 22 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)