2025年10月19日 星期日

Mapleboard MP510-50 測試 (四十三) : HTTP API 函式執行平台 (7)

這幾天為了給市圖爬蟲程式改版 (爬取結果儲存於資料庫), 將架在 Mapleboard 與 render.com 上面 serverless 平台添加了 SQLite 資料庫線上管理功能, 本篇旨在紀錄此新增功能. 關於 SQLite 資料庫操作參考 :


在之前的 serverless 專案中, 利用一個 SQLite 資料庫 serverless.db 來儲存各個非系統函式模組被呼叫的次數, 資料表名稱為 call_stats. 現在想用這資料庫來儲存應用函式的資料, 例如圖書館爬蟲之爬取結果, 所以需要一組系統函式來進行資料表的線上管理, 為此我在首頁函式列表底下添加了一個 "資料表列表" 超連結至 /function/list_tables : 



所需模組如下 :
  1. list_tables.py :
    資料表管理之首頁, 以表格顯示非系統資料表, 利用超連結進行顯示 schema, 紀錄, 執行 SQL, 以及刪除資料表等操作. 
  2. add_table.py :
    根據 Schema 新增一個資料表.
  3. drop_table.py :
    刪除指定之資料表.
  4. view_table.py :
    以分頁方式顯示指定之資料表內容.
  5. show_shema.py :
    顯示指定資料表結構.
  6. export_table.py :
    輸出指定資料表內容為 csv 檔. 
  7. execute_sql.py :
    執行 SQL 指令. 
我透過 ChatGPT 協助快速生成上述 7 個資料庫操作的程式碼, 然後依據需求進行微幅調整. 在這過程中也發現了 edit_function.py 的臭蟲, 由於線上操作需要顯示較複雜之 HTML 碼, 原本的 edit_function.py 頁面會出現如下版面崩潰現象 : 




解決之道是使用 html.escape 函式來跳脫 HTML 碼 (例如將 < 字元轉成 &lt;). 注意, 以前 Flask 有實作 escape() 函式, 但新版因 Python 標準模組 html 模組已內建而移除. 修改後的 edit_function.py 程式碼如下 : 

# edit_function.py
# 模組名稱也可更改 -> 相當於新增模組
from flask import render_template_string
import os
from html import escape

def main(request, **kwargs):
    module_name=request.args.get('module_name', '')  # 取得模組名稱
    if not module_name:  # 檢查有無傳入模組名稱
        return '請指定要編輯的模組名稱,格式 ?module_name=hello'
    filename=f'./functions/{module_name}.py'
    if not os.path.isfile(filename):
        return f'找不到函式檔案:{module_name}'
    with open(filename, 'r', encoding='utf-8') as f: 
        content=f.read()  # 讀取模組內容
        content=escape(content)
    html=f'''
    <h2>編輯函式模組:/functions/{module_name}.py</h2>
    <form method="POST" action="/function/update_function">
        <label>模組名稱(請用英數字,不含副檔名 .py):</label><br>    
        <input type="text" name="module_name" size="50" value="{module_name}"><br><br>
        <label>模組內容(請輸入合語法的 Python 程式碼):</label><br>        
        <textarea id="code" name="code" rows="20" cols="100" style="font-family:monospace;">{content}</textarea><br><br>
        <button type="button" onclick="location.href='/function/list_functions'">取消</button>
        <button type="button" onclick="document.getElementById('code').value = '';">清除</button>
        <button type="submit">更新</button>
    </form>
    '''
    return render_template_string(html)

此處改用 flask.render_template_string() 函式來渲染 HTML 程式碼.

以下是經過修正後的程式碼 :


1. list_tables.py :

此模組為資料表管理之首頁, 以表格顯示所有非系統資料表, 利用超連結進行顯示 schema, 紀錄, 執行 SQL, 以及刪除資料表等操作.

# list_tables.py
import os
import sqlite3

