|
| 1 | +import type { Kysely } from 'kysely' |
| 2 | + |
| 3 | +import { E_LOCK_NOT_OWNED } from '../errors.js' |
| 4 | +import type { KyselyOptions, LockStore } from '../types/main.js' |
| 5 | + |
| 6 | +/** |
| 7 | + * Create a new knex store |
| 8 | + */ |
| 9 | +export function knexStore(config: KyselyOptions) { |
| 10 | + return { config, factory: () => new KyselyStore(config) } |
| 11 | +} |
| 12 | + |
| 13 | +export class KyselyStore implements LockStore { |
| 14 | + /** |
| 15 | + * Knex connection instance |
| 16 | + */ |
| 17 | + #connection: Kysely<any> |
| 18 | + |
| 19 | + /** |
| 20 | + * The name of the table used to store locks |
| 21 | + */ |
| 22 | + #tableName = 'verrou' |
| 23 | + |
| 24 | + /** |
| 25 | + * A promise that resolves when the table is created |
| 26 | + */ |
| 27 | + #initialized: Promise<void> |
| 28 | + |
| 29 | + constructor(config: KyselyOptions) { |
| 30 | + this.#connection = config.connection |
| 31 | + this.#tableName = config.tableName || this.#tableName |
| 32 | + if (config.autoCreateTable !== false) { |
| 33 | + this.#initialized = this.#createTableIfNotExists() |
| 34 | + } else { |
| 35 | + this.#initialized = Promise.resolve() |
| 36 | + } |
| 37 | + } |
| 38 | + |
| 39 | + /** |
| 40 | + * Create the locks table if it doesn't exist |
| 41 | + */ |
| 42 | + async #createTableIfNotExists() { |
| 43 | + await this.#connection.schema |
| 44 | + .createTable(this.#tableName) |
| 45 | + .addColumn('key', 'varchar(255)', (col) => col.primaryKey().notNull()) |
| 46 | + .addColumn('owner', 'varchar(255)', (col) => col.notNull()) |
| 47 | + .addColumn('expiration', 'bigint') |
| 48 | + .ifNotExists() |
| 49 | + .execute() |
| 50 | + } |
| 51 | + |
| 52 | + /** |
| 53 | + * Compute the expiration date based on the provided TTL |
| 54 | + */ |
| 55 | + #computeExpiresAt(ttl: number | null) { |
| 56 | + if (ttl) return Date.now() + ttl |
| 57 | + return null |
| 58 | + } |
| 59 | + |
| 60 | + /** |
| 61 | + * Get the current owner of given lock key |
| 62 | + */ |
| 63 | + async #getCurrentOwner(key: string) { |
| 64 | + await this.#initialized |
| 65 | + const result = await this.#connection |
| 66 | + .selectFrom(this.#tableName) |
| 67 | + .where('key', '=', key) |
| 68 | + .select('owner') |
| 69 | + .executeTakeFirst() |
| 70 | + |
| 71 | + return result?.owner |
| 72 | + } |
| 73 | + |
| 74 | + /** |
| 75 | + * Save the lock in the store if not already locked by another owner |
| 76 | + * |
| 77 | + * We basically rely on primary key constraint to ensure the lock is |
| 78 | + * unique. |
| 79 | + * |
| 80 | + * If the lock already exists, we check if it's expired. If it is, we |
| 81 | + * update it with the new owner and expiration date. |
| 82 | + */ |
| 83 | + async save(key: string, owner: string, ttl: number | null) { |
| 84 | + try { |
| 85 | + await this.#initialized |
| 86 | + await this.#connection |
| 87 | + .insertInto(this.#tableName) |
| 88 | + .values({ key, owner, expiration: this.#computeExpiresAt(ttl) }) |
| 89 | + .execute() |
| 90 | + |
| 91 | + return true |
| 92 | + } catch (error) { |
| 93 | + const updated = await this.#connection |
| 94 | + .updateTable(this.#tableName) |
| 95 | + .where('key', '=', key) |
| 96 | + .where('expiration', '<=', Date.now()) |
| 97 | + .set({ owner, expiration: this.#computeExpiresAt(ttl) }) |
| 98 | + .executeTakeFirst() |
| 99 | + |
| 100 | + return updated.numUpdatedRows >= BigInt(1) |
| 101 | + } |
| 102 | + } |
| 103 | + |
| 104 | + /** |
| 105 | + * Delete the lock from the store if it is owned by the owner |
| 106 | + * Otherwise throws a E_LOCK_NOT_OWNED error |
| 107 | + */ |
| 108 | + async delete(key: string, owner: string): Promise<void> { |
| 109 | + const currentOwner = await this.#getCurrentOwner(key) |
| 110 | + if (currentOwner !== owner) throw new E_LOCK_NOT_OWNED() |
| 111 | + |
| 112 | + await this.#connection |
| 113 | + .deleteFrom(this.#tableName) |
| 114 | + .where('key', '=', key) |
| 115 | + .where('owner', '=', owner) |
| 116 | + .execute() |
| 117 | + } |
| 118 | + |
| 119 | + /** |
| 120 | + * Force delete the lock from the store. No check is made on the owner |
| 121 | + */ |
| 122 | + async forceDelete(key: string) { |
| 123 | + await this.#connection.deleteFrom(this.#tableName).where('key', '=', key).execute() |
| 124 | + } |
| 125 | + |
| 126 | + /** |
| 127 | + * Extend the lock expiration. Throws an error if the lock is not owned by the owner |
| 128 | + * Duration is in milliseconds |
| 129 | + */ |
| 130 | + async extend(key: string, owner: string, duration: number) { |
| 131 | + const updated = await this.#connection |
| 132 | + .updateTable(this.#tableName) |
| 133 | + .where('key', '=', key) |
| 134 | + .where('owner', '=', owner) |
| 135 | + .set({ expiration: Date.now() + duration }) |
| 136 | + .executeTakeFirst() |
| 137 | + |
| 138 | + if (updated.numUpdatedRows === BigInt(0)) throw new E_LOCK_NOT_OWNED() |
| 139 | + } |
| 140 | + |
| 141 | + /** |
| 142 | + * Check if the lock exists |
| 143 | + */ |
| 144 | + async exists(key: string) { |
| 145 | + await this.#initialized |
| 146 | + const result = await this.#connection |
| 147 | + .selectFrom(this.#tableName) |
| 148 | + .where('key', '=', key) |
| 149 | + .select('expiration') |
| 150 | + .executeTakeFirst() |
| 151 | + |
| 152 | + if (!result) return false |
| 153 | + if (result.expiration === null) return true |
| 154 | + |
| 155 | + return result.expiration > Date.now() |
| 156 | + } |
| 157 | + |
| 158 | + /** |
| 159 | + * Disconnect kysely connection |
| 160 | + */ |
| 161 | + disconnect() { |
| 162 | + return this.#connection.destroy() |
| 163 | + } |
| 164 | +} |
0 commit comments