· 7 years ago · Mar 20, 2018, 07:44 PM
1import os
2import csv
3import json
4import glob
5import sqlite3
6import subprocess
7import traceback
8import lxml
9from lxml.etree import XMLSyntaxError
10
11from nfer.utils import xcode
12import justext
13
14
15class Crawling(object):
16
17 def __init__(
18 self,
19 _id,
20 db,
21 cursor,
22 log,
23 download_path,
24 text_tool_path,
25 token_collection_name,
26 setup,
27 user,
28 secret_key):
29
30 self.log = log
31 self._id = _id
32 self.db = db
33 self.setup = setup
34 self.user = user
35 self.secret_key = secret_key
36 self.token_collection_name = token_collection_name
37 self.cursor = cursor
38 self.text_tool_path = text_tool_path
39 self.download_path = download_path
40 self.error_logs = open('/tmp/crawling_server_error_logs.log', 'a')
41 self.minimum_threshold_value = 7
42
43 def _sub_process(self, command):
44
45 process = subprocess.Popen(command, stdout=subprocess.PIPE,
46 stderr=subprocess.STDOUT, shell=True)
47
48 output, error = process.communicate()
49 if error:
50 self.error_logs.write('command used: {com}'.format(com=command))
51 self.error_logs.write(error + '\n')
52 print output
53
54 def sub_process_for_downloading_urls(
55 self,
56 depth_level=0,
57 accept_regex='',
58 url=''):
59
60 command = 'cd {cp} && wget --tries 3 --timeout 10 --wait=1 --random-wait --recursive --level={l} --accept-regex {ar} {url}'.format(
61 cp=self.current_path, l=depth_level, ar=accept_regex, url=url)
62 self.log.debug('started crawling url', url=url)
63 self._sub_process(command)
64 self.log.debug('downloading is done and stored in file')
65
66 def clean_text_file(self):
67 self.cleaned_output_file = os.path.join(
68 self.current_path, 'cleaned_final_text.txt')
69 clean_text_command = './fast_clean2 -input {ip} -output {op} -threads 1 -preserve data/preserve_phrases.txt -stopwords data/stopwords.txt'.format(
70 ip=self.final_text_file, op=self.cleaned_output_file)
71 command = 'cd {path} && {run_command}'.format(
72 path=self.text_tool_path, run_command=clean_text_command)
73 self.log.debug(
74 'started cleaning text file',
75 path=self.cleaned_output_file,
76 command=command)
77 self._sub_process(command)
78 self.log.debug(
79 'cleaning text is done',
80 file_path=self.cleaned_output_file)
81
82 def phrases_generation(self):
83 self.output_phrases_file = os.path.join(
84 self.current_path, 'output_phrases.txt')
85 phrases_run_command = 'python text2vocab.py run -i {ip} -v data/corpus_vocab_20170930.txt -o {op} --stop-words-file data/stopwords.txt'.format(
86 ip=self.cleaned_output_file, op=self.output_phrases_file)
87 phrases_gen_run_command = 'cd {path} && {comm}'.format(
88 path=self.text_tool_path, comm=phrases_run_command)
89 self.log.debug(
90 'started generating phrases',
91 file=self.output_phrases_file,
92 command=phrases_gen_run_command)
93 self._sub_process(phrases_gen_run_command)
94 self.log.debug('phrases generations is completed')
95
96 def sort_phrases(self):
97 self.sorted_phrases_file = os.path.join(self.current_path, 'sorted_phrases.txt')
98 command = 'cat {out_phras} | sort | uniq -c | sort -n | tac > {sor_phras}'.format(out_phras=self.output_phrases_file, sor_phras=self.sorted_phrases_file)
99 self.log.debug('started sorting phrases', command=command)
100 self._sub_process(command)
101 self.log.debug('sorting completed')
102 self.final_sorted_phrases_file = os.path.join(self.current_path, 'final_sorted_phrases.txt')
103 final_sorted_file_inst = open(self.final_sorted_phrases_file, 'w')
104
105 with open(self.sorted_phrases_file) as f:
106 line_count = 0
107 for line in f:
108 #import pdb;pdb.set_trace()
109 line_chunks = filter(None, line.split(' '))
110 if not (int(line_chunks[0]) > self.minimum_threshold_value):
111 break
112 token = line_chunks[-1].split('\t')[-1]
113 final_sorted_file_inst.write(token + '\n')
114 line_count += 1
115 self.log.debug('phrases filteration is done', count=line_count)
116
117 def txt_to_csv_convertor(self):
118 self.log.debug('started converting txt file to csv')
119 input_file_inst = open(self.final_sorted_phrases_file)
120 self.output_csv_file_name = os.path.join(self.current_path, 'output_phrases.csv')
121
122 output_file_inst = open(self.output_csv_file_name, 'w')
123 in_txt = csv.reader(input_file_inst, delimiter='\t')
124 out_csv = csv.writer(output_file_inst)
125 output_file_inst.write('token' + '\n')
126 out_csv.writerows(in_txt)
127 self.log.debug('txt to csv conversion is done')
128
129 def upload_collections(self):
130 self.cursor.execute("select status from nference_jobs_data where id=:id", {"id": self._id})
131 if not self.cursor.fetchall()[-1][0] == 'terminated':
132 command = 'nferx -u {u} -s {s} -a {setup} create_token_collection -t {coll} -f {fn} -d tab'.format(u=self.user, s=self.secret_key, setup=self.setup, coll=self.token_collection_name, fn=self.output_csv_file_name)
133 self.log.debug('started uploading collection', setup=self.setup, file_name=self.output_csv_file_name)
134 self._sub_process(command)
135 self.log.debug('uploading collections is done', token_coll_name=self.token_collection_name)
136 else:
137 self.log.debug('job terminated so skipping uploading collections')
138
139 def extract_text_from_rawdata(self):
140 self.log.debug('started text extraction')
141 self.final_text_file = os.path.join(
142 self.current_path, 'final_text.txt')
143 file_name = open(self.final_text_file, 'w')
144 # TODO Fix me am having concern
145 for path_patterns in os.walk(self.current_path):
146 first_half_path = path_patterns[0]
147 for _file in path_patterns[-1]:
148 _file = os.path.join(first_half_path, _file)
149 self.log.info('file name', file=_file)
150 with open(_file) as f:
151 try:
152 paragraphs = justext.justext(
153 f.read(), justext.get_stoplist('English'))
154
155 for paragraph in paragraphs:
156 file_name.write(xcode(paragraph.text) + '\n')
157 file_name.flush()
158 except:
159 self.error_logs.write(traceback.format_exc())
160 # TODO FIX ME
161 continue
162 hdoc = lxml.html.fromstring(f.read())
163 file_name.write(''.join(hdoc.xpath('//text()')) + '\n')
164 file_name.flush()
165
166 file_name.close()
167 self.log.debug(
168 'text extraction is done',
169 file_path=self.final_text_file)
170
171 def fetch_urls(self, params):
172
173 params = json.loads(params)
174 if not (type(params) == list):
175 params = json.loads(params)
176
177 for record in params:
178 try:
179 url = record.get('start_url', '')
180 self.sub_process_for_downloading_urls(
181 record.get('depth', ''),
182 record.get('accept_regex', ''),
183 url)
184
185 except:
186 self.log.error("got error while fetching url",exception=traceback.format_exc())
187 self.error_logs.write(str(traceback.format_exc()) + '\n')
188 self.error_logs.flush()
189 continue
190
191 def process_job(self):
192 # TODO SQL INJECTION
193 command = 'select id, dt_created, corpus, params from nference_jobs_data where status="ready" and id=:id limit 1'
194 self.cursor.execute(command, {'id': self._id})
195 id, dt_created, corpus, params = self.cursor.fetchall()[-1]
196
197 # UPDATING STATUS IN DB
198 update_command = 'UPDATE nference_jobs_data SET status=:status where id=:id'
199 self.cursor.execute(update_command, {"id": id, "status": "running"})
200 self.db.commit()
201 path = str(dt_created) + '_' + str(corpus)
202 self.log.debug(
203 'date and corpus version info',
204 corpus=corpus,
205 date=dt_created,
206 id=id)
207 self.current_path = os.path.join(self.download_path, path)
208 try:
209 os.mkdir(self.current_path)
210 except OSError:
211 self.error_logs.write(traceback.format_exc() + '\n')
212 self.error_logs.flush()
213 pass
214
215 # Fetching
216 self.fetch_urls(params)
217 self.extract_text_from_rawdata()
218 self.clean_text_file()
219 self.phrases_generation()
220 self.sort_phrases()
221 self.txt_to_csv_convertor()
222 self.upload_collections()
223
224 # UPDATING STATUS IN DB
225 self.cursor.execute(update_command, {"id": id, "status": "done"})
226 self.db.commit()
227
228 def run(self):
229 self.log.debug('job started', id=self._id)
230 self.process_job()
231 self.log.debug('job completed', id=self._id)