237 lines
6.6 KiB
Python
237 lines
6.6 KiB
Python
import os
|
|
import time
|
|
import flask
|
|
import flask_sqlalchemy
|
|
import flask_praetorian
|
|
import flask_cors
|
|
import sys
|
|
|
|
db = flask_sqlalchemy.SQLAlchemy()
|
|
guard = flask_praetorian.Praetorian()
|
|
cors = flask_cors.CORS()
|
|
|
|
|
|
# A generic user model that might be used by an app powered by flask-praetorian
|
|
class User(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
username = db.Column(db.Text, unique=True)
|
|
password = db.Column(db.Text)
|
|
roles = db.Column(db.Text)
|
|
is_active = db.Column(db.Boolean, default=True, server_default='true')
|
|
age = db.Column(db.Integer)
|
|
gender = db.Column(db.Text)
|
|
education = db.Column(db.Text)
|
|
skills = db.Column(db.Text)
|
|
|
|
@property
|
|
def rolenames(self):
|
|
try:
|
|
return self.roles.split(',')
|
|
except Exception:
|
|
return []
|
|
|
|
@classmethod
|
|
def lookup(cls, username):
|
|
return cls.query.filter_by(username=username).one_or_none()
|
|
|
|
@classmethod
|
|
def identify(cls, id):
|
|
return cls.query.get(id)
|
|
|
|
@property
|
|
def identity(self):
|
|
return self.id
|
|
|
|
def is_valid(self):
|
|
return self.is_active
|
|
|
|
|
|
# Initialize flask app for the example
|
|
app = flask.Flask(__name__)
|
|
app.config['SECRET_KEY'] = 'top secret'
|
|
app.config['JWT_ACCESS_LIFESPAN'] = {'hours': 24}
|
|
app.config['JWT_REFRESH_LIFESPAN'] = {'days': 30}
|
|
# Read environment variables
|
|
if "DEBUG" in os.environ and os.environ["DEBUG"] == 'yes':
|
|
debug = True
|
|
else:
|
|
debug = False
|
|
if "HOST" in os.environ:
|
|
host = os.environ["HOST"]
|
|
else:
|
|
host = '0.0.0.0'
|
|
if "PORT" in os.environ:
|
|
port = int(os.environ["PORT"])
|
|
else:
|
|
port = 5000
|
|
|
|
# Initialize the flask-praetorian instance for the app
|
|
guard.init_app(app, User)
|
|
|
|
# Initialize a local database for the example
|
|
app.config['SQLALCHEMY_DATABASE_URI'] = f"sqlite:///{os.path.join(os.getcwd(), 'database.db')}"
|
|
db.init_app(app)
|
|
|
|
# Initializes CORS so that the api_tool can talk to the example app
|
|
cors.init_app(app)
|
|
|
|
|
|
@app.route('/api/login', methods=['POST'])
|
|
def login():
|
|
"""
|
|
Logs a user in by parsing a POST request containing user credentials and
|
|
issuing a JWT token.
|
|
.. example::
|
|
$ curl http://localhost:5000/api/login -X POST \
|
|
-d '{"username":"Yasoob","password":"strongpassword"}'
|
|
"""
|
|
req = flask.request.get_json(force=True)
|
|
username = req.get('username', None)
|
|
password = req.get('password', None)
|
|
try:
|
|
user = guard.authenticate(username, password)
|
|
ret = {'access_token': guard.encode_jwt_token(user)}
|
|
return ret, 200
|
|
except flask_praetorian.exceptions.AuthenticationError:
|
|
ret = {'message': "Benutzername und Passwort stimmen nicht überein oder das Konto existiert noch nicht."}
|
|
return ret, 401
|
|
|
|
|
|
@app.route('/api/refresh', methods=['POST'])
|
|
def refresh():
|
|
"""
|
|
Refreshes an existing JWT by creating a new one that is a copy of the old
|
|
except that it has a refrehsed access expiration.
|
|
.. example::
|
|
$ curl http://localhost:5000/api/refresh -X GET \
|
|
-H "Authorization: Bearer <your_token>"
|
|
"""
|
|
print("refresh request")
|
|
old_token = request.get_data()
|
|
new_token = guard.refresh_jwt_token(old_token)
|
|
ret = {'access_token': new_token}
|
|
return ret, 200
|
|
|
|
|
|
@app.route('/api/protected')
|
|
@flask_praetorian.auth_required
|
|
def protected():
|
|
"""
|
|
A protected endpoint. The auth_required decorator will require a header
|
|
containing a valid JWT
|
|
.. example::
|
|
$ curl http://
|
|
localhost:5000/api/protected -X GET \
|
|
-H "Authorization: Bearer <your_token>"
|
|
"""
|
|
return {'message': f'protected endpoint (allowed user {flask_praetorian.current_user().username})'}
|
|
|
|
|
|
@app.route('/api/register', methods=['POST'])
|
|
def register():
|
|
req = flask.request.get_json(force=True)
|
|
username = req.get('username', None)
|
|
password = req.get('password', None)
|
|
|
|
get_list_by_username = User.query.filter_by(username=username).first()
|
|
if get_list_by_username is None:
|
|
new_user = User(
|
|
username=username,
|
|
password=guard.hash_password(password)
|
|
)
|
|
db.session.add(new_user)
|
|
db.session.commit()
|
|
ret = {'message': 'Account erstellt für den Account {}. Du kannst dich nun einloggen'.format(
|
|
new_user.username
|
|
)}
|
|
return ret, 200
|
|
else:
|
|
ret = {'message': 'Benutzername {} existiert bereits. Bitte wähle einen anderen '.format(
|
|
username
|
|
)}
|
|
return ret, 409
|
|
|
|
|
|
@app.route('/api/protected/umfrage', methods=['POST'])
|
|
@flask_praetorian.auth_required
|
|
def survey():
|
|
req = flask.request.get_json(force=True)
|
|
token = guard.read_token_from_header()
|
|
extracted_token = guard.extract_jwt_token(token)
|
|
id_req = extracted_token['id']
|
|
|
|
# get data from the survey and write it to the database
|
|
|
|
# placeholder if the survey is valid
|
|
is_survey_valid = True
|
|
|
|
# Check age if it's numeric
|
|
age = req.get('age', None)
|
|
if not age.isnumeric():
|
|
is_survey_valid = False
|
|
|
|
# check gender possibilites
|
|
gender = req.get('gender', None)
|
|
possible_genders = [
|
|
"male",
|
|
"female",
|
|
"divers",
|
|
"sex_na"
|
|
]
|
|
if gender not in possible_genders:
|
|
is_survey_valid = False
|
|
|
|
# check education possibilities
|
|
education = req.get('education', None)
|
|
possible_education = [
|
|
"edu_lehre",
|
|
"edu_gymnasium",
|
|
"edu_berufsmatura",
|
|
"edu_bachelor",
|
|
"edu_Master",
|
|
"edu_na"
|
|
]
|
|
if education not in possible_education:
|
|
is_survey_valid = False
|
|
|
|
# check skills values
|
|
skills = req.get('skills', None)
|
|
possible_skills = [
|
|
"skills_sehr_gut",
|
|
"skills_gut",
|
|
"skills_mittel",
|
|
"skills_nicht_so_good",
|
|
"skills_garnicht",
|
|
"skills_na"
|
|
]
|
|
|
|
if skills not in possible_skills:
|
|
is_survey_valid = False
|
|
|
|
if is_survey_valid:
|
|
user_db = User.query.filter_by(id=id_req).first()
|
|
user_db.age = age
|
|
user_db.gender = gender
|
|
user_db.education = education
|
|
user_db.skills = skills
|
|
db.session.commit()
|
|
print(req)
|
|
ret = {'message': 'Vielen Dank für das Ausfüllen der Umfrage.'}
|
|
return ret, 200
|
|
else:
|
|
ret = {
|
|
'message': 'Einige der Felder stimmen nicht überein und müssen angepasst werden.'}
|
|
return ret, 400
|
|
|
|
|
|
@app.route('/api/protected/behavior', methods=['POST'])
|
|
@flask_praetorian.auth_required
|
|
def check_password_username():
|
|
print("hallooo POST", file=sys.stderr)
|
|
ret = {'message': 'Top'}
|
|
return ret, 200
|
|
|
|
|
|
# Run the example
|
|
if __name__ == '__main__':
|
|
app.run(debug=debug, host=host, port=port)
|