· 5 years ago · Aug 02, 2020, 12:58 PM
1// Copyright (c) 2020 DDN. All rights reserved.
2// Use of this source code is governed by a MIT-style
3// license that can be found in the LICENSE file.
4
5use crate::diff::calculate_diff;
6use crate::error::ImlManagerCliError;
7use crate::gen_tree::{apply_diff, Item, Node, State, Tree};
8use futures::{future, FutureExt, TryFutureExt};
9use iml_api_utils::dependency_tree::{build_direct_dag, DependencyDAG, Deps, Rich};
10use iml_wire_types::{ApiList, AvailableAction, Command, EndpointName, FlatQuery, Host, Job, Step};
11use indicatif::{MultiProgress, ProgressBar, ProgressStyle, ProgressDrawTarget};
12use lazy_static::lazy_static;
13use regex::{Captures, Regex};
14use serde::export::Formatter;
15use std::collections::HashSet;
16use std::fmt::Display;
17use std::sync::atomic::AtomicBool;
18use std::sync::{Arc, Mutex};
19use std::{collections::HashMap, fmt::Debug, iter, time::Duration};
20use std::{fmt, fs};
21use tokio::task::JoinError;
22use tokio::{task::spawn_blocking, time::delay_for};
23use crate::var::kit::{Rng, get_action, Action};
24use std::cell::Cell;
25use itertools::Itertools;
26use console::style;
27
28const ARROW: &'_ str = " ═➤ "; // variants: = ═ - ▬ > ▷ ▶ ► ➤
29const SPACE: &'_ str = " ";
30const FETCH_DELAY_MS: u64 = 1000;
31const SHOW_DELAY_MS: u64 = 150;
32
33type Job0 = Job<Option<serde_json::Value>>;
34type RichCommand = Rich<i32, Arc<Command>>;
35type RichJob = Rich<i32, Arc<Job0>>;
36type RichStep = Rich<i32, Arc<Step>>;
37
38#[derive(Copy, Clone, Hash, PartialEq, Eq, Ord, PartialOrd, Debug)]
39pub struct CmdId(i32);
40
41#[derive(Copy, Clone, Hash, PartialEq, Eq, Ord, PartialOrd, Debug)]
42pub struct JobId(i32);
43
44// region declaration of types TypeId, State, Item<K>
45#[derive(Copy, Clone, Hash, PartialEq, Eq, Debug)]
46pub enum TypedId {
47 Cmd(i32),
48 Job(i32),
49 Step(i32),
50}
51
52impl Default for TypedId {
53 fn default() -> Self {
54 TypedId::Cmd(0)
55 }
56}
57
58impl Display for TypedId {
59 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
60 match self {
61 TypedId::Cmd(i) => write!(f, "c{}", i),
62 TypedId::Job(i) => write!(f, "j{}", i),
63 TypedId::Step(i) => write!(f, "s{}", i),
64 }
65 }
66}
67
68#[derive(Clone, Eq, PartialEq, Debug)]
69pub struct Stuff {
70 pub msg: String,
71 pub console: Vec<String>,
72}
73
74impl Display for Stuff {
75 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
76 write!(f, "{}", self.msg)
77 }
78}
79
80/// It is pretty expensive to set the style on the progress bar on each iteration,
81/// so we need to keep track what the style and whether it has been set for the progress bar.
82/// See [`set_progress_bar_message`] function.
83#[derive(Clone, Debug)]
84pub struct ProgressBarIndicator {
85 pub progress_bar: ProgressBar,
86 pub active_style: Cell<Option<bool>>,
87}
88// endregion
89
90#[derive(Clone, Debug)]
91pub struct TreeState {
92 pub has_read: bool,
93 pub index: usize,
94 pub commands: Vec<Command>,
95 pub jobs: Vec<Job0>,
96 pub steps: Vec<Step>,
97}
98
99lazy_static! {
100 static ref RNG: Mutex<Rng> = Mutex::new(Rng::new());
101 static ref TREE_STATE: Mutex<TreeState> = {
102 let str = fs::read_to_string("ops/commands/cmd-37_38.json").unwrap();
103 let mut command_list = serde_json::from_str::<ApiList<Command>>(&str).unwrap();
104 for cmd in &mut command_list.objects {
105 set_cmd_state(cmd, State::Progressing);
106 }
107 let str = fs::read_to_string("ops/jobs/jobs-227-242.json").unwrap();
108 let mut job_list = serde_json::from_str::<ApiList<Job0>>(&str).unwrap();
109 for job in &mut job_list.objects {
110 set_job_state(job, State::Progressing);
111 }
112 let str = fs::read_to_string("ops/steps/steps-400-x.json").unwrap();
113 let mut step_list = serde_json::from_str::<ApiList<Step>>(&str).unwrap();
114 for step in &mut step_list.objects {
115 set_step_state(step, State::Progressing);
116 }
117 Mutex::new(TreeState {
118 has_read: false,
119 index: 0,
120 commands: command_list.objects,
121 jobs: job_list.objects,
122 steps: step_list.objects,
123 })
124 };
125}
126
127#[derive(serde::Serialize)]
128pub struct SendJob<T> {
129 pub class_name: String,
130 pub args: T,
131}
132
133#[derive(serde::Serialize)]
134pub struct SendCmd<T> {
135 pub jobs: Vec<SendJob<T>>,
136 pub message: String,
137}
138
139pub async fn create_command<T: serde::Serialize>(
140 cmd_body: SendCmd<T>,
141) -> Result<Command, ImlManagerCliError> {
142 let resp = post(Command::endpoint_name(), cmd_body)
143 .await?
144 .error_for_status()?;
145
146 let cmd = resp.json().await?;
147
148 tracing::debug!("Resp JSON is {:?}", cmd);
149
150 Ok(cmd)
151}
152
153fn cmd_state(cmd: &Command) -> State {
154 if cmd.cancelled {
155 State::Cancelled
156 } else if cmd.errored {
157 State::Errored
158 } else if cmd.complete {
159 State::Completed
160 } else {
161 State::Progressing
162 }
163}
164
165fn set_cmd_state(cmd: &mut Command, state: State) {
166 cmd.complete = false;
167 cmd.errored = false;
168 cmd.cancelled = false;
169 match state {
170 State::Cancelled => cmd.cancelled = true,
171 State::Errored => cmd.errored = true,
172 State::Completed => cmd.complete = true,
173 State::Progressing => {},
174 }
175}
176
177fn job_state(job: &Job0) -> State {
178 // job.state can be "pending", "tasked" or "complete"
179 // if a job is errored or cancelled, it is also complete
180 if job.cancelled {
181 State::Cancelled
182 } else if job.errored {
183 State::Errored
184 } else if job.state == "complete" {
185 State::Completed
186 } else {
187 State::Progressing
188 }
189}
190
191fn set_job_state(job: &mut Job0, state: State) {
192 job.cancelled = false;
193 job.errored = false;
194 job.state = "incomplete".to_string();
195 match state {
196 State::Cancelled => job.cancelled = true,
197 State::Errored => job.errored = true,
198 State::Completed => job.state = "complete".to_string(),
199 State::Progressing => {},
200 }
201}
202
203fn step_state(step: &Step) -> State {
204 // step.state can be "success", "failed" or "incomplete"
205 match &step.state[..] {
206 "cancelled" => State::Cancelled,
207 "failed" => State::Errored,
208 "success" => State::Completed,
209 _ /* "incomplete" */ => State::Progressing,
210 }
211}
212
213fn set_step_state(step: &mut Step, state: State) {
214 match state {
215 State::Cancelled => step.state = "cancelled".to_string(),
216 State::Errored => step.state = "failed".to_string(),
217 State::Completed => step.state = "success".to_string(),
218 State::Progressing => step.state = "incomplete".to_string(),
219 }
220}
221
222fn cmd_finished(cmd: &Command) -> bool {
223 cmd_state(cmd) == State::Completed
224}
225
226fn job_finished(job: &Job0) -> bool {
227 job_state(job) == State::Completed
228}
229
230fn step_finished(step: &Step) -> bool {
231 step_state(step) != State::Progressing
232}
233
234pub async fn wait_for_command(cmd: Command) -> Result<Command, ImlManagerCliError> {
235 loop {
236 if cmd_finished(&cmd) {
237 return Ok(cmd);
238 }
239
240 delay_for(Duration::from_millis(1000)).await;
241
242 let client = iml_manager_client::get_client()?;
243
244 let cmd = iml_manager_client::get(
245 client,
246 &format!("command/{}", cmd.id),
247 Vec::<(String, String)>::new(),
248 )
249 .await?;
250
251 if cmd_finished(&cmd) {
252 return Ok(cmd);
253 }
254 }
255}
256
257pub async fn fetch_api_list<T>(ids: Vec<i32>) -> Result<ApiList<T>, ImlManagerCliError>
258where
259 T: EndpointName + serde::de::DeserializeOwned + std::fmt::Debug,
260{
261 let query: Vec<_> = ids
262 .into_iter()
263 .map(|x| ["id__in".into(), x.to_string()])
264 .chain(iter::once(["limit".into(), "0".into()]))
265 .collect();
266 get(T::endpoint_name(), query).await
267}
268
269/// Waits for command completion and prints progress messages
270/// This *does not* error on command failure, it only tracks command
271/// completion
272pub async fn wait_for_commands(commands: &[Command]) -> Result<Vec<Command>, ImlManagerCliError> {
273 let multi_progress = Arc::new(MultiProgress::new());
274 multi_progress.set_draw_target(ProgressDrawTarget::stdout());
275 let sty_main = ProgressStyle::default_bar().template("{bar:60.green/yellow} {pos:>4}/{len:4}");
276 let pb_main = multi_progress.add(ProgressBar::new(commands.len() as u64));
277 pb_main.set_style(sty_main);
278 pb_main.tick();
279
280 // `current_items` will have only commands at first
281 // and then will be extended after `fetch_and_update` succeeds
282 let current_items = Arc::new(tokio::sync::Mutex::new(
283 build_initial_items(commands, |i, y| {
284 // similar to what is passed to apply_diff
285 let pb = ProgressBarIndicator {
286 progress_bar: multi_progress.insert(i, ProgressBar::new(1_000_000)),
287 active_style: Cell::new(None),
288 };
289 set_progress_bar_message(&pb, y);
290 pb
291 })
292 ));
293 let (cmds, cmd_ids) = build_initial_commands(commands);
294 let is_done = Arc::new(AtomicBool::new(false));
295
296 // multi-progress waiting loop
297 // fut1: ErrInto<Map<JoinHandle<Result<()>>, fn(Result<Result<(), Error>, JoinError>)
298 let fut1 = {
299 let multi_progress = Arc::clone(&multi_progress);
300 spawn_blocking(move || multi_progress.join()).map(
301 |r: Result<Result<(), std::io::Error>, JoinError>| {
302 r.map_err(|e: JoinError| e.into())
303 .and_then(std::convert::identity)
304 },
305 ).err_into()
306 };
307
308 // updating loop
309 // fut2: impl Future<Output=Result<Vec<Command>, JoinError>>
310 let fut2 = {
311 let is_done = Arc::clone(&is_done);
312 let multi_progress = Arc::clone(&multi_progress);
313 let current_items = Arc::clone(¤t_items);
314 async move {
315 let mut cmds: HashMap<i32, RichCommand> = cmds;
316 let mut jobs: HashMap<i32, RichJob> = HashMap::new();
317 let mut steps: HashMap<i32, RichStep> = HashMap::new();
318
319 loop {
320 if cmds.values().all(|cmd| cmd_state(cmd) != State::Progressing) {
321 tracing::debug!("All commands complete. Returning");
322 is_done.store(true, std::sync::atomic::Ordering::SeqCst);
323
324 // Now return the commands from the future.
325 // Unfortunately, there is no easy unsafe way to move out from Arc,
326 // so the `clone` maybe needed.
327 let mut commands: Vec<Command> = Vec::with_capacity(cmds.len());
328 for id in cmd_ids {
329 if let Some(rich_cmd) = cmds.remove(&id) {
330 if let Ok(cmd) = Arc::try_unwrap(rich_cmd.inner) {
331 commands.push(cmd);
332 } else {
333 commands.push((*cmd.inner).clone());
334 }
335 }
336 }
337 return Ok::<_, ImlManagerCliError>(commands);
338 }
339 fetch_and_update(&cmd_ids, &mut cmds, &mut jobs, &mut steps).await?;
340 let (mut fresh_items, p, l) = build_fresh_items(&cmd_ids, &cmds, &jobs, &steps);
341 pb_main.set_length(l);
342 pb_main.set_position(p);
343 let diff = calculate_diff(¤t_items.lock().await, &fresh_items);
344 apply_diff(
345 &mut (*current_items.lock().await),
346 &mut fresh_items,
347 &diff,
348 |i, y| {
349 let pb = ProgressBarIndicator {
350 progress_bar: multi_progress.insert(i, ProgressBar::new(1_000_000)),
351 active_style: Cell::new(None),
352 };
353 set_progress_bar_message(&pb, y);
354 pb
355 },
356 |_, pb, y| set_progress_bar_message(pb, y),
357 |_, pb| multi_progress.remove(&pb.progress_bar),
358 );
359
360 for it in current_items.lock().await.iter() {
361 if let Some(ic) = &it.indicator {
362 if it.state == State::Progressing {
363 ic.progress_bar.inc(1);
364 }
365 }
366 }
367 delay_for(Duration::from_millis(FETCH_DELAY_MS)).await;
368 }
369 }
370 };
371
372 // showing loop
373 // fut3: impl Future<Output=Result<(), Error>>
374 let fut3 = {
375 let is_done = Arc::clone(&is_done);
376 let current_items = Arc::clone(¤t_items);
377 async move {
378 while !is_done.load(std::sync::atomic::Ordering::SeqCst) {
379 for it in current_items.lock().await.iter() {
380 if it.state == State::Progressing {
381 if let Some(ic) = &it.indicator {
382 ic.progress_bar.inc(1);
383 }
384 }
385 }
386 delay_for(Duration::from_millis(SHOW_DELAY_MS)).await;
387 }
388 Ok(())
389 }
390 };
391
392 let (_, cmds, _) = future::try_join3(fut1, fut2, fut3).await?;
393 Ok(cmds)
394}
395
396/// wrap each command and build `cmd_ids` to maintain the order of the commands
397fn build_initial_commands(commands: &[Command]) -> (HashMap<i32, RichCommand>, Vec<i32>) {
398 let mut cmds = HashMap::new();
399 let mut cmd_ids = Vec::new();
400 for command in commands.iter() {
401 let (id, deps) = extract_children_from_cmd(command);
402 let inner = Arc::new(command.clone());
403 cmds.insert(id, Rich { id, deps, inner });
404 cmd_ids.push(id);
405 }
406 (cmds, cmd_ids)
407}
408
409fn build_initial_items(
410 commands: &[Command],
411 create_bar: impl Fn(usize, &Item<TypedId, Stuff, ProgressBarIndicator>) -> ProgressBarIndicator,
412) -> Vec<Item<TypedId, Stuff, ProgressBarIndicator>> {
413 let mut current_items = Vec::new();
414 for (i, cmd) in commands.iter().enumerate() {
415 let mut it = Item {
416 id: TypedId::Cmd(cmd.id),
417 indent: "".to_owned(),
418 state: cmd_state(cmd),
419 outer: Stuff {
420 msg: cmd.message.clone(),
421 console: vec![],
422 },
423 indicator: None,
424 };
425 it.indicator = Some(create_bar(i, &it));
426 current_items.push(it);
427 }
428 current_items
429}
430
431async fn fetch_and_update(
432 cmd_ids: &[i32],
433 commands: &mut HashMap<i32, RichCommand>,
434 jobs: &mut HashMap<i32, RichJob>,
435 steps: &mut HashMap<i32, RichStep>,
436) -> Result<(), ImlManagerCliError> {
437 if !TREE_STATE.lock().unwrap().has_read {
438 TREE_STATE.lock().unwrap().has_read = true;
439 update_commands(commands, TREE_STATE.lock().unwrap().commands.clone());
440 update_jobs(jobs, TREE_STATE.lock().unwrap().jobs.clone());
441 update_steps(steps, TREE_STATE.lock().unwrap().steps.clone());
442 } else {
443 let trees = build_trees(cmd_ids, commands, jobs, steps);
444 for tree in trees {
445 let action: Option<Action> = get_action(&mut RNG.lock().unwrap(), &tree);
446 match action {
447 None => {},
448 Some(Action::KeepProgress) => {},
449 Some(Action::AddNode(typed_id, name)) => match typed_id {
450 TypedId::Job(j) => {
451 let new_job_id = jobs.keys().copied().max().unwrap_or(300) + 1;
452 let mut parent_job = (*jobs[&j].inner).clone();
453 parent_job.wait_for.push(format!("/api/job/{}/", new_job_id));
454 let new_job = Job0 {
455 id: new_job_id,
456 resource_uri: format!("/api/job/{}/", new_job_id),
457 cancelled: false,
458 errored: false,
459 state: "incomplete".to_string(),
460 class_name: format!("{}-Class", name.to_uppercase()),
461 commands: parent_job.commands.clone(),
462 description: name.to_string(),
463 created_at: parent_job.created_at.clone(),
464 modified_at: parent_job.modified_at.clone(),
465 step_results: Default::default(),
466 wait_for: vec![],
467 steps: vec![],
468 available_transitions: vec![],
469 read_locks: vec![],
470 write_locks: vec![]
471 };
472 update_jobs(jobs, vec![parent_job, new_job]);
473 },
474 _ => {},
475 },
476 Some(Action::CompleteNode(typed_id)) => match typed_id {
477 TypedId::Cmd(x) => {
478 let mut cmd = (*commands[&x].inner).clone();
479 set_cmd_state(&mut cmd, State::Completed);
480 update_commands(commands, vec![cmd]);
481 },
482 TypedId::Job(x) => {
483 let mut job = (*jobs[&x].inner).clone();
484 set_job_state(&mut job, State::Completed);
485 update_jobs(jobs, vec![job]);
486 },
487 TypedId::Step(x) => {
488 let mut step = (*steps[&x].inner).clone();
489 set_step_state(&mut step, State::Completed);
490 update_steps(steps, vec![step]);
491 },
492 },
493 Some(Action::FailNode(typed_id)) => match typed_id {
494 TypedId::Cmd(x) => {
495 let mut cmd = (*commands[&x].inner).clone();
496 set_cmd_state(&mut cmd, State::Errored);
497 update_commands(commands, vec![cmd]);
498 },
499 TypedId::Job(x) => {
500 let mut job = (*jobs[&x].inner).clone();
501 set_job_state(&mut job, State::Errored);
502 update_jobs(jobs, vec![job]);
503 },
504 TypedId::Step(x) => {
505 let mut step = (*steps[&x].inner).clone();
506 set_step_state(&mut step, State::Errored);
507 step.console = CONSOLE.to_owned();
508 step.backtrace = BACKTRACE.to_owned();
509 update_steps(steps, vec![step]);
510 },
511 },
512 }
513 }
514 }
515 // let (load_cmd_ids, load_job_ids, load_step_ids) = extract_ids_to_load(&commands, &jobs, &steps);
516 // let loaded_cmds: ApiList<Command> = fetch_api_list(load_cmd_ids).await?;
517 // let loaded_jobs: ApiList<Job0> = fetch_api_list(load_job_ids).await?;
518 // let loaded_steps: ApiList<Step> = fetch_api_list(load_step_ids).await?;
519 // update_commands(commands, loaded_cmds.objects);
520 // update_jobs(jobs, loaded_jobs.objects);
521 // update_steps(steps, loaded_steps.objects);
522 Ok(())
523}
524
525fn update_commands(commands: &mut HashMap<i32, RichCommand>, loaded_cmds: Vec<Command>) {
526 let new_commands = loaded_cmds
527 .into_iter()
528 .map(|t| {
529 let (id, deps) = extract_children_from_cmd(&t);
530 let inner = Arc::new(t);
531 (id, Rich { id, deps, inner })
532 })
533 .collect::<HashMap<i32, RichCommand>>();
534 commands.extend(new_commands);
535}
536
537fn update_jobs(jobs: &mut HashMap<i32, RichJob>, loaded_jobs: Vec<Job0>) {
538 let new_jobs = loaded_jobs
539 .into_iter()
540 .map(|t| {
541 let (id, deps) = extract_children_from_job(&t);
542 let inner = Arc::new(t);
543 (id, Rich { id, deps, inner })
544 })
545 .collect::<HashMap<i32, RichJob>>();
546 jobs.extend(new_jobs);
547}
548
549fn update_steps(steps: &mut HashMap<i32, RichStep>, loaded_steps: Vec<Step>) {
550 let new_steps = loaded_steps
551 .into_iter()
552 .map(|t| {
553 let (id, deps) = extract_children_from_step(&t);
554 let inner = Arc::new(t);
555 (id, Rich { id, deps, inner })
556 })
557 .collect::<HashMap<i32, RichStep>>();
558 steps.extend(new_steps);
559}
560
561fn extract_ids_to_load(
562 commands: &HashMap<i32, RichCommand>,
563 jobs: &HashMap<i32, RichJob>,
564 steps: &HashMap<i32, RichStep>,
565) -> (Vec<i32>, Vec<i32>, Vec<i32>) {
566 let load_cmd_ids = extract_sorted_keys(&commands)
567 .into_iter()
568 .filter(|c| {
569 commands
570 .get(c)
571 .map(|cmd| !cmd_finished(cmd))
572 .unwrap_or(true)
573 })
574 .collect::<Vec<i32>>();
575 let load_job_ids = load_cmd_ids
576 .iter()
577 .filter(|c| commands.contains_key(c))
578 .flat_map(|c| commands[c].deps())
579 .filter(|j| jobs.get(j).map(|job| !job_finished(job)).unwrap_or(true))
580 .copied()
581 .collect::<Vec<i32>>();
582 let load_step_ids = load_job_ids
583 .iter()
584 .filter(|j| jobs.contains_key(j))
585 .flat_map(|j| jobs[j].deps())
586 .filter(|s| {
587 steps
588 .get(s)
589 .map(|step| !step_finished(step))
590 .unwrap_or(true)
591 })
592 .copied()
593 .collect::<Vec<i32>>();
594 (load_cmd_ids, load_job_ids, load_step_ids)
595}
596
597fn extract_sorted_keys<T>(hm: &HashMap<i32, T>) -> Vec<i32> {
598 let mut ids = hm.keys().copied().collect::<Vec<_>>();
599 ids.sort();
600 ids
601}
602
603pub fn print_error(tree: &Tree<TypedId, Stuff>, id: TypedId, print: impl Fn(&str)) {
604 let path = tree.get_path_from_root(id);
605 let caption = path
606 .iter()
607 .filter_map(|id| tree.get_node(*id))
608 .map(|n| n.inner.msg.clone())
609 .join(ARROW);
610 print(&caption);
611 if let Some(lid) = path.last() {
612 if let Some(node) = tree.get_node(*lid) {
613 for line in node.inner.console.iter() {
614 print(&format!("{}{}", SPACE, style(line).red()));
615 }
616 }
617 }
618}
619
620/// Waits for command completion and prints progress messages.
621/// This will error on command failure and print failed commands in the error message.
622pub async fn wait_for_cmds_success(cmds: &[Command]) -> Result<Vec<Command>, ImlManagerCliError> {
623 let cmds = wait_for_commands(cmds).await?;
624
625 let (failed, passed): (Vec<_>, Vec<_>) =
626 cmds.into_iter().partition(|x| x.errored || x.cancelled);
627
628 if !failed.is_empty() {
629 Err(failed.into())
630 } else {
631 Ok(passed)
632 }
633}
634
635pub async fn get_available_actions(
636 id: u32,
637 content_type_id: u32,
638) -> Result<ApiList<AvailableAction>, ImlManagerCliError> {
639 get(
640 AvailableAction::endpoint_name(),
641 vec![
642 (
643 "composite_ids",
644 format!("{}:{}", content_type_id, id).as_ref(),
645 ),
646 ("limit", "0"),
647 ],
648 )
649 .await
650}
651
652/// Given an `ApiList`, this fn returns the first item or errors.
653pub fn first<T: EndpointName>(x: ApiList<T>) -> Result<T, ImlManagerCliError> {
654 x.objects
655 .into_iter()
656 .next()
657 .ok_or_else(|| ImlManagerCliError::DoesNotExist(T::endpoint_name()))
658}
659
660/// Wrapper for a `GET` to the Api.
661pub async fn get<T: serde::de::DeserializeOwned + std::fmt::Debug>(
662 endpoint: &str,
663 query: impl serde::Serialize,
664) -> Result<T, ImlManagerCliError> {
665 let client = iml_manager_client::get_client()?;
666
667 iml_manager_client::get(client, endpoint, query)
668 .await
669 .map_err(|e| e.into())
670}
671
672/// Wrapper for a `POST` to the Api.
673pub async fn post(
674 endpoint: &str,
675 body: impl serde::Serialize,
676) -> Result<iml_manager_client::Response, ImlManagerCliError> {
677 let client = iml_manager_client::get_client()?;
678
679 iml_manager_client::post(client, endpoint, body)
680 .await
681 .map_err(|e| e.into())
682}
683
684/// Wrapper for a `PUT` to the Api.
685pub async fn put(
686 endpoint: &str,
687 body: impl serde::Serialize,
688) -> Result<iml_manager_client::Response, ImlManagerCliError> {
689 let client = iml_manager_client::get_client()?;
690 iml_manager_client::put(client, endpoint, body)
691 .await
692 .map_err(|e| e.into())
693}
694
695/// Wrapper for a `DELETE` to the Api.
696pub async fn delete(
697 endpoint: &str,
698 query: impl serde::Serialize,
699) -> Result<iml_manager_client::Response, ImlManagerCliError> {
700 let client = iml_manager_client::get_client().expect("Could not create API client");
701 iml_manager_client::delete(client, endpoint, query)
702 .await
703 .map_err(|e| e.into())
704}
705
706pub async fn get_hosts() -> Result<ApiList<Host>, ImlManagerCliError> {
707 get(Host::endpoint_name(), Host::query()).await
708}
709
710pub async fn get_all<T: EndpointName + FlatQuery + Debug + serde::de::DeserializeOwned>(
711) -> Result<ApiList<T>, ImlManagerCliError> {
712 get(T::endpoint_name(), T::query()).await
713}
714
715pub async fn get_one<T: EndpointName + FlatQuery + Debug + serde::de::DeserializeOwned>(
716 query: Vec<(&str, &str)>,
717) -> Result<T, ImlManagerCliError> {
718 let mut q = T::query();
719 q.extend(query);
720 first(get(T::endpoint_name(), q).await?)
721}
722
723pub async fn get_influx<T: serde::de::DeserializeOwned + std::fmt::Debug>(
724 db: &str,
725 influxql: &str,
726) -> Result<T, ImlManagerCliError> {
727 let client = iml_manager_client::get_client()?;
728
729 iml_manager_client::get_influx(client, db, influxql)
730 .await
731 .map_err(|e| e.into())
732}
733
734fn extract_children_from_cmd(cmd: &Command) -> (i32, Vec<i32>) {
735 let mut deps = cmd
736 .jobs
737 .iter()
738 .filter_map(|s| extract_uri_id::<Job0>(s))
739 .collect::<Vec<i32>>();
740 deps.sort();
741 (cmd.id, deps)
742}
743
744fn extract_children_from_job(job: &Job0) -> (i32, Vec<i32>) {
745 let mut deps = job
746 .steps
747 .iter()
748 .filter_map(|s| extract_uri_id::<Step>(s))
749 .collect::<Vec<i32>>();
750 deps.sort();
751 (job.id, deps)
752}
753
754fn extract_children_from_step(step: &Step) -> (i32, Vec<i32>) {
755 (step.id, Vec::new()) // steps have no descendants
756}
757
758fn extract_wait_fors_from_job(job: &Job0, jobs: &HashMap<i32, RichJob>) -> (i32, Vec<i32>) {
759 // Extract the interdependencies between jobs.
760 // See [command_modal::tests::test_jobs_ordering]
761 let mut deps = job
762 .wait_for
763 .iter()
764 .filter_map(|s| extract_uri_id::<Job0>(s))
765 .collect::<Vec<i32>>();
766 let triple = |id: &i32| {
767 jobs
768 .get(id)
769 .map(|arj| (-(arj.deps.len() as i32), &arj.description[..], arj.id))
770 .unwrap_or((0, "", *id))
771 };
772 deps.sort_by(|i1, i2| {
773 let t1 = triple(i1);
774 let t2 = triple(i2);
775 t1.cmp(&t2)
776 });
777 (job.id, deps)
778}
779
780fn extract_uri_id<T: EndpointName>(input: &str) -> Option<i32> {
781 lazy_static::lazy_static! {
782 static ref RE: Regex = Regex::new(r"/api/(\w+)/(\d+)/").unwrap();
783 }
784 RE.captures(input).and_then(|cap: Captures| {
785 let s = cap.get(1).unwrap().as_str();
786 let t = cap.get(2).unwrap().as_str();
787 if s == T::endpoint_name() {
788 t.parse::<i32>().ok()
789 } else {
790 None
791 }
792 })
793}
794
795// TODO debug only
796fn build_trees(
797 cmd_ids: &[i32],
798 commands: &HashMap<i32, RichCommand>,
799 jobs: &HashMap<i32, RichJob>,
800 steps: &HashMap<i32, RichStep>,
801) -> Vec<Tree<TypedId, Stuff>> {
802 let mut trees = Vec::with_capacity(cmd_ids.len());
803 for c in cmd_ids {
804 let cmd = &commands[&c];
805 if cmd.deps().iter().all(|j| jobs.contains_key(j)) {
806 let extract_fun = |job: &Arc<Job0>| extract_wait_fors_from_job(job, &jobs);
807 let jobs_graph_data = cmd
808 .deps()
809 .iter()
810 .map(|k| RichJob::new(Arc::clone(&jobs[k].inner), extract_fun))
811 .collect::<Vec<RichJob>>();
812 let dag = build_direct_dag(&jobs_graph_data);
813 let tree = build_gen_tree(cmd, &dag, &steps);
814 trees.push(tree);
815 }
816 }
817 trees
818}
819
820// region functions build_fresh_items / build_gen_tree
821fn build_fresh_items(
822 cmd_ids: &[i32],
823 commands: &HashMap<i32, RichCommand>,
824 jobs: &HashMap<i32, RichJob>,
825 steps: &HashMap<i32, RichStep>,
826) -> (Vec<Item<TypedId, Stuff, ProgressBarIndicator>>, u64, u64) {
827 let mut all_items = Vec::new();
828 let mut main_position = 0;
829 let mut main_length = 0;
830 for c in cmd_ids {
831 let cmd = &commands[&c];
832 let it = Item {
833 id: TypedId::Cmd(*c),
834 indent: "".to_owned(),
835 state: cmd_state(cmd),
836 outer: Stuff {
837 msg: cmd.message.clone(),
838 console: Vec::new(),
839 },
840 indicator: None,
841 };
842 all_items.push(it);
843 if cmd.deps().iter().all(|j| jobs.contains_key(j)) {
844 let extract_fun = |job: &Arc<Job0>| extract_wait_fors_from_job(job, &jobs);
845 let jobs_graph_data = cmd
846 .deps()
847 .iter()
848 .map(|k| RichJob::new(Arc::clone(&jobs[k].inner), extract_fun))
849 .collect::<Vec<RichJob>>();
850 let dag = build_direct_dag(&jobs_graph_data);
851 let mut tree = build_gen_tree(cmd, &dag, &steps);
852 main_length += tree.len() as u64;
853 main_position += tree.filter_node_keys(|n| n.state != State::Progressing).len() as u64;
854
855 // The collapsing is needed to reduce some deep levels of the
856 // tree so that all the trees fit into terminal screens.
857 let pairs = tree.calculate_states_to_level(2);
858 for (id, s) in pairs {
859 tree.get_node_mut(id).map(|n| {
860 n.collapsed = true;
861 n.state = s;
862 });
863 }
864 let items = tree.render();
865 for it in items.into_iter() {
866 all_items.push(it);
867 }
868 }
869 }
870 (all_items, main_position, main_length)
871}
872
873fn build_gen_tree(
874 cmd: &RichCommand,
875 graph: &DependencyDAG<i32, RichJob>,
876 steps: &HashMap<i32, RichStep>,
877) -> Tree<TypedId, Stuff> {
878 fn traverse(
879 graph: &DependencyDAG<i32, RichJob>,
880 job: Arc<RichJob>,
881 steps: &HashMap<i32, RichStep>,
882 parent: Option<TypedId>,
883 visited: &mut HashSet<TypedId>,
884 tree: &mut Tree<TypedId, Stuff>,
885 ) {
886 let is_new = visited.insert(TypedId::Job(job.id));
887 let node = Node {
888 key: TypedId::Job(job.id),
889 parent: None,
890 deps: Vec::with_capacity(job.deps.len()),
891 collapsed: false,
892 state: job_state(&job),
893 inner: Stuff {
894 msg: job.description.clone(),
895 console: Vec::new(),
896 },
897 };
898 let pk = tree.add_child_node(parent, node);
899 let new_parent = Some(pk);
900
901 // add child jobs to the tree
902 if let Some(deps) = graph.links.get(&job.id()) {
903 if is_new {
904 for d in deps {
905 traverse(graph, Arc::clone(d), steps, new_parent, visited, tree);
906 }
907 }
908 }
909 // add steps if any
910 for step_id in &job.steps {
911 if let Some(step_id) = extract_uri_id::<Step>(step_id) {
912 if let Some(step) = steps.get(&step_id) {
913 let node = Node {
914 key: TypedId::Step(step_id),
915 parent: None,
916 collapsed: false,
917 deps: Vec::new(),
918 state: step_state(step),
919 inner: Stuff {
920 msg: step.class_name.clone(),
921 console: step
922 .console
923 .lines()
924 .map(|s| s.to_owned())
925 .collect::<Vec<_>>(),
926 },
927 };
928 tree.add_child_node(new_parent, node);
929 }
930 }
931 }
932 }
933 let mut tree = Tree::new();
934 let p = tree.add_child_node(
935 None,
936 Node {
937 key: TypedId::Cmd(cmd.id),
938 parent: None,
939 collapsed: false,
940 deps: vec![],
941 state: cmd_state(cmd),
942 inner: Stuff {
943 msg: cmd.message.clone(),
944 console: vec![],
945 },
946 },
947 );
948 tree.root = Some(p);
949 let mut visited = HashSet::new();
950 for r in &graph.roots {
951 traverse(
952 graph,
953 Arc::clone(r),
954 steps,
955 Some(p),
956 &mut visited,
957 &mut tree,
958 );
959 }
960 tree
961}
962
963pub fn set_progress_bar_message(
964 ind: &ProgressBarIndicator,
965 item: &Item<TypedId, Stuff, ProgressBarIndicator>,
966) {
967 // let spinner_style = ProgressStyle::default_spinner()
968 // .tick_chars("⠁⠂⠄⡀⢀⠠⠐⠈ ")
969 // .template("{prefix:.bold.dim} {spinner} {wide_msg}");
970
971 // two styles are applied because indicatif doesn't able to set the spinner symbol
972 // after the progress bar completed.
973 let sty_aux = ProgressStyle::default_bar().template("{prefix} {spinner:.green} {msg}");
974 let sty_aux_finish = ProgressStyle::default_bar().template("{prefix} {msg}");
975
976 match item.state {
977 State::Progressing => {
978 if ind.active_style.get() != Some(true) {
979 ind.progress_bar.set_style(sty_aux.clone());
980 ind.active_style.set(Some(true));
981 }
982 ind.progress_bar.set_prefix(&item.indent);
983 ind.progress_bar.set_message(&item.outer.to_string());
984 }
985 _ => {
986 if ind.active_style.get() != Some(false) {
987 ind.progress_bar.set_style(sty_aux_finish.clone());
988 ind.active_style.set(Some(false));
989 }
990 ind.progress_bar.set_prefix(&item.indent);
991 ind.progress_bar.set_message(&format!("{} {}", item.state, item.outer.to_string()));
992 }
993 }
994}
995// endregion
996
997mod tests {
998 use super::*;
999 use crate::gen_tree::iterate_items;
1000
1001 fn convert_items_to_string<K, U: Display, B>(items: &[Item<K, U, B>]) -> String {
1002 let mut acc = String::with_capacity(64);
1003 iterate_items(items, |_, s| {
1004 acc.push_str(&s);
1005 acc.push('\n');
1006 });
1007 acc
1008 }
1009
1010 #[test]
1011 fn test_job_tree() {
1012 let mut commands = HashMap::new();
1013 let mut jobs = HashMap::new();
1014 let mut steps = HashMap::new();
1015 update_commands(&mut commands, TREE_STATE.lock().unwrap().commands.clone());
1016 update_jobs(&mut jobs, TREE_STATE.lock().unwrap().jobs.clone());
1017 update_steps(&mut steps, TREE_STATE.lock().unwrap().steps.clone());
1018 let cmd = commands.get(&37).unwrap();
1019
1020 let extract_fun = |job: &Arc<Job0>| extract_wait_fors_from_job(job, &jobs);
1021 let jobs_graph_data = cmd
1022 .deps()
1023 .iter()
1024 .map(|k| RichJob::new(Arc::clone(&jobs[k].inner), extract_fun))
1025 .collect::<Vec<RichJob>>();
1026 let dag = build_direct_dag(&jobs_graph_data);
1027
1028 let mut items: Vec<Item<TypedId, Stuff, ()>> = Vec::new();
1029
1030 for cmd in commands.values() {
1031 let tree = build_gen_tree(cmd, &dag, &steps);
1032 assert_eq!(is_valid(&tree), true);
1033 let mut cmd_items = tree.render::<Stuff, ()>();
1034 cmd_items.into_iter().for_each(|it| items.push(it));
1035 }
1036 let output = convert_items_to_string(&items);
1037 println!("{}", output);
1038 }
1039}
1040
1041const CONSOLE: &'_ str = r#"-- Logs begin at Wed 2019-07-10 16:12:42 UTC, end at Wed 2019-07-10 16:52:46 UTC. --
1042Jul 10 16:12:50 adm.local systemd[1]: Started IML Agent Comms Service.
1043Jul 10 16:12:50 adm.local iml-agent-comms[3069]: [INFO iml_rabbit] creating client
1044Jul 10 16:12:50 adm.local iml-agent-comms[3069]: [INFO iml_agent_comms] Starting iml-agent-comms on V4(127.0.0.1:8003)
1045Jul 10 16:12:50 adm.local iml-agent-comms[3069]: [INFO iml_rabbit] creating client
1046Jul 10 16:12:50 adm.local iml-agent-comms[3069]: [ERROR iml_agent_comms] Os { code: 111, kind: ConnectionRefused, message: "Connection refused" }
1047Jul 10 16:12:50 adm.local iml-agent-comms[3069]: [ERROR iml_rabbit] There was an error connecting to rabbit: Os { code: 111, kind: ConnectionRefused, message: "Connection refused" }
1048Jul 10 16:12:50 adm.local systemd[1]: iml-agent-comms.service holdoff time over, scheduling restart.
1049Jul 10 16:12:50 adm.local systemd[1]: Stopped IML Agent Comms Service.
1050Jul 10 16:12:53 adm.local systemd[1]: Started IML Agent Comms Service.
1051Jul 10 16:12:53 adm.local iml-agent-comms[3191]: [INFO iml_rabbit] creating client"#;
1052
1053const BACKTRACE: &'_ str = r#"Traceback (most recent call last):
1054 File "greetings.py", line 10, in greet_many
1055 greet(person)
1056 File "greetings.py", line 5, in greet
1057 print(greeting + ', ' + who_to_greet(someone))
1058TypeError: must be str, not int
1059During handling of the above exception, another exception occurred:
1060Traceback (most recent call last):
1061 File "greetings.py", line 14, in <module>
1062 greet_many(['Chad', 'Dan', 1])
1063 File "greetings.py", line 12, in greet_many
1064 print('hi, ' + person)
1065TypeError: must be str, not int"#;
1066
1067