Skip to content

Commit

Permalink
support multi-dbserver
Browse files Browse the repository at this point in the history
  • Loading branch information
ChickenSellerRED committed Oct 26, 2023
1 parent bc180f9 commit 9a035b1
Show file tree
Hide file tree
Showing 11 changed files with 122 additions and 67 deletions.
1 change: 0 additions & 1 deletion .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ tools
.idea/

# Don't state conf.py
conf/
tools/
config.yaml

Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ services:
depends_on:
- db
db:
image: postgres:15-alpine
image: postgres/postgres:16-3.4-alpine
container_name: postgres-db
environment:
POSTGRES_DB: admin
Expand Down
9 changes: 8 additions & 1 deletion inituser_and_start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,11 @@ python init_user.py &

python stream_sim.py &

streamlit run viz.py
streamlit run viz.py


wget
lsb_release
gnupg
service postgresql start
/etc/postgresql/{version}/main/pg_hba.conf
11 changes: 7 additions & 4 deletions markdown/setting_up.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
# Welcome to w4h setting up toturial!

## Prerequisites
Ensure you have your database service in somewhere, and you know the
db's host,username,password,database name, and the port it is listening to.
W4h contains a default postgre db which has been setting up in you server, uses port 5432.
If you want to add a new or change the current db server, ensure you have your database service in somewhere,
and you know the db's host,username,password,database name, and the port it is listening to.

## Setup
1. create your config.yaml to setup your db, according to the example file:
[config.yaml.example](../app/static/config.yaml.example)
1. Change your config.yaml to setup your db, according to the example file:
[config.yaml.example](../app/static/config.yaml.example)
2. Set the "database_number", Create new database server fields and set the index correctly.
3. set up your DB server's config in the following field, and save file:
- nickname: your db's showing name in w4h
- dbms: 'postgresql' or 'mysql'
- host: your db's host
- port: your db's port
Expand Down
14 changes: 10 additions & 4 deletions script/import_hub_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from script.utils import load_config

from script.w4h_db_utils import create_w4h_instance, get_existing_databases, populate_tables, populate_subject_table
from script.w4h_db_utils import create_w4h_instance, get_existing_databases, populate_tables, populate_subject_table,get_existing_database_server


CONFIG_FILE = 'conf/config.yaml'
Expand Down Expand Up @@ -106,12 +106,18 @@ def import_page():


elif database_option == db_selection_options[1]:
new_db_name = st.text_input("Enter new w4h database instance name")

col1, col2 = st.columns([2,5])
with col1:
selected_db_server = st.selectbox("**Select a database server**", get_existing_database_server()).split(' (')[0]
with col2:
new_db_name = st.text_input("Enter new w4h database instance name")
if st.button("Create"):
# Here, implement logic to create the new database with the name new_db_name.
create_w4h_instance(new_db_name, config_path) # This function needs to be implemented.
print(selected_db_server)
create_w4h_instance(selected_db_server, new_db_name, config_path) # This function needs to be implemented.
st.success(f"Database '{new_db_name}' created!")
selected_db = new_db_name
selected_db = "[" + selected_db_server + "] " + new_db_name


uploaded_file = st.file_uploader("Choose a CSV file", type="csv")
Expand Down
32 changes: 24 additions & 8 deletions script/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,14 @@ def load_config(config_file: str) -> dict:
except yaml.YAMLError as exc:
print(exc)


