· 6 years ago · Mar 10, 2019, 04:48 AM
1const { Database } = require("sqlite3");
2
3const callback = (res, rej) => (err, val) => err ? rej(err) : res(val);
4const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms));
5
6const createTable = (name) => `
7 CREATE TABLE IF NOT EXISTS ${name} (
8 id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
9 at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
10 priority INTEGER NOT NULL,
11 json TEXT NOT NULL
12 )
13`;
14
15const pushItem = (name) =>
16 `INSERT INTO ${name} (priority, json) VALUES (?, ?)`;
17
18const selectSize = (name) =>
19 `SELECT COUNT(1) AS count FROM ${name}`;
20
21const selectTop = (name) =>
22 `SELECT * FROM ${name} ORDER BY priority DESC, at ASC LIMIT 1`;
23
24const deleteItem = (name, id) =>
25 `DELETE FROM ${name} WHERE id = ${id}`;
26
27class SQLite3Queue {
28 constructor(options = {}) {
29 this._filename = options.filename || ":memory:";
30 this._table = options.table || "queue";
31 this._maxRetries = options.maxRetries || -1;
32 this._retryAfter = options.retryAfter || 100;
33 }
34
35 async connect() {
36 if (!this._database) {
37 this._database = new Database(this._filename);
38
39 await new Promise((resolve, reject) => {
40 this._database.run(
41 createTable(this._table),
42 callback(resolve, reject),
43 );
44 });
45 }
46
47 return this._database;
48 }
49
50 push(value, priority = 0) {
51 return this._attempt(() => this._push(value, priority));
52 }
53
54 async _push(value, priority) {
55 const database = await this.connect();
56 const statement = database.prepare(pushItem(this._table));
57
58 await new Promise((resolve, reject) => {
59 statement.on("error", reject);
60
61 statement.run([
62 parseInt(priority, 10),
63 JSON.stringify(value)
64 ]);
65
66 statement.finalize(callback(resolve, reject));
67 });
68 }
69
70 async size() {
71 return this._attempt(() => this._size());
72 }
73
74 async _size() {
75 const database = await this.connect();
76
77 const row = await new Promise((resolve, reject) => {
78 database.get(selectSize(this._table), callback(resolve, reject));
79 });
80
81 return row.count;
82 }
83
84 async _peek() {
85 const database = await this.connect();
86
87 const top = await new Promise((resolve, reject) => {
88 database.get(selectTop(this._table), callback(resolve, reject));
89 });
90
91 return top;
92 }
93
94 pop() {
95 return this._attempt(() => this._pop());
96 }
97
98 async _pop() {
99 const database = await this.connect();
100 const top = await this._peek();
101
102 if (top) {
103 const changes = await new Promise((resolve, reject) => {
104 database.run(
105 deleteItem(this._table, top.id),
106 function (err) {
107 return err ? reject(err) : resolve(this.changes);
108 },
109 );
110 });
111
112 if (changes) {
113 return JSON.parse(top.json);
114 }
115 }
116
117 return undefined;
118 }
119
120 async _attempt(fn) {
121 for (let count = 0; ; count++) {
122 try {
123 const value = await fn();
124 return value;
125 } catch (err) {
126 console.log(err);
127
128 if (err.code !== "SQLITE_BUSY" || count === this._maxRetries) {
129 throw err;
130 }
131
132 await sleep(this._retryAfter * count);
133 }
134 }
135 }
136}
137
138module.exports = SQLite3Queue;