· 7 years ago · Feb 12, 2019, 08:24 PM
1import std.stdio;
2import std.array;
3import std.conv;
4import std.string;
5import std.path;
6import std.file;
7import requests;
8import arsd.database, arsd.sqlite;
9import std.regex;
10import core.time : days;
11import std.datetime;
12import std.algorithm;
13import std.zip;
14import std.range;
15import std.process;
16import std.concurrency;
17import core.thread;
18
19string ftp_url;
20string ftp_login;
21string ftp_pass;
22string root_ftp_dir;
23
24int count = 0;
25
26Request rq;
27Response rs;
28Database db;
29
30string ftp_root_uri = "ftp://ftp.zakupki.gov.ru/";
31string python_script_dir = r"D:\code\2018\zakupki-downloader\source2";
32
33string files_folder;
34
35string db_name = "my.db";
36
37string fz_name = "44"; // или 223
38
39struct MyStruct
40{
41 string region_name;
42 string section_name;
43 string full_file_path;
44 string arch_date;
45}
46
47 MyStruct [] mystructs;
48
49
50static this()
51{
52 if(fz_name == "44")
53 {
54 ftp_url = "ftp.zakupki.gov.ru";
55 ftp_login = "free";
56 ftp_pass = "free";
57 root_ftp_dir = "/fcs_regions/";
58 }
59 else
60 {
61 ftp_url = "ftp.zakupki.gov.ru";
62 ftp_login = "fz223free";
63 ftp_pass = "fz223free";
64 root_ftp_dir = "/out/published/";
65 }
66}
67
68void main()
69{
70
71 writeln(thisExePath());
72
73 files_folder = dirName(thisExePath()) ~ `/files/`;
74
75 if(!db_name.exists()) // Ñкорее вÑего тут будет падать, некогда проверÑть
76 {
77 init_db();
78 }
79
80 db = new Sqlite(db_name);
81
82 //downloadFile();
83
84 chdir(python_script_dir);
85
86 foreach(task; iota(0,10))
87 {
88 spawn(&extractFromDBAndProcess);
89 }
90
91 //extractFromDBAndProcess();
92
93 //listOfFinalFolders(); // только выгружаем в БД данные
94
95}
96
97void init_db()
98{
99 string sql_create_1 = `CREATE TABLE IF NOT EXISTS ftp_files (ID INTEGER PRIMARY KEY AUTOINCREMENT, region TEXT NOT NULL, section_name TEXT NOT NULL, ftp_file_full_path TEXT NOT NULL UNIQUE, arch_date TEXT NOT NULL, processing_status TEXT)`;
100 db.query(sql_create_1);
101
102 string sql_create_2 = `CREATE TABLE IF NOT EXISTS "processing_files"("arch_full_name" Text, "error_message" Text, "status" Text, "id" Integer NOT NULL PRIMARY KEY AUTOINCREMENT, "xml_name" Text );`;
103 db.query(sql_create_2);
104}
105
106void listOfFinalFolders() // ÑпиÑок файлов Ñ Ð¿Ð¾Ð»Ð½Ñ‹Ð¼Ð¸ путÑми
107{
108
109 MyStruct mystruct;
110
111 rq.verbosity = 3;
112 rq.authenticator = new BasicAuthentication(ftp_login, ftp_pass);
113
114 string [] sectionsForProcessing = listOfSectionForProcessing();
115
116 if(fz_name == "223")
117 {
118 rs = rq.get("ftp://ftp.zakupki.gov.ru/out/published/");
119 string [] list_of_root_regions_folders;
120
121 foreach (line; splitLines(to!string(rs.responseBody)))
122 {
123 if(line.startsWith(`/`))
124 {
125 foreach(section; sectionsForProcessing)
126 {
127 string folder_path;
128 if(baseName(line).toLower == "moskva") // пока только Ð´Ð»Ñ Ð¼Ð¾Ñквы
129 {
130 folder_path = `ftp://ftp.zakupki.gov.ru/out/published/` ~ baseName(line) ~ `/` ~ section ~ `/daily/`; // baseName(line) - название региона
131 //writeln("folder_paths: ", folder_paths);
132
133 string [] file_list;
134 file_list = getListOfFolderFiles(folder_path);
135
136 foreach(file_full_name; file_list)
137 {
138 auto str_date = matchFirst(file_full_name, regex(r"([0-9]{8})")); // 20180711
139 if (to!int(str_date.hit[0..4])>=2018) // берем только 2018 и Ñтарше
140 {
141 mystruct.region_name = baseName(line);
142 mystruct.section_name = section;
143 mystruct.full_file_path = file_full_name;
144 mystruct.arch_date = str_date.hit;
145 mystructs ~= mystruct;
146
147 }
148 }
149 }
150
151
152 }
153 }
154 }
155
156 }
157
158 if(fz_name == "44")
159 {
160 rs = rq.get("ftp://ftp.zakupki.gov.ru/fcs_regions/");
161 string [] list_of_root_regions_folders;
162
163 foreach (line; splitLines(to!string(rs.responseBody)))
164 {
165 if(line.startsWith(`/`))
166 {
167 foreach(section; sectionsForProcessing)
168 {
169 string folder_path;
170 if(baseName(line).toLower == "moskva") // пока только Ð´Ð»Ñ Ð¼Ð¾Ñквы
171 {
172 folder_path = `ftp://ftp.zakupki.gov.ru/fcs_regions/` ~ baseName(line) ~ `/` ~ section ~ `/currMonth/`; // baseName(line) - название региона
173 //writeln("folder_paths: ", folder_paths);
174
175 string [] file_list;
176 file_list = getListOfFolderFiles(folder_path);
177
178 foreach(file_full_name; file_list)
179 {
180 auto str_date = matchFirst(file_full_name, regex(r"([0-9]{8})")); // 20180711
181 if (to!int(str_date.hit[0..4])>=2018) // берем только 2018 и Ñтарше
182 {
183 mystruct.region_name = baseName(line);
184 mystruct.section_name = section;
185 mystruct.full_file_path = file_full_name;
186 mystruct.arch_date = str_date.hit;
187 mystructs ~= mystruct;
188
189 }
190 }
191 }
192
193
194 }
195 }
196 }
197
198 }
199
200 else
201 {
202 writeln("Unkdown FZ");
203 }
204
205 saveToDB();
206
207}
208
209string [] getListOfFolderFiles(string folder)
210{
211
212 string [] file_list;
213
214 rq.verbosity = 3;
215 rq.authenticator = new BasicAuthentication(ftp_login, ftp_pass);
216 rs = rq.get(folder);
217
218 foreach (line; splitLines(to!string(rs.responseBody)))
219 {
220 if(line.startsWith(`/`))
221 {
222 file_list ~= line;
223 //writeln(line);
224 }
225 }
226
227 return file_list;
228
229}
230
231
232void saveToDB()
233{
234 writeln("saveToDB");
235
236 foreach(mystr; mystructs)
237 {
238 //writeln(mystr);
239 db.query(`INSERT INTO ftp_files (region, section_name, ftp_file_full_path, arch_date) VALUES (?, ?, ?, ?);`, mystr.region_name, mystr.section_name, mystr.full_file_path, mystr.arch_date);
240 }
241}
242
243
244void extractFromDBAndProcess()
245{
246
247 foreach(row; db.query(`SELECT ID, region, section_name, ftp_file_full_path, arch_date, processing_status FROM ftp_files WHERE processing_status IS NULL AND isProcessing IS NOT True;`))
248 {
249 string sql_set_processing_flag = `UPDATE "ftp_files" SET "isProcessing" = True WHERE "ID" = ` ~ row["ID"] ~ `;`;
250 db.query(sql_set_processing_flag);
251
252 writeln(row["ftp_file_full_path"]);
253 downloadFile(row["ftp_file_full_path"], row["section_name"], to!int(row["ID"]));
254
255 }
256}
257
258void updateArchiveStatus(string status, int id)
259{
260 string sql = (`UPDATE "ftp_files" SET "processing_status" = "%s" WHERE id=%d`).format(status,id);
261 db.query(sql);
262}
263
264
265void updateXMLStatus(string arch_name, string xml_name, string status, string error_message = "")
266{
267 string sql;
268 if(error_message == "")
269 sql = (`INSERT OR REPLACE INTO processing_files(arch_full_name, xml_name, status) VALUES ('%s', '%s', '%s')`).format(arch_name, xml_name, status);
270 else
271 sql = (`INSERT OR REPLACE INTO processing_files(arch_full_name, xml_name, status, error_message) VALUES ('%s', '%s', '%s', '%s')`).format(arch_name, xml_name, status, error_message);
272
273 db.query(sql);
274}
275
276
277void downloadFile(string full_file_path, string section_name, int id )
278{
279 if(!exists("files"))
280 {
281 auto dir = "files";
282 dir.mkdir;
283 }
284
285 rq.authenticator = new BasicAuthentication(ftp_login, ftp_pass);
286 Response rs;
287 try // потенциальное падение. Пофиг еÑли упало
288 {
289 rs = rq.get(ftp_root_uri ~ `/` ~ full_file_path);
290 }
291
292 catch (Exception e)
293 {
294 writeln("[ERROR] FAILED request to FTP");
295 }
296
297
298 string archive_path = files_folder ~ baseName(full_file_path);
299 writeln("archive_path: ", archive_path);
300
301 File f = File(archive_path, "wb");
302 f.rawWrite(rs.responseBody.data);
303 f.close();
304
305 if(getSize(archive_path) < 256)
306 {
307 writeln("File too small and deleted: ", archive_path);
308 archive_path.remove;
309 return;
310 }
311
312 processSingleFile(archive_path, section_name, id);
313 archive_path.remove();
314
315}
316
317void processSingleFile(string archive_path, string section_name, int id)
318{
319
320 auto zip = new ZipArchive(read(archive_path));
321
322 foreach (file_name, am; zip.directory)
323 {
324 if(file_name.canFind("Cancel")) // обычно каÑаетÑÑ Ð¿Ñ€Ð¾Ñ‚Ð¾ÐºÐ¾Ð»Ð¾Ð². Подобные мы пропуÑкаем
325 continue;
326
327 if(file_name.extension != ".xml")
328 continue;
329
330 writeln("file_name: ", file_name);
331 //readln();
332
333 writefln("%10s %08x %s", am.expandedSize, am.crc32, file_name);
334 assert(am.expandedData.length == 0);
335 // decompress the archive member
336 auto my_xml = zip.expand(am);
337
338 string xml_dir = archive_path.stripExtension.stripExtension;
339
340 if(!exists(xml_dir))
341 {
342 auto dir = xml_dir;
343 dir.mkdir;
344 }
345
346 string xml_full_path = xml_dir ~ `/` ~ file_name;
347
348 File f = File(xml_full_path, "wb");
349 f.rawWrite(my_xml);
350 f.close();
351 writeln("xml_full_path: ", xml_full_path);
352
353 //writeln(`python ` ~ python_script_dir ~ `\main.py ` ~ xml_full_path ~ ` ` ~ section_name);
354
355 // auto pid = spawnShell(`python ` ~ python_script_dir ~ `\main.py ` ~ xml_full_path ~ ` ` ~ section_name);
356 // auto exitCode = wait(pid);
357 // if(exitCode == 0)
358 // updateXMLStatus(archive_path, file_name, "success");
359 // else
360 //updateXMLStatus(archive_path, file_name, "fail");
361
362
363 auto result = executeShell(`python ` ~ python_script_dir ~ `\main.py ` ~ xml_full_path ~ ` ` ~ section_name);
364 //auto exitCode = wait(pid);
365 if(result.status == 0)
366 {
367 updateXMLStatus(archive_path, file_name, "success", "success"); // Ñтавим error_text в error_text (поÑледнее), чтобы наглÑднее было
368 }
369 else
370 {
371
372 bool isErrorTextFound = false;
373 string error_text;
374 foreach(line; result.output.splitLines())
375 {
376 if (line.canFind("ORA")) // мы нашли почему упало
377 {
378 isErrorTextFound = true;
379 error_text = line;
380 }
381
382 }
383
384 if (isErrorTextFound)
385 {
386 updateXMLStatus(archive_path, file_name, "fail", error_text);
387 }
388
389 else
390 {
391 updateXMLStatus(archive_path, file_name, "fail");
392 }
393 //readln;
394 writeln("----------------------------");
395
396
397 }
398
399
400
401 writeln("\ncount: ", count++);
402 }
403
404 updateArchiveStatus("done", id); // когда обработали вÑе файлы в архиве
405}
406
407string [] listOfSectionForProcessing()
408{
409 string [] sections;
410 if(fz_name == "223")
411 {
412 //string [] sections = ["purchaseContract", "purchaseNotice", "purchaseNoticeAE", "purchaseProtocol"];
413 sections = ["purchaseContract", "purchaseNotice"];
414 }
415
416 if(fz_name == "44")
417 {
418 sections = ["protocols", "contracts", "notifications"];
419 }
420
421 return sections;
422
423}