· 5 years ago · Jun 22, 2020, 12:28 PM
1import datetime
2
3from flask import Flask
4from flask import g
5from flask import redirect
6from flask import request
7from flask import session
8from flask import url_for, abort, render_template, flash
9from functools import wraps
10from hashlib import md5
11from peewee import *
12
13# database- tweepee
14DATABASE = 'tweepee.db'
15DEBUG = True
16SECRET_KEY = 'hin6bab8ge25*r=x&+5$0kn=-#log$pt^#@vrqjld!^2ci@g*b'
17
18# creating the app
19app = Flask(__name__)
20app.config.from_object(__name__)
21
22# creating a database
23database = SqliteDatabase(DATABASE)
24
25 class Meta:
26 database = database
27
28# the user model specifies its fields (or columns) declaratively, like django
29class User(BaseModel):
30 username = CharField(unique=True)
31 password = CharField()
32 email = CharField()
33 join_date = DateTimeField()
34
35 # it often makes sense to put convenience methods on model instances, for
36 # example, "give me all the users this user is following":
37 def following(self):
38 # query other users through the "relationship" table
39 return (User
40 .select()
41 .join(Relationship, on=Relationship.to_user)
42 .where(Relationship.from_user == self)
43 .order_by(User.username))
44
45 def followers(self):
46 return (User
47 .select()
48 .join(Relationship, on=Relationship.from_user)
49 .where(Relationship.to_user == self)
50 .order_by(User.username))
51
52 def is_following(self, user):
53 return (Relationship
54 .select()
55 .where(
56 (Relationship.from_user == self) &
57 (Relationship.to_user == user))
58 .exists())
59
60 def gravatar_url(self, size=80):
61 return 'http://www.gravatar.com/avatar/%s?d=identicon&s=%d' % \
62 (md5(self.email.strip().lower().encode('utf-8')).hexdigest(), size)
63
64
65class Relationship(BaseModel):
66 from_user = ForeignKeyField(User, backref='relationships')
67 to_user = ForeignKeyField(User, backref='related_to')
68
69 class Meta:
70 indexes = (
71 # Specify a unique multi-column index on from/to-user.
72 (('from_user', 'to_user'), True),
73 )
74
75
76
77class Message(BaseModel):
78 user = ForeignKeyField(User, backref='messages')
79 content = TextField()
80 pub_date = DateTimeField()
81
82
83# simple utility function to create tables
84def create_tables():
85 with database:
86 database.create_tables([User, Relationship, Message])
87
88
89def auth_user(user):
90 session['logged_in'] = True
91 session['user_id'] = user.id
92 session['username'] = user.username
93 flash('You are logged in as %s' % (user.username))
94
95# get the user from the session
96def get_current_user():
97 if session.get('logged_in'):
98 return User.get(User.id == session['user_id'])
99
100
101def login_required(f):
102 @wraps(f)
103 def inner(*args, **kwargs):
104 if not session.get('logged_in'):
105 return redirect(url_for('login'))
106 return f(*args, **kwargs)
107 return inner
108
109def object_list(template_name, qr, var_name='object_list', **kwargs):
110 kwargs.update(
111 page=int(request.args.get('page', 1)),
112 pages=qr.count() / 20 + 1)
113 kwargs[var_name] = qr.paginate(kwargs['page'])
114 return render_template(template_name, **kwargs)
115
116
117def get_object_or_404(model, *expressions):
118 try:
119 return model.get(*expressions)
120 except model.DoesNotExist:
121 abort(404)
122
123
124@app.template_filter('is_following')
125def is_following(from_user, to_user):
126 return from_user.is_following(to_user)
127
128
129@app.before_request
130def before_request():
131 g.db = database
132 g.db.connect()
133
134@app.after_request
135def after_request(response):
136 g.db.close()
137 return response
138
139# views -- these are the actual mappings of url to view function
140@app.route('/')
141def homepage():
142 # depending on whether the requesting user is logged in or not, show them
143 # either the public timeline or their own private timeline
144 if session.get('logged_in'):
145 return private_timeline()
146 else:
147 return public_timeline()
148
149@app.route('/private/')
150def private_timeline():
151 # the private timeline exemplifies the use of a subquery -- we are asking for
152 # messages where the person who created the message is someone the current
153 # user is following. these messages are then ordered newest-first.
154 user = get_current_user()
155 messages = (Message
156 .select()
157 .where(Message.user << user.following())
158 .order_by(Message.pub_date.desc()))
159 return object_list('private_messages.html', messages, 'message_list')
160
161@app.route('/public/')
162def public_timeline():
163 # simply display all messages, newest first
164 messages = Message.select().order_by(Message.pub_date.desc())
165 return object_list('public_messages.html', messages, 'message_list')
166
167@app.route('/join/', methods=['GET', 'POST'])
168def join():
169 if request.method == 'POST' and request.form['username']:
170 try:
171 with database.atomic():
172 # Attempt to create the user. If the username is taken, due to the
173 # unique constraint, the database will raise an IntegrityError.
174 user = User.create(
175 username=request.form['username'],
176 password=md5((request.form['password']).encode('utf-8')).hexdigest(),
177 email=request.form['email'],
178 join_date=datetime.datetime.now())
179
180 # mark the user as being 'authenticated' by setting the session vars
181 auth_user(user)
182 return redirect(url_for('homepage'))
183
184 except IntegrityError:
185 flash('That username is already taken')
186
187 return render_template('join.html')
188
189@app.route('/login/', methods=['GET', 'POST'])
190def login():
191 if request.method == 'POST' and request.form['username']:
192 try:
193 pw_hash = md5(request.form['password'].encode('utf-8')).hexdigest()
194 user = User.get(
195 (User.username == request.form['username']) &
196 (User.password == pw_hash))
197 except User.DoesNotExist:
198 flash('The password entered is incorrect')
199 else:
200 auth_user(user)
201 return redirect(url_for('homepage'))
202
203 return render_template('login.html')
204
205@app.route('/logout/')
206def logout():
207 session.pop('logged_in', None)
208 flash('You were logged out')
209 return redirect(url_for('homepage'))
210
211@app.route('/following/')
212@login_required
213def following():
214 user = get_current_user()
215 return object_list('user_following.html', user.following(), 'user_list')
216
217@app.route('/followers/')
218@login_required
219def followers():
220 user = get_current_user()
221 return object_list('user_followers.html', user.followers(), 'user_list')
222
223@app.route('/users/')
224def user_list():
225 users = User.select().order_by(User.username)
226 return object_list('user_list.html', users, 'user_list')
227
228@app.route('/users/<username>/')
229def user_detail(username):
230 # using the "get_object_or_404" shortcut here to get a user with a valid
231 # username or short-circuit and display a 404 if no user exists in the db
232 user = get_object_or_404(User, User.username == username)
233
234 # get all the users messages ordered newest-first -- note how we're accessing
235 # the messages -- user.message_set.
236 messages = user.messages.order_by(Message.pub_date.desc())
237 return object_list('user_detail.html', messages, 'message_list', user=user)
238
239@app.route('/users/<username>/follow/', methods=['POST'])
240@login_required
241def user_follow(username):
242 user = get_object_or_404(User, User.username == username)
243 try:
244 with database.atomic():
245 Relationship.create(
246 from_user=get_current_user(),
247 to_user=user)
248 except IntegrityError:
249 pass
250
251 flash('You are following %s' % user.username)
252 return redirect(url_for('user_detail', username=user.username))
253
254@app.route('/users/<username>/unfollow/', methods=['POST'])
255@login_required
256def user_unfollow(username):
257 user = get_object_or_404(User, User.username == username)
258 (Relationship
259 .delete()
260 .where(
261 (Relationship.from_user == get_current_user()) &
262 (Relationship.to_user == user))
263 .execute())
264 flash('You are no longer following %s' % user.username)
265 return redirect(url_for('user_detail', username=user.username))
266
267@app.route('/create/', methods=['GET', 'POST'])
268@login_required
269def create():
270 user = get_current_user()
271 if request.method == 'POST' and request.form['content']:
272 message = Message.create(
273 user=user,
274 content=request.form['content'],
275 pub_date=datetime.datetime.now())
276 flash('Your message has been created')
277 return redirect(url_for('user_detail', username=user.username))
278
279 return render_template('create.html')
280
281@app.context_processor
282def _inject_user():
283 return {'current_user': get_current_user()}
284
285# allow running from the command line
286if __name__ == '__main__':
287 create_tables()
288 app.run()