def main(request, **kwargs):    
    protected=[]  # 被保護的資料名稱清單 (不顯示)    
    DB_PATH='./serverless.db'  # 資料庫路徑
    # 檢查資料庫檔案是否存在
    if not os.path.exists(DB_PATH):
        html='''
            <p>資料庫檔案 serverless.db 不存在!
            <a href="/function/list_functions">函式列表</a></p>
            '''
        return html
    # 連線資料庫
    try:
        conn=sqlite3.connect(DB_PATH)
        cur=conn.cursor()
        # 取得所有非系統資料表
        cur.execute("""
            SELECT name FROM sqlite_master WHERE type='table'
            AND name NOT LIKE 'sqlite_%' ORDER BY name;
            """)
        rows=cur.fetchall()
        tables=[r[0] for r in rows]
        html='<h2>資料表列表</h2>'
        html += '<table border="1" cellspacing="0" cellpadding="6" '+\
                'style="border-collapse: collapse;">'
        html += '<tr><th>資料表名稱</th><th>記錄筆數</th><th>Schema</th>' +\
                '<th>檢視記錄</th><th>匯出</th><th>刪除</th></tr>'
        if not tables:
            html += '<tr><td colspan="6">目前無任何資料表</td></tr>'
        else:  # 遍歷所有非系統資料表
            for table in tables:
                if table in protected:  # 不顯示被保護資料表                    
                    continue                
                try:  # 取得該資料表的記錄筆數 (若失敗則顯示 '-')
                    cur2=conn.cursor()
                    cur2.execute(f"SELECT COUNT(*) FROM \"{table}\";")
                    cnt=cur2.fetchone()[0]
                except Exception:  # 資料表無紀錄
                    cnt='-'
                html += '<tr>'
                html += f'<td>{table}</td>'
                html += f'<td align="right">{cnt}</td>'
                html += f'<td><a href="/function/show_schema?table={table}">顯示</a></td>'
                html += f'<td><a href="/function/view_table?table={table}">檢視</a></td>'
                html += f'<td><a href="/function/export_table?table={table}">CSV</a></td>'
                html += f'<td><a href="/function/drop_table?table={table}" onclick=' +\
                        f'"return confirm(\'確定要刪除資料表 {table} 嗎?\')">刪除</a></td>'
                html += '</tr>'
        html += '</table>'
        # 管理連結:新增資料表、回到主頁、登出等
        html += '<br><a href="/function/add_table">新增資料表</a> '
        html += '<a href="/function/execute_sql">執行 SQL</a> '
        html += '<a href="/function/list_functions">返回函式列表</a> '
        html += '<a href="/logout">登出</a>'
        conn.close()
        return html
    except sqlite3.Error as e:
        return f'<p>連線資料庫失敗 {str(e)}</p>'
    finally:
        if 'conn' in locals():
            conn.close()    

此處 protected 用來列舉系統資料表, 避免在操作中被誤改, 目前僅統計呼叫次數 call_stats 資料表而已, 為了測試方便先暫時維持空串列, 測試完再把 call_stats 加入串列中. 




所有資料庫操作的功能都在此網頁中. 


2. add_table.py :

此模組根據指定之 Schema 新增一個資料表.

# add_table.py
import os
import sqlite3
from flask import request

def main(request, **kwargs):
    DB_PATH='./serverless.db'
    # 還沒送出表單 : 顯示建立表單頁面
    if request.method == 'GET':
        html=(
            "<h2>新增資料表</h2>"
            "<form method='post' action='/function/add_table'>"
            "<label>資料表名稱:</label><br>"
            "<input type='text' name='table' required><br><br>"
            "<label>欄位定義 (Schema, 以逗號分隔):</label><br>"
            "<textarea name='schema' rows='4' cols='70' "
            "placeholder='例如:id INTEGER PRIMARY KEY, name TEXT, age INTEGER, height REAL' "
            "required></textarea><br><br>"
            "<input type='submit' value='建立資料表'>"
            "</form>"
            "<a href='/function/list_tables'>返回資料表列表</a>"
            )
        return html
    # 處理表單送出
    if request.method == 'POST':
        table=request.form.get('table', '').strip()
        schema=request.form.get('schema', '').strip()
        if not table or not schema:
            return '<p>請輸入資料表名稱與欄位定義!<a href="/function/add_table">返回</a></p>'
        if not os.path.exists(DB_PATH):
            return '''
                <p>資料庫檔案 serverless.db 不存在!
                <a href="/function/list_tables">返回</a></p>
                '''
        try: # 建立資料表
            conn=sqlite3.connect(DB_PATH)
            cur=conn.cursor()
            cur.execute(f'CREATE TABLE IF NOT EXISTS "{table}" ({schema});')
            conn.commit()
            conn.close()
            return f'''
                <p>資料表 <b>{table}</b> 建立成功!</p>
                <a href="/function/list_tables">返回資料表列表</a>
                '''
        except sqlite3.Error as e:
            return f'<p>建立資料表失敗:{str(e)} <a href="/function/add_table">返回</a></p>'
        finally:
            if 'conn' in locals():
                conn.close()            

