· 7 years ago · Mar 20, 2018, 07:44 PM
1import os
2import json
3import sqlite3
4import hashlib
5import datetime
6import threading
7import binascii
8
9from funcserver import Server
10from nfer.utils import xcode
11from crawl_sqlite3_urls import Crawling
12
13def generate_random_string(length=6):
14 '''
15 Returns a random string of a specified length.
16
17 >>> len(generate_random_string(length=25))
18 25
19
20 # Test randomness. Try N times and observe no duplicaton
21 >>> N = 100
22 >>> len(set(generate_random_string(10) for i in range(N))) == N
23 True
24 '''
25 n = int(length / 2 + 1)
26 x = binascii.hexlify(os.urandom(n))
27 return x[:length]
28
29
30
31class Urls2CollectionsAPI(object):
32 def __init__(self, database_path, download_path, text_tool_path):
33 self.database_path = database_path
34 self.db = sqlite3.connect(self.database_path, check_same_thread=False)
35 self.cursor = self.db.cursor()
36 self.download_path = download_path
37 self.text_tool_path = text_tool_path
38 #TODO update table name
39 try:
40 self.cursor.execute('''CREATE TABLE nference_jobs_data(id TEXT PRIMARY KEY, status TEXT,
41 dt_created TEXT, corpus INTEGER, params TEXT, token_collection_name TEXT, setup TEXT, user TEXT, secret_key TEXT)''')
42 except sqlite3.OperationalError:
43 pass
44
45 def new(self, urls, corpus, token_collection_name, setup, user, secret_key):
46 _id = generate_random_string(length=32)
47 dt_created = datetime.datetime.utcnow().isoformat()
48
49 self.cursor.execute(
50 '''INSERT INTO nference_jobs_data(id, status, dt_created, corpus, params, token_collection_name, setup, user, secret_key)VALUES(?,?,?,?,?,?,?,?,?)''',
51 (_id,
52 'ready',
53 dt_created,
54 corpus,
55 json.dumps(urls),
56 token_collection_name,
57 setup,
58 user,
59 secret_key))
60 self.db.commit()
61 t = threading.Thread(target=Crawling(_id, self.db, self.cursor, self.log, self.download_path, self.text_tool_path, token_collection_name, setup, user, secret_key).run)
62 t.daemon = True
63 t.start()
64 return _id
65
66 def status(self, job_id):
67 status_obj = self.cursor.execute(
68 'SELECT status FROM nference_jobs_data WHERE id=:id', {
69 "id": job_id})
70 status = status_obj.fetchone()[0]
71 return status
72
73 def terminate(self, job_id):
74 status_obj = self.cursor.execute(
75 'UPDATE nference_jobs_data SET status=:status where id=:id', {
76 "status": "terminated", "id": job_id})
77 self.db.commit()
78 return 'terminated {id}'.format(id=job_id)
79
80
81class Urls2Collections(Server):
82
83 def define_args(self, parser):
84 super(Urls2Collections, self).define_args(parser)
85
86 parser.add_argument('-db', '--database-path', required=True,
87 help="provide database absolute path")
88 parser.add_argument('-dp', '--download-path', required=True,
89 help="provide default path where download urls should store")
90 parser.add_argument('-tf', '--text-tool-path', required=True,
91 help='provide text cleaning script path')
92
93 def prepare_api(self):
94 super(Urls2Collections, self).prepare_api()
95
96 return Urls2CollectionsAPI(self.args.database_path, self.args.download_path, self.args.text_tool_path)
97
98if __name__ == '__main__':
99 Urls2Collections().start()