def get_db_engine(config_file: str='conf/config.yaml', db_name=None) -> sqlalchemy.engine.base.Engine:
def getServerIdByNickname(config_file: str='conf/config.yaml', nickname='local db'):
config = load_config(config_file)
server_number = config['database_number']
for i in range(1,server_number+1):
if(config["database"+str(i)]['nickname'] == nickname):
return i
raise Exception("No such nickname: \""+nickname+"\"")
def get_db_engine(config_file: str='conf/config.yaml',db_server_id = 1, db_server_nickname = None, db_name=None,mixed_db_name=None) -> sqlalchemy.engine.base.Engine:
"""Create a SQLAlchemy Engine instance based on the config file
Args:
Expand All @@ -62,19 +68,29 @@ def get_db_engine(config_file: str='conf/config.yaml', db_name=None) -> sqlalche
# load the configurations
config = load_config(config_file=config_file)
# Database connection configuration
dbms = config['database']['dbms']
db_host = config['database']['host']
db_port = config['database']['port']
db_user = config['database']['user']
db_pass = config['database']['password']
if mixed_db_name != None:
db_server_nickname = mixed_db_name.split("] ")[0][1:]
db_name = mixed_db_name.split("] ")[1]
print(mixed_db_name,"!")
print("server: ", db_server_nickname,"!")
print("db_name: ", db_name, "!")
if db_server_nickname != None:
db_server_id = getServerIdByNickname(nickname=db_server_nickname)
db_server = 'database'+str(db_server_id)
dbms = config[db_server]['dbms']
db_host = config[db_server]['host']
db_port = config[db_server]['port']
db_user = config[db_server]['user']
db_pass = config[db_server]['password']
db_name = db_name if db_name else ''

db_user_encoded = urllib.parse.quote_plus(db_user)
db_pass_encoded = urllib.parse.quote_plus(db_pass)

# creating SQLAlchemy Engine instance
con_str = f'postgresql://{db_user_encoded}:{db_pass_encoded}@{db_host}:{db_port}/{db_name}'
db_engine = create_engine(con_str, echo=True, future=True)
print(con_str)
db_engine = create_engine(con_str, echo=True)

return db_engine

52 changes: 32 additions & 20 deletions script/w4h_db_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from script.utils import load_config, get_db_engine


def create_tables(db_name: str, config_file='conf/config.yaml'):
def create_tables(db_server_nickname:str, db_name: str, config_file='conf/config.yaml'):
"""Create the W4H tables in the database with the given name based on the config file
Args:
Expand All @@ -23,7 +23,7 @@ def create_tables(db_name: str, config_file='conf/config.yaml'):
"""
metadata = MetaData()
config = load_config(config_file=config_file)
db_engine = get_db_engine(config_file, db_name=db_name)
db_engine = get_db_engine(config_file, db_server_nickname=db_server_nickname, db_name=db_name)
try:
columns_config = config["mapping"]["columns"]

Expand Down Expand Up @@ -56,14 +56,14 @@ def create_tables(db_name: str, config_file='conf/config.yaml'):



def create_w4h_instance(db_name: str, config_file='conf/config.yaml'):
def create_w4h_instance(db_server:str, db_name: str, config_file='conf/config.yaml'):
"""Create a new W4H database instance with the given name and initialize the tables based on the config file
Args:
db_name (str): Name of the database to create
config_file (str, optional): Path to the config file. Defaults to 'conf/config.yaml'.
"""
db_engine_tmp = get_db_engine(config_file)
db_engine_tmp = get_db_engine(config_file,db_server_nickname=db_server)
try:
logger.info('Database engine created!')
# Execute the SQL command to create the database if it doesn't exist
Expand All @@ -78,20 +78,19 @@ def create_w4h_instance(db_name: str, config_file='conf/config.yaml'):
except Exception as err:
logger.error(err)
db_engine_tmp.dispose()
db_engine = get_db_engine(config_file, db_name=db_name)
db_engine = get_db_engine(config_file,db_server_nickname=db_server, db_name=db_name)
try:
# Enable PostGIS extension
with db_engine.connect() as connection:
connection.execute(text(f"CREATE EXTENSION postgis;"))
logger.success(f"PostGIS extension enabled for {db_name}!")
connection.commit()
db_engine.dispose()
except Exception as err:
logger.error(err)
db_engine.dispose()
return
# Create the W4H tables
create_tables(config_file=config_file, db_name=db_name)
create_tables(config_file=config_file, db_name=db_name, db_server_nickname=db_server)
logger.success(f"W4H tables initialized!")


Expand All @@ -104,18 +103,31 @@ def get_existing_databases(config_file='conf/config.yaml') -> list:
Returns:
list: List of all existing databases (strings)
"""
db_list = []
config = load_config(config_file=config_file)
db_engine = get_db_engine(config_file)
try:
with db_engine.connect() as connection:
result = connection.execute(text("SELECT datname FROM pg_database WHERE datistemplate = false;"))
databases = [row[0] for row in result]
db_engine.dispose()
return databases
except Exception as err:
logger.error(err)
db_engine.dispose()
return []
database_number = config['database_number']
for i in range(1,database_number+1):
db_engine = get_db_engine(config_file,db_server_id=i)
try:
with db_engine.connect() as connection:
result = connection.execute(text("SELECT datname FROM pg_database WHERE datistemplate = false;"))
db_list += [ '[' + config['database'+str(i)]['nickname'] + '] ' + row[0] for row in result]
db_engine.dispose()
except Exception as err:
logger.error(err)
db_engine.dispose()
return db_list
return db_list

def get_existing_database_server(config_file='conf/config.yaml') -> list:
db_list_server = []
config = load_config(config_file=config_file)
database_number = config['database_number']
for i in range(1, database_number + 1):
db_list_server += [config['database'+str(i)]['nickname'] + ' (' + config['database'+str(i)]['host'] + ')']
return db_list_server




def populate_tables(df: pd.DataFrame, db_name: str, mappings: dict, config_path='conf/config.yaml'):
Expand All @@ -138,7 +150,7 @@ def populate_tables(df: pd.DataFrame, db_name: str, mappings: dict, config_path=
user_table_name = config['mapping']['tables']['user_table']['name']

# Create a session
engine = get_db_engine(config_path, db_name=db_name)
engine = get_db_engine(config_path, mixed_db_name=db_name)
Session = sessionmaker(bind=engine)
session = Session()

Expand Down Expand Up @@ -203,7 +215,7 @@ def populate_subject_table(df: pd.DataFrame, db_name: str, config_path='conf/con
config = load_config(config_path)

# Create a session
engine = get_db_engine(config_path, db_name=db_name)
engine = get_db_engine(config_path, mixed_db_name=db_name)

# populate the user table (directly push df to table), if already exists, append new users
df.to_sql(user_tbl_name, engine, if_exists='append', index=False)
Expand Down
24 changes: 18 additions & 6 deletions static/config.yaml.example
Original file line number Diff line number Diff line change
@@ -1,9 +1,21 @@
database:
dbms: 'postgresql' or 'mysql'
host: <hostname> # Replace with your DB host
port: <port> # Replace with your DB port
user: <username> # Replace with your DB username
password: <password> # Replace with your DB password
database_number: 1

database1:
nickname: 'local db' # will shows in the selectbox
dbms: 'postgresql'
host: 'db' # Replace with your DB host
port: 5432 # Replace with your DB port
user: 'admin' # Replace with your DB username
password: 'admin'

#database2:
# nickname: <your nick name>
# dbms: <db's system>
# host: <db host>
# port: <db port>
# user: <db username>
# password: <password>



mapping:
Expand Down
28 changes: 14 additions & 14 deletions stream_sim.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import urllib.parse

from script.conf import *
from script.utils import Singleton, load_config
from script.utils import Singleton, load_config, get_db_engine

data_loader_inited = False

Expand Down Expand Up @@ -84,18 +84,18 @@ def init_start_stream_index(self, start_time):



def get_db_engine(db_name=None):
"""
Returns a SQLAlchemy engine for connecting to the database.
Returns:
engine (sqlalchemy.engine.base.Engine): SQLAlchemy engine object.
"""
config = load_config("conf/config.yaml")["database"]
db_user_enc = urllib.parse.quote_plus(config["user"])
db_pass_enc = urllib.parse.quote_plus(config["password"])
# traceback.print_exc()
return create_engine(f'{config["dbms"]}://{db_user_enc}:{db_pass_enc}@{config["host"]}:{config["port"]}/{db_name}')
# def get_db_engine(db_name=None):
# """
# Returns a SQLAlchemy engine for connecting to the database.
#
# Returns:
# engine (sqlalchemy.engine.base.Engine): SQLAlchemy engine object.
# """
# config = load_config("conf/config.yaml")["database"]
# db_user_enc = urllib.parse.quote_plus(config["user"])
# db_pass_enc = urllib.parse.quote_plus(config["password"])
# # traceback.print_exc()
# return create_engine(f'{config["dbms"]}://{db_user_enc}:{db_pass_enc}@{config["host"]}:{config["port"]}/{db_name}')


def get_query_result(query, db_conn, params=[]):
Expand Down Expand Up @@ -179,7 +179,7 @@ def get_data(config):
raise ValueError('DATA_TYPE must be either DATABASE or CSV.')
if data_type == 'DATABASE':
# Fetching data from database table
db_conn = get_db_engine(db_name)
db_conn = get_db_engine(mixed_db_name=db_name)
res = (get_series_from_db(db_conn, table_name=db_table, ids=ids, id_column='id', start_time=start_time),
get_series_from_db(db_conn, table_name=db_calories_table, ids=ids, id_column='id', start_time=start_time),
get_series_from_db(db_conn, table_name=db_coordinates_table, ids=ids, id_column='id', start_time=start_time))
Expand Down
Binary file modified user.db
Binary file not shown.
16 changes: 8 additions & 8 deletions viz.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from script.import_hub_main import import_page
import geopandas as gpd
from shapely import wkb
from script.utils import get_db_engine

import os

Expand All @@ -42,11 +43,11 @@


# get db engine
def get_db_engine():
config = load_config("conf/config.yaml")
db_user_enc = urllib.parse.quote_plus(config["database"]["user"])
db_pass_enc = urllib.parse.quote_plus(config["database"]["password"])
return create_engine(f'postgresql://{db_user_enc}:{db_pass_enc}@{config["database"]["host"]}:{config["database"]["port"]}/{st.session_state["current_db"]}')
# def get_db_engine():
# config = load_config("conf/config.yaml")
# db_user_enc = urllib.parse.quote_plus(config["database"]["user"])
# db_pass_enc = urllib.parse.quote_plus(config["database"]["password"])
# return create_engine(f'postgresql://{db_user_enc}:{db_pass_enc}@{config["database"]["host"]}:{config["database"]["port"]}/{st.session_state["current_db"]}')

# get user ids
def get_garmin_user_id(db_conn, pattern=None):
Expand Down Expand Up @@ -118,7 +119,7 @@ def get_data(session=None, real_time=False) -> pd.DataFrame:
else:
start_date = session.get('start_date')
end_date = session.get('end_date')
db_conn = get_db_engine()
db_conn = get_db_engine(mixed_db_name=session["current_db"])
# query heart rate
df_hrate = pd.read_sql(f"SELECT * FROM {DB_TABLE} WHERE Date(timestamp) >= Date(%s) AND Date(timestamp) <= Date(%s)", db_conn, params=[start_date, end_date])
df_hrate.sort_values(by=['timestamp'], inplace=True)
Expand Down Expand Up @@ -1132,7 +1133,6 @@ def tutorial_page():
elif page == "How to start":
with open('markdown/how_to_start.md', 'r', encoding='utf-8') as markdown_file:
markdown_text = markdown_file.read()
# 显示Markdown内容
st.markdown(markdown_text, unsafe_allow_html=True)
if page == "Setting up":
config_file = st.file_uploader("Upload config file", type=['yaml', 'example','txt'])
Expand Down Expand Up @@ -1180,7 +1180,7 @@ def main():
st.experimental_rerun()

if(session["current_db"] != ""):
garmin_df = get_garmin_df(get_db_engine())
garmin_df = get_garmin_df(get_db_engine(mixed_db_name=session["current_db"]))
garmin_df.age = garmin_df.age.astype(int)
garmin_df.weight = garmin_df.weight.astype(int)
garmin_df.height = garmin_df.height.astype(int)
Expand Down

0 comments on commit 9a035b1

Please sign in to comment.