182 lines
No EOL
6.7 KiB
Python
182 lines
No EOL
6.7 KiB
Python
from flask import Flask, url_for, redirect, render_template, request, session, Response
|
|
from pymongo import MongoClient
|
|
from datetime import datetime
|
|
from bson.objectid import ObjectId
|
|
from werkzeug.security import generate_password_hash, check_password_hash
|
|
|
|
app = Flask(__name__)
|
|
app.secret_key = 'secret_key' #change this to a random string
|
|
|
|
#initialize the databases
|
|
client = MongoClient('localhost', 27017) #change this if you are using a different host/port
|
|
db = client.flask_db
|
|
posts_collection = db.posts_collection
|
|
users_collection = db.users_collection
|
|
nuke_counter = db.nuke_counter
|
|
isAdmin = False
|
|
|
|
#app routes
|
|
@app.route('/', methods=['GET'])
|
|
def index():
|
|
return render_template('index.html')
|
|
|
|
@app.route('/board/<board_name>', methods=['GET'])
|
|
def board(board_name):
|
|
links = [
|
|
{'name': 'b', 'display_name': '/b/ - random'},
|
|
{'name': 'g', 'display_name': '/g/ - technology'},
|
|
{'name': 'a', 'display_name': '/a/ - anime'},
|
|
{'name': 'v', 'display_name': '/v/ - video games'},
|
|
{'name': 'w', 'display_name': '/w/ - wallpapers'},
|
|
{'name': 'x', 'display_name': '/x/ - paranormal'},
|
|
{'name': 't', 'display_name': '/t/ - test'},
|
|
{'name': 's', 'display_name': '/s/ - soyjaks'},
|
|
{'name': 'pol', 'display_name': '/pol/ - politically incorrect'}
|
|
]
|
|
|
|
posts = posts_collection.find({'board_name': board_name}).sort('timestamp', -1)
|
|
display_name = next((link['display_name'] for link in links if link['name'] == board_name), board_name)
|
|
if isAdmin == True or ('username' in session and session['username'] == 'admin'):
|
|
return render_template('board.html', title=board_name, header=display_name, links=links, posts=posts, admin=admin)
|
|
else:
|
|
return render_template('board.html', title=board_name, header=display_name, links=links, posts=posts, admin=None)
|
|
#posting API
|
|
@app.route('/post', methods=['POST'])
|
|
def post():
|
|
board_name = request.form['board_name']
|
|
content = request.form['content']
|
|
if 'image' in request.files:
|
|
image = request.files['image']
|
|
else:
|
|
image = None
|
|
timestamp = datetime.now()
|
|
if 'user_id' not in session:
|
|
username = 'Anonymous'
|
|
else:
|
|
username = session['username']
|
|
|
|
#insert the post into MongoDB
|
|
post_data = {
|
|
'board_name': board_name,
|
|
'content': content,
|
|
'timestamp': timestamp,
|
|
'username' : username
|
|
}
|
|
|
|
if image != None:
|
|
post_data['image'] = image.read()
|
|
|
|
posts_collection.insert_one(post_data)
|
|
|
|
return redirect(url_for('board', board_name=board_name))
|
|
#image API
|
|
@app.route('/image/<post_id>')
|
|
def image(post_id):
|
|
post = posts_collection.find_one({'_id': ObjectId(post_id)})
|
|
if post and 'image' in post:
|
|
return Response(post['image'], mimetype='image/jpeg')
|
|
else:
|
|
return 'Image not found', 404
|
|
|
|
@app.route('/login', methods=['GET'])
|
|
def login():
|
|
return render_template('login.html')
|
|
#login API
|
|
@app.route('/login', methods=['POST'])
|
|
def login_post():
|
|
username = request.form['username']
|
|
password = request.form['password']
|
|
|
|
user = users_collection.find_one({'username': username})
|
|
if user and check_password_hash(user['password'], password):
|
|
session['user_id'] = str(user['_id'])
|
|
session['username'] = username
|
|
return redirect(url_for('index'))
|
|
elif user == 'admin' and check_password_hash(user['password'], password):
|
|
session['user_id'] = str(user['_id'])
|
|
session['username'] = 'admin'
|
|
isAdmin = True
|
|
return redirect(url_for('admin', isAdmin=isAdmin))
|
|
else:
|
|
return redirect(url_for('login'))
|
|
|
|
@app.route('/register', methods=['GET'])
|
|
def register():
|
|
regalert = request.args.get('regalert', '')
|
|
return render_template('register.html', regalert=regalert)
|
|
#registration API
|
|
@app.route('/register', methods=['POST'])
|
|
def register_post():
|
|
username = request.form['username']
|
|
password = request.form['password']
|
|
hashed_password = generate_password_hash(password, method='pbkdf2:sha256')
|
|
regalert = ''
|
|
|
|
if users_collection.find_one({'username': username}):
|
|
regalert = 'username already exists!'
|
|
return redirect(url_for('register', regalert=regalert))
|
|
else:
|
|
# Insert the user into MongoDB
|
|
users_collection.insert_one({
|
|
'username': username,
|
|
'password': hashed_password
|
|
})
|
|
|
|
return redirect(url_for('login'))
|
|
#admin dashboard
|
|
@app.route('/admin', methods=['GET'])
|
|
def admin():
|
|
users = users_collection.find({})
|
|
success1 = request.args.get('success1', '')
|
|
success2 = request.args.get('success2', '')
|
|
success3 = request.args.get('success3', '')
|
|
total_users = users_collection.count_documents({})
|
|
total_posts = posts_collection.count_documents({})
|
|
nuke_count = nuke_counter.count_documents({})
|
|
isAdmin = request.args.get('isAdmin', False)
|
|
if isAdmin == True or ('username' in session and session['username'] == 'admin'):
|
|
return render_template('admin.html', success1=success1, success2=success2, success3=success3, total_users=total_users, total_posts=total_posts, nuke_count=nuke_count, users=users)
|
|
else:
|
|
return url_for('index')
|
|
#admin functions
|
|
@app.route('/deletepost', methods=['POST'])
|
|
def deletepost():
|
|
if isAdmin == True or ('username' in session and session['username'] == 'admin'):
|
|
post_id = request.form['post_id']
|
|
posts_collection.delete_one({'_id': ObjectId(post_id)})
|
|
success = 'post deleted!'
|
|
return redirect(url_for('admin', success1=success))
|
|
else:
|
|
return redirect(url_for('index'))
|
|
|
|
@app.route('/deleteuser', methods=['POST'])
|
|
def deleteuser():
|
|
if isAdmin == True or ('username' in session and session['username'] == 'admin'):
|
|
user_id = request.form['user_id']
|
|
users_collection.delete_one({'_id': ObjectId(user_id)})
|
|
success = 'user deleted!'
|
|
return redirect(url_for('admin', success2=success))
|
|
else:
|
|
return redirect(url_for('index'))
|
|
|
|
@app.route('/nukeboard', methods=['POST'])
|
|
def nukeboard():
|
|
if isAdmin == True or ('username' in session and session['username'] == 'admin'):
|
|
board_name = request.form['board_name']
|
|
posts_collection.delete_many({'board_name': board_name})
|
|
success = 'board nuked!'
|
|
nuke_counter.insert_one({'board_name': board_name}, {'date': datetime.now()})
|
|
return redirect(url_for('admin', success3=success))
|
|
else:
|
|
return redirect(url_for('index'))
|
|
|
|
#logout API
|
|
@app.route('/logout')
|
|
def logout():
|
|
session.pop('user_id', None)
|
|
session.pop('username', None)
|
|
return redirect(url_for('index'))
|
|
|
|
#start the server
|
|
if __name__ == '__main__':
|
|
app.run(debug=True, host='100.64.0.18', port=5000) |