· 5 years ago · Mar 02, 2020, 07:39 PM
1using System;
2using System.Collections.Concurrent;
3using System.Collections.Generic;
4using System.Data;
5using System.Data.SqlClient;
6using System.IO;
7using System.IO.Packaging;
8using System.Linq;
9using System.Security.Cryptography;
10using System.Threading;
11using System.Threading.Tasks;
12using System.Xml.Linq;
13
14using Cogito.Threading;
15
16using SomeCompany.Data.SqlClient;
17
18using Microsoft.SqlServer.Dac;
19
20using Polly;
21
22using Serilog;
23
24namespace SomeCompany.SqlServer.Deployment
25{
26
27 /// <summary>
28 /// Deploys instances of SQL environment databases
29 /// </summary>
30 public abstract class SqlEnvironmentDatabaseBuilder
31 {
32
33 static readonly XNamespace dacRptNs = "http://schemas.microsoft.com/sqlserver/dac/DeployReport/2012/02";
34 static readonly XNamespace dacSrsNs = "http://schemas.microsoft.com/sqlserver/dac/Serialization/2012/02";
35
36 static readonly AsyncLock staticSync = new AsyncLock();
37 static readonly ConcurrentDictionary<string, string> dacTagCache = new ConcurrentDictionary<string, string>();
38
39 static readonly AsyncPolicy RETRY_POLICY = Policy
40 .Handle<Exception>()
41 .RetryAsync(3);
42
43 /// <summary>
44 /// Converts the given path into a full path.
45 /// </summary>
46 /// <param name="path"></param>
47 /// <returns></returns>
48 static string GetFullPath(string path)
49 {
50 if (Path.IsPathRooted(path) == false)
51 path = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, path);
52
53 return path;
54 }
55
56 readonly SqlEnvironmentInstanceProvider instances;
57 readonly ILogger logger;
58
59 /// <summary>
60 /// Initializes a new instance.
61 /// </summary>
62 /// <param name="instances"></param>
63 /// <param name="logger"></param>
64 public SqlEnvironmentDatabaseBuilder(SqlEnvironmentInstanceProvider instances, ILogger logger)
65 {
66 this.instances = instances ?? throw new ArgumentNullException(nameof(instances));
67 this.logger = logger ?? throw new ArgumentNullException(nameof(logger));
68 }
69
70 /// <summary>
71 /// Loads the DACPAC file in the executable directory with the given name.
72 /// </summary>
73 /// <param name="relativeFileName"></param>
74 /// <returns></returns>
75 DacPackage LoadDacPackage(string relativeFileName)
76 {
77 return DacPackage.Load(GetFullPath(relativeFileName), DacSchemaModelStorageType.Memory);
78 }
79
80 /// <summary>
81 /// Loads the PUBXML file in the executable directory with the given name.
82 /// </summary>
83 /// <param name="relativeFileName"></param>
84 /// <returns></returns>
85 DacProfile LoadDacProfile(string relativeFileName)
86 {
87 return DacProfile.Load(GetFullPath(relativeFileName));
88 }
89
90 /// <summary>
91 /// Builds the database.
92 /// </summary>
93 /// <param name="instanceId"></param>
94 /// <param name="databaseName"></param>
95 /// <param name="dacPacFileName"></param>
96 /// <param name="profileFileName"></param>
97 /// <param name="includeStoredProcedures"></param>
98 /// <param name="sqlCommandVariables"></param>
99 /// <returns></returns>
100 protected async Task<SqlEnvironmentDatabase> BuildDatabaseAsync(
101 string instanceId,
102 string databaseName,
103 string dacPacFileName,
104 string profileFileName,
105 bool includeStoredProcedures = true,
106 IDictionary<string, string> sqlCommandVariables = null,
107 CancellationToken cancellationToken = default)
108 {
109 // resolve target instance
110 var instance = await instances.GetInstanceAsync(instanceId);
111
112 // deploy database
113 await DeployDacPacAsync(
114 instance,
115 databaseName,
116 dacPacFileName,
117 profileFileName,
118 includeStoredProcedures,
119 sqlCommandVariables,
120 cancellationToken);
121
122 // return new database record
123 return new SqlEnvironmentDatabase(instance, databaseName);
124 }
125
126 /// <summary>
127 /// Deploys the specified SQL database against the given instance.
128 /// </summary>
129 /// <param name="instance"></param>
130 /// <param name="databaseName"></param>
131 /// <param name="dacPacFileName"></param>
132 /// <param name="profileFileName"></param>
133 /// <param name="includeStoredProcedures"></param>
134 /// <param name="sqlCommandVariables"></param>
135 /// <param name="cancellationToken"></param>
136 /// <returns></returns>
137 Task DeployDacPacAsync(
138 SqlEnvironmentInstance instance,
139 string databaseName,
140 string dacPacFileName,
141 string profileFileName,
142 bool includeStoredProcedures = true,
143 IDictionary<string, string> sqlCommandVariables = null,
144 CancellationToken cancellationToken = default)
145 {
146 if (instance == null)
147 throw new ArgumentNullException(nameof(instance));
148 if (databaseName == null)
149 throw new ArgumentNullException(nameof(databaseName));
150 if (dacPacFileName == null)
151 throw new ArgumentNullException(nameof(dacPacFileName));
152 if (profileFileName == null)
153 throw new ArgumentNullException(nameof(profileFileName));
154
155 if (File.Exists(dacPacFileName) == false)
156 throw new FileNotFoundException("Missing DACPAC. Ensure project has been built successfully.", dacPacFileName);
157 if (File.Exists(profileFileName) == false)
158 throw new FileNotFoundException("Missing DAC profile. Ensure project has been built successfully.", profileFileName);
159
160 return RETRY_POLICY.ExecuteAsync(() => Task.Run(async () =>
161 {
162 using (var cnn = await instance.OpenConnectionAsync())
163 {
164 // lock database for deployment
165 if (await cnn.GetAppLock($"SqlEnvironmentDatabaseBuilder::{databaseName}", timeout: (int)TimeSpan.FromMinutes(5).TotalMilliseconds) < 0)
166 throw new InvalidOperationException("Unable to acquire lock.");
167
168 try
169 {
170 // MD5SUM of the DACPAC is put onto the database to indicate no change
171 var tag = GetDacTag(dacPacFileName);
172 if (tag == await GetDacTag(instance.DbInstance, databaseName))
173 {
174 logger.Information("{Instance}.{Database} is up to date.", instance.DbInstance.ServerName, databaseName);
175 return;
176 }
177
178 using (var dac = LoadDacPackage(dacPacFileName))
179 {
180 // load up the DAC services
181 var svc = new DacServices(instance.DbInstance.CreateConnectionStringBuilder().ToString());
182 svc.Message += (s, a) => LogDacServiceMessage(instance.DbInstance.ServerName, databaseName, a);
183 svc.ProgressChanged += (s, a) => LogDacServiceProgress(instance.DbInstance.ServerName, databaseName, a);
184
185 var prf = LoadDacProfile(profileFileName);
186 var opt = prf.DeployOptions;
187
188 // excluded unsupported features
189 var exclude = new List<ObjectType>(opt.ExcludeObjectTypes);
190
191 // remove full text stuff
192 var fti = await instance.GetIsFullTextInstalled();
193 if (fti == false)
194 {
195 exclude.Add(ObjectType.FullTextCatalogs);
196 exclude.Add(ObjectType.FullTextStoplists);
197 }
198
199 // told to remove stored procedures
200 if (!includeStoredProcedures)
201 exclude.Add(ObjectType.StoredProcedures);
202
203 opt.ExcludeObjectTypes = exclude.ToArray();
204
205 // additional options
206 opt.PopulateFilesOnFileGroups = true;
207 opt.BlockOnPossibleDataLoss = false;
208 opt.DoNotAlterReplicatedObjects = false;
209
210 // apply manual variables
211 if (sqlCommandVariables != null)
212 foreach (var kvp in sqlCommandVariables)
213 opt.SqlCommandVariableValues[kvp.Key] = kvp.Value;
214
215 // filters out objects marked to be filtered
216 var args = new Dictionary<string, string>();
217 args["IsFullTextInstalled"] = fti.ToString();
218 opt.AdditionalDeploymentContributors = typeof(SqlDacDeploymentPlanObjectFilter).FullName;
219 opt.AdditionalDeploymentContributorArguments = string.Join(";", args.Select(i => $"{i.Key}={i.Value}"));
220
221 // check if database exists
222 if (await cnn.ExecuteScalarAsync((string)$"SELECT db_id('{databaseName}')") is short dbid)
223 {
224 cnn.ChangeDatabase(databaseName);
225
226 // some items are replicated
227 var helpDbReplicationOption = await cnn.ExecuteSpHelpReplicationDbOptionAsync(databaseName);
228 if (helpDbReplicationOption.TransactionalPublish)
229 {
230 if ((int)await cnn.ExecuteScalarAsync("SELECT COUNT(*) FROM sysarticles") > 0)
231 {
232 var reportTxt = svc.GenerateDeployReport(dac, databaseName, opt, cancellationToken);
233 var reportXml = XDocument.Parse(reportTxt);
234
235 foreach (var operation in reportXml.Root.Elements(dacRptNs + "Operations").Elements(dacRptNs + "Operation"))
236 if ((string)operation.Attribute("Name") == "TableRebuild" ||
237 (string)operation.Attribute("Name") == "Drop" ||
238 (string)operation.Attribute("Name") == "Alter" ||
239 (string)operation.Attribute("Name") == "Rename")
240 foreach (var item in operation.Elements(dacRptNs + "Item"))
241 if ((string)item.Attribute("Type") == "SqlTable" ||
242 (string)item.Attribute("Type") == "SqlSimpleColumn")
243 await DropReplicatedTableAsync(cnn, (string)item.Attribute("Value"));
244 }
245 }
246 }
247
248 // deploy database
249 logger.Information("Publishing {DacPacFile} to {Instance}:{Database}.", dacPacFileName, instance.DbInstance.ServerName, databaseName);
250 svc.Deploy(dac, databaseName, true, opt, cancellationToken);
251 }
252
253 // generate files for file groups
254 foreach (var group in await GetFileGroupsWithMissingFiles(instance.DbInstance, databaseName))
255 await CreateDefaultFilesForFileGroup(instance.DbInstance, databaseName, group);
256
257 // change database owner for test environment
258 await SetDatabaseOwner(instance.DbInstance, databaseName);
259
260 // record that the version we just deployed
261 await SetDacTag(instance.DbInstance, databaseName, tag);
262 }
263 finally
264 {
265 try
266 {
267 await cnn.ReleaseAppLock($"SqlEnvironmentDatabaseBuilder::{databaseName}");
268 }
269 catch (SqlException)
270 {
271 // ignore
272 }
273 }
274 }
275 }));
276 }
277
278 /// <summary>
279 /// Drops the specified table.
280 /// </summary>
281 /// <param name="cnn"></param>
282 /// <param name="tableName"></param>
283 /// <returns></returns>
284 async Task DropReplicatedTableAsync(SqlConnection cnn, string tableName)
285 {
286 var subscriptions = await cnn.LoadDataTableAsync($@"
287 SELECT DISTINCT
288 syspublications.name pubname,
289 sysarticles.name artname
290 FROM sysarticles
291 INNER JOIN syspublications
292 ON syspublications.pubid = sysarticles.pubid
293 INNER JOIN syssubscriptions
294 ON syssubscriptions.artid = sysarticles.artid
295 INNER JOIN sys.tables systables
296 ON systables.object_id = sysarticles.objid
297 INNER JOIN sys.schemas sysschemas
298 ON sysschemas.schema_id = systables.schema_id
299 WHERE sysarticles.type = 1
300 AND '[' + sysschemas.name + '].[' + systables.name + ']' = {tableName}");
301
302 foreach (var row in subscriptions.Rows.Cast<DataRow>())
303 await cnn.ExecuteNonQueryAsync($@"EXEC sp_dropsubscription @publication = {row["pubname"]}, @article = {row["artname"]}, @subscriber = N'all'");
304
305 var articles = await cnn.LoadDataTableAsync($@"
306 SELECT DISTINCT
307 syspublications.name pubname,
308 sysarticles.name artname
309 FROM sysarticles
310 INNER JOIN syspublications
311 ON syspublications.pubid = sysarticles.pubid
312 INNER JOIN sys.tables systables
313 ON systables.object_id = sysarticles.objid
314 INNER JOIN sys.schemas sysschemas
315 ON sysschemas.schema_id = systables.schema_id
316 WHERE sysarticles.type = 1
317 AND '[' + sysschemas.name + '].[' + systables.name + ']' = {tableName}");
318
319 foreach (var row in articles.Rows.Cast<DataRow>())
320 await cnn.ExecuteNonQueryAsync($@"EXEC sp_droparticle @publication = {row["pubname"]}, @article = {row["artname"]}");
321 }
322
323 /// <summary>
324 /// Sets the database owner.
325 /// </summary>
326 /// <param name="instance"></param>
327 /// <param name="databaseName"></param>
328 /// <returns></returns>
329 async Task SetDatabaseOwner(ISqlDbInstance instance, string databaseName)
330 {
331 using (var cnn = instance.CreateConnection())
332 {
333 await cnn.OpenAsync();
334 cnn.ChangeDatabase(databaseName);
335
336 var owner = (string)await cnn.ExecuteScalarAsync($"SELECT SUSER_SNAME(owner_sid) owner_name FROM sys.databases WHERE name = {databaseName}");
337 if (owner != "sa")
338 {
339 logger.Information("Changing owner of {DatabaseName} to 'sa'.", databaseName);
340 await cnn.ExecuteNonQueryAsync((string)$@"ALTER AUTHORIZATION ON DATABASE::[{databaseName}] TO [sa]");
341 }
342 }
343 }
344
345 /// <summary>
346 /// Sets the DacTag on the given database.
347 /// </summary>
348 /// <param name="instance"></param>
349 /// <param name="databaseName"></param>
350 /// <param name="tag"></param>
351 async Task SetDacTag(ISqlDbInstance instance, string databaseName, string tag)
352 {
353 using (var cnn = instance.CreateConnection())
354 {
355 await cnn.OpenAsync();
356 cnn.ChangeDatabase(databaseName);
357 await cnn.ExecuteNonQueryAsync($@"EXEC sys.sp_addextendedproperty @name = N'DACTAG', @value = {tag}");
358 }
359 }
360
361 /// <summary>
362 /// Gets the DacTag for a given database.
363 /// </summary>
364 /// <param name="instance"></param>
365 /// <param name="databaseName"></param>
366 /// <returns></returns>
367 async Task<string> GetDacTag(ISqlDbInstance instance, string databaseName)
368 {
369 using (var cnn = instance.CreateConnection())
370 {
371 await cnn.OpenAsync();
372
373 // check if database exists
374 if (await cnn.ExecuteScalarAsync((string)$"SELECT db_id('{databaseName}')") is short dbid)
375 {
376 // switch to database
377 cnn.ChangeDatabase(databaseName);
378
379 // select tag
380 if (await cnn.ExecuteScalarAsync((string)$@"SELECT TOP 1 value FROM sys.extended_properties WHERE class = 0 AND name = 'DACTAG'") is string value)
381 return value;
382 }
383 }
384
385 return null;
386 }
387
388 /// <summary>
389 /// Gets the DACTAG to be used to identify modifications to the given DAC file.
390 /// </summary>
391 /// <param name="file"></param>
392 /// <returns></returns>
393 string GetDacTag(string file)
394 {
395 return dacTagCache.GetOrAdd(GetFullPath(file), _ =>
396 {
397 using (var sha1 = SHA256.Create())
398 {
399 var s = 0;
400 var b = new byte[sha1.InputBlockSize];
401
402 // hash each set of hashable data
403 foreach (var stream in GetStreamsToHashForDacTag(file))
404 while ((s = stream.Read(b, 0, b.Length)) > 0)
405 sha1.TransformBlock(b, 0, s, b, 0);
406
407 // end hashing
408 sha1.TransformFinalBlock(b, 0, 0);
409 var h = sha1.Hash;
410
411 // final hash output as string
412 return BitConverter.ToString(h).Replace("-", "");
413 }
414 });
415 }
416
417 /// <summary>
418 /// Yields a series of streams to contribute to the DACTAG hash.
419 /// </summary>
420 /// <param name="file"></param>
421 /// <returns></returns>
422 IEnumerable<Stream> GetStreamsToHashForDacTag(string file)
423 {
424 using (var pkg = Package.Open(file, FileMode.Open))
425 {
426 if (GetPart(pkg, "/Origin.xml") is PackagePart origin)
427 foreach (var checksum in
428 XDocument.Load(origin.GetStream())
429 .Elements(dacSrsNs + "DacOrigin")
430 .Elements(dacSrsNs + "Checksums")
431 .Elements(dacSrsNs + "Checksum")
432 .OrderBy(i => (string)i.Attribute("Uri"))
433 .Select(i => i.Value))
434 yield return new MemoryStream(ParseHex(checksum));
435
436 if (GetPart(pkg, "/predeploy.sql") is PackagePart predeploy)
437 using (var stream = predeploy.GetStream())
438 yield return stream;
439
440 if (GetPart(pkg, "/postdeploy.sql") is PackagePart postdeploy)
441 using (var stream = postdeploy.GetStream())
442 yield return stream;
443 }
444 }
445
446 /// <summary>
447 /// Gets the specified part from the package, or returns <c>null</c>.
448 /// </summary>
449 /// <param name="pkg"></param>
450 /// <param name="partUri"></param>
451 /// <returns></returns>
452 PackagePart GetPart(Package pkg, string partUri)
453 {
454 var u = new Uri(partUri, UriKind.Relative);
455 return pkg.PartExists(u) ? pkg.GetPart(u) : null;
456 }
457
458 /// <summary>
459 /// Converts a hexadecimal string into a byte array.
460 /// </summary>
461 /// <param name="input"></param>
462 /// <returns></returns>
463 byte[] ParseHex(string input)
464 {
465 var i = 0;
466 var x = 0;
467 var bytes = new byte[input.Length / 2];
468
469 while (input.Length > i + 1)
470 {
471 long lngDecimal = Convert.ToInt32(input.Substring(i, 2), 16);
472 bytes[x] = Convert.ToByte(lngDecimal);
473 i += 2;
474 ++x;
475 }
476
477 return bytes;
478 }
479
480 /// <summary>
481 /// Logs the given <see cref="DacMessage"/>.
482 /// </summary>
483 /// <param name="instanceName"></param>
484 /// <param name="databaseName"></param>
485 /// <param name="args"></param>
486 void LogDacServiceMessage(string instanceName, string databaseName, DacMessageEventArgs args)
487 {
488 var l = logger.ForContext("DacMessageEventArgs", args);
489
490 switch (args.Message.MessageType)
491 {
492 case DacMessageType.Message:
493 l.Debug("{Instance}.{Database}: {Message}", instanceName, databaseName, args.Message.Message);
494 break;
495 case DacMessageType.Warning:
496 l.Warning("{Instance}.{Database}: {Message}", instanceName, databaseName, args.Message.Message);
497 break;
498 case DacMessageType.Error:
499 l.Error("{Instance}.{Database}: {Message}", instanceName, databaseName, args.Message.Message);
500 break;
501 }
502 }
503
504 /// <summary>
505 /// Logs the given DAC progress.
506 /// </summary>
507 /// <param name="instanceName"></param>
508 /// <param name="databaseName"></param>
509 /// <param name="args"></param>
510 void LogDacServiceProgress(string instanceName, string databaseName, DacProgressEventArgs args)
511 {
512 logger
513 .ForContext("DacProgressEventArgs", args)
514 .Information("{Instance}.{Database}: {Message}", instanceName, databaseName, args.Message);
515 }
516
517 /// <summary>
518 /// Gets the names of filegroups for the given database that are missing files.
519 /// </summary>
520 /// <param name="instance"></param>
521 /// <param name="databaseName"></param>
522 /// <returns></returns>
523 async Task<string[]> GetFileGroupsWithMissingFiles(ISqlDbInstance instance, string databaseName)
524 {
525 using (var cnn = instance.CreateConnection())
526 {
527 await cnn.OpenAsync();
528 cnn.ChangeDatabase(databaseName);
529
530 using (var cmd = cnn.CreateCommand())
531 {
532 cmd.CommandText = @"
533 SELECT *
534 FROM sysfilegroups
535 LEFT JOIN sysfiles
536 ON sysfiles.groupid = sysfilegroups.groupid
537 WHERE sysfiles.fileid IS NULL";
538
539 var l = new List<string>();
540 using (var rdr = await cmd.ExecuteReaderAsync())
541 while (await rdr.ReadAsync())
542 l.Add((string)rdr["groupname"]);
543
544 return l.ToArray();
545 }
546 }
547 }
548
549 /// <summary>
550 /// Finds the path of the first known file within the database, which will serve as the default file path.
551 /// </summary>
552 /// <param name="instance"></param>
553 /// <param name="databaseName"></param>
554 /// <returns></returns>
555 async Task<string> GetDefaultFilePathForDatabase(ISqlDbInstance instance, string databaseName)
556 {
557 using (var cnn = instance.CreateConnection())
558 {
559 await cnn.OpenAsync();
560 cnn.ChangeDatabase(databaseName);
561
562 var file = (string)await cnn.ExecuteScalarAsync($@"
563 SELECT TOP 1
564 filename
565 FROM sysfiles
566 WHERE filename IS NOT NULL");
567 var path = new FileInfo(file).DirectoryName;
568
569 return path;
570 }
571 }
572
573 /// <summary>
574 /// Creates a default set of files for the given file group.
575 /// </summary>
576 /// <param name="instance"></param>
577 /// <param name="databaseName"></param>
578 /// <param name="groupName"></param>
579 async Task CreateDefaultFilesForFileGroup(ISqlDbInstance instance, string databaseName, string groupName)
580 {
581 var rootPath = await GetDefaultFilePathForDatabase(instance, databaseName);
582 var dataPath = Path.Combine(rootPath, databaseName + "_" + groupName + ".mdf");
583 var logsPath = Path.Combine(rootPath, databaseName + "_" + groupName + ".ldf");
584
585 using (var cnn = instance.CreateConnection())
586 {
587 await cnn.OpenAsync();
588 cnn.ChangeDatabase(databaseName);
589
590 await cnn.ExecuteNonQueryAsync((string)$@"
591 ALTER DATABASE [{databaseName}]
592 ADD FILE ( NAME = {groupName}_DATA, FILENAME = '{dataPath}' )
593 TO FILEGROUP {groupName}");
594 }
595 }
596
597 /// <summary>
598 /// Builds a new publication.
599 /// </summary>
600 /// <param name="instanceId"></param>
601 /// <param name="databaseName"></param>
602 /// <param name="publication"></param>
603 /// <returns></returns>
604 protected async Task<SqlEnvironmentDatabase> BuildSubscriptionAsync(string instanceId, string databaseName, SqlEnvironmentPublication publication, CancellationToken cancellationToken = default)
605 {
606 var subscriberInstance = await instances.GetInstanceAsync(instanceId);
607 if (subscriberInstance == null)
608 throw new InvalidOperationException();
609
610 await RETRY_POLICY.ExecuteAsync(() => Task.Run(async () =>
611 {
612 using (var publisher = publication.Database.Instance.DbInstance.CreateConnection())
613 using (var subscriber = subscriberInstance.DbInstance.CreateConnection())
614 {
615 await publisher.OpenAsync();
616 await subscriber.OpenAsync();
617
618 using (await staticSync.LockAsync())
619 {
620 // only allow a single executor to modify the permissions
621 // discover distributor info from publisher
622 var distributorInfo = await publisher.ExecuteSpHelpDistributorAsync();
623 if (distributorInfo?.Account != null)
624 {
625 await subscriber.CreateWindowsLoginIfNotExistsAsync(distributorInfo.Account);
626 await subscriber.ExecuteSpAddSrvRoleMemberAsync(distributorInfo.Account, "sysadmin");
627 }
628 }
629
630 // ensure subscriber database exists
631 await subscriber.ExecuteNonQueryAsync((string)$@"
632 IF NOT EXISTS ( SELECT * FROM sys.databases WHERE name = N'{databaseName}' )
633 CREATE DATABASE [{databaseName}]");
634
635 publisher.ChangeDatabase(publication.Database.Name);
636
637 await publisher.ExecuteNonQueryAsync($@"
638 IF NOT EXISTS ( SELECT * FROM syssubscriptions WHERE srvname = {subscriberInstance.DbInstance.ServerName} AND dest_db = {databaseName} )
639 EXEC sp_addsubscription
640 @publication = {publication.Name},
641 @subscriber = {subscriberInstance.DbInstance.ServerName},
642 @destination_db = {databaseName},
643 @subscription_type = N'Push',
644 @sync_type = N'automatic',
645 @article = N'all',
646 @update_mode = N'read only',
647 @subscriber_type = 0");
648
649 var articles = (await publisher.LoadDataTableAsync($@"
650 SELECT DISTINCT
651 NULLIF(a.artid, '') as article_id,
652 NULLIF(a.name, '') as article_name,
653 COALESCE(u.srvname, '') as subscriber_name
654 FROM syspublications p
655 INNER JOIN sysarticles a
656 ON a.pubid = p.pubid
657 INNER JOIN sys.tables t
658 ON t.object_id = a.objid
659 INNER JOIN sys.schemas s
660 ON s.schema_id = t.schema_id
661 LEFT JOIN syssubscriptions u
662 ON u.artid = a.artid
663 AND u.srvname = N'{subscriberInstance.DbInstance.ServerName}'
664 WHERE p.name = N'{publication.Name}'"))
665 .Rows.Cast<DataRow>()
666 .Select(i => new
667 {
668 ArticleId = (int)i["article_id"],
669 ArticleName = (string)i["article_name"],
670 SubscriberName = (string)i["subscriber_name"]
671 });
672
673 // add missing articles
674 foreach (var article in articles)
675 if (!string.IsNullOrEmpty(article.ArticleName) && string.IsNullOrEmpty(article.SubscriberName))
676 await publisher.ExecuteSpAddSubscriptionAsync(
677 publication: publication.Name,
678 subscriber: subscriberInstance.DbInstance.ServerName,
679 subscriberType: 0,
680 subscriptionType: "Push",
681 destinationDb: databaseName,
682 article: article.ArticleName,
683 syncType: "automatic",
684 updateMode: "read only");
685
686 using (await staticSync.LockAsync())
687 {
688 try
689 {
690 // configure push agent
691 await publisher.ExecuteSpAddPushSubscriptionAgentAsync(
692 publication: publication.Name,
693 subscriber: subscriberInstance.DbInstance.ServerName,
694 subscriberDb: databaseName,
695 subscriberSecurityMode: 1);
696 }
697 catch (Exception e)
698 {
699 logger.Error(e, "Exception adding push subscription agent.");
700 }
701
702 // start publication
703 try
704 {
705 await publisher.ExecuteSpStartPublicationSnapshotAsync(publication.Name);
706 }
707 catch (Exception)
708 {
709 // ignore
710 }
711 }
712 }
713 }));
714
715 return new SqlEnvironmentDatabase(subscriberInstance, databaseName);
716 }
717
718 /// <summary>
719 /// Gets the ID of the instance this database will exist on.
720 /// </summary>
721 public abstract string InstanceId { get; }
722
723 /// <summary>
724 /// Gets the ID of the database to be built.
725 /// </summary>
726 public abstract string DatabaseId { get; }
727
728 /// <summary>
729 /// Builds the requested database.
730 /// </summary>
731 /// <param name="cancellationToken"></param>
732 /// <returns></returns>
733 public abstract Task<SqlEnvironmentDatabase> BuildDatabaseAsync(CancellationToken cancellationToken = default);
734
735 }
736
737}