· 6 years ago · Sep 17, 2019, 08:54 PM
1%%%-------------------------------------------------------------------
2%%% @doc
3%%%
4%%% @end
5%%%-------------------------------------------------------------------
6-module(dvcs_task_queue).
7-include("../../dvcs_common/src/dvcs_types.hrl").
8-behaviour(gen_server).
9
10%% API
11-export([start_link/0, stop/0]).
12-export([add_task/1, remove_task/1, refresh/0,
13 show_queue/0, show_successful/0, show_failed/0, show_running/0]).
14-export([max_running/0, set_max_running/1]).
15-export([task_finished/2, task_failed/2]).
16-export([test1/1]).
17%% gen_server callbacks
18-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
19 terminate/2, code_change/3]).
20
21-define(SERVER, ?MODULE).
22-define(SUCCESSFUL, "successful_tasks.dets").
23-define(FAILED, "failed_tasks.dets").
24-define(QUEUE, "queue").
25-define(MAX_RUNNING_DEFAULT, 10).
26-define(ATTEMPTS_MAX, 4).
27
28-record(state, {queue,
29 running, %% {Key, Value} - {Pid, Task}
30 successful,
31 failed}).
32%% To do:
33
34%% Нужно реализовать функцию добавления в успешно завершенные задачи.
35%% Дочерний процесс перед собственным завершением должен сообщить родителю
36%% о результате, положительном или отрицательном. На задачу дается 5 попыток.
37%% Нужно в task добавить task_hist, в которую будет заноситься:
38%% когда добавлено в очередь, когда она была возвращена, с каким результатом.
39%% Каждая попытка должна сохраняться. [{added, returned, success}].
40%%%===================================================================
41%%% API
42%%%===================================================================
43
44%%--------------------------------------------------------------------
45%% @doc
46%% Starts the server
47%%
48%% @spec start_link() -> {ok, Pid} | ignore | {error, Error}
49%% @end
50%%--------------------------------------------------------------------
51start_link() ->
52 gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
53
54stop() ->
55 gen_server:call(?SERVER, stop).
56
57add_task(Task) ->
58 gen_server:call(?SERVER, {add_task, Task}).
59
60remove_task(TaskId) ->
61 gen_server:call(?SERVER, {remove_task, TaskId}).
62
63refresh() ->
64 gen_server:call(?SERVER, refresh).
65
66show_queue() ->
67 gen_server:call(?SERVER, get_queue).
68
69show_successful() ->
70 gen_server:call(?SERVER, {get_successful, 10}).
71
72show_failed() ->
73 gen_server:call(?SERVER, {get_failed, 10}).
74
75show_running() ->
76 gen_server:call(?SERVER, get_running).
77
78%% настройки:
79max_running() ->
80 dvcs_settings:value(max_running).
81set_max_running(N) ->
82 dvcs_settings:store(max_running, N).
83
84%% Для dvcs_watcher:
85task_finished(Pid, Task) ->
86 gen_server:call(?SERVER, {task_finished, Pid, Task}).
87
88task_failed(Pid, Task) ->
89 gen_server:call(?SERVER, {task_failed, Pid, Task}).
90
91%% тесты
92test1(N) ->
93 Tasks = [#task{id = Id} || Id <- lists:seq(1,N) ],
94 refresh(),
95 lists:foreach(fun (X) ->
96 dvcs_task_queue:add_task(X),
97 dvcs_task_queue:remove_task(X#task.id),
98 dvcs_task_queue:remove_task(X#task.id)
99 end, Tasks).
100 %% timer:sleep(N*1000),
101 %% dvcs_task_queue:stop().
102
103%%%===================================================================
104%%% gen_server callbacks
105%%%===================================================================
106
107%%--------------------------------------------------------------------
108%% @private
109%% @doc
110%% Initializes the server
111%%
112%% @spec init(Args) -> {ok, State} |
113%% {ok, State, Timeout} |
114%% ignore |
115%% {stop, Reason}
116%% @end
117%%--------------------------------------------------------------------
118init([]) ->
119 {ok, S} = dets:open_file(successful_tasks,
120 [{access, read_write},
121 {keypos, 2},
122 {file, ?SUCCESSFUL},
123 {type, set}]),
124 {ok, F} = dets:open_file(failed_tasks,
125 [{access, read_write},
126 {keypos, 2},
127 {file, ?FAILED},
128 {type, set}]),
129 {Q, R} = case read_state() of
130 {error, cannot_read_file} ->
131 {queue:new(), dict:new()};
132 {A, B} ->
133 UndoneTasks = B,
134 NewQ = lists:foldl(fun (Task, Queue) ->
135 queue:in_r(Task, Queue)
136 end, A, UndoneTasks),
137 {NewQ, dict:new()}
138 end,
139 case max_running() of
140 undefined ->
141 set_max_running(?MAX_RUNNING_DEFAULT);
142 _ -> []
143 end,
144 process_q(),
145 {ok, #state{queue = Q, running = R, successful = S, failed = F}}.
146
147%%--------------------------------------------------------------------
148%% @private
149%% @doc
150%% Handling call messages
151%%
152%% @spec handle_call(Request, From, State) ->
153%% {reply, Reply, State} |
154%% {reply, Reply, State, Timeout} |
155%% {noreply, State} |
156%% {noreply, State, Timeout} |
157%% {stop, Reason, Reply, State} |
158%% {stop, Reason, State}
159%% @end
160%%--------------------------------------------------------------------
161
162%% добавление задачи в очередь пользователем
163handle_call({add_task, Task = #task{}}, _From, State = #state{queue = Q}) ->
164 {Reply, NewState} = case check_if_new(Task, State) of
165 true ->
166 {ok, State#state{queue = append_task_to_q(Task, Q)}};
167 false ->
168 {{error, bad_id}, State}
169 end,
170 save_state(NewState),
171 process_q(),
172 {reply, Reply, NewState};
173
174%% удаление задачи из очереди либо прерывание её выполнения (если она уже запущена)
175handle_call({remove_task, TaskId}, _From, State = #state{queue = Q, running = R}) ->
176 ToKill = dict:filter(fun (_Pid, #task{id = Id}) -> Id == TaskId end, R),
177 lists:foreach(fun ({Pid, _}) ->
178 dvcs_watcher:stop(Pid)
179 end, dict:to_list(ToKill)),
180 NewRunning = dict:filter(fun (_Pid, #task{id = Id}) -> Id /= TaskId end, R),
181 NewQ = queue:filter(fun (#task{id = Id}) -> Id /= TaskId end, Q),
182 NewState = State#state{queue = NewQ, running = NewRunning},
183 save_state(NewState),
184 process_q(),
185 {reply, ok, NewState};
186
187handle_call(refresh, _From, State = #state{queue = _Q, running = R, failed = F, successful = S}) ->
188 lists:foreach(fun ({Pid, _Task}) ->
189 dvcs_watcher:stop(Pid)
190 end, dict:to_list(R)),
191 NewState = State#state{queue = queue:new(), running = dict:new()},
192 save_state(NewState),
193 dets:delete_all_objects(F),
194 dets:delete_all_objects(S),
195 {reply, ok, NewState};
196
197
198handle_call({task_finished, Pid, Task = #task{hist = [HistH | HistT]}}, _From,
199 State = #state{running = R, successful = S}) ->
200 NewTask = Task#task{hist = [HistH#task_hist{finished = os:timestamp(), success = true}|HistT]},
201 dets:insert_new(S, NewTask),
202 dvcs_watcher:stop(Pid),
203 NewR = dict:erase(Pid, R),
204 NewState = State#state{running = NewR},
205 save_state(NewState),
206 process_q(),
207 {reply, ok, NewState};
208
209handle_call({task_failed, Pid, Task = #task{hist = [HistH | HistT]}}, _From,
210 State = #state{queue = Q, running = R, failed = F}) ->
211 NewTask = Task#task{hist = [HistH#task_hist{finished = os:timestamp(), success = false}|HistT]},
212 NewR = dict:erase(Pid, R),
213 NewQ = case length(Task#task.hist) < ?ATTEMPTS_MAX of
214 true ->
215 append_task_to_q(NewTask, Q);
216 false ->
217 dets:insert_new(F, NewTask),
218 Q
219 end,
220 dvcs_watcher:stop(Pid),
221 NewState = State#state{queue = NewQ, running = NewR},
222 save_state(NewState),
223 process_q(),
224 {reply, ok, NewState};
225
226
227
228handle_call(get_queue, _From, State = #state{queue = Q}) ->
229 {reply, Q, State};
230
231handle_call(get_running, _From, State = #state{running = R}) ->
232 {reply, R, State};
233
234handle_call({get_successful, N}, _From, State = #state{successful = S}) ->
235 Reply = case dets:match(S, '$1', N) of
236 {error, Reason} -> {error, Reason};
237 {Match,_Contin} -> lists:flatten(Match);
238 '$end_of_table' -> []
239 end,
240 {reply, Reply, State};
241
242handle_call({get_failed, N}, _From, State = #state{failed = F}) ->
243 Reply = case dets:match(F, '$1', N) of
244 {error, Reason} -> {error, Reason};
245 {Match,_Contin} -> lists:flatten(Match);
246 '$end_of_table' -> []
247 end,
248 {reply, Reply, State};
249
250handle_call(stop, _From, State) ->
251 {stop, normal_stop, State};
252
253handle_call(_Request, _From, State) ->
254 Reply = ok,
255 {reply, Reply, State}.
256
257%%--------------------------------------------------------------------
258%% @private
259%% @doc
260%% Handling cast messages
261%%
262%% @spec handle_cast(Msg, State) -> {noreply, State} |
263%% {noreply, State, Timeout} |
264%% {stop, Reason, State}
265%% @end
266%%--------------------------------------------------------------------
267
268handle_cast(process_q, State = #state{queue = Q, running = R}) ->
269 {NewQ, NewR} = process_q(Q, R, max_running()),
270 NewState = State#state{queue = NewQ, running = NewR},
271 save_state(NewState),
272 {noreply, NewState};
273
274
275
276handle_cast(_Msg, State) ->
277 {noreply, State}.
278
279%%--------------------------------------------------------------------
280%% @private
281%% @doc
282%% Handling all non call/cast messages
283%%
284%% @spec handle_info(Info, State) -> {noreply, State} |
285%% {noreply, State, Timeout} |
286%% {stop, Reason, State}
287%% @end
288%%--------------------------------------------------------------------
289
290handle_info({'DOWN', _Ref, process, Pid, Reason},
291 State = #state{queue = Q, running = R}) when Reason /= normal ->
292
293 NewR = dict:erase(Pid, R),
294 NewQ = case dict:find(Pid, R) of
295 {ok, Task} ->
296 io:format("down with ~p; task ~p~n", [Reason, Task#task.id]),
297 append_task_to_q(Task, Q);
298 error ->
299 Q
300 end,
301 NewState = State#state{queue = NewQ, running = NewR},
302 save_state(NewState),
303 process_q(),
304 {noreply, NewState};
305
306handle_info(_Info, State) ->
307 {noreply, State}.
308
309%%--------------------------------------------------------------------
310%% @private
311%% @doc
312%% This function is called by a gen_server when it is about to
313%% terminate. It should be the opposite of Module:init/1 and do any
314%% necessary cleaning up. When it returns, the gen_server terminates
315%% with Reason. The return value is ignored.
316%%
317%% @spec terminate(Reason, State) -> void()
318%% @end
319%%--------------------------------------------------------------------
320
321
322terminate(_Reason, State = #state{queue = _Q, running = R,
323 successful = S, failed = F}) ->
324 lists:foreach(fun ({Pid, _Task}) ->
325 dvcs_watcher:stop(Pid)
326 end, dict:to_list(R)),
327 save_state(State),
328 dets:close(S),
329 dets:close(F),
330 ok.
331
332%%--------------------------------------------------------------------
333%% @private
334%% @doc
335%% Convert process state when code is changed
336%%
337%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState}
338%% @end
339%%--------------------------------------------------------------------
340code_change(_OldVsn, State, _Extra) ->
341 {ok, State}.
342
343%%%===================================================================
344%%% Internal functions
345%%%===================================================================
346
347%% Считывает из файла очередь задач и последний список выполняемых задач.
348read_state() ->
349 case file:consult(?QUEUE) of
350 {ok, [PropList | _]} ->
351 {proplists:get_value(queue, PropList),
352 proplists:get_value(running, PropList)};
353 _ ->
354 {error, cannot_read_file}
355 end.
356
357%% Сохраняет очередь и список текущих задач в файл "queue"
358save_state(_State = #state{queue = Q, running = R}) ->
359 {ok, File} = file:open(?QUEUE, [write]),
360 R2 = [Task || {_Pid, Task} <- dict:to_list(R)],
361 io:fwrite(File, "[{queue, ~p}, {running, ~p}].~n", [Q,R2]),
362 file:close(File).
363
364%% Проверяет, был ли использован Id в старых задачах, как в выполненных, так и в текущих.
365check_if_new(_Task = #task{id = Id}, _State = #state{queue = Q, running = R,
366 successful = S, failed = F}) ->
367 Qlist = queue:to_list(Q),
368 A = lists:foldl(fun (El, Acc) ->
369 ( El#task.id == Id ) or Acc
370 end, false, Qlist),
371 B = dets:member(S, Id),
372 C = dets:member(F, Id),
373 D = lists:member(Id, [ X || {_, #task{id = X}} <- dict:to_list(R)]),
374 not ( A or B or C or D ).
375
376
377append_task_to_q(Task = #task{hist = Hist}, Q) ->
378 NewTask = Task#task{hist = [#task_hist{added_to_q = os:timestamp()}|Hist]},
379 queue:in(NewTask, Q).
380
381%% асинхронное сообщение о том, что нужно снова обработать очередь.
382%% Для внутреннего использования
383process_q() ->
384 gen_server:cast(?SERVER, process_q).
385
386%%
387process_q(Q, R, MaxR) ->
388 case MaxR > dict:size(R) andalso not queue:is_empty(Q) of
389 false ->
390 {Q, R};
391 true ->
392 {{value,Task = #task{hist = [HistH|HistT]}}, Q2} = queue:out(Q),
393 NewTask = Task#task{hist = [ HistH#task_hist{taken = os:timestamp()} | HistT]},
394 {ok, Pid} = dvcs_watcher:start_link(NewTask),
395 erlang:monitor(process, Pid),
396 R2 = dict:store(Pid, NewTask, R),
397 process_q(Q2, R2, MaxR)
398 end.