from flask import Flask
from flask_sqlalchemy import SQLAlchemy
"""
These object and definitions are used throughout the Jupyter Notebook.
"""
# Setup of key Flask object (app)
app = Flask(__name__)
# Setup SQLAlchemy object and properties for the database (db)
database = 'sqlite:///sqlitebbal.db' # path and filename of database
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SQLALCHEMY_DATABASE_URI'] = database
app.config['SECRET_KEY'] = 'SECRET_KEY'
db = SQLAlchemy()
# This belongs in place where it runs once per project
db.init_app(app)
import datetime
from datetime import datetime
import json
from sqlalchemy.exc import IntegrityError
from werkzeug.security import generate_password_hash, check_password_hash
class User(db.Model):
__tablename__ = 'users' # table name is plural, class name is singular
id = db.Column(db.Integer, primary_key=True)
_task = db.Column(db.String(255), unique=False, nullable=False)
_timeExpected = db.Column(db.String(255), unique=True, nullable=False)
def __init__(self, task, timeExpected):
self._task = task # variables with self prefix become part of the object,
self._timeExpected = timeExpected
@property
def task(self):
return self._task
# a setter function, allows name to be updated after initial object creation
@task.setter
def task(self, task):
self._task = task
# a getter method, extracts uid from object
@property
def timeExpected(self):
return self._timeExpected
# a setter function, allows uid to be updated after initial object creation
@timeExpected.setter
def uid(self, timeExpected):
self._timeExpected = timeExpected
def __str__(self):
return json.dumps(self.read())
# CRUD create/add a new record to the table
# returns self or None on error
def create(self):
try:
# creates a person object from User(db.Model) class, passes initializers
db.session.add(self) # add prepares to persist person object to Users table
db.session.commit() # SqlAlchemy "unit of work pattern" requires a manual commit
return self
except IntegrityError:
db.session.remove()
return None
# CRUD read converts self to dictionary
# returns dictionary
def read(self):
return {
"id": self.id,
"task": self.task,
"timeExpected": self.timeExpected,
}
# CRUD update: updates user name, password, phone
# returns self
def update(self, task="", timeExpected=""):
self.task = task
db.session.commit()
return self
# CRUD delete: remove self
# None
def delete(self):
db.session.delete(self)
db.session.commit()
return None
# just a definition
def initUsers():
with app.app_context():
db.create_all()
u1 = User(task='Basketball', timeExpected="13 minutes")
u2 = User(task='Track', timeExpected="51 minutes")
u3 = User(task='math hw', timeExpected="16 minutes")
u4 = User(task='CSP hw', timeExpected="17 minutes")
u5 = User(task='Swim', timeExpected="15 minutes")
u6 = User(task='Drawing', timeExpected="55 minutes")
users = [u1, u2, u3, u4, u5, u6]
"""Builds sample user/note(s) data"""
for user in users:
try:
'''add user to table'''
object = user.create()
print(f"Created new uid {object.uid}")
except: # error raised if object nit created
'''fails with bad or duplicate data'''
print(f"Records exist uid {user.uid}, or error.")
initUsers()
def read():
with app.app_context():
table = User.query.all()
json_ready = [user.read() for user in table] # "List Comprehensions", for each user add user.read() to list
return json_ready
read()
def find_by_task(task):
with app.app_context():
user = User.query.filter_by(_task=task).first()
return user # returns user object
def deleteuser():
id = input("enter task")
user = find_by_task(id)
print(user)
if user is not None:
with app.app_context():
db.session.delete(user)
db.session.commit()
print("User deleted.")
else:
print("User not found.")
deleteuser()
print("new DB")
read()
def updateuser():
id2 = input("id")
task2= input("new task")
with app.app_context():
user = User.query.filter_by(id=id2).first()
user.update(task=task2)
db.session.commit()
def read():
with app.app_context():
table = User.query.all()
json_ready = [user.read() for user in table] # "List Comprehensions", for each user add user.read() to list
return json_ready
read()
updateuser()
def create():
# optimize user time to see if uid exists
task = input("Enter your task")
user = find_by_task(task)
try:
print("Found\n", user.read())
return
except:
pass # keep going
# request value that ensure creating valid object
TimeExpected = input("Enter Time Expected")
# Initialize User object before date
user = User(task=task,
timeExpected=TimeExpected,
)
with app.app_context():
try:
object = user.create()
print("Created\n", object.read())
except: # error raised if object not created
print("Unknown error uid {uid}")
create()
def process():
endpoint = input("r for read, d for delete, u for update, c for create")
if endpoint == "r":
with app.app_context():
table = User.query.all()
json_ready = [user.read() for user in table] # "List Comprehensions", for each user add user.read() to list
return json_ready
elif endpoint == "d":
deleteuser()
elif endpoint == "u":
updateuser()
elif endpoint == "c":
create()
else:
print('function not found')
process()