· 6 years ago · Aug 18, 2019, 07:56 AM
1from pathlib import Path
2from typing import Union, Optional
3import shutil
4import os
5import json
6import sqlite3
7import logging
8
9Pathish = Union[Path, str]
10
11
12class Task:
13 """Abstract class for qcp Tasks. Should not be instantiated directly."""
14
15 def __init__(self) -> None:
16 self.type = 0
17
18 def run(self) -> None:
19 """Run the Task"""
20 pass
21
22 @staticmethod
23 def from_dict(x, validate: bool = False) -> "Task":
24 """Create a Task of the appropriate subclass from a python dict"""
25 task_type = x["type"]
26
27 if task_type == -1:
28 return KillTask()
29 elif task_type == 0:
30 return Task()
31 elif task_type == 1:
32 return EchoTask(x["msg"])
33 elif task_type == 2:
34 return FileTask(x["src"], validate=validate)
35 elif task_type == 3:
36 return DeleteTask(x["src"], validate=validate)
37 elif task_type == 4:
38 return CopyTask(x["src"], x["dst"], validate=validate)
39 elif task_type == 5:
40 return MoveTask(x["src"], x["dst"], validate=validate)
41 elif task_type == 6:
42 raise NotImplementedError
43 else:
44 raise ValueError
45
46 def __repr__(self) -> str:
47 return 'NULL'
48
49 def __eq__(self, other) -> bool:
50 return self.__dict__ == other.__dict__
51
52 def __ne__(self, other) -> bool:
53 return self.__dict__ != other.__dict__
54
55
56class KillTask(Task):
57 """Kill the qcp server"""
58 def __init__(self) -> None:
59 self.type = -1
60 super().__init__()
61
62 def run(self) -> None:
63 raise NotImplementedError
64
65 def __repr__(self) -> str:
66 return 'KILL'
67
68
69class EchoTask(Task):
70 """Log a message"""
71 def __init__(self, msg: str) -> None:
72 super().__init__()
73 self.msg = msg
74 self.type = 1
75
76 def run(self) -> None:
77 print(self.msg)
78
79 def __repr__(self) -> str:
80 return f'Echo: "{self.msg}"'
81
82
83class FileTask(Task):
84 """Abstract class for all file-based tasks"""
85 def __init__(self, src: Pathish, validate: bool = True) -> None:
86 super().__init__()
87 self.validate = validate
88 self.src = Path(src).as_posix()
89 self.type = 2
90 if validate:
91 self.__validate__()
92
93 def __validate__(self) -> None:
94 if not Path(self.src).exists():
95 raise FileNotFoundError(f'{self.src} does not exist')
96 elif not (Path(self.src).is_dir() or Path(self.src).is_file()):
97 raise TypeError(f'{self.src} is neither a file nor directory')
98
99
100class DeleteTask(FileTask):
101 """Delete a file"""
102 def __init__(self, src: Pathish, validate: bool = True) -> None:
103 super().__init__(src=src, validate=validate)
104 self.type = 3
105
106 def run(self) -> None:
107 os.unlink(self.src)
108
109 def __repr__(self) -> str:
110 return f'DEL {self.src}'
111
112
113class CopyTask(FileTask):
114 """Copy a file"""
115 def __init__(self, src: Pathish, dst: Pathish, validate: bool = True) -> None:
116 super().__init__(src=src, validate=False)
117 self.dst = Path(dst).as_posix()
118 self.type = 4
119 self.validate = validate
120 if validate:
121 self.__validate__()
122
123 def __repr__(self) -> str:
124 return f'COPY {self.src} -> {self.dst}'
125
126 def __validate__(self) -> None:
127 super().__validate__()
128 if Path(self.dst).exists():
129 raise FileExistsError
130
131 def run(self) -> None:
132 self.__validate__()
133 shutil.copy(self.src, self.dst)
134
135
136class MoveTask(CopyTask):
137 """Move a file"""
138 def __init__(self, src: Pathish, dst: Pathish, validate: bool = True) -> None:
139 super().__init__(src=src, dst=dst, validate=validate)
140 self.type = 5
141
142 def run(self) -> None:
143 super().__validate__()
144 shutil.move(self.src, self.dst)
145
146 def __repr__(self) -> str:
147 return f'MOVE {self.src} -> {self.dst}'
148
149
150class TaskQueueElement:
151 """An enqueued Task"""
152
153 task = None #: A Task
154 status = None #: Status of the queued Task
155 priority = None #: Priority of the queued Task
156
157 def __init__(self, task: Task, priority: 1) -> None:
158 self.task = task
159 self.priority = priority
160
161 def __lt__(self, other) -> bool:
162 return self.priority < other.priority
163
164 def __gt__(self, other) -> bool:
165 return self.priority > other.priority
166
167 def __eq__(self, other) -> bool:
168 return self.__dict__ == other.__dict__
169
170 def __ne__(self, other) -> bool:
171 return self.__dict__ != other.__dict__
172
173
174class TaskQueue:
175 """A prioritzed queue for tasks"""
176 def __init__(self, path: Pathish = 'qcp.db') -> None:
177 """
178 Instantiate a TaskQueue
179
180 :param path: Path to store the persistent queue
181 :type path: Path or str
182 """
183
184 self.con = sqlite3.connect(path, isolation_level="EXCLUSIVE")
185 self.path = Path(path)
186
187 cur = self.con.cursor()
188 cur.execute("""
189 CREATE TABLE IF NOT EXISTS tasks (
190 priority INTEGER,
191 task TEXT,
192 status INTEGER,
193 owner INTEGER
194 )
195 """)
196 self.con.commit()
197
198 @property
199 def n_total(self) -> int:
200 """Count of all tasks in queue (including failed and completed)"""
201 cur = self.con.cursor()
202 return cur.execute("SELECT COUNT(1) from tasks").fetchall()[0][0]
203
204 @property
205 def n_pending(self) -> int:
206 """Number of pending tasks"""
207 cur = self.con.cursor()
208 return cur.execute("SELECT COUNT(1) FROM tasks WHERE status = 0").fetchall()[0][0]
209
210 @property
211 def n_running(self) -> int:
212 """Count of currently running tasks"""
213 cur = self.con.cursor()
214 return cur.execute("SELECT COUNT(1) FROM tasks WHERE status = 1").fetchall()[0][0]
215
216 @property
217 def n_done(self) -> int:
218 """count of completed tasks"""
219 cur = self.con.cursor()
220 return cur.execute("SELECT COUNT(1) from tasks WHERE status = 2").fetchall()[0][0]
221
222 @property
223 def n_failed(self) -> int:
224 """count of completed tasks"""
225 cur = self.con.cursor()
226 return cur.execute("SELECT COUNT(1) from tasks WHERE status = -1").fetchall()[0][0]
227
228 def put(self, task: "Task", priority: Optional[int] = None) -> None:
229 """
230 Enqueue a task
231
232 :param task: Task to be added to the queue
233 :type task: Task
234 :param priority: (optional) priority for executing `task` (tasks with lower priority will be executed earlier)
235 :type priority: int
236 """
237
238 cur = self.con.cursor()
239 cur.execute(
240 "INSERT INTO tasks (priority, task, status) VALUES (?, ?, ?)", (priority, json.dumps(task.__dict__), 0)
241 )
242 self.con.commit()
243
244 def pop(self) -> "Task":
245 """
246 Retrieves Task object and sets status of Task in database to "in progress" (1)
247
248 :raises AlreadyUnderEvaluationError: If trying to pop a tasks that is already being processed (i.e. if a race
249 condition occurs if the queue is processed in parallel)
250 """
251 cur = self.con.cursor()
252 cur.execute("SELECT _ROWID_ from tasks WHERE status = 0 ORDER BY priority LIMIT 1")
253 oid = cur.fetchall()[0][0].__str__()
254 self.mark_running(oid, id(self))
255
256 cur.execute("SELECT owner, task FROM tasks WHERE _ROWID_ = ?", oid)
257 record = cur.fetchall()[0]
258 if record[0] != id(self):
259 raise AlreadyUnderEvaluationError
260
261 task = Task.from_dict(json.loads(record[1]))
262 task.oid = oid
263 return task
264
265 def peek(self) -> "Task":
266 """
267 Retrieves Task object without changing its status in the queue
268 """
269 cur = self.con.cursor()
270 cur.execute("SELECT * from tasks ORDER BY priority LIMIT 1")
271 record = cur.fetchall()[0]
272 oid = record[0].__str__()
273 task = Task.from_dict(json.loads(record[1]), validate=False)
274 task.oid = oid
275 return task
276
277 def print(self, n: int = 10) -> None:
278 """
279 Print an overview of the queue
280
281 :param n: number of tasks to preview
282 :type n: int
283 """
284 assert isinstance(n, int) and n > 0
285 cur = self.con.cursor()
286 cur.execute("SELECT status, task from tasks ORDER BY priority LIMIT ?", (str(n), ))
287 records = cur.fetchall()
288 for record in records:
289 print(f"[{record[0]}] {Task.from_dict(json.loads(record[1]))}")
290
291 def mark_pending(self, oid: int) -> None:
292 """
293 Mark the operation with the _ROWID_ `oid` as "pending" (0)
294
295 :param oid: ID of the task to mark
296 :type oid: int
297 """
298 cur = self.con.cursor()
299 cur.execute("UPDATE tasks SET status = 0, owner = NULL where _ROWID_ = ?", (oid, ))
300 self.con.commit()
301
302 def mark_running(self, oid: int, owner: int) -> None:
303 """Mark the operation with the _ROWID_ `oid` as "running" (1). The "owner" Id is to ensure no two processes
304 are trying to execute the same operation
305
306 :param oid: ID of the task to mark
307 :type oid: int
308 :param owner: Id of the process that is handling the operation
309 :type owner: int
310 """
311 cur = self.con.cursor()
312 cur.execute("UPDATE tasks SET status = 1, owner = ? where _ROWID_ = ?", (owner, oid))
313 self.con.commit()
314
315 def mark_done(self, oid: int) -> None:
316 """
317 Mark the operation with the _ROWID_ `oid` as "done" (2)
318 :param oid: ID of the task to mark
319 :type oid: int
320 """
321 cur = self.con.cursor()
322 cur.execute("UPDATE tasks SET status = 2, owner = NULL where _ROWID_ = ?", (oid, ))
323 self.con.commit()
324
325 def mark_failed(self, oid: int) -> None:
326 """
327 Mark the operation with the _ROWID_ `oid` as "failed" (-1)
328
329 :param oid: ID of the task to mark
330 :type oid: int
331 """
332 cur = self.con.cursor()
333 cur.execute("UPDATE tasks SET status = -1, owner = NULL where _ROWID_ = ?", (oid, ))
334 self.con.commit()
335
336 def run(self) -> None:
337 """Execute all pending tasks"""
338 if self.n_pending < 1:
339 logging.getLogger().warn("Queue is empty")
340
341 while self.n_pending > 0:
342 op = self.pop()
343 op.run()
344 self.mark_done(op.oid)
345
346
347class AlreadyUnderEvaluationError(Exception):
348 """This Task is already being processed by a different worker"""
349 pass
350
351import tasks
352import pytest
353
354def test_TaskQueue(tmp_path):
355 """TaskQueue can queue and execute tasks"""
356 src = tmp_path.joinpath("foo")
357 src.touch()
358
359 q = tasks.TaskQueue(tmp_path.joinpath("qcp.db"))
360 q.put(tasks.CopyTask(src, tmp_path.joinpath("copied_file")))
361 q.run()
362 assert tmp_path.joinpath("copied_file").is_file()
363 q.put(tasks.MoveTask(tmp_path.joinpath("copied_file"), tmp_path.joinpath("moved_file")))
364 q.run()
365 assert not tmp_path.joinpath("copied_file").is_file()
366 assert tmp_path.joinpath("moved_file").is_file()
367 q.put(tasks.DeleteTask(tmp_path.joinpath("moved_file")))
368 q.run()
369 assert not tmp_path.joinpath("moved_file").is_file()
370 assert src.is_file()