· 6 years ago · Jul 26, 2019, 10:28 AM
1import re
2import sqlite3
3import logging
4from urllib.parse import urlparse
5
6logger = logging.getLogger(__name__)
7
8
9class SqlitePipeline(object):
10 def open_spider(self, spider):
11 self.conn = sqlite3.connect('Leads.db')
12 self.cursor = self.conn.cursor()
13
14 create_source_table = 'CREATE TABLE IF NOT EXISTS data (' \
15 'business_name VARCHAR(255),' \
16 'owner VARCHAR(255),' \
17 'phone VARCHAR(255),' \
18 'email VARCHAR(255),' \
19 'website VARCHAR(255),' \
20 'state VARCHAR(255),' \
21 'address VARCHAR (255),' \
22 'zip_code VARCHAR (32),' \
23 'directory VARCHAR (255),' \
24 'directory_business_link VARCHAR (255),'\
25 'keyword VARCHAR (255))'
26 self.cursor.execute(create_source_table)
27 self.conn.commit()
28
29 @staticmethod
30 def clean_phone(phone):
31 if not isinstance(phone, str):
32 return ''
33
34 cleaned_phone = phone.strip()
35 if cleaned_phone.startswith('+1'):
36 cleaned_phone = cleaned_phone.strip('+1')
37
38 cleaned_phone = re.sub('[^\d]', '', cleaned_phone)
39 return cleaned_phone
40
41 @staticmethod
42 def clean_website(website):
43 if not isinstance(website, str):
44 return ''
45 return urlparse(website.strip()).netloc
46
47 def process_item(self, item, spider):
48 table_name = 'data'
49
50 # replace all \xa0 with space and strip text
51 for key in item:
52 if isinstance(item[key], str):
53 item[key] = item[key].replace('\xa0', ' ').strip()
54
55 item['phone'] = self.clean_phone(item.get('phone', ''))
56
57 if 'website' in item:
58 if isinstance(item['website'], (set, list)):
59 cleaned_website_list = set()
60 for website in item['website']:
61 cleaned_website = self.clean_website(website)
62 cleaned_website_list.add(cleaned_website)
63 item['website'] = ','.join(cleaned_website_list)
64 elif isinstance(item['website'], str):
65 item['website'] = self.clean_website(item['website'])
66
67 # insert into db
68 try:
69 placeholder = ', '.join(["?"] * len(item))
70 statement = 'INSERT OR REPLACE INTO {table} ({columns}) VALUES ({values})'.format(
71 table=table_name, columns=','.join(item.keys()), values=placeholder)
72
73 logger.info('Item: {} inserted to {}'.format(item, table_name))
74 self.cursor.execute(statement, list(item.values()))
75 self.conn.commit()
76 except Exception as e:
77 logger.error('Error {}'.format(e))
78 return item
79
80 def close(self, spider):
81 self.cursor.close()
82 self.conn.close()