· 7 years ago · Feb 01, 2019, 02:22 AM
1use std::collections::HashSet;
2use std::iter;
3use std::time::{SystemTime, UNIX_EPOCH};
4
5use graph::data::subgraph::schema::*;
6use graph::prelude::{
7 CreateSubgraphResult, SubgraphAssignmentProvider as SubgraphAssignmentProviderTrait,
8 SubgraphRegistrar as SubgraphRegistrarTrait, *,
9};
10
11pub struct SubgraphRegistrar<L, P, S, CS> {
12 logger: Logger,
13 resolver: Arc<L>,
14 provider: Arc<P>,
15 store: Arc<S>,
16 chain_store: Arc<CS>,
17 node_id: NodeId,
18 version_switching_mode: SubgraphVersionSwitchingMode,
19 assignment_event_stream_cancel_guard: CancelGuard, // cancels on drop
20}
21
22impl<L, P, S, CS> SubgraphRegistrar<L, P, S, CS>
23where
24 L: LinkResolver,
25 P: SubgraphAssignmentProviderTrait,
26 S: Store,
27 CS: ChainStore,
28{
29 pub fn new(
30 logger: Logger,
31 resolver: Arc<L>,
32 provider: Arc<P>,
33 store: Arc<S>,
34 chain_store: Arc<CS>,
35 node_id: NodeId,
36 version_switching_mode: SubgraphVersionSwitchingMode,
37 ) -> Self {
38 let logger = logger.new(o!("component" => "SubgraphRegistrar"));
39
40 SubgraphRegistrar {
41 logger,
42 resolver,
43 provider,
44 store,
45 chain_store,
46 node_id,
47 version_switching_mode,
48 assignment_event_stream_cancel_guard: CancelGuard::new(),
49 }
50 }
51
52 pub fn start(&self) -> impl Future<Item = (), Error = Error> {
53 let logger_clone1 = self.logger.clone();
54 let logger_clone2 = self.logger.clone();
55 let provider = self.provider.clone();
56 let node_id = self.node_id.clone();
57 let assignment_event_stream_cancel_handle =
58 self.assignment_event_stream_cancel_guard.handle();
59
60 // The order of the following three steps is important:
61 // - Start assignment event stream
62 // - Read assignments table and start assigned subgraphs
63 // - Start processing assignment event stream
64 //
65 // Starting the event stream before reading the assignments table ensures that no
66 // assignments are missed in the period of time between the table read and starting event
67 // processing.
68 // Delaying the start of event processing until after the table has been read and processed
69 // ensures that Remove events happen after the assigned subgraphs have been started, not
70 // before (otherwise a subgraph could be left running due to a race condition).
71 //
72 // The discrepancy between the start time of the event stream and the table read can result
73 // in some extraneous events on start up. Examples:
74 // - The event stream sees an Add event for subgraph A, but the table query finds that
75 // subgraph A is already in the table.
76 // - The event stream sees a Remove event for subgraph B, but the table query finds that
77 // subgraph B has already been removed.
78 // The `handle_assignment_events` function handles these cases by ignoring AlreadyRunning
79 // (on subgraph start) or NotRunning (on subgraph stop) error types, which makes the
80 // operations idempotent.
81
82 // Start event stream
83 let assignment_event_stream = self.assignment_events();
84
85 // Deploy named subgraphs found in store
86 self.start_assigned_subgraphs().and_then(move |()| {
87 // Spawn a task to handle assignment events
88 tokio::spawn(future::lazy(move || {
89 assignment_event_stream
90 .map_err(SubgraphAssignmentProviderError::Unknown)
91 .map_err(CancelableError::Error)
92 .cancelable(&assignment_event_stream_cancel_handle, || {
93 CancelableError::Cancel
94 })
95 .for_each(move |assignment_event| {
96 assert_eq!(assignment_event.node_id(), &node_id);
97 handle_assignment_event(assignment_event, provider.clone(), &logger_clone1)
98 })
99 .map_err(move |e| match e {
100 CancelableError::Cancel => {}
101 CancelableError::Error(e) => {
102 error!(logger_clone2, "assignment event stream failed: {}", e);
103 panic!("assignment event stream error: {}", e);
104 }
105 })
106 }));
107
108 Ok(())
109 })
110 }
111
112 pub fn assignment_events(&self) -> impl Stream<Item = AssignmentEvent, Error = Error> + Send {
113 let store = self.store.clone();
114 let node_id = self.node_id.clone();
115
116 store
117 .subscribe(vec![
118 SubgraphDeploymentAssignmentEntity::subgraph_entity_pair(),
119 ])
120 .map_err(|()| format_err!("Entity change stream failed"))
121 .and_then(
122 move |entity_change| -> Result<Box<Stream<Item = _, Error = _> + Send>, _> {
123 let subgraph_hash = SubgraphDeploymentId::new(entity_change.entity_id.clone())
124 .map_err(|()| format_err!("Invalid subgraph hash in assignment entity"))?;
125
126 match entity_change.operation {
127 EntityChangeOperation::Added | EntityChangeOperation::Updated => {
128 store
129 .get(SubgraphDeploymentAssignmentEntity::key(
130 subgraph_hash.clone(),
131 ))
132 .map_err(|e|
133 format_err!("Failed to get subgraph assignment entity: {}", e)
134 )
135 .map(|entity_opt| -> Box<Stream<Item = _, Error = _> + Send> {
136 if let Some(entity) = entity_opt {
137 if entity.get("nodeId") == Some(&node_id.to_string().into()) {
138 let subgraph = match get_subgraph_for_deployment_id(store.clone(), &subgraph_hash) {
139 Ok(subgraph) => subgraph,
140 Err(e) => return Box::new(stream::once(Err(format_err!("Failed to resolve subgraph for deployment {}: {}", subgraph_hash, e)))),
141 };
142
143 // Start subgraph on this node
144 Box::new(stream::once(Ok(AssignmentEvent::Add {
145 subgraph_name: subgraph.name,
146 subgraph_id: subgraph_hash,
147 node_id: node_id.clone(),
148 })))
149 } else {
150 // Ensure it is removed from this node
151 Box::new(stream::once(Ok(AssignmentEvent::Remove {
152 subgraph_id: subgraph_hash,
153 node_id: node_id.clone(),
154 })))
155 }
156 } else {
157 // Was added/updated, but is now gone.
158 // We will get a separate Removed event later.
159 Box::new(stream::empty())
160 }
161 })
162 }
163 EntityChangeOperation::Removed => {
164 // Send remove event without checking node ID.
165 // If node ID does not match, then this is a no-op when handled in
166 // assignment provider.
167 Ok(Box::new(stream::once(Ok(AssignmentEvent::Remove {
168 subgraph_id: subgraph_hash,
169 node_id: node_id.clone(),
170 }))))
171 }
172 }
173 },
174 )
175 .flatten()
176 }
177
178 fn start_assigned_subgraphs(&self) -> impl Future<Item = (), Error = Error> {
179 let provider = self.provider.clone();
180 let logger = self.logger.clone();
181 let store = self.store.clone();
182
183 // Create a query to find all assignments with this node ID
184 let assignment_query = SubgraphDeploymentAssignmentEntity::query()
185 .filter(EntityFilter::new_equal("nodeId", self.node_id.to_string()));
186
187 future::result(self.store.find(assignment_query))
188 .map_err(|e| format_err!("Error querying subgraph assignments: {}", e))
189 .and_then(move |assignment_entities| {
190 assignment_entities
191 .into_iter()
192 .map(|assignment_entity| {
193 // Parse as subgraph hash
194 assignment_entity.id().and_then(|id| {
195 SubgraphDeploymentId::new(id).map_err(|()| {
196 format_err!("Invalid subgraph hash in assignment entity")
197 })
198 })
199 })
200 .collect::<Result<HashSet<SubgraphDeploymentId>, _>>()
201 })
202 .and_then(move |subgraph_ids| {
203 let provider = provider.clone();
204 let store = store.clone();
205
206 stream::iter_ok(subgraph_ids).for_each(move |id| {
207 let subgraph = get_subgraph_for_deployment_id(store.clone(), &id)
208 .expect("failed to resolve subgraph for deployment");
209
210 start_subgraph(subgraph.name, id, &*provider, logger.clone())
211 .map_err(|()| unreachable!())
212 })
213 })
214 }
215}
216
217impl<L, P, S, CS> SubgraphRegistrarTrait for SubgraphRegistrar<L, P, S, CS>
218where
219 L: LinkResolver,
220 P: SubgraphAssignmentProviderTrait,
221 S: Store,
222 CS: ChainStore,
223{
224 fn create_subgraph(
225 &self,
226 name: SubgraphName,
227 ) -> Box<Future<Item = CreateSubgraphResult, Error = SubgraphRegistrarError> + Send + 'static>
228 {
229 Box::new(future::result(create_subgraph(
230 &self.logger,
231 self.store.clone(),
232 name,
233 )))
234 }
235
236 fn create_subgraph_version(
237 &self,
238 name: SubgraphName,
239 hash: SubgraphDeploymentId,
240 node_id: NodeId,
241 ) -> Box<Future<Item = (), Error = SubgraphRegistrarError> + Send + 'static> {
242 let logger = self.logger.clone();
243 let store = self.store.clone();
244 let version_switching_mode = self.version_switching_mode;
245 let chain_store = self.chain_store.clone();
246
247 Box::new(
248 SubgraphManifest::resolve(hash.to_ipfs_link(), self.resolver.clone())
249 .map_err(SubgraphRegistrarError::ResolveError)
250 .and_then(move |manifest| {
251 create_subgraph_version(
252 &logger,
253 store,
254 chain_store,
255 name,
256 manifest,
257 node_id,
258 version_switching_mode,
259 )
260 }),
261 )
262 }
263
264 fn remove_subgraph(
265 &self,
266 name: SubgraphName,
267 ) -> Box<Future<Item = (), Error = SubgraphRegistrarError> + Send + 'static> {
268 Box::new(future::result(remove_subgraph(
269 &self.logger,
270 self.store.clone(),
271 name,
272 )))
273 }
274
275 fn list_subgraphs(
276 &self,
277 ) -> Box<Future<Item = Vec<SubgraphName>, Error = SubgraphRegistrarError> + Send + 'static>
278 {
279 Box::new(
280 future::result(self.store.find(SubgraphEntity::query()))
281 .from_err()
282 .and_then(|subgraph_entities| {
283 subgraph_entities
284 .into_iter()
285 .map(|mut entity| {
286 let name_string = entity.remove("name").unwrap().as_string().unwrap();
287 SubgraphName::new(name_string.to_owned())
288 .map_err(|()| {
289 format_err!(
290 "Subgraph name in store has invalid format: {:?}",
291 name_string
292 )
293 })
294 .map_err(SubgraphRegistrarError::from)
295 })
296 .collect::<Result<_, _>>()
297 }),
298 )
299 }
300}
301
302fn handle_assignment_event<P>(
303 event: AssignmentEvent,
304 provider: Arc<P>,
305 logger: &Logger,
306) -> Box<Future<Item = (), Error = CancelableError<SubgraphAssignmentProviderError>> + Send>
307where
308 P: SubgraphAssignmentProviderTrait,
309{
310 let logger = logger.to_owned();
311
312 debug!(logger, "Received assignment event: {:?}", event);
313
314 match event {
315 AssignmentEvent::Add {
316 subgraph_name,
317 subgraph_id,
318 node_id: _,
319 } => Box::new(
320 start_subgraph(subgraph_name, subgraph_id, &*provider, logger)
321 .map_err(|()| unreachable!()),
322 ),
323 AssignmentEvent::Remove {
324 subgraph_id,
325 node_id: _,
326 } => Box::new(
327 provider
328 .stop(subgraph_id)
329 .then(|result| match result {
330 Ok(()) => Ok(()),
331 Err(SubgraphAssignmentProviderError::NotRunning(_)) => Ok(()),
332 Err(e) => Err(e),
333 })
334 .map_err(CancelableError::Error),
335 ),
336 }
337}
338
339/// Never errors.
340fn start_subgraph<P: SubgraphAssignmentProviderTrait>(
341 subgraph_name: SubgraphName,
342 subgraph_id: SubgraphDeploymentId,
343 provider: &P,
344 logger: Logger,
345) -> impl Future<Item = (), Error = ()> + 'static {
346 provider
347 .start(subgraph_name.clone(), subgraph_id.clone())
348 .then(move |result| -> Result<(), _> {
349 match result {
350 Ok(()) => Ok(()),
351 Err(SubgraphAssignmentProviderError::AlreadyRunning(_)) => Ok(()),
352 Err(e) => {
353 // Errors here are likely an issue with the subgraph.
354 // These will be recorded eventually so that they can be displayed
355 // in a UI.
356 error!(
357 logger,
358 "Subgraph instance failed to start";
359 "error" => e.to_string(),
360 "subgraph_name" => subgraph_name.to_string(),
361 "subgraph_id" => subgraph_id.to_string()
362 );
363 Ok(())
364 }
365 }
366 })
367}
368
369fn create_subgraph(
370 logger: &Logger,
371 store: Arc<impl Store>,
372 name: SubgraphName,
373) -> Result<CreateSubgraphResult, SubgraphRegistrarError> {
374 let mut ops = vec![];
375
376 // Check if this subgraph already exists
377 let subgraph_entity_opt = store.find_one(
378 SubgraphEntity::query().filter(EntityFilter::new_equal("name", name.to_string())),
379 )?;
380 if subgraph_entity_opt.is_some() {
381 debug!(
382 logger,
383 "Subgraph name already exists: {:?}",
384 name.to_string()
385 );
386 return Err(SubgraphRegistrarError::NameExists(name.to_string()));
387 }
388
389 ops.push(EntityOperation::AbortUnless {
390 description: "Subgraph entity should not exist".to_owned(),
391 query: SubgraphEntity::query().filter(EntityFilter::new_equal("name", name.to_string())),
392 entity_ids: vec![],
393 });
394
395 let created_at = SystemTime::now()
396 .duration_since(UNIX_EPOCH)
397 .unwrap()
398 .as_secs();
399 let entity = SubgraphEntity::new(name.clone(), None, None, created_at);
400 let entity_id = generate_entity_id();
401 ops.extend(entity.write_operations(&entity_id));
402
403 store.apply_entity_operations(ops, EventSource::None)?;
404
405 debug!(logger, "Created subgraph"; "subgraph_name" => name.to_string());
406
407 Ok(CreateSubgraphResult { id: entity_id })
408}
409
410fn create_subgraph_version(
411 logger: &Logger,
412 store: Arc<impl Store>,
413 chain_store: Arc<impl ChainStore>,
414 name: SubgraphName,
415 manifest: SubgraphManifest,
416 node_id: NodeId,
417 version_switching_mode: SubgraphVersionSwitchingMode,
418) -> Result<(), SubgraphRegistrarError> {
419 let mut ops = vec![];
420
421 // Look up subgraph entity by name
422 let subgraph_entity_opt = store.find_one(
423 SubgraphEntity::query().filter(EntityFilter::new_equal("name", name.to_string())),
424 )?;
425 let subgraph_entity = subgraph_entity_opt.ok_or_else(|| {
426 debug!(
427 logger,
428 "Subgraph not found, could not create_subgraph_version";
429 "subgraph_name" => name.to_string()
430 );
431 SubgraphRegistrarError::NameNotFound(name.to_string())
432 })?;
433 let subgraph_entity_id = subgraph_entity.id()?;
434 let current_version_id_opt = match subgraph_entity.get("currentVersion") {
435 Some(Value::String(current_version_id)) => Some(current_version_id.to_owned()),
436 Some(Value::Null) => None,
437 None => None,
438 Some(_) => panic!("subgraph entity has invalid type in currentVersion field"),
439 };
440 let pending_version_id_opt = match subgraph_entity.get("pendingVersion") {
441 Some(Value::String(pending_version_id)) => Some(pending_version_id.to_owned()),
442 Some(Value::Null) => None,
443 None => None,
444 Some(_) => panic!("subgraph entity has invalid type in pendingVersion field"),
445 };
446 ops.push(EntityOperation::AbortUnless {
447 description:
448 "Subgraph entity must still exist, have same name/currentVersion/pendingVersion"
449 .to_owned(),
450 query: SubgraphEntity::query().filter(EntityFilter::And(vec![
451 EntityFilter::new_equal("name", name.to_string()),
452 EntityFilter::new_equal("currentVersion", current_version_id_opt.clone()),
453 EntityFilter::new_equal("pendingVersion", pending_version_id_opt.clone()),
454 ])),
455 entity_ids: vec![subgraph_entity_id.clone()],
456 });
457
458 // Look up current version's deployment hash
459 let current_version_hash_opt = match current_version_id_opt {
460 Some(ref current_version_id) => Some(get_subgraph_version_deployment_id(
461 store.clone(),
462 current_version_id.clone(),
463 )?),
464 None => None,
465 };
466
467 // Look up pending version's deployment hash
468 let pending_version_hash_opt = match pending_version_id_opt {
469 Some(ref pending_version_id) => Some(get_subgraph_version_deployment_id(
470 store.clone(),
471 pending_version_id.clone(),
472 )?),
473 None => None,
474 };
475
476 // Find all subgraph version entities that point to this hash or a hash
477 let (version_summaries_before, read_summaries_ops) = store.read_subgraph_version_summaries(
478 iter::once(manifest.id.clone())
479 .chain(current_version_hash_opt)
480 .chain(pending_version_hash_opt)
481 .collect(),
482 )?;
483 ops.extend(read_summaries_ops);
484
485 // Create the subgraph version entity
486 let version_entity_id = generate_entity_id();
487 let created_at = SystemTime::now()
488 .duration_since(UNIX_EPOCH)
489 .unwrap()
490 .as_secs();
491 ops.extend(
492 SubgraphVersionEntity::new(subgraph_entity_id.clone(), manifest.id.clone(), created_at)
493 .write_operations(&version_entity_id),
494 );
495
496 // Simulate the creation of the new version and updating of Subgraph.pending/current
497 let version_summaries_after = match version_switching_mode {
498 SubgraphVersionSwitchingMode::Instant => {
499 // Previously pending or current versions will no longer be pending/current
500 let mut version_summaries_after = version_summaries_before
501 .clone()
502 .into_iter()
503 .map(|mut version_summary| {
504 if Some(&version_summary.id) == pending_version_id_opt.as_ref() {
505 version_summary.pending = false;
506 }
507
508 if Some(&version_summary.id) == current_version_id_opt.as_ref() {
509 version_summary.current = false;
510 }
511
512 version_summary
513 })
514 .collect::<Vec<_>>();
515
516 // Add new version, which will immediately be current
517 version_summaries_after.push(SubgraphVersionSummary {
518 id: version_entity_id.clone(),
519 subgraph_id: subgraph_entity_id.clone(),
520 deployment_id: manifest.id.clone(),
521 pending: false,
522 current: true,
523 });
524
525 version_summaries_after
526 }
527 SubgraphVersionSwitchingMode::Synced => {
528 // Just set the new version is pending, unless there is no current versions yet
529 if current_version_id_opt.is_some() {
530 // Previous pending version (if there was one) is no longer pending
531 let mut version_summaries_after = version_summaries_before
532 .clone()
533 .into_iter()
534 .map(|mut version_summary| {
535 if Some(&version_summary.id) == pending_version_id_opt.as_ref() {
536 version_summary.pending = false;
537 }
538
539 version_summary
540 })
541 .collect::<Vec<_>>();
542
543 // Add new version, which will be pending but not current
544 version_summaries_after.push(SubgraphVersionSummary {
545 id: version_entity_id.clone(),
546 subgraph_id: subgraph_entity_id.clone(),
547 deployment_id: manifest.id.clone(),
548 pending: true,
549 current: false,
550 });
551
552 version_summaries_after
553 } else {
554 // No need to process list, as there is no current version
555 let mut version_summaries_after = version_summaries_before.clone();
556
557 // Add new version, which will immediately be current
558 version_summaries_after.push(SubgraphVersionSummary {
559 id: version_entity_id.clone(),
560 subgraph_id: subgraph_entity_id.clone(),
561 deployment_id: manifest.id.clone(),
562 pending: false,
563 current: true,
564 });
565
566 version_summaries_after
567 }
568 }
569 };
570
571 // Check if subgraph deployment already exists for this hash
572 let deployment_entity_opt = store.get(SubgraphDeploymentEntity::key(manifest.id.clone()))?;
573 ops.push(EntityOperation::AbortUnless {
574 description: "Subgraph deployment entity must continue to exist/not exist".to_owned(),
575 query: SubgraphDeploymentEntity::query()
576 .filter(EntityFilter::new_equal("id", manifest.id.to_string())),
577 entity_ids: if deployment_entity_opt.is_some() {
578 vec![manifest.id.to_string()]
579 } else {
580 vec![]
581 },
582 });
583
584 // Create deployment only if it does not exist already
585 if deployment_entity_opt.is_none() {
586 let chain_head_ptr_opt = chain_store.chain_head_ptr()?;
587 let chain_head_block_number = match chain_head_ptr_opt {
588 Some(chain_head_ptr) => chain_head_ptr.number,
589 None => 0,
590 };
591 let genesis_block_ptr = chain_store.genesis_block_ptr()?;
592 ops.extend(
593 SubgraphDeploymentEntity::new(
594 &manifest,
595 false,
596 false,
597 genesis_block_ptr,
598 chain_head_block_number,
599 )
600 .create_operations(&manifest.id),
601 );
602 }
603
604 // Possibly add assignment for new deployment hash, and possibly remove assignments for old
605 // current/pending
606 ops.extend(store.reconcile_assignments(
607 logger,
608 version_summaries_before,
609 version_summaries_after,
610 Some(node_id),
611 ));
612
613 // Update current/pending versions in Subgraph entity
614 match version_switching_mode {
615 SubgraphVersionSwitchingMode::Instant => {
616 ops.extend(SubgraphEntity::update_pending_version_operations(
617 &subgraph_entity_id,
618 None,
619 ));
620 ops.extend(SubgraphEntity::update_current_version_operations(
621 &subgraph_entity_id,
622 Some(version_entity_id),
623 ));
624 }
625 SubgraphVersionSwitchingMode::Synced => {
626 // Just set the new version is pending, unless there is no current versions yet
627 if current_version_id_opt.is_some() {
628 ops.extend(SubgraphEntity::update_pending_version_operations(
629 &subgraph_entity_id,
630 Some(version_entity_id),
631 ));
632 } else {
633 ops.extend(SubgraphEntity::update_pending_version_operations(
634 &subgraph_entity_id,
635 None,
636 ));
637 ops.extend(SubgraphEntity::update_current_version_operations(
638 &subgraph_entity_id,
639 Some(version_entity_id),
640 ));
641 }
642 }
643 }
644
645 // Commit entity ops
646 store.apply_entity_operations(ops, EventSource::None)?;
647
648 debug!(
649 logger,
650 "Wrote new subgraph version to store";
651 "subgraph_name" => name.to_string(),
652 "subgraph_hash" => manifest.id.to_string()
653 );
654
655 Ok(())
656}
657
658fn get_subgraph_version_deployment_id(
659 store: Arc<impl Store>,
660 version_id: String,
661) -> Result<SubgraphDeploymentId, SubgraphRegistrarError> {
662 let version_entity = store
663 .get(SubgraphVersionEntity::key(version_id))?
664 .ok_or_else(|| TransactionAbortError::Other(format!("Subgraph version entity missing")))
665 .map_err(StoreError::from)?;
666
667 Ok(SubgraphDeploymentId::new(
668 version_entity
669 .get("deployment")
670 .unwrap()
671 .to_owned()
672 .as_string()
673 .unwrap(),
674 )
675 .unwrap())
676}
677
678/// Performs a reverse lookup from a deployment to a subgraph.
679fn get_subgraph_for_deployment_id(
680 store: Arc<impl Store>,
681 deployment_id: &SubgraphDeploymentId,
682) -> Result<SubgraphEntity, Error> {
683 // Resolve the deployment into the version it belongs to
684 let version = SubgraphVersionEntity::try_from_entity(
685 store
686 .find_one(SubgraphVersionEntity::query_from_deployment(deployment_id))?
687 .ok_or_else(|| {
688 format_err!("Missing subgraph version for deployment {}", deployment_id)
689 })?,
690 )?;
691
692 // Resolve the version into the subgraph the version (and thus,
693 // the deployment) belongs to
694 store
695 .get_typed(SubgraphEntity::key_from_version(&version))
696 .map_err(Error::from)
697 .and_then(|subgraph| {
698 subgraph.ok_or_else(|| format_err!("Missing subgraph for deployment {}", deployment_id))
699 })
700}
701
702fn remove_subgraph(
703 logger: &Logger,
704 store: Arc<impl Store>,
705 name: SubgraphName,
706) -> Result<(), SubgraphRegistrarError> {
707 let mut ops = vec![];
708
709 // Find the subgraph entity
710 let subgraph_entity_opt = store
711 .find_one(SubgraphEntity::query().filter(EntityFilter::new_equal("name", name.to_string())))
712 .map_err(|e| format_err!("query execution error: {}", e))?;
713 let subgraph_entity = subgraph_entity_opt
714 .ok_or_else(|| SubgraphRegistrarError::NameNotFound(name.to_string()))?;
715
716 ops.push(EntityOperation::AbortUnless {
717 description: "Subgraph entity must still exist".to_owned(),
718 query: SubgraphEntity::query().filter(EntityFilter::new_equal("name", name.to_string())),
719 entity_ids: vec![subgraph_entity.id().unwrap()],
720 });
721
722 // Find subgraph version entities
723 let subgraph_version_entities = store.find(SubgraphVersionEntity::query().filter(
724 EntityFilter::new_equal("subgraph", subgraph_entity.id().unwrap()),
725 ))?;
726
727 ops.push(EntityOperation::AbortUnless {
728 description: "Subgraph must have same set of versions".to_owned(),
729 query: SubgraphVersionEntity::query().filter(EntityFilter::new_equal(
730 "subgraph",
731 subgraph_entity.id().unwrap(),
732 )),
733 entity_ids: subgraph_version_entities
734 .iter()
735 .map(|entity| entity.id().unwrap())
736 .collect(),
737 });
738
739 // Remove subgraph version entities, and their deployment/assignment when applicable
740 ops.extend(remove_subgraph_versions(
741 logger,
742 store.clone(),
743 subgraph_version_entities,
744 )?);
745
746 // Remove the subgraph entity
747 ops.push(EntityOperation::Remove {
748 key: SubgraphEntity::key(subgraph_entity.id()?),
749 });
750
751 store.apply_entity_operations(ops, EventSource::None)?;
752
753 debug!(logger, "Removed subgraph"; "subgraph_name" => name.to_string());
754
755 Ok(())
756}
757
758/// Remove a set of subgraph versions atomically.
759///
760/// It may seem like it would be easier to generate the EntityOperations for subgraph versions
761/// removal one at a time, but that approach is significantly complicated by the fact that the
762/// store does not reflect the EntityOperations that have been accumulated so far. Earlier subgraph
763/// version creations/removals can affect later ones by affecting whether or not a subgraph deployment
764/// or assignment needs to be created/removed.
765fn remove_subgraph_versions(
766 logger: &Logger,
767 store: Arc<impl Store>,
768 version_entities_to_delete: Vec<Entity>,
769) -> Result<Vec<EntityOperation>, SubgraphRegistrarError> {
770 let mut ops = vec![];
771
772 let version_entity_ids_to_delete = version_entities_to_delete
773 .iter()
774 .map(|version_entity| version_entity.id().unwrap())
775 .collect::<HashSet<_>>();
776
777 // Get hashes that are referenced by versions that will be deleted.
778 // These are candidates for clean up.
779 let referenced_subgraph_hashes = version_entities_to_delete
780 .iter()
781 .map(|version_entity| {
782 SubgraphDeploymentId::new(
783 version_entity
784 .get("deployment")
785 .unwrap()
786 .to_owned()
787 .as_string()
788 .unwrap(),
789 )
790 .unwrap()
791 })
792 .collect::<HashSet<_>>();
793
794 // Find all subgraph version entities that point to these subgraph deployments
795 let (version_summaries, read_summaries_ops) =
796 store.read_subgraph_version_summaries(referenced_subgraph_hashes.into_iter().collect())?;
797 ops.extend(read_summaries_ops);
798
799 // Simulate the planned removal of SubgraphVersion entities
800 let version_summaries_after_delete = version_summaries
801 .clone()
802 .into_iter()
803 .filter(|version_summary| !version_entity_ids_to_delete.contains(&version_summary.id))
804 .collect::<Vec<_>>();
805
806 // Create/remove assignments based on the subgraph version changes.
807 // We are only deleting versions here, so no assignments will be created,
808 // and we can safely pass None for the node ID.
809 ops.extend(store.reconcile_assignments(
810 logger,
811 version_summaries,
812 version_summaries_after_delete,
813 None,
814 ));
815
816 // Actually remove the subgraph version entities.
817 // Note: we do this last because earlier AbortUnless ops depend on these entities still
818 // existing.
819 ops.extend(
820 version_entities_to_delete
821 .iter()
822 .map(|version_entity| EntityOperation::Remove {
823 key: SubgraphVersionEntity::key(version_entity.id().unwrap()),
824 }),
825 );
826
827 Ok(ops)
828}