wirechan/app.py

185 lines
No EOL
6.6 KiB
Python

from flask import Flask, url_for, redirect, render_template, request, session, Response
from pymongo import MongoClient
from datetime import datetime
from bson.objectid import ObjectId
from werkzeug.security import generate_password_hash, check_password_hash
app = Flask(__name__)
app.secret_key = 'secret_key'
#initialize the database
client = MongoClient('localhost', 27017)
db = client.flask_db
posts_collection = db.posts_collection
users_collection = db.users_collection
nuke_counter = db.nuke_counter
#app routes
@app.route('/', methods=['GET'])
def index():
return render_template('index.html')
@app.route('/board/<board_name>', methods=['GET'])
def board(board_name):
links = [
{'name': 'b', 'display_name': '/b/ - random'},
{'name': 'g', 'display_name': '/g/ - technology'},
{'name': 'a', 'display_name': '/a/ - anime'},
{'name': 'v', 'display_name': '/v/ - video games'},
{'name': 'w', 'display_name': '/w/ - wallpapers'},
{'name': 'x', 'display_name': '/x/ - paranormal'},
{'name': 't', 'display_name': '/t/ - test'},
{'name': 's', 'display_name': '/s/ - soyjaks'},
{'name': 'pol', 'display_name': '/pol/ - politically incorrect'}
]
posts = posts_collection.find({'board_name': board_name}).sort('timestamp', -1)
display_name = next((link['display_name'] for link in links if link['name'] == board_name), board_name)
admin_user = users_collection.find_one({'username': 'admin'})
if admin_user and session.get('user_id') == str(admin_user['_id']):
return render_template('board.html', title=board_name, header=display_name, links=links, posts=posts, admin=True)
else:
admin=False
return render_template('board.html', title=board_name, header=display_name, links=links, posts=posts, admin=admin)
@app.route('/post', methods=['POST'])
def post():
board_name = request.form['board_name']
content = request.form['content']
if 'image' in request.files:
image = request.files['image']
else:
image = None
timestamp = datetime.now()
if 'user_id' not in session:
username = 'Anonymous'
else:
username = session['username']
#insert the post into MongoDB
post_data = {
'board_name': board_name,
'content': content,
'timestamp': timestamp,
'username' : username
}
if image != None:
post_data['image'] = image.read()
posts_collection.insert_one(post_data)
return redirect(url_for('board', board_name=board_name))
@app.route('/image/<post_id>')
def image(post_id):
post = posts_collection.find_one({'_id': ObjectId(post_id)})
if post and 'image' in post:
return Response(post['image'], mimetype='image/jpeg')
else:
return 'Image not found', 404
@app.route('/login', methods=['GET'])
def login():
return render_template('login.html')
@app.route('/login', methods=['POST'])
def login_post():
username = request.form['username']
password = request.form['password']
user = users_collection.find_one({'username': username})
if user and check_password_hash(user['password'], password):
session['user_id'] = str(user['_id'])
session['username'] = username
return redirect(url_for('index'))
elif user == 'admin' and check_password_hash(user['password'], password):
session['user_id'] = str(user['_id'])
session['username'] = username
return redirect(url_for('admin'))
else:
return redirect(url_for('login'))
@app.route('/register', methods=['GET'])
def register():
regalert = request.args.get('regalert', '')
return render_template('register.html', regalert=regalert)
@app.route('/register', methods=['POST'])
def register_post():
username = request.form['username']
password = request.form['password']
hashed_password = generate_password_hash(password, method='pbkdf2:sha256')
regalert = ''
if users_collection.find_one({'username': username}):
regalert = 'username already exists!'
return redirect(url_for('register', regalert=regalert))
else:
# Insert the user into MongoDB
users_collection.insert_one({
'username': username,
'password': hashed_password
})
return redirect(url_for('login'))
@app.route('/admin', methods=['GET'])
def admin():
admin_user = users_collection.find_one({'username': 'admin'})
users = users_collection.find({})
success1 = request.args.get('success1', '')
success2 = request.args.get('success2', '')
success3 = request.args.get('success3', '')
total_users = users_collection.count_documents({})
total_posts = posts_collection.count_documents({})
nuke_count = nuke_counter.count_documents({})
if admin_user or session['user_id'] != str(admin_user['_id']):
return render_template('admin.html', success1=success1, success2=success2, success3=success3, total_users=total_users, total_posts=total_posts, nuke_count=nuke_count, users=users)
else:
return url_for('index')
@app.route('/deletepost', methods=['POST'])
def deletepost():
admin_user = users_collection.find_one({'username': 'admin'})
if not admin_user or session['user_id'] != str(admin_user['_id']):
return redirect(url_for('index'))
else:
post_id = request.form['post_id']
posts_collection.delete_one({'_id': ObjectId(post_id)})
success = 'post deleted!'
return redirect(url_for('admin', success1=success))
@app.route('/deleteuser', methods=['POST'])
def deleteuser():
admin_user = users_collection.find_one({'username': 'admin'})
if not admin_user or session['user_id'] != str(admin_user['_id']):
return redirect(url_for('index'))
else:
user_id = request.form['user_id']
users_collection.delete_one({'_id': ObjectId(user_id)})
success = 'user deleted!'
return redirect(url_for('admin', success2=success))
@app.route('/nukeboard', methods=['POST'])
def nukeboard():
admin_user = users_collection.find_one({'username': 'admin'})
if not admin_user or session['user_id'] != str(admin_user['_id']):
return redirect(url_for('index'))
else:
board_name = request.form['board_name']
posts_collection.delete_many({'board_name': board_name})
success = 'board nuked!'
nuke_counter.insert_one({'board_name': board_name}, {'date': datetime.now()})
return redirect(url_for('admin', success3=success))
@app.route('/logout')
def logout():
session.pop('user_id', None)
session.pop('username', None)
return redirect(url_for('index'))
if __name__ == '__main__':
app.run(debug=True, host='100.64.0.18', port=5000)