此模組集輸入表單與建立資料表兩個動作於一身, 依據 HTTP 方法來判定要做哪個動作, 剛進入 /function/add_table 時是 GET 方法, 這時會顯示輸入表單頁面 : 




此處以建立市圖借書預約資訊的資料表 ksml_books 為例, 輸入如下 Schema :

account PRIMARY KEY, borrow_books TEXT, reserve_books TEXT, updated_at TEXT 

按建立資料表按鈕會以 POST 方法再次請求 /function/add_table, 這時會執行 CREATE TABLE 指令建立資料表 : 




按 ksml_books 資料表 Schema 欄的顯示鈕會請求 /function/show_schema 模組, 其程式碼如下 :

# show_schema.py
import os
import sqlite3
from urllib.parse import parse_qs

def main(request, **kwargs):
    # 解析 URL 參數取得資料表名稱
    query=request.query_string.decode('utf-8')
    params=parse_qs(query)
    table=params.get('table', [''])[0]
    DB_PATH='./serverless.db'
    # 檢查資料庫是否存在
    if not os.path.exists(DB_PATH):
        return '''
        <p>資料庫檔案 serverless.db 不存在!
        <a href="/function/list_tables">返回資料表列表</a></p>
        '''
    # 檢查 URL 參數中是否有 table 名稱
    if not table:
        return '''
        <p>缺少 table 參數!
        <a href="/function/list_tables">返回資料表列表</a></p>
        '''
    try:
        conn=sqlite3.connect(DB_PATH)
        cur=conn.cursor()
        # 檢查資料表是否存在
        cur.execute("SELECT name FROM sqlite_master WHERE type='table' AND name=?;", (table,))
        exists=cur.fetchone()
        if not exists:
            conn.close()
            return f'''
            <p>資料表 <b>{table}</b> 不存在!
            <a href="/function/list_tables">返回資料表列表</a></p>
            '''
        # 取得欄位資訊
        cur.execute(f"PRAGMA table_info('{table}');")
        columns=cur.fetchall()
        # 取得 Schema 的 SQL
        cur.execute("SELECT sql FROM sqlite_master WHERE type='table' AND name=?;", (table,))
        create_sql=cur.fetchone()[0]
        conn.close()
        # 產生 HTML
        html=f'<h2>資料表 {table} 結構 (Schema)</h2>'
        html += '<table border="1" cellspacing="0" cellpadding="6" style="border-collapse: collapse;">'
        html += '<tr><th>#</th><th>欄位名稱</th><th>資料型別</th><th>非空</th><th>預設值</th><th>主鍵</th></tr>'
        for col in columns:
            cid, name, ctype, notnull, dflt_value, pk=col
            html += f'<tr>'
            html += f'<td>{cid}</td>'
            html += f'<td>{name}</td>'
            html += f'<td>{ctype}</td>'
            html += f'<td>{"✓" if notnull else ""}</td>'
            html += f'<td>{dflt_value if dflt_value is not None else ""}</td>'
            html += f'<td>{"✓" if pk else ""}</td>'
            html += '</tr>'
        html += '</table>'
        # 顯示 CREATE TABLE 語法
        html += '<h3>CREATE TABLE SQL 語法</h3>'
        html += f'<pre style="background-color:#f4f4f4;padding:10px;">{create_sql}</pre>'
        # 導覽連結
        html += '<a href="/function/list_tables">返回資料表列表</a>'
        return html
    except sqlite3.Error as e:
        return f'<p>讀取資料表結構失敗:{str(e)}</p>'
    finally:
        if 'conn' in locals():
            conn.close()

此程式使用 urllib.parse.parse_qs() 函式來解析請求之 URL 字串以取得 table 參數, 先用 PRAGMA table_info('{table}') 指令取得資料表的欄位資訊, 然後從 SQLite 的系統資料表 sqlite_master 的 sql 欄位中取得建立此資料表之 SQL 指令, 最後利用迴圈顯示欄位資訊與 Schema (SQL 指令) :




接下來實作 /function/execute_sql 並利用它來寫入資料表 :

# execute_sql.py
import os
import sqlite3
from flask import request
from html import escape   

def main(request, **kwargs):
    DB_PATH='./serverless.db'
    # 還沒送出表單 (GET):顯示 SQL 輸入頁面
    if request.method == 'GET':
        html=(
            "<h2>執行 SQL 指令</h2>"
            "<form method='post' action='/function/execute_sql'>"
            "<label>輸入 SQL 指令:</label><br>"
            "<textarea name='sql' rows='6' cols='80' "
            "placeholder='例如:SELECT * FROM call_stats;' required></textarea><br><br>"
            "<input type='submit' value='執行'>"
            "</form>"
            "<a href='/function/list_tables'>返回資料表列表</a>"
        )
        return html  # Removed conn.close() since conn is not defined
    # 處理表單送出 (POST):執行 SQL 指令
    if request.method == 'POST':
        sql=request.form.get('sql', '').strip()  # 取出 sql 欄位
        if not sql:
            return '<p>請輸入 SQL 指令!<a href="/function/execute_sql">返回</a></p>'
        if not os.path.exists(DB_PATH):
            return (
                '<p>資料庫檔案 serverless.db 不存在!'
                '<a href="/function/list_tables">返回</a></p>'
            )
        try:  # 執行 SQL 指令 
            conn=sqlite3.connect(DB_PATH)  # 建立連線與 cursor
            cur=conn.cursor()
            # SELECT 查詢 : 執行查詢後取出所有結果
            if sql.lower().startswith('select'):
                cur.execute(sql)
                rows=cur.fetchall()
                columns=[desc[0] for desc in cur.description]  # 取得欄位名稱
                # 將查詢結果轉成 HTML 表格
                if not rows:
                    result="<p>查無資料。</p>"
                else:
                    result='<table border="1" cellspacing="0" cellpadding="6" style="border-collapse: collapse;">'
                    result += "<tr>" + "".join(f"<th>{escape(col)}</th>" for col in columns) + "</tr>"
                    for row in rows: 
                        result += "<tr>" + "".join(f"<td>{escape(str(v))}</td>" for v in row) + "</tr>"
                    result += "</table>"
                conn.close()  # 關閉資料庫連線
                return (
                    f"<h2>SQL 查詢結果</h2>"
                    f"<p><b>指令:</b> {escape(sql)}</p>"
                    f"{result}"
                    "<br><a href='/function/execute_sql'>返回</a>"
                )
            else:  # 其他非 SELECT 指令 (INSERT, UPDATE, DELETE, DROP/CREATE TABLE) 
                cur.execute(sql)
                conn.commit()
                affected=cur.rowcount
                conn.close()
                return (
                    f"<p>SQL 指令執行成功!(影響 {affected} 筆紀錄)</p>"
                    f"<p><b>指令:</b> {escape(sql)}</p>"
                    "<a href='/function/execute_sql'>返回</a>"
                )
        except sqlite3.Error as e:
            conn.close()  # 關閉資料庫連線
            return f"<p>SQL 執行錯誤:{escape(str(e))}</p><a href='/function/execute_sql'>返回</a>"

此程式與 create_table.py 結構相同, 都是利用判別 HTTP 請求是 GET 或 POST 方法來決定回應方式, 若是 GET 方法就回應一個 SQL 輸入頁面; 若是 POST 方法就執行表單傳來的 SQL 指令 (分成 SELECT 與非 SELECT 指令兩類). 另外, 與上面修改 edit_function.py 的原因一樣, 為了避免網頁版面崩掉與 XSS 注入, 此程式使用 html.escape() 來跳脫 HTML 字元. 

第一次請求 /function/execute_sql 時是 GET 方法, 這時會顯示 SQL 輸入頁面, 測試用的 SQL 寫入指令例如 : 

INSERT OR REPLACE INTO ksml_books (account, borrow_books, reserve_books, updated_at)
VALUES ('tony', '["Python入門", "資料庫設計"]', '["AI導論"]', '2025-10-19 14:39:00')

此處因為 account 是不能重複的主鍵, 所以我們使用 INSERT OR REPLACE 指令, 當然也可以第一次用 INSERT 後續用 UPDATE 指令, 但 用 INSERT OR REPLACE 較單純 :





新增紀錄成功後, 再新增下一筆紀錄 : 

INSERT OR REPLACE INTO ksml_books (account, borrow_books, reserve_books, updated_at)
VALUES ('amy', '["京都時光"]', '["關西地圖"]', '2025-10-19 14:45:00')

這樣回到資料表列表就會看到 ksml_stats 有兩筆紀錄了 :




按檢視紀錄欄中的檢視超連結就會呼叫 /function/view_table 顯示資料表內容, 程式碼如下 :

# view_table.py
import os
import sqlite3
import html
from urllib.parse import parse_qs

def main(request, **kwargs):
    # 解析 URL 查詢參數 (?table=xxx&page=1)
    query=request.query_string.decode('utf-8')
    params=parse_qs(query)
    table=params.get('table', [''])[0]
    try:
        page=int(params.get('page', ['1'])[0])
        if page < 1:
            page=1
    except ValueError:
        page=1    
    DB_PATH='./serverless.db'
    ROWS_PER_PAGE=50  # 每頁顯示筆數
    # 檢查資料庫是否存在
    if not os.path.exists(DB_PATH):
        return '''
        <p>資料庫檔案 serverless.db 不存在!
        <a href="/function/list_tables">返回資料表列表</a></p>
        '''
    # 檢查 URL 參數中是否有 table 名稱
    if not table:
        return '''
        <p>缺少 table 參數!
        <a href="/function/list_tables">返回資料表列表</a></p>
        '''
    try:
        conn=sqlite3.connect(DB_PATH)
        cur=conn.cursor()
        # 安全檢查 (避免惡意表名注入)
        cur.execute("""
            SELECT name FROM sqlite_master
            WHERE type='table' AND name=?
        """, (table,))
        if not cur.fetchone():
            conn.close()
            return f'<p>資料表 "{html.escape(table)}" 不存在!</p>'
        # 取得欄位名稱
        cur.execute(f'SELECT * FROM "{table}" LIMIT 1;')
        colnames=[d[0] for d in cur.description] if cur.description else []
        # 計算總筆數
        cur.execute(f'SELECT COUNT(*) FROM "{table}";')
        total_rows=cur.fetchone()[0]
        total_pages=(total_rows + ROWS_PER_PAGE - 1) // ROWS_PER_PAGE
        # 取得當前頁的資料
        offset=(page - 1) * ROWS_PER_PAGE
        cur.execute(f'SELECT * FROM "{table}" LIMIT ? OFFSET ?;', (ROWS_PER_PAGE, offset))
        rows=cur.fetchall()
        conn.close()
        # 組成 HTML
        text=f'<h2>資料表 {html.escape(table)} 內容</h2>'
        text += '<table border="1" cellspacing="0" cellpadding="6" style="border-collapse: collapse;">'
        # 欄位列
        if colnames:
            text += '<tr>' + ''.join(f'<th>{html.escape(c)}</th>' for c in colnames) + '</tr>'
        else:
            text += '<tr><td colspan="99">(無欄位)</td></tr>'
        # 資料列
        if rows:
            for row in rows:
                text += '<tr>' + ''.join(f'<td>{html.escape(str(v))}</td>' for v in row) + '</tr>'
        else:
            text += '<tr><td colspan="99">(本頁無資料)</td></tr>'
        text += '</table>'
        # 分頁列
        text += f'<p>共 {total_rows} 筆資料,頁 {page}/{total_pages}</p>'
        text += '<div style="margin:10px 0;">'
        if page > 1:
            text += f'<a href="/function/view_table?table={html.escape(table)}&page={page-1}">上一頁</a> '
        if page < total_pages:
            text += f'<a href="/function/view_table?table={html.escape(table)}&page={page+1}">下一頁</a>'
        text += '</div>'
        text += '<a href="/function/list_tables">返回資料表列表</a>'
        return text
    except Exception as e:
        return f'<p>讀取資料時發生錯誤:{html.escape(str(e))}</p>'
    finally:
        if 'conn' in locals():
            conn.close()

此程式使用分頁方式來顯示資料表內的全部紀錄 (預設每頁 50 筆), 首先用 urllib.parse.parse_qs() 函式來解析請求之 URL 字串以取得 table 與 page 參數, 然後用 SELECT COUNT 指令取得紀錄總筆數以便計算要分幾頁來顯示, 結果如下 :




按資料表列表匯出欄的 CSV 超連結會呼叫 /function/export_table 來匯出該資料表內容為 CSV 檔, 程式碼如下 :

# export_table.py
import os
import sqlite3
import csv
from io import StringIO
from flask import Response, request

def main(request, **kwargs):
    # 解析 URL 查詢參數 (?table=xxx)
    table=request.args.get('table')
    if not table:
        return '<p>未指定資料表名稱!<a href="/function/list_tables">返回列表</a></p>'
    DB_PATH='./serverless.db'
    # 檢查資料庫是否存在   
    if not os.path.exists(DB_PATH):
        return '<p>資料庫檔案 serverless.db 不存在!<a href="/function/list_tables">返回列表</a></p>'
    try:
        conn=sqlite3.connect(DB_PATH)
        cur=conn.cursor()
        # 取得欄位名稱
        try:
            cur.execute(f'PRAGMA table_info("{table}")')
            columns_info=cur.fetchall()
            if not columns_info:
                return f'<p>資料表 {table} 不存在!<a href="/function/list_tables">返回列表</a></p>'
            columns=[col[1] for col in columns_info]  # col[1] 是欄位名稱
        except Exception as e:
            return f'<p>取得欄位失敗: {str(e)}</p>'
        # 取得所有資料
        try:
            cur.execute(f'SELECT * FROM "{table}"')
            rows=cur.fetchall()
        except Exception as e:
            return f'<p>讀取資料失敗: {str(e)}</p>'
        # 將資料寫入 CSV
        output=StringIO()
        writer=csv.writer(output)
        writer.writerow(columns)
        writer.writerows(rows)
        csv_data=output.getvalue()
        output.close()
        # 回傳 CSV 給瀏覽器下載
        csv_bytes=csv_data.encode('utf-8-sig')        
        return Response(
            csv_bytes,
            mimetype='text/csv',
            headers={'Content-Disposition': f'attachment; filename="{table}.csv"'}
            )   
    except sqlite3.Error as e:
        return f'<p>連線資料庫失敗 {str(e)}</p>'
    finally:
        if 'conn' in locals():
            conn.close()

此程式使用 io.StringIO 類別來將資料表內容轉成 CSV 資料, 注意, 此處為了解決 Excel 在 Windows 版的 BOM 中文亂碼問題, 此 CSV 資料還呼叫了 encode() 來指定 'utf-8-sig' 編碼格式, 最後將編碼後的 CSV 資料利用 flask.Response() 函式傳給瀏覽器下載 : 




開啟 ksml_books.csv 結果如下 :




最後一個模組是用來刪除資料表的 /function/drop_table, 程式碼如下 :

# drop_table.py
import os
import sqlite3
from flask import request, redirect

def main(request, **kwargs):
    # 解析 URL 參數取得資料表名稱
    query=request.query_string.decode('utf-8')
    params=parse_qs(query)
    table=params.get('table', [''])[0]
    DB_PATH='./serverless.db'
    # 檢查資料庫是否存在
    if not os.path.exists(DB_PATH):
        return '''
            <p>資料庫檔案 serverless.db 不存在!
            <a href="/function/list_tables">返回資料表列表</a></p>
            '''
    # 設定保護清單,避免刪除重要系統表
    protected=['call_stats']
    if table in protected:
        return f'''
            <p>資料表 {table} 為保護表,無法刪除!
            <a href="/function/list_tables">返回列表</a></p>
            '''
    try:
        conn=sqlite3.connect(DB_PATH)
        cur=conn.cursor()
        # 執行刪除資料表
        cur.execute(f'DROP TABLE IF EXISTS "{table}"')
        conn.commit()
        conn.close()
        # 刪除成功後返回資料表列表
        return f'''
            <p>資料表 {table} 已刪除成功!
            <a href="/function/list_tables">返回列表</a></p>
            '''
    except sqlite3.Error as e:
        return f'<p>刪除資料表 {table} 失敗: {str(e)}</p>'
    finally:
        if 'conn' in locals():
            conn.close()

此程式使用 DROP TABLE 指令來刪除指定資料表, 在資料表列表的刪除欄位中按刪除鈕, 經過確認即可刪除資料表 :






可見 ksml_books 資料表已被刪除. 

OK, 這樣就報完工啦!


2025-10-19 補充 : 

剛剛發現似乎缺了刪除紀錄的功能, 所以我又修改了 view_table.py, 在表格最後添加 "刪除" 欄, 按下超連結菁確認後會呼叫 delete_record.py, 攜帶 table 與 pk (帳號主鍵), 刪除完回 view_table.py :

# view_table.py
import os
import sqlite3
import html
from urllib.parse import parse_qs, quote

def main(request, **kwargs):
    # 解析 URL 查詢參數 (?table=xxx&page=1)
    query=request.query_string.decode('utf-8')
    params=parse_qs(query)
    table=params.get('table', [''])[0]
    try:
        page=int(params.get('page', ['1'])[0])
        if page < 1:
            page=1
    except ValueError:
        page=1    
    DB_PATH='./serverless.db'
    ROWS_PER_PAGE=50  # 每頁顯示筆數
    # 檢查資料庫是否存在
    if not os.path.exists(DB_PATH):
        return '''
        <p>資料庫檔案 serverless.db 不存在!<a href="/function/list_tables">返回資料表列表</a></p>
        '''
    if not table:
        return '''
        <p>缺少 table 參數!<a href="/function/list_tables">返回資料表列表</a></p>
        '''
    try:
        conn=sqlite3.connect(DB_PATH)
        cur=conn.cursor()
        # 安全檢查 (避免惡意表名注入)
        cur.execute("""
            SELECT name FROM sqlite_master
            WHERE type='table' AND name=?
        """, (table,))
        if not cur.fetchone():
            conn.close()
            return f'<p>資料表 "{html.escape(table)}" 不存在!</p>'
        # 取得欄位名稱
        cur.execute(f'SELECT * FROM "{table}" LIMIT 1;')
        colnames=[d[0] for d in cur.description] if cur.description else []
        # 計算總筆數
        cur.execute(f'SELECT COUNT(*) FROM "{table}";')
        total_rows=cur.fetchone()[0]
        total_pages=(total_rows + ROWS_PER_PAGE - 1) // ROWS_PER_PAGE
        # 取得當前頁的資料
        offset=(page - 1) * ROWS_PER_PAGE
        cur.execute(f'SELECT * FROM "{table}" LIMIT ? OFFSET ?;', (ROWS_PER_PAGE, offset))
        rows=cur.fetchall()
        conn.close()

        # 組成 HTML
        text=f'<h2>資料表 {html.escape(table)} 內容</h2>'
        text += '<table border="1" cellspacing="0" cellpadding="6" style="border-collapse: collapse;">'
        # 欄位列
        if colnames:
            text += '<tr>'
            text += ''.join(f'<th>{html.escape(c)}</th>' for c in colnames)
            text += '<th>刪除</th>'  # 新增刪除欄位
            text += '</tr>'
        else:
            text += '<tr><td colspan="99">(無欄位)</td></tr>'
        # 資料列
        if rows:
            for row in rows:
                text += '<tr>'
                for v in row:
                    text += f'<td>{html.escape(str(v))}</td>'
                # 假設第一個欄位是主鍵
                pk_value = quote(str(row[0]))
                text += f'<td><a href="/function/delete_record?table={quote(table)}&pk={pk_value}" ' \
                        f'onclick="return confirm(\'確定要刪除此筆紀錄嗎?\')">刪除</a></td>'
                text += '</tr>'
        else:
            text += '<tr><td colspan="99">(本頁無資料)</td></tr>'
        text += '</table>'
        # 分頁列
        text += f'<p>共 {total_rows} 筆資料,頁 {page}/{total_pages}</p>'
        text += '<div style="margin:10px 0;">'
        if page > 1:
            text += f'<a href="/function/view_table?table={quote(table)}&page={page-1}">上一頁</a> '
        if page < total_pages:
            text += f'<a href="/function/view_table?table={quote(table)}&page={page+1}">下一頁</a>'
        text += '</div>'
        text += '<a href="/function/list_tables">返回資料表列表</a>'
        return text
    except Exception as e:
        return f'<p>讀取資料時發生錯誤:{html.escape(str(e))}</p>'
    finally:
        if 'conn' in locals():
            conn.close()





確認刪除會呼叫 delete_record.py :

# delete_record.py
import os
import sqlite3
from flask import redirect, request
from html import escape

def main(request, **kwargs):
    table=request.args.get('table', '').strip()
    pk=request.args.get('pk', '').strip()
    DB_PATH='./serverless.db'
    # 檢查參數
    if not table or not pk:
        return f'<p>缺少 table 或 pk 參數!<a href="/function/list_tables">返回列表</a></p>'
    # 檢查資料庫
    if not os.path.exists(DB_PATH):
        return f'<p>資料庫檔案 serverless.db 不存在!<a href="/function/list_tables">返回列表</a></p>'
    try:  # 刪除紀錄
        conn=sqlite3.connect(DB_PATH)
        cur=conn.cursor()
        # 安全檢查:確認 table 存在
        cur.execute("SELECT name FROM sqlite_master WHERE type='table' AND name=?", (table,))
        if not cur.fetchone():
            return f'<p>資料表 "{escape(table)}" 不存在!<a href="/function/list_tables">返回列表</a></p>'
        # 執行刪除
        cur.execute('DELETE FROM ksml_books WHERE account=?', (pk,))
        conn.commit()
        return redirect(f'/function/view_table?table={table}')
    except sqlite3.Error as e:
        return f'<p>刪除紀錄失敗:{escape(str(e))}</p><a href="/function/view_table?table={escape(table)}">返回</a>'
    finally:
        if 'conn' in locals():
            conn.close()

刪除完返回 view_table.py :




雖然手動刪除使用機會不大, 但系統功能還是要完備. 

最後修改 serverless.py, 將上面 8 個與資料庫管理相關的函式列入被保護的系統模組避免被誤改誤刪 : 

# 需要驗證的函式列表
PROTECTED_FUNCTIONS=['list_functions',
                     'add_function',
                     'save_function',
                     'edit_function',
                     'update_function',
                     'delete_function',
                     'show_stats',
                     'clear_stats',
                     'list_tables',
                     'add_table',
                     'drop_table',
                     'view_table',
                     'show_schema',
                     'export_table',
                     'execute_sql',
                     'delete_record'
                     ]

我把此次 v5 改版 zip 放在 GitHub :


而持續小幅修改的則放在 render.com 部署用的 repo :



2025-10-21 補充 :

今天修改了 view_table.py 中顯示資料表內容的程式碼, 將欄位內容中的 \n 跳行改為 <br> 以便在網頁中呈現跳行效果 : 

        # 資料列
        if rows:
            for row in rows:
                text += '<tr>'
                for v in row:
                    html_nl2br=html.escape(str(v)).replace("\n", "<br>")
                    text += f'<td>{html_nl2br}</td>'

它原本只是 html.escape(str(v)) 而已, 這樣 \n 就沒有跳行效果. 注意, 如果在 f 字串中處理 \n 字元會出錯, 必須先處理完再放進 f 字串裡. 

沒有留言 :