這幾天為了給市圖爬蟲程式改版 (爬取結果儲存於資料庫), 將架在 Mapleboard 與 render.com 上面 serverless 平台添加了 SQLite 資料庫線上管理功能, 本篇旨在紀錄此新增功能. 關於 SQLite 資料庫操作參考 :
在之前的 serverless 專案中, 利用一個 SQLite 資料庫 serverless.db 來儲存各個非系統函式模組被呼叫的次數, 資料表名稱為 call_stats. 現在想用這資料庫來儲存應用函式的資料, 例如圖書館爬蟲之爬取結果, 所以需要一組系統函式來進行資料表的線上管理, 為此我在首頁函式列表底下添加了一個 "資料表列表" 超連結至 /function/list_tables :
所需模組如下 :
- list_tables.py :
資料表管理之首頁, 以表格顯示非系統資料表, 利用超連結進行顯示 schema, 紀錄, 執行 SQL, 以及刪除資料表等操作. - add_table.py :
根據 Schema 新增一個資料表. - drop_table.py :
刪除指定之資料表. - view_table.py :
以分頁方式顯示指定之資料表內容. - show_shema.py :
顯示指定資料表結構. - export_table.py :
輸出指定資料表內容為 csv 檔. - execute_sql.py :
執行 SQL 指令.
我透過 ChatGPT 協助快速生成上述 7 個資料庫操作的程式碼, 然後依據需求進行微幅調整. 在這過程中也發現了 edit_function.py 的臭蟲, 由於線上操作需要顯示較複雜之 HTML 碼, 原本的 edit_function.py 頁面會出現如下版面崩潰現象 :
解決之道是使用 html.escape 函式來跳脫 HTML 碼 (例如將 < 字元轉成 <). 注意, 以前 Flask 有實作 escape() 函式, 但新版因 Python 標準模組 html 模組已內建而移除. 修改後的 edit_function.py 程式碼如下 :
# edit_function.py
# 模組名稱也可更改 -> 相當於新增模組
from flask import render_template_string
import os
from html import escape
def main(request, **kwargs):
module_name=request.args.get('module_name', '') # 取得模組名稱
if not module_name: # 檢查有無傳入模組名稱
return '請指定要編輯的模組名稱,格式 ?module_name=hello'
filename=f'./functions/{module_name}.py'
if not os.path.isfile(filename):
return f'找不到函式檔案:{module_name}'
with open(filename, 'r', encoding='utf-8') as f:
content=f.read() # 讀取模組內容
content=escape(content)
html=f'''
<h2>編輯函式模組:/functions/{module_name}.py</h2>
<form method="POST" action="/function/update_function">
<label>模組名稱(請用英數字,不含副檔名 .py):</label><br>
<input type="text" name="module_name" size="50" value="{module_name}"><br><br>
<label>模組內容(請輸入合語法的 Python 程式碼):</label><br>
<textarea id="code" name="code" rows="20" cols="100" style="font-family:monospace;">{content}</textarea><br><br>
<button type="button" onclick="location.href='/function/list_functions'">取消</button>
<button type="button" onclick="document.getElementById('code').value = '';">清除</button>
<button type="submit">更新</button>
</form>
'''
return render_template_string(html)
此處改用 flask.render_template_string() 函式來渲染 HTML 程式碼.
以下是經過修正後的程式碼 :
1. list_tables.py :
此模組為資料表管理之首頁, 以表格顯示所有非系統資料表, 利用超連結進行顯示 schema, 紀錄, 執行 SQL, 以及刪除資料表等操作.
# list_tables.py
import os
import sqlite3
def main(request, **kwargs):
protected=[] # 被保護的資料名稱清單 (不顯示)
DB_PATH='./serverless.db' # 資料庫路徑
# 檢查資料庫檔案是否存在
if not os.path.exists(DB_PATH):
html='''
<p>資料庫檔案 serverless.db 不存在!
<a href="/function/list_functions">函式列表</a></p>
'''
return html
# 連線資料庫
try:
conn=sqlite3.connect(DB_PATH)
cur=conn.cursor()
# 取得所有非系統資料表
cur.execute("""
SELECT name FROM sqlite_master WHERE type='table'
AND name NOT LIKE 'sqlite_%' ORDER BY name;
""")
rows=cur.fetchall()
tables=[r[0] for r in rows]
html='<h2>資料表列表</h2>'
html += '<table border="1" cellspacing="0" cellpadding="6" '+\
'style="border-collapse: collapse;">'
html += '<tr><th>資料表名稱</th><th>記錄筆數</th><th>Schema</th>' +\
'<th>檢視記錄</th><th>匯出</th><th>刪除</th></tr>'
if not tables:
html += '<tr><td colspan="6">目前無任何資料表</td></tr>'
else: # 遍歷所有非系統資料表
for table in tables:
if table in protected: # 不顯示被保護資料表
continue
try: # 取得該資料表的記錄筆數 (若失敗則顯示 '-')
cur2=conn.cursor()
cur2.execute(f"SELECT COUNT(*) FROM \"{table}\";")
cnt=cur2.fetchone()[0]
except Exception: # 資料表無紀錄
cnt='-'
html += '<tr>'
html += f'<td>{table}</td>'
html += f'<td align="right">{cnt}</td>'
html += f'<td><a href="/function/show_schema?table={table}">顯示</a></td>'
html += f'<td><a href="/function/view_table?table={table}">檢視</a></td>'
html += f'<td><a href="/function/export_table?table={table}">CSV</a></td>'
html += f'<td><a href="/function/drop_table?table={table}" onclick=' +\
f'"return confirm(\'確定要刪除資料表 {table} 嗎?\')">刪除</a></td>'
html += '</tr>'
html += '</table>'
# 管理連結:新增資料表、回到主頁、登出等
html += '<br><a href="/function/add_table">新增資料表</a> '
html += '<a href="/function/execute_sql">執行 SQL</a> '
html += '<a href="/function/list_functions">返回函式列表</a> '
html += '<a href="/logout">登出</a>'
conn.close()
return html
except sqlite3.Error as e:
return f'<p>連線資料庫失敗 {str(e)}</p>'
finally:
if 'conn' in locals():
conn.close()
此處 protected 用來列舉系統資料表, 避免在操作中被誤改, 目前僅統計呼叫次數 call_stats 資料表而已, 為了測試方便先暫時維持空串列, 測試完再把 call_stats 加入串列中.
所有資料庫操作的功能都在此網頁中.
2. add_table.py :
此模組根據指定之 Schema 新增一個資料表.
# add_table.py
import os
import sqlite3
from flask import request
def main(request, **kwargs):
DB_PATH='./serverless.db'
# 還沒送出表單 : 顯示建立表單頁面
if request.method == 'GET':
html=(
"<h2>新增資料表</h2>"
"<form method='post' action='/function/add_table'>"
"<label>資料表名稱:</label><br>"
"<input type='text' name='table' required><br><br>"
"<label>欄位定義 (Schema, 以逗號分隔):</label><br>"
"<textarea name='schema' rows='4' cols='70' "
"placeholder='例如:id INTEGER PRIMARY KEY, name TEXT, age INTEGER, height REAL' "
"required></textarea><br><br>"
"<input type='submit' value='建立資料表'>"
"</form>"
"<a href='/function/list_tables'>返回資料表列表</a>"
)
return html
# 處理表單送出
if request.method == 'POST':
table=request.form.get('table', '').strip()
schema=request.form.get('schema', '').strip()
if not table or not schema:
return '<p>請輸入資料表名稱與欄位定義!<a href="/function/add_table">返回</a></p>'
if not os.path.exists(DB_PATH):
return '''
<p>資料庫檔案 serverless.db 不存在!
<a href="/function/list_tables">返回</a></p>
'''
try: # 建立資料表
conn=sqlite3.connect(DB_PATH)
cur=conn.cursor()
cur.execute(f'CREATE TABLE IF NOT EXISTS "{table}" ({schema});')
conn.commit()
conn.close()
return f'''
<p>資料表 <b>{table}</b> 建立成功!</p>
<a href="/function/list_tables">返回資料表列表</a>
'''
except sqlite3.Error as e:
return f'<p>建立資料表失敗:{str(e)} <a href="/function/add_table">返回</a></p>'
finally:
if 'conn' in locals():
conn.close()
此模組集輸入表單與建立資料表兩個動作於一身, 依據 HTTP 方法來判定要做哪個動作, 剛進入 /function/add_table 時是 GET 方法, 這時會顯示輸入表單頁面 :
此處以建立市圖借書預約資訊的資料表 ksml_books 為例, 輸入如下 Schema :
account PRIMARY KEY, borrow_books TEXT, reserve_books TEXT, updated_at TEXT
按建立資料表按鈕會以 POST 方法再次請求 /function/add_table, 這時會執行 CREATE TABLE 指令建立資料表 :
按 ksml_books 資料表 Schema 欄的顯示鈕會請求 /function/show_schema 模組, 其程式碼如下 :
# show_schema.py
import os
import sqlite3
from urllib.parse import parse_qs
def main(request, **kwargs):
# 解析 URL 參數取得資料表名稱
query=request.query_string.decode('utf-8')
params=parse_qs(query)
table=params.get('table', [''])[0]
DB_PATH='./serverless.db'
# 檢查資料庫是否存在
if not os.path.exists(DB_PATH):
return '''
<p>資料庫檔案 serverless.db 不存在!
<a href="/function/list_tables">返回資料表列表</a></p>
'''
# 檢查 URL 參數中是否有 table 名稱
if not table:
return '''
<p>缺少 table 參數!
<a href="/function/list_tables">返回資料表列表</a></p>
'''
try:
conn=sqlite3.connect(DB_PATH)
cur=conn.cursor()
# 檢查資料表是否存在
cur.execute("SELECT name FROM sqlite_master WHERE type='table' AND name=?;", (table,))
exists=cur.fetchone()
if not exists:
conn.close()
return f'''
<p>資料表 <b>{table}</b> 不存在!
<a href="/function/list_tables">返回資料表列表</a></p>
'''
# 取得欄位資訊
cur.execute(f"PRAGMA table_info('{table}');")
columns=cur.fetchall()
# 取得 Schema 的 SQL
cur.execute("SELECT sql FROM sqlite_master WHERE type='table' AND name=?;", (table,))
create_sql=cur.fetchone()[0]
conn.close()
# 產生 HTML
html=f'<h2>資料表 {table} 結構 (Schema)</h2>'
html += '<table border="1" cellspacing="0" cellpadding="6" style="border-collapse: collapse;">'
html += '<tr><th>#</th><th>欄位名稱</th><th>資料型別</th><th>非空</th><th>預設值</th><th>主鍵</th></tr>'
for col in columns:
cid, name, ctype, notnull, dflt_value, pk=col
html += f'<tr>'
html += f'<td>{cid}</td>'
html += f'<td>{name}</td>'
html += f'<td>{ctype}</td>'
html += f'<td>{"✓" if notnull else ""}</td>'
html += f'<td>{dflt_value if dflt_value is not None else ""}</td>'
html += f'<td>{"✓" if pk else ""}</td>'
html += '</tr>'
html += '</table>'
# 顯示 CREATE TABLE 語法
html += '<h3>CREATE TABLE SQL 語法</h3>'
html += f'<pre style="background-color:#f4f4f4;padding:10px;">{create_sql}</pre>'
# 導覽連結
html += '<a href="/function/list_tables">返回資料表列表</a>'
return html
except sqlite3.Error as e:
return f'<p>讀取資料表結構失敗:{str(e)}</p>'
finally:
if 'conn' in locals():
conn.close()
此程式使用 urllib.parse.parse_qs() 函式來解析請求之 URL 字串以取得 table 參數, 先用 PRAGMA table_info('{table}') 指令取得資料表的欄位資訊, 然後從 SQLite 的系統資料表 sqlite_master 的 sql 欄位中取得建立此資料表之 SQL 指令, 最後利用迴圈顯示欄位資訊與 Schema (SQL 指令) :
接下來實作 /function/execute_sql 並利用它來寫入資料表 :
# execute_sql.py
import os
import sqlite3
from flask import request
from html import escape
def main(request, **kwargs):
DB_PATH='./serverless.db'
# 還沒送出表單 (GET):顯示 SQL 輸入頁面
if request.method == 'GET':
html=(
"<h2>執行 SQL 指令</h2>"
"<form method='post' action='/function/execute_sql'>"
"<label>輸入 SQL 指令:</label><br>"
"<textarea name='sql' rows='6' cols='80' "
"placeholder='例如:SELECT * FROM call_stats;' required></textarea><br><br>"
"<input type='submit' value='執行'>"
"</form>"
"<a href='/function/list_tables'>返回資料表列表</a>"
)
return html # Removed conn.close() since conn is not defined
# 處理表單送出 (POST):執行 SQL 指令
if request.method == 'POST':
sql=request.form.get('sql', '').strip() # 取出 sql 欄位
if not sql:
return '<p>請輸入 SQL 指令!<a href="/function/execute_sql">返回</a></p>'
if not os.path.exists(DB_PATH):
return (
'<p>資料庫檔案 serverless.db 不存在!'
'<a href="/function/list_tables">返回</a></p>'
)
try: # 執行 SQL 指令
conn=sqlite3.connect(DB_PATH) # 建立連線與 cursor
cur=conn.cursor()
# SELECT 查詢 : 執行查詢後取出所有結果
if sql.lower().startswith('select'):
cur.execute(sql)
rows=cur.fetchall()
columns=[desc[0] for desc in cur.description] # 取得欄位名稱
# 將查詢結果轉成 HTML 表格
if not rows:
result="<p>查無資料。</p>"
else:
result='<table border="1" cellspacing="0" cellpadding="6" style="border-collapse: collapse;">'
result += "<tr>" + "".join(f"<th>{escape(col)}</th>" for col in columns) + "</tr>"
for row in rows:
result += "<tr>" + "".join(f"<td>{escape(str(v))}</td>" for v in row) + "</tr>"
result += "</table>"
conn.close() # 關閉資料庫連線
return (
f"<h2>SQL 查詢結果</h2>"
f"<p><b>指令:</b> {escape(sql)}</p>"
f"{result}"
"<br><a href='/function/execute_sql'>返回</a>"
)
else: # 其他非 SELECT 指令 (INSERT, UPDATE, DELETE, DROP/CREATE TABLE)
cur.execute(sql)
conn.commit()
affected=cur.rowcount
conn.close()
return (
f"<p>SQL 指令執行成功!(影響 {affected} 筆紀錄)</p>"
f"<p><b>指令:</b> {escape(sql)}</p>"
"<a href='/function/execute_sql'>返回</a>"
)
except sqlite3.Error as e:
conn.close() # 關閉資料庫連線
return f"<p>SQL 執行錯誤:{escape(str(e))}</p><a href='/function/execute_sql'>返回</a>"
此程式與 create_table.py 結構相同, 都是利用判別 HTTP 請求是 GET 或 POST 方法來決定回應方式, 若是 GET 方法就回應一個 SQL 輸入頁面; 若是 POST 方法就執行表單傳來的 SQL 指令 (分成 SELECT 與非 SELECT 指令兩類). 另外, 與上面修改 edit_function.py 的原因一樣, 為了避免網頁版面崩掉與 XSS 注入, 此程式使用 html.escape() 來跳脫 HTML 字元.
第一次請求 /function/execute_sql 時是 GET 方法, 這時會顯示 SQL 輸入頁面, 測試用的 SQL 寫入指令例如 :
INSERT OR REPLACE INTO ksml_books (account, borrow_books, reserve_books, updated_at)
VALUES ('tony', '["Python入門", "資料庫設計"]', '["AI導論"]', '2025-10-19 14:39:00')
此處因為 account 是不能重複的主鍵, 所以我們使用 INSERT OR REPLACE 指令, 當然也可以第一次用 INSERT 後續用 UPDATE 指令, 但 用 INSERT OR REPLACE 較單純 :
新增紀錄成功後, 再新增下一筆紀錄 :
INSERT OR REPLACE INTO ksml_books (account, borrow_books, reserve_books, updated_at)
VALUES ('amy', '["京都時光"]', '["關西地圖"]', '2025-10-19 14:45:00')
這樣回到資料表列表就會看到 ksml_stats 有兩筆紀錄了 :
按檢視紀錄欄中的檢視超連結就會呼叫 /function/view_table 顯示資料表內容, 程式碼如下 :
# view_table.py
import os
import sqlite3
import html
from urllib.parse import parse_qs
def main(request, **kwargs):
# 解析 URL 查詢參數 (?table=xxx&page=1)
query=request.query_string.decode('utf-8')
params=parse_qs(query)
table=params.get('table', [''])[0]
try:
page=int(params.get('page', ['1'])[0])
if page < 1:
page=1
except ValueError:
page=1
DB_PATH='./serverless.db'
ROWS_PER_PAGE=50 # 每頁顯示筆數
# 檢查資料庫是否存在
if not os.path.exists(DB_PATH):
return '''
<p>資料庫檔案 serverless.db 不存在!
<a href="/function/list_tables">返回資料表列表</a></p>
'''
# 檢查 URL 參數中是否有 table 名稱
if not table:
return '''
<p>缺少 table 參數!
<a href="/function/list_tables">返回資料表列表</a></p>
'''
try:
conn=sqlite3.connect(DB_PATH)
cur=conn.cursor()
# 安全檢查 (避免惡意表名注入)
cur.execute("""
SELECT name FROM sqlite_master
WHERE type='table' AND name=?
""", (table,))
if not cur.fetchone():
conn.close()
return f'<p>資料表 "{html.escape(table)}" 不存在!</p>'
# 取得欄位名稱
cur.execute(f'SELECT * FROM "{table}" LIMIT 1;')
colnames=[d[0] for d in cur.description] if cur.description else []
# 計算總筆數
cur.execute(f'SELECT COUNT(*) FROM "{table}";')
total_rows=cur.fetchone()[0]
total_pages=(total_rows + ROWS_PER_PAGE - 1) // ROWS_PER_PAGE
# 取得當前頁的資料
offset=(page - 1) * ROWS_PER_PAGE
cur.execute(f'SELECT * FROM "{table}" LIMIT ? OFFSET ?;', (ROWS_PER_PAGE, offset))
rows=cur.fetchall()
conn.close()
# 組成 HTML
text=f'<h2>資料表 {html.escape(table)} 內容</h2>'
text += '<table border="1" cellspacing="0" cellpadding="6" style="border-collapse: collapse;">'
# 欄位列
if colnames:
text += '<tr>' + ''.join(f'<th>{html.escape(c)}</th>' for c in colnames) + '</tr>'
else:
text += '<tr><td colspan="99">(無欄位)</td></tr>'
# 資料列
if rows:
for row in rows:
text += '<tr>' + ''.join(f'<td>{html.escape(str(v))}</td>' for v in row) + '</tr>'
else:
text += '<tr><td colspan="99">(本頁無資料)</td></tr>'
text += '</table>'
# 分頁列
text += f'<p>共 {total_rows} 筆資料,頁 {page}/{total_pages}</p>'
text += '<div style="margin:10px 0;">'
if page > 1:
text += f'<a href="/function/view_table?table={html.escape(table)}&page={page-1}">上一頁</a> '
if page < total_pages:
text += f'<a href="/function/view_table?table={html.escape(table)}&page={page+1}">下一頁</a>'
text += '</div>'
text += '<a href="/function/list_tables">返回資料表列表</a>'
return text
except Exception as e:
return f'<p>讀取資料時發生錯誤:{html.escape(str(e))}</p>'
finally:
if 'conn' in locals():
conn.close()
此程式使用分頁方式來顯示資料表內的全部紀錄 (預設每頁 50 筆), 首先用 urllib.parse.parse_qs() 函式來解析請求之 URL 字串以取得 table 與 page 參數, 然後用 SELECT COUNT 指令取得紀錄總筆數以便計算要分幾頁來顯示, 結果如下 :
按資料表列表匯出欄的 CSV 超連結會呼叫 /function/export_table 來匯出該資料表內容為 CSV 檔, 程式碼如下 :
# export_table.py
import os
import sqlite3
import csv
from io import StringIO
from flask import Response, request
def main(request, **kwargs):
# 解析 URL 查詢參數 (?table=xxx)
table=request.args.get('table')
if not table:
return '<p>未指定資料表名稱!<a href="/function/list_tables">返回列表</a></p>'
DB_PATH='./serverless.db'
# 檢查資料庫是否存在
if not os.path.exists(DB_PATH):
return '<p>資料庫檔案 serverless.db 不存在!<a href="/function/list_tables">返回列表</a></p>'
try:
conn=sqlite3.connect(DB_PATH)
cur=conn.cursor()
# 取得欄位名稱
try:
cur.execute(f'PRAGMA table_info("{table}")')
columns_info=cur.fetchall()
if not columns_info:
return f'<p>資料表 {table} 不存在!<a href="/function/list_tables">返回列表</a></p>'
columns=[col[1] for col in columns_info] # col[1] 是欄位名稱
except Exception as e:
return f'<p>取得欄位失敗: {str(e)}</p>'
# 取得所有資料
try:
cur.execute(f'SELECT * FROM "{table}"')
rows=cur.fetchall()
except Exception as e:
return f'<p>讀取資料失敗: {str(e)}</p>'
# 將資料寫入 CSV
output=StringIO()
writer=csv.writer(output)
writer.writerow(columns)
writer.writerows(rows)
csv_data=output.getvalue()
output.close()
# 回傳 CSV 給瀏覽器下載
csv_bytes=csv_data.encode('utf-8-sig')
return Response(
csv_bytes,
mimetype='text/csv',
headers={'Content-Disposition': f'attachment; filename="{table}.csv"'}
)
except sqlite3.Error as e:
return f'<p>連線資料庫失敗 {str(e)}</p>'
finally:
if 'conn' in locals():
conn.close()
此程式使用 io.StringIO 類別來將資料表內容轉成 CSV 資料, 注意, 此處為了解決 Excel 在 Windows 版的 BOM 中文亂碼問題, 此 CSV 資料還呼叫了 encode() 來指定 'utf-8-sig' 編碼格式, 最後將編碼後的 CSV 資料利用 flask.Response() 函式傳給瀏覽器下載 :
開啟 ksml_books.csv 結果如下 :
最後一個模組是用來刪除資料表的 /function/drop_table, 程式碼如下 :
# drop_table.py
import os
import sqlite3
from flask import request, redirect
def main(request, **kwargs):
# 解析 URL 參數取得資料表名稱
query=request.query_string.decode('utf-8')
params=parse_qs(query)
table=params.get('table', [''])[0]
DB_PATH='./serverless.db'
# 檢查資料庫是否存在
if not os.path.exists(DB_PATH):
return '''
<p>資料庫檔案 serverless.db 不存在!
<a href="/function/list_tables">返回資料表列表</a></p>
'''
# 設定保護清單,避免刪除重要系統表
protected=['call_stats']
if table in protected:
return f'''
<p>資料表 {table} 為保護表,無法刪除!
<a href="/function/list_tables">返回列表</a></p>
'''
try:
conn=sqlite3.connect(DB_PATH)
cur=conn.cursor()
# 執行刪除資料表
cur.execute(f'DROP TABLE IF EXISTS "{table}"')
conn.commit()
conn.close()
# 刪除成功後返回資料表列表
return f'''
<p>資料表 {table} 已刪除成功!
<a href="/function/list_tables">返回列表</a></p>
'''
except sqlite3.Error as e:
return f'<p>刪除資料表 {table} 失敗: {str(e)}</p>'
finally:
if 'conn' in locals():
conn.close()
此程式使用 DROP TABLE 指令來刪除指定資料表, 在資料表列表的刪除欄位中按刪除鈕, 經過確認即可刪除資料表 :
可見 ksml_books 資料表已被刪除.
OK, 這樣就報完工啦!
2025-10-19 補充 :
剛剛發現似乎缺了刪除紀錄的功能, 所以我又修改了 view_table.py, 在表格最後添加 "刪除" 欄, 按下超連結菁確認後會呼叫 delete_record.py, 攜帶 table 與 pk (帳號主鍵), 刪除完回 view_table.py :
# view_table.py
import os
import sqlite3
import html
from urllib.parse import parse_qs, quote
def main(request, **kwargs):
# 解析 URL 查詢參數 (?table=xxx&page=1)
query=request.query_string.decode('utf-8')
params=parse_qs(query)
table=params.get('table', [''])[0]
try:
page=int(params.get('page', ['1'])[0])
if page < 1:
page=1
except ValueError:
page=1
DB_PATH='./serverless.db'
ROWS_PER_PAGE=50 # 每頁顯示筆數
# 檢查資料庫是否存在
if not os.path.exists(DB_PATH):
return '''
<p>資料庫檔案 serverless.db 不存在!<a href="/function/list_tables">返回資料表列表</a></p>
'''
if not table:
return '''
<p>缺少 table 參數!<a href="/function/list_tables">返回資料表列表</a></p>
'''
try:
conn=sqlite3.connect(DB_PATH)
cur=conn.cursor()
# 安全檢查 (避免惡意表名注入)
cur.execute("""
SELECT name FROM sqlite_master
WHERE type='table' AND name=?
""", (table,))
if not cur.fetchone():
conn.close()
return f'<p>資料表 "{html.escape(table)}" 不存在!</p>'
# 取得欄位名稱
cur.execute(f'SELECT * FROM "{table}" LIMIT 1;')
colnames=[d[0] for d in cur.description] if cur.description else []
# 計算總筆數
cur.execute(f'SELECT COUNT(*) FROM "{table}";')
total_rows=cur.fetchone()[0]
total_pages=(total_rows + ROWS_PER_PAGE - 1) // ROWS_PER_PAGE
# 取得當前頁的資料
offset=(page - 1) * ROWS_PER_PAGE
cur.execute(f'SELECT * FROM "{table}" LIMIT ? OFFSET ?;', (ROWS_PER_PAGE, offset))
rows=cur.fetchall()
conn.close()
# 組成 HTML
text=f'<h2>資料表 {html.escape(table)} 內容</h2>'
text += '<table border="1" cellspacing="0" cellpadding="6" style="border-collapse: collapse;">'
# 欄位列
if colnames:
text += '<tr>'
text += ''.join(f'<th>{html.escape(c)}</th>' for c in colnames)
text += '<th>刪除</th>' # 新增刪除欄位
text += '</tr>'
else:
text += '<tr><td colspan="99">(無欄位)</td></tr>'
# 資料列
if rows:
for row in rows:
text += '<tr>'
for v in row:
text += f'<td>{html.escape(str(v))}</td>'
# 假設第一個欄位是主鍵
pk_value = quote(str(row[0]))
text += f'<td><a href="/function/delete_record?table={quote(table)}&pk={pk_value}" ' \
f'onclick="return confirm(\'確定要刪除此筆紀錄嗎?\')">刪除</a></td>'
text += '</tr>'
else:
text += '<tr><td colspan="99">(本頁無資料)</td></tr>'
text += '</table>'
# 分頁列
text += f'<p>共 {total_rows} 筆資料,頁 {page}/{total_pages}</p>'
text += '<div style="margin:10px 0;">'
if page > 1:
text += f'<a href="/function/view_table?table={quote(table)}&page={page-1}">上一頁</a> '
if page < total_pages:
text += f'<a href="/function/view_table?table={quote(table)}&page={page+1}">下一頁</a>'
text += '</div>'
text += '<a href="/function/list_tables">返回資料表列表</a>'
return text
except Exception as e:
return f'<p>讀取資料時發生錯誤:{html.escape(str(e))}</p>'
finally:
if 'conn' in locals():
conn.close()
確認刪除會呼叫 delete_record.py :
# delete_record.py
import os
import sqlite3
from flask import redirect, request
from html import escape
def main(request, **kwargs):
table=request.args.get('table', '').strip()
pk=request.args.get('pk', '').strip()
DB_PATH='./serverless.db'
# 檢查參數
if not table or not pk:
return f'<p>缺少 table 或 pk 參數!<a href="/function/list_tables">返回列表</a></p>'
# 檢查資料庫
if not os.path.exists(DB_PATH):
return f'<p>資料庫檔案 serverless.db 不存在!<a href="/function/list_tables">返回列表</a></p>'
try: # 刪除紀錄
conn=sqlite3.connect(DB_PATH)
cur=conn.cursor()
# 安全檢查:確認 table 存在
cur.execute("SELECT name FROM sqlite_master WHERE type='table' AND name=?", (table,))
if not cur.fetchone():
return f'<p>資料表 "{escape(table)}" 不存在!<a href="/function/list_tables">返回列表</a></p>'
# 執行刪除
cur.execute('DELETE FROM ksml_books WHERE account=?', (pk,))
conn.commit()
return redirect(f'/function/view_table?table={table}')
except sqlite3.Error as e:
return f'<p>刪除紀錄失敗:{escape(str(e))}</p><a href="/function/view_table?table={escape(table)}">返回</a>'
finally:
if 'conn' in locals():
conn.close()
刪除完返回 view_table.py :
雖然手動刪除使用機會不大, 但系統功能還是要完備.
最後修改 serverless.py, 將上面 8 個與資料庫管理相關的函式列入被保護的系統模組避免被誤改誤刪 :
# 需要驗證的函式列表
PROTECTED_FUNCTIONS=['list_functions',
'add_function',
'save_function',
'edit_function',
'update_function',
'delete_function',
'show_stats',
'clear_stats',
'list_tables',
'add_table',
'drop_table',
'view_table',
'show_schema',
'export_table',
'execute_sql',
'delete_record'
]
我把此次 v5 改版 zip 放在 GitHub :
而持續小幅修改的則放在 render.com 部署用的 repo :
2025-10-21 補充 :
今天修改了 view_table.py 中顯示資料表內容的程式碼, 將欄位內容中的 \n 跳行改為 <br> 以便在網頁中呈現跳行效果 :
# 資料列
if rows:
for row in rows:
text += '<tr>'
for v in row:
html_nl2br=html.escape(str(v)).replace("\n", "<br>")
text += f'<td>{html_nl2br}</td>'
它原本只是 html.escape(str(v)) 而已, 這樣 \n 就沒有跳行效果. 注意, 如果在 f 字串中處理 \n 字元會出錯, 必須先處理完再放進 f 字串裡.



















沒有留言 :
張貼留言