1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
| import time, sys, os import numpy as np import pandas as pd
import sqlite3 import streamlit as st
def create_connection(db_file):
conn = None try: conn = sqlite3.connect(db_file) except Exception as e: st.write(e)
return conn
def create_database():
st.write("# PAGE1 : 데이터베이스 만들기") st.write("---")
db_filename = st.text_input("데이터베이스 이름을 입력하세요.") create_db = st.button('데이터베이스 생성')
if create_db: if db_filename.endswith('.db'): if db_filename not in [file for file in os.listdir(os.getcwd())]: conn = create_connection(db_filename) st.success('데이터베이스 생성 완료.') else: st.error('이미 존재하는 데이터베이스입니다. 다시 입력하세요.') else: st.error('파일이름은 .db로 끝나야 합니다. 다시 입력하세요.')
def upload_data():
st.write("# PAGE2 : 파일 업로드하기") st.write("---")
sqlite_dbs = [file for file in os.listdir('.') if file.endswith('.db')] db_filename = st.selectbox('데이터베이스를 선택하세요.', sqlite_dbs) if db_filename is not None: conn = create_connection(db_filename) cursor = conn.cursor() cursor.execute("SELECT name FROM sqlite_master WHERE type='table';") tables = cursor.fetchall() tables = [table[0] for table in tables] table_name = st.text_input('테이블 이름을 입력하세요.') if len(table_name) > 0: if table_name not in tables: uploaded_file = st.file_uploader('업로드할 파일을 첨부하세요.') if uploaded_file is not None: try: df = pd.read_csv(uploaded_file, encoding='cp949') st.write('**Data loaded successfully. These are the first 3 rows.**') st.dataframe(df.head(3), use_container_width=True) col1, col2 = st.columns([8,1]) is_apply = col2.button('Upload', use_container_width=True) if is_apply: pg_bar = col1.progress(0, text="⏩Progress") for percent_complete in range(100): time.sleep(0.01) pg_bar.progress(percent_complete + 1, text="Progress") df.to_sql(name=table_name, con=conn, if_exists='replace', index=False) time.sleep(0.1) st.success("업로드가 완료되었습니다.") except Exception as e: st.write(e) else: st.error("이미 존재하는 테이블입니다. 다시 입력하세요.") else: st.error('DB 파일이 존재하지 않습니다. DB 파일을 먼저 생성하세요.')
def run_query():
st.write("# PAGE3 : SQL 쿼리로 데이터 조작하기") st.write("---")
sqlite_dbs = [file for file in os.listdir('.') if file.endswith('.db')] db_filename = st.selectbox('DB Filename', sqlite_dbs) query = st.text_area('SQL Query', height=150) if db_filename is not None: if len(query) == 0: is_disabled = True else: is_disabled = False submitted = st.button('Run Query', disabled=is_disabled)
if submitted: conn = create_connection(db_filename) query = conn.execute(query) cols = [column[0] for column in query.description] results_df= pd.DataFrame.from_records( data = query.fetchall(), columns = cols ) st.dataframe(results_df) if st.button('Download'): convert_df(results_df)
else: st.error('DB 파일이 존재하지 않습니다. DB 파일을 먼저 생성하세요.')
def convert_df(df): return df.to_csv(index=False).encode('cp949')
def main():
st.set_page_config( page_title="Sqlite3 DB Connect with Streamlit", page_icon="⚒️", layout="wide", initial_sidebar_state="auto", menu_items={ 'Get Help': 'mailto:donumm64@gmail.com', 'About': "*Made by gonekng*" } )
st.sidebar.subheader("🎈Streamlit으로 Sqlite3 DB 연동하기") st.sidebar.write("---") page_names_to_funcs = { "Create Database": create_database, "Upload Data": upload_data, "Run Query": run_query, }
selected_page = st.sidebar.selectbox("Select a page", page_names_to_funcs.keys(), label_visibility='collapsed') page_names_to_funcs[selected_page]()
if __name__ == "__main__":
main()
|