· 6 years ago · Sep 17, 2019, 08:52 PM
1%%%-------------------------------------------------------------------
2%%% @author deadhorse <deadhorse@deb7.4>
3%%% @copyright (C) 2015, deadhorse
4%%% @doc
5%%%
6%%% @end
7%%% Created : 5 May 2015 by deadhorse <deadhorse@deb7.4>
8%%%-------------------------------------------------------------------
9-module(dvcs_task_queue).
10-include("../../dvcs_common/src/dvcs_types.hrl").
11-behaviour(gen_server).
12
13%% API
14-export([start_link/0, stop/0]).
15-export([add_task/1, remove_task/1, refresh/0,
16 show_queue/0, show_successful/0, show_failed/0, show_running/0]).
17-export([max_running/0, set_max_running/1]).
18-export([task_finished/2, task_failed/2]).
19-export([test1/1]).
20%% gen_server callbacks
21-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
22 terminate/2, code_change/3]).
23
24-define(SERVER, ?MODULE).
25-define(SUCCESSFUL, "successful_tasks.dets").
26-define(FAILED, "failed_tasks.dets").
27-define(QUEUE, "queue").
28-define(MAX_RUNNING_DEFAULT, 10).
29-define(ATTEMPTS_MAX, 4).
30
31-record(state, {queue,
32 running, %% {Key, Value} - {Pid, Task}
33 successful,
34 failed}).
35%% To do:
36
37%% Нужно реализовать функцию добавления в успешно завершенные задачи.
38%% Дочерний процесс перед собственным завершением должен сообщить родителю
39%% о результате, положительном или отрицательном. На задачу дается 5 попыток.
40%% Нужно в task добавить task_hist, в которую будет заноситься:
41%% когда добавлено в очередь, когда она была возвращена, с каким результатом.
42%% Каждая попытка должна сохраняться. [{added, returned, success}].
43%%%===================================================================
44%%% API
45%%%===================================================================
46
47%%--------------------------------------------------------------------
48%% @doc
49%% Starts the server
50%%
51%% @spec start_link() -> {ok, Pid} | ignore | {error, Error}
52%% @end
53%%--------------------------------------------------------------------
54start_link() ->
55 gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
56
57stop() ->
58 gen_server:call(?SERVER, stop).
59
60add_task(Task) ->
61 gen_server:call(?SERVER, {add_task, Task}).
62
63remove_task(TaskId) ->
64 gen_server:call(?SERVER, {remove_task, TaskId}).
65
66refresh() ->
67 gen_server:call(?SERVER, refresh).
68
69show_queue() ->
70 gen_server:call(?SERVER, get_queue).
71
72show_successful() ->
73 gen_server:call(?SERVER, {get_successful, 10}).
74
75show_failed() ->
76 gen_server:call(?SERVER, {get_failed, 10}).
77
78show_running() ->
79 gen_server:call(?SERVER, get_running).
80
81%% настройки:
82max_running() ->
83 dvcs_settings:value(max_running).
84set_max_running(N) ->
85 dvcs_settings:store(max_running, N).
86
87%% Для dvcs_watcher:
88task_finished(Pid, Task) ->
89 gen_server:call(?SERVER, {task_finished, Pid, Task}).
90
91task_failed(Pid, Task) ->
92 gen_server:call(?SERVER, {task_failed, Pid, Task}).
93
94%% тесты
95test1(N) ->
96 Tasks = [#task{id = Id} || Id <- lists:seq(1,N) ],
97 refresh(),
98 lists:foreach(fun (X) ->
99 dvcs_task_queue:add_task(X),
100 dvcs_task_queue:remove_task(X#task.id),
101 dvcs_task_queue:remove_task(X#task.id)
102 end, Tasks).
103 %% timer:sleep(N*1000),
104 %% dvcs_task_queue:stop().
105
106%%%===================================================================
107%%% gen_server callbacks
108%%%===================================================================
109
110%%--------------------------------------------------------------------
111%% @private
112%% @doc
113%% Initializes the server
114%%
115%% @spec init(Args) -> {ok, State} |
116%% {ok, State, Timeout} |
117%% ignore |
118%% {stop, Reason}
119%% @end
120%%--------------------------------------------------------------------
121init([]) ->
122 {ok, S} = dets:open_file(successful_tasks,
123 [{access, read_write},
124 {keypos, 2},
125 {file, ?SUCCESSFUL},
126 {type, set}]),
127 {ok, F} = dets:open_file(failed_tasks,
128 [{access, read_write},
129 {keypos, 2},
130 {file, ?FAILED},
131 {type, set}]),
132 {Q, R} = case read_state() of
133 {error, cannot_read_file} ->
134 {queue:new(), dict:new()};
135 {A, B} ->
136 UndoneTasks = B,
137 NewQ = lists:foldl(fun (Task, Queue) ->
138 queue:in_r(Task, Queue)
139 end, A, UndoneTasks),
140 {NewQ, dict:new()}
141 end,
142 case max_running() of
143 undefined ->
144 set_max_running(?MAX_RUNNING_DEFAULT);
145 _ -> []
146 end,
147 process_q(),
148 {ok, #state{queue = Q, running = R, successful = S, failed = F}}.
149
150%%--------------------------------------------------------------------
151%% @private
152%% @doc
153%% Handling call messages
154%%
155%% @spec handle_call(Request, From, State) ->
156%% {reply, Reply, State} |
157%% {reply, Reply, State, Timeout} |
158%% {noreply, State} |
159%% {noreply, State, Timeout} |
160%% {stop, Reason, Reply, State} |
161%% {stop, Reason, State}
162%% @end
163%%--------------------------------------------------------------------
164
165%% добавление задачи в очередь пользователем
166handle_call({add_task, Task = #task{}}, _From, State = #state{queue = Q}) ->
167 {Reply, NewState} = case check_if_new(Task, State) of
168 true ->
169 {ok, State#state{queue = append_task_to_q(Task, Q)}};
170 false ->
171 {{error, bad_id}, State}
172 end,
173 save_state(NewState),
174 process_q(),
175 {reply, Reply, NewState};
176
177%% удаление задачи из очереди либо прерывание её выполнения (если она уже запущена)
178handle_call({remove_task, TaskId}, _From, State = #state{queue = Q, running = R}) ->
179 ToKill = dict:filter(fun (_Pid, #task{id = Id}) -> Id == TaskId end, R),
180 lists:foreach(fun ({Pid, _}) ->
181 dvcs_watcher:stop(Pid)
182 end, dict:to_list(ToKill)),
183 NewRunning = dict:filter(fun (_Pid, #task{id = Id}) -> Id /= TaskId end, R),
184 NewQ = queue:filter(fun (#task{id = Id}) -> Id /= TaskId end, Q),
185 NewState = State#state{queue = NewQ, running = NewRunning},
186 save_state(NewState),
187 process_q(),
188 {reply, ok, NewState};
189
190handle_call(refresh, _From, State = #state{queue = _Q, running = R, failed = F, successful = S}) ->
191 lists:foreach(fun ({Pid, _Task}) ->
192 dvcs_watcher:stop(Pid)
193 end, dict:to_list(R)),
194 NewState = State#state{queue = queue:new(), running = dict:new()},
195 save_state(NewState),
196 dets:delete_all_objects(F),
197 dets:delete_all_objects(S),
198 {reply, ok, NewState};
199
200
201handle_call({task_finished, Pid, Task = #task{hist = [HistH | HistT]}}, _From,
202 State = #state{running = R, successful = S}) ->
203 NewTask = Task#task{hist = [HistH#task_hist{finished = os:timestamp(), success = true}|HistT]},
204 dets:insert_new(S, NewTask),
205 dvcs_watcher:stop(Pid),
206 NewR = dict:erase(Pid, R),
207 NewState = State#state{running = NewR},
208 save_state(NewState),
209 process_q(),
210 {reply, ok, NewState};
211
212handle_call({task_failed, Pid, Task = #task{hist = [HistH | HistT]}}, _From,
213 State = #state{queue = Q, running = R, failed = F}) ->
214 NewTask = Task#task{hist = [HistH#task_hist{finished = os:timestamp(), success = false}|HistT]},
215 NewR = dict:erase(Pid, R),
216 NewQ = case length(Task#task.hist) < ?ATTEMPTS_MAX of
217 true ->
218 append_task_to_q(NewTask, Q);
219 false ->
220 dets:insert_new(F, NewTask),
221 Q
222 end,
223 dvcs_watcher:stop(Pid),
224 NewState = State#state{queue = NewQ, running = NewR},
225 save_state(NewState),
226 process_q(),
227 {reply, ok, NewState};
228
229
230
231handle_call(get_queue, _From, State = #state{queue = Q}) ->
232 {reply, Q, State};
233
234handle_call(get_running, _From, State = #state{running = R}) ->
235 {reply, R, State};
236
237handle_call({get_successful, N}, _From, State = #state{successful = S}) ->
238 Reply = case dets:match(S, '$1', N) of
239 {error, Reason} -> {error, Reason};
240 {Match,_Contin} -> lists:flatten(Match);
241 '$end_of_table' -> []
242 end,
243 {reply, Reply, State};
244
245handle_call({get_failed, N}, _From, State = #state{failed = F}) ->
246 Reply = case dets:match(F, '$1', N) of
247 {error, Reason} -> {error, Reason};
248 {Match,_Contin} -> lists:flatten(Match);
249 '$end_of_table' -> []
250 end,
251 {reply, Reply, State};
252
253handle_call(stop, _From, State) ->
254 {stop, normal_stop, State};
255
256handle_call(_Request, _From, State) ->
257 Reply = ok,
258 {reply, Reply, State}.
259
260%%--------------------------------------------------------------------
261%% @private
262%% @doc
263%% Handling cast messages
264%%
265%% @spec handle_cast(Msg, State) -> {noreply, State} |
266%% {noreply, State, Timeout} |
267%% {stop, Reason, State}
268%% @end
269%%--------------------------------------------------------------------
270
271handle_cast(process_q, State = #state{queue = Q, running = R}) ->
272 {NewQ, NewR} = process_q(Q, R, max_running()),
273 NewState = State#state{queue = NewQ, running = NewR},
274 save_state(NewState),
275 {noreply, NewState};
276
277
278
279handle_cast(_Msg, State) ->
280 {noreply, State}.
281
282%%--------------------------------------------------------------------
283%% @private
284%% @doc
285%% Handling all non call/cast messages
286%%
287%% @spec handle_info(Info, State) -> {noreply, State} |
288%% {noreply, State, Timeout} |
289%% {stop, Reason, State}
290%% @end
291%%--------------------------------------------------------------------
292
293handle_info({'DOWN', _Ref, process, Pid, Reason},
294 State = #state{queue = Q, running = R}) when Reason /= normal ->
295
296 NewR = dict:erase(Pid, R),
297 NewQ = case dict:find(Pid, R) of
298 {ok, Task} ->
299 io:format("down with ~p; task ~p~n", [Reason, Task#task.id]),
300 append_task_to_q(Task, Q);
301 error ->
302 Q
303 end,
304 NewState = State#state{queue = NewQ, running = NewR},
305 save_state(NewState),
306 process_q(),
307 {noreply, NewState};
308
309handle_info(_Info, State) ->
310 {noreply, State}.
311
312%%--------------------------------------------------------------------
313%% @private
314%% @doc
315%% This function is called by a gen_server when it is about to
316%% terminate. It should be the opposite of Module:init/1 and do any
317%% necessary cleaning up. When it returns, the gen_server terminates
318%% with Reason. The return value is ignored.
319%%
320%% @spec terminate(Reason, State) -> void()
321%% @end
322%%--------------------------------------------------------------------
323
324
325terminate(_Reason, State = #state{queue = _Q, running = R,
326 successful = S, failed = F}) ->
327 lists:foreach(fun ({Pid, _Task}) ->
328 dvcs_watcher:stop(Pid)
329 end, dict:to_list(R)),
330 save_state(State),
331 dets:close(S),
332 dets:close(F),
333 ok.
334
335%%--------------------------------------------------------------------
336%% @private
337%% @doc
338%% Convert process state when code is changed
339%%
340%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState}
341%% @end
342%%--------------------------------------------------------------------
343code_change(_OldVsn, State, _Extra) ->
344 {ok, State}.
345
346%%%===================================================================
347%%% Internal functions
348%%%===================================================================
349
350%% Считывает из файла очередь задач и последний список выполняемых задач.
351read_state() ->
352 case file:consult(?QUEUE) of
353 {ok, [PropList | _]} ->
354 {proplists:get_value(queue, PropList),
355 proplists:get_value(running, PropList)};
356 _ ->
357 {error, cannot_read_file}
358 end.
359
360%% Сохраняет очередь и список текущих задач в файл "queue"
361save_state(_State = #state{queue = Q, running = R}) ->
362 {ok, File} = file:open(?QUEUE, [write]),
363 R2 = [Task || {_Pid, Task} <- dict:to_list(R)],
364 io:fwrite(File, "[{queue, ~p}, {running, ~p}].~n", [Q,R2]),
365 file:close(File).
366
367%% Проверяет, был ли использован Id в старых задачах, как в выполненных, так и в текущих.
368check_if_new(_Task = #task{id = Id}, _State = #state{queue = Q, running = R,
369 successful = S, failed = F}) ->
370 Qlist = queue:to_list(Q),
371 A = lists:foldl(fun (El, Acc) ->
372 ( El#task.id == Id ) or Acc
373 end, false, Qlist),
374 B = dets:member(S, Id),
375 C = dets:member(F, Id),
376 D = lists:member(Id, [ X || {_, #task{id = X}} <- dict:to_list(R)]),
377 not ( A or B or C or D ).
378
379
380append_task_to_q(Task = #task{hist = Hist}, Q) ->
381 NewTask = Task#task{hist = [#task_hist{added_to_q = os:timestamp()}|Hist]},
382 queue:in(NewTask, Q).
383
384%% асинхронное сообщение о том, что нужно снова обработать очередь.
385%% Для внутреннего использования
386process_q() ->
387 gen_server:cast(?SERVER, process_q).
388
389%%
390process_q(Q, R, MaxR) ->
391 case MaxR > dict:size(R) andalso not queue:is_empty(Q) of
392 false ->
393 {Q, R};
394 true ->
395 {{value,Task = #task{hist = [HistH|HistT]}}, Q2} = queue:out(Q),
396 NewTask = Task#task{hist = [ HistH#task_hist{taken = os:timestamp()} | HistT]},
397 {ok, Pid} = dvcs_watcher:start_link(NewTask),
398 erlang:monitor(process, Pid),
399 R2 = dict:store(Pid, NewTask, R),
400 process_q(Q2, R2, MaxR)
401 end.