· 7 years ago · Jan 18, 2019, 02:54 PM
1import json
2import logging
3from datetime import datetime
4
5import pandas as pd
6from cassandra import ConsistencyLevel, WriteTimeout
7from cassandra.cluster import BatchStatement, Cluster
8from cassandra.query import SimpleStatement
9from cassandra.concurrent import execute_concurrent, execute_concurrent_with_args
10
11
12class PythonCassandraExample:
13 def __init__(self):
14 self.cluster = None
15 self.session = None
16 self.keyspace = None
17 self.log = None
18
19 def __del__(self):
20 self.cluster.shutdown()
21
22 def createsession(self):
23 self.cluster = Cluster(['localhost'], connect_timeout=50)
24 self.session = self.cluster.connect(self.keyspace)
25
26 def getsession(self):
27 return self.session
28
29 # How about Adding some log info to see what went wrong
30 def setlogger(self):
31 log = logging.getLogger()
32 log.setLevel('INFO')
33 handler = logging.StreamHandler()
34 handler.setFormatter(logging.Formatter(
35 "%(asctime)s [%(levelname)s] %(name)s: %(message)s"))
36 log.addHandler(handler)
37 self.log = log
38 # Create Keyspace based on Given Name
39
40 def handle_error(self, exception):
41 self.log.error("Failed to fetch user info: %s", exception)
42
43
44 def createkeyspace(self, keyspace):
45 """
46 :param keyspace: The Name of Keyspace to be created
47 :return:
48 """
49 # Before we create new lets check if exiting keyspace; we will drop that and create new
50 rows = self.session.execute(
51 "SELECT keyspace_name FROM system_schema.keyspaces")
52 if keyspace in [row[0] for row in rows]:
53 self.log.info("dropping existing keyspace...")
54 self.session.execute("DROP KEYSPACE " + keyspace)
55
56 self.log.info("creating keyspace...")
57 self.session.execute("""
58 CREATE KEYSPACE %s
59 WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '2' }
60 """ % keyspace)
61
62 self.log.info("setting keyspace...")
63 self.session.set_keyspace(keyspace)
64
65 def create_table(self, table_name):
66 self.table_name = table_name
67 c_sql = "CREATE TABLE IF NOT EXISTS {} (id varchar, version varchar, row varchar, PRIMARY KEY(id, version));".format(
68 self.table_name)
69 print("Query for creating table is: {}".format(c_sql))
70 self.session.execute(c_sql)
71 self.log.info("DP Table Created !!!")
72
73 # lets do some batch insert
74 def insert_data(self):
75 start_time = datetime.utcnow()
76 destination = "/Users/aviralsrivastava/dev/learning_dask/10M_rows.csv"
77 chunksize = 1000
78 chunks = pd.read_csv(destination, chunksize=chunksize)
79 chunk_counter = 0
80 insert_sql = self.session.prepare(
81 (
82 "INSERT INTO {} ({}, {}, {}) VALUES (?,?,?)"
83 ).format(
84 self.table_name, "id", "version", "row"
85 )
86 )
87 futures = []
88 for df in chunks:
89 df = df.to_dict(orient='records')
90 chunk_counter += 1
91 if chunk_counter % 100 == 0:
92 print(chunk_counter)
93 for row in df:
94 key = str(row["0"])
95 row = json.dumps(row, default=str)
96 params = (key, row, )
97 futures.append(
98 (
99 insert_sql,
100 params
101 )
102 )
103 if (len(futures) >= 10000):
104 results = execute_concurrent(
105 self.session, futures, concurrency=10000, raise_on_first_error=False)
106 for (success, result) in results:
107 if not success:
108 self.handle_error(result) # result will be an Exception
109 del futures[:]
110 if len(futures) > 0: #in case the last dataframe has #rows < 10k.
111 results = execute_concurrent(
112 self.session, futures, concurrency=10000, raise_on_first_error=False
113 )
114 for (success, result) in results:
115 if not success:
116 self.handle_error(result)
117 del futures[:]
118
119 print("Complete task's duration is: {}".format(
120 datetime.utcnow() - start_time))
121
122
123if __name__ == '__main__':
124 example1 = PythonCassandraExample()
125 example1.createsession()
126 example1.setlogger()
127 example1.createkeyspace('thu_athena_five')
128 example1.create_table('TenMillion')
129 example1.insert_data()