· 6 years ago · Sep 18, 2019, 07:40 PM
1#!/usr/bin/python2.7
2#
3# Interface for the assignement
4#
5
6import psycopg2
7
8DATABASE_NAME = 'dds_assgn1'
9
10
11def getopenconnection(user='postgres', password='1234', dbname='postgres'):
12 return psycopg2.connect("dbname='" + dbname + "' user='" + user + "' host='localhost' password='" + password + "'")
13
14
15def loadratings(ratingstablename, ratingsfilepath, openconnection):
16 #print "loadratings called"
17 openconnection.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
18 cur = openconnection.cursor()
19 #query = "DROP TABLE IF EXISTS " + ratingstablename + "; CREATE TABLE " + ratingstablename + " (UserID int, MovieID int, Rating float);"
20 query = "CREATE TABLE " + ratingstablename + " (UserID int, MovieID int, Rating float);"
21 cur.execute(query)
22 #print "create table executed"
23 rating_list = []
24 #ratingsfilepath = '/home/user/Downloads/ratings.dat'
25 for each_line in open(ratingsfilepath, 'r'):
26 row = each_line.rstrip()
27 rating_list.append(row.split('::'))
28 print rating_list.__sizeof__()
29 ctr=0
30 for record in rating_list:
31 record = record[:3]
32 cur.execute("INSERT INTO " + ratingstablename + " VALUES(%s,%s,%s)", record)
33 #print "record " + str(ctr) + "inserted"
34
35
36def rangepartition(ratingstablename, numberofpartitions, openconnection):
37 openconnection.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
38 cur = openconnection.cursor()
39 pref = "range_part"
40 ranges = numberofpartitions / 5.0
41 for i in range(0, numberofpartitions):
42 tableName = pref + str(i)
43 startRange = i * ranges
44 endRange = startRange + ranges
45 createTableRangePart_query = "DROP TABLE IF EXISTS " + tableName +";" "CREATE TABLE IF NOT EXISTS " + tableName + " (UserID int, MovieID int, Rating float);"
46 cur.execute(createTableRangePart_query)
47 #print "create range exec"
48 if i == 0:
49 insertRangePartInc_query = "INSERT INTO " + tableName + " (UserID, MovieID, Rating) select UserID, MovieID, Rating FROM " + ratingstablename + " WHERE Rating >= " + str(startRange) + " AND Rating <= " + str(endRange) + ";"
50 cur.execute(insertRangePartInc_query)
51 #print "insert inc executed"
52 else:
53 insertRangePartExc_query = "INSERT INTO " + tableName + " (UserID, MovieID, Rating) select UserID, MovieID, Rating FROM " + ratingstablename + " WHERE Rating > " + str(startRange) + " AND Rating <= " + str(endRange) + ";"
54 cur.execute(insertRangePartExc_query)
55 #print "insert exc executed"
56 cur.close()
57 openconnection.commit()
58
59
60
61
62def roundrobinpartition(ratingstablename, numberofpartitions, openconnection):
63 openconnection.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
64 cur = openconnection.cursor()
65 pref = "rrobin_part"
66 #print "out loop"
67 for i in range(0, numberofpartitions):
68 tableName = pref + str(i)
69 createTableRRobinPart_query = "DROP TABLE IF EXISTS " + tableName +";" "CREATE TABLE IF NOT EXISTS " + tableName + " (UserID int, MovieID int, Rating float);"
70 cur.execute(createTableRRobinPart_query)
71 #print "create exec"
72 print numberofpartitions
73 insertRRobinPart_query = "INSERT INTO " + tableName + " (UserID, MovieID, Rating) select UserID, MovieID, Rating FROM (SELECT UserID, MovieID, Rating, ROW_NUMBER() OVER() AS ROW_NUM FROM " + ratingstablename + ") AS TEMP WHERE MOD(TEMP.ROW_NUM-1, " + str(numberofpartitions) + ") = " + str(i) + ";"
74 cur.execute(insertRRobinPart_query)
75 #print "insert exec"
76 cur.close()
77 openconnection.commit()
78
79def roundrobininsert(ratingstablename, userid, itemid, rating, openconnection):
80 openconnection.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
81 cur = openconnection.cursor()
82 pref = "rrobin_part"
83 # print "out loop"
84 query = "INSERT INTO " + ratingstablename + " VALUES (" + str(userid) + ", " + str()UserID, MovieID, Rating)
85
86 for i in range(0, numberofpartitions):
87 tableName = pref + str(i)
88 createTableRRobinPart_query = "DROP TABLE IF EXISTS " + tableName + ";" "CREATE TABLE IF NOT EXISTS " + tableName + " (UserID int, MovieID int, Rating float);"
89 cur.execute(createTableRRobinPart_query)
90 # print "create exec"
91 print numberofpartitions
92 insertRRobinPart_query = "INSERT INTO " + tableName + " (UserID, MovieID, Rating) select UserID, MovieID, Rating FROM (SELECT UserID, MovieID, Rating, ROW_NUMBER() OVER() AS ROW_NUM FROM " + ratingstablename + ") AS TEMP WHERE MOD(TEMP.ROW_NUM-1, " + str(
93 numberofpartitions) + ") = " + str(i) + ";"
94 cur.execute(insertRRobinPart_query)
95 # print "insert exec"
96 cur.close()
97 openconnection.commit()
98
99
100def rangeinsert(ratingstablename, userid, itemid, rating, openconnection):
101 pass
102
103def create_db(dbname):
104 """
105 We create a DB by connecting to the default user and database of Postgres
106 The function first checks if an existing database exists for a given name, else creates it.
107 :return:None
108 """
109 # Connect to the default database
110 con = getopenconnection(dbname='postgres')
111 con.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
112 cur = con.cursor()
113
114 # Check if an existing database with the same name exists
115 cur.execute('SELECT COUNT(*) FROM pg_catalog.pg_database WHERE datname=\'%s\'' % (dbname,))
116 count = cur.fetchone()[0]
117 if count == 0:
118 cur.execute('CREATE DATABASE %s' % (dbname,)) # Create the database
119 else:
120 print 'A database named {0} already exists'.format(dbname)
121
122 # Clean up
123 cur.close()
124 con.close()
125
126if __name__ == '__main__':
127 create_db("dds_assgn1")
128 print "db created"
129 con = getopenconnection(dbname="dds_assgn1")
130 con.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
131 print "connection established"
132 #loadratings("test12345", '/home/user/Downloads/ratings.dat', con)
133 rangepartition("test12345", 5, con)
134 #roundrobinpartition("test12345", 5, con)