From f42f80752b68fcc943a9cbc47b04d70d4cdf4646 Mon Sep 17 00:00:00 2001 From: alexander34ro Date: Wed, 5 Feb 2020 16:56:02 +0100 Subject: [PATCH] 2to3 -w minitwit.py --- remote_code/minitwit.py | 4 +- remote_code/minitwit.py.bak | 251 ++++++++++++++++++++++++++++++++++++ 2 files changed, 253 insertions(+), 2 deletions(-) create mode 100644 remote_code/minitwit.py.bak diff --git a/remote_code/minitwit.py b/remote_code/minitwit.py index b69ab41..072d149 100644 --- a/remote_code/minitwit.py +++ b/remote_code/minitwit.py @@ -8,7 +8,7 @@ :copyright: (c) 2010 by Armin Ronacher. :license: BSD, see LICENSE for more details. """ -from __future__ import with_statement + import re import time import sqlite3 @@ -94,7 +94,7 @@ def timeline(): redirect to the public timeline. This timeline shows the user's messages as well as all the messages of followed users. """ - print "We got a visitor from: " + str(request.remote_addr) + print("We got a visitor from: " + str(request.remote_addr)) if not g.user: return redirect(url_for('public_timeline')) offset = request.args.get('offset', type=int) diff --git a/remote_code/minitwit.py.bak b/remote_code/minitwit.py.bak new file mode 100644 index 0000000..b69ab41 --- /dev/null +++ b/remote_code/minitwit.py.bak @@ -0,0 +1,251 @@ +# -*- coding: utf-8 -*- +""" + MiniTwit + ~~~~~~~~ + + A microblogging application written with Flask and sqlite3. + + :copyright: (c) 2010 by Armin Ronacher. + :license: BSD, see LICENSE for more details. +""" +from __future__ import with_statement +import re +import time +import sqlite3 +from hashlib import md5 +from datetime import datetime +from contextlib import closing +from flask import Flask, request, session, url_for, redirect, \ + render_template, abort, g, flash +from werkzeug import check_password_hash, generate_password_hash + + +# configuration +DATABASE = '/tmp/minitwit.db' +PER_PAGE = 30 +DEBUG = True +SECRET_KEY = 'development key' + +# create our little application :) +app = Flask(__name__) + + +def connect_db(): + """Returns a new connection to the database.""" + return sqlite3.connect(DATABASE) + + +def init_db(): + """Creates the database tables.""" + with closing(connect_db()) as db: + with app.open_resource('schema.sql') as f: + db.cursor().executescript(f.read()) + db.commit() + + +def query_db(query, args=(), one=False): + """Queries the database and returns a list of dictionaries.""" + cur = g.db.execute(query, args) + rv = [dict((cur.description[idx][0], value) + for idx, value in enumerate(row)) for row in cur.fetchall()] + return (rv[0] if rv else None) if one else rv + + +def get_user_id(username): + """Convenience method to look up the id for a username.""" + rv = g.db.execute('select user_id from user where username = ?', + [username]).fetchone() + return rv[0] if rv else None + + +def format_datetime(timestamp): + """Format a timestamp for display.""" + return datetime.utcfromtimestamp(timestamp).strftime('%Y-%m-%d @ %H:%M') + + +def gravatar_url(email, size=80): + """Return the gravatar image for the given email address.""" + return 'http://www.gravatar.com/avatar/%s?d=identicon&s=%d' % \ + (md5(email.strip().lower().encode('utf-8')).hexdigest(), size) + + +@app.before_request +def before_request(): + """Make sure we are connected to the database each request and look + up the current user so that we know he's there. + """ + g.db = connect_db() + g.user = None + if 'user_id' in session: + g.user = query_db('select * from user where user_id = ?', + [session['user_id']], one=True) + + +@app.after_request +def after_request(response): + """Closes the database again at the end of the request.""" + g.db.close() + return response + + +@app.route('/') +def timeline(): + """Shows a users timeline or if no user is logged in it will + redirect to the public timeline. This timeline shows the user's + messages as well as all the messages of followed users. + """ + print "We got a visitor from: " + str(request.remote_addr) + if not g.user: + return redirect(url_for('public_timeline')) + offset = request.args.get('offset', type=int) + return render_template('timeline.html', messages=query_db(''' + select message.*, user.* from message, user + where message.author_id = user.user_id and ( + user.user_id = ? or + user.user_id in (select whom_id from follower + where who_id = ?)) + order by message.pub_date desc limit ?''', + [session['user_id'], session['user_id'], PER_PAGE])) + + +@app.route('/public') +def public_timeline(): + """Displays the latest messages of all users.""" + return render_template('timeline.html', messages=query_db(''' + select message.*, user.* from message, user + where message.author_id = user.user_id + order by message.pub_date desc limit ?''', [PER_PAGE])) + + +@app.route('/') +def user_timeline(username): + """Display's a users tweets.""" + profile_user = query_db('select * from user where username = ?', + [username], one=True) + if profile_user is None: + abort(404) + followd = False + if g.user: + followed = query_db('''select 1 from follower where + follower.who_id = ? and follower.whom_id = ?''', + [session['user_id'], profile_user['user_id']], one=True) is not None + return render_template('timeline.html', messages=query_db(''' + select message.*, user.* from message, user where + user.user_id = message.author_id and user.user_id = ? + order by message.pub_date desc limit ?''', + [profile_user['user_id'], PER_PAGE]), followed=followed, + profile_user=profile_user) + + +@app.route('//follow') +def follow_user(username): + """Adds the current user as follower of the given user.""" + if not g.user: + abort(401) + whom_id = get_user_id(username) + if whom_id is None: + abort(404) + g.db.execute('insert into follower (who_id, whom_id) values (?, ?)', + [session['user_id'], whom_id]) + g.db.commit() + flash('You are now following "%s"' % username) + return redirect(url_for('user_timeline', username=username)) + + +@app.route('//unfollow') +def unfollow_user(username): + """Removes the current user as follower of the given user.""" + if not g.user: + abort(401) + whom_id = get_user_id(username) + if whom_id is None: + abort(404) + g.db.execute('delete from follower where who_id=? and whom_id=?', + [session['user_id'], whom_id]) + g.db.commit() + flash('You are no longer following "%s"' % username) + return redirect(url_for('user_timeline', username=username)) + + +@app.route('/add_message', methods=['POST']) +def add_message(): + """Registers a new message for the user.""" + if 'user_id' not in session: + abort(401) + if request.form['text']: + g.db.execute('''insert into message (author_id, text, pub_date) + values (?, ?, ?)''', (session['user_id'], request.form['text'], + int(time.time()))) + g.db.commit() + flash('Your message was recorded') + return redirect(url_for('timeline')) + + +@app.route('/login', methods=['GET', 'POST']) +def login(): + """Logs the user in.""" + if g.user: + return redirect(url_for('timeline')) + error = None + if request.method == 'POST': + user = query_db('''select * from user where + username = ?''', [request.form['username']], one=True) + if user is None: + error = 'Invalid username' + elif not check_password_hash(user['pw_hash'], + request.form['password']): + error = 'Invalid password' + else: + flash('You were logged in') + session['user_id'] = user['user_id'] + return redirect(url_for('timeline')) + return render_template('login.html', error=error) + + +@app.route('/register', methods=['GET', 'POST']) +def register(): + """Registers the user.""" + if g.user: + return redirect(url_for('timeline')) + error = None + if request.method == 'POST': + if not request.form['username']: + error = 'You have to enter a username' + elif not request.form['email'] or \ + '@' not in request.form['email']: + error = 'You have to enter a valid email address' + elif not request.form['password']: + error = 'You have to enter a password' + elif request.form['password'] != request.form['password2']: + error = 'The two passwords do not match' + elif get_user_id(request.form['username']) is not None: + error = 'The username is already taken' + else: + g.db.execute('''insert into user ( + username, email, pw_hash) values (?, ?, ?)''', + [request.form['username'], request.form['email'], + generate_password_hash(request.form['password'])]) + g.db.commit() + flash('You were successfully registered and can login now') + return redirect(url_for('login')) + return render_template('register.html', error=error) + + +@app.route('/logout') +def logout(): + """Logs the user out""" + flash('You were logged out') + session.pop('user_id', None) + return redirect(url_for('public_timeline')) + + +# add some filters to jinja and set the secret key and debug mode +# from the configuration. +app.jinja_env.filters['datetimeformat'] = format_datetime +app.jinja_env.filters['gravatar'] = gravatar_url +app.secret_key = SECRET_KEY +app.debug = DEBUG + + +if __name__ == '__main__': + app.run(host="0.0.0.0")