2024年10月30日 星期三

MicroPython 學習筆記 : xtools for ESP32

昨天借助 ChatGPT 對 ESP8266 版的 xtools.py 進行了記憶體耗用上的優化, 參考 :


雖然 ESP32 開發板基本上有較大的 SRAM (520KB), 但寫程式不浪費記憶體仍是基本要求, 所以今天把 ESP32 版的 xtools.py 也丟給 ChatGPT 去檢視看看哪裡可以改進, 前一篇 ESP8266 重疊的部分就不列了, ChatGPT 修改建議如下 :

1. 將 set_ap() 中不再使用的變數 (例如 data, request 等) 在使用完後手動設定為 None 以協助垃圾回收進行內存釋放.   

def set_ap(led=2):
    html='''
    <!DOCTYPE html>
    <html>
      <head>
        <meta charset="utf-8">
        <meta name="viewport" content="width=device-width,initial-scale=1">
      </head>
      <body>
        %s
      </body>
    </html>
    '''
    form='''
        <form method='get' action='/update_ap' 
        style='width: max-content; margin: 10px auto'>
          <h2 style='text-align: center; font-size: 20px'>設定 WiFi 基地台</h2>
          <div style='display: block; margin-bottom: 20px'>
            <label for='ssid' style='display: block; font-size: 16px'>SSID</label>
            <input type='text' id='ssid' name='ssid' 
            style='padding: 10px 8px; width: 100%; font-size: 16px'>
          </div>
          <div style='display: block; margin-bottom: 20px'>
            <label for='pwd' style='display: block; font-size: 16px'>Password</label>
            <input type='text' id='pwd' name='pwd' 
            style='padding: 10px 8px; width: 100%; font-size: 16px'>
          </div>
          <button type="submit" style='width:100%;font-size: 16px'>連線</button>
        </form>
    '''
    ok='''
       <h2>WiFi 連線成功<br>IP : <a href={0}>{0}</a></h2>
       <a href=192.168.4.1>
         <button style="width:100%;font-size: 16px">重新設定</button>
       </a>              
    '''
    ng='''
       <h2 style="text-align: center;">WiFi 基地台連線失敗<br> 
       按 Reset 鈕後重新設定</h2>
       <a href="192.168.4.1">
         <button style="width:100%;font-size: 16px">重新設定</button>
       </a>   
    '''
    wifi_led=Pin(led, Pin.OUT, value=1)  # 預設熄滅板上 LED
    ap=network.WLAN(network.AP_IF)       # 開啟 AP 模式
    ap.active(True)
    sta=network.WLAN(network.STA_IF)     # 開啟 STA 模式
    sta.active(True)
    import socket
    addr=socket.getaddrinfo('192.168.4.1', 80)[0][-1] # 傳回 (ip, port)
    s=socket.socket()  # 建立伺服端 TCP socket
    s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # 網址可重複請求
    s.bind(addr)  # 綁定 192.168.4.1 的 80 埠
    s.listen(5)   # 最多同時 5 個連線
    print('網頁伺服器正在監聽 : ', addr)
    while True:   # 監聽 192.168.4.1 的 80 埠
        cs, addr=s.accept()  
        print('發現來自客戶端的連線 : ', addr)
        data=cs.recv(1024)      
        request=str(data, 'utf8')   
        print(request, end='\n')
        del data, addr   
        if request.find('update_ap?') == 5:  # 檢查是否為更新之 URL 
            # 擷取請求參數中的 SSID 與密碼
            para=request[request.find('ssid='):request.find(' HTTP/')]
            ssid=para.split('&')[0].split('=')[1] 
            pwd=para.split('&')[1].split('=')[1]
            sta.connect(ssid, pwd)       # 連線 WiFi 基地台
            start_time=time.time()       # 紀錄起始時間  
            while not sta.isconnected(): # 連線 WiFi (15 秒)
                wifi_led.value(0)  # 讓板載 LED 閃爍
                time.sleep_ms(300)
                wifi_led.value(1)
                time.sleep_ms(300)                
                if time.time()-start_time > 15: # 是否超過連線秒數
                    print('WiFi 連線逾時!')
                    break  # 逾時跳出無限迴圈
            # 確認是否連線成功
            if sta.isconnected():     # WiFi 連線成功
                print('WiFi 連線成功 : ', sta.ifconfig())
                ip=sta.ifconfig()[0]  # 取得 ip
                print('取得 IP : ' + ip)
                with open('config.py', 'w', encoding='utf-8') as f:
                    f.write(f'SSID="{ssid}"\nPASSWORD="{pwd}"') # 更新設定檔
                cs.send(html % ok.format(ip))  # 回應連線成功頁面
                for i in range(25):   # 連線成功 : 快閃 5 秒
                    wifi_led.value(0)
                    time.sleep_ms(100)
                    wifi_led.value(1)
                    time.sleep_ms(100)
                cs.close()
                s.close()
                return ip
            else:
                print('WiFi 連線失敗 : 請按 Reset 鈕後重設.')
                wifi_led.value(1)     # 連線失敗 : 熄滅 LED
                cs.send(html % ng)    # 回應連線失敗頁面
                cs.close()
                s.close()
                return None
        else:  # 顯示設定 WiFi 頁面
            cs.send(html % form)  # 回應設定 WiFi 頁面
        cs.close()
        del cs, request   

更動的部分是將 del data, addr 向上移入 while 迴圈內而已. 

2. 將 set_ap() 函式中的 html、form、ok 和 ng 的 HTML 内容拆出來成為網頁模板, 然後於 set_ap() 中呼叫新增的 load_template() 載入網頁內容來用 :

例如:
    (1). template.html:包含主要的 HTML 框架
    (2). form_template.html:包含 WiFi 設定的表單
    (3). ok_template.html:包含 WiFi 連線成功的內容
    (4). ng_template.html:包含 WiFi 連線失敗的內容

set_ap() 修改如下 :

def load_template(filename):
    """從文件讀取 HTML 模板"""
    with open(filename, 'r', encoding='utf-8') as file:
        return file.read()

def set_ap(led=2):
    # 載入 HTML 模板文件
    html_template = load_template('template.html')
    form_template = load_template('form_template.html')
    ok_template = load_template('ok_template.html')
    ng_template = load_template('ng_template.html')

    wifi_led = Pin(led, Pin.OUT, value=1)  # 預設熄滅板上 LED
    ap = network.WLAN(network.AP_IF)       # 開啟 AP 模式
    ap.active(True)
    sta = network.WLAN(network.STA_IF)     # 開啟 STA 模式
    sta.active(True)

    import socket
    addr = socket.getaddrinfo('192.168.4.1', 80)[0][-1]  # 傳回 (ip, port)
    s = socket.socket()  # 建立伺服端 TCP socket
    s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)  # 網址可重複請求
    s.bind(addr)  # 綁定 192.168.4.1 的 80 埠
    s.listen(5)   # 最多同時 5 個連線
    print('網頁伺服器正在監聽 : ', addr)

    while True:
        cs, addr = s.accept()
        print('發現來自客戶端的連線 : ', addr)
        data = cs.recv(1024)
        request = str(data, 'utf8')
        print(request, end='\n')
        del data, addr 

        if request.find('update_ap?') == 5:  # 檢查是否為更新之 URL
            para = request[request.find('ssid='):request.find(' HTTP/')]
            ssid = para.split('&')[0].split('=')[1]
            pwd = para.split('&')[1].split('=')[1]
            sta.connect(ssid, pwd)       # 連線 WiFi 基地台
            start_time = time.time()     # 紀錄起始時間

            while not sta.isconnected():  # 連線 WiFi (15 秒)
                wifi_led.value(0)  # 讓板載 LED 閃爍
                time.sleep_ms(300)
                wifi_led.value(1)
                time.sleep_ms(300)
                if time.time() - start_time > 15:  # 是否超過連線秒數
                    print('WiFi 連線逾時!')
                    break  # 逾時跳出無限迴圈

            if sta.isconnected():  # WiFi 連線成功
                print('WiFi 連線成功 : ', sta.ifconfig())
                ip = sta.ifconfig()[0]
                print('取得 IP : ' + ip)
                with open('config.py', 'w', encoding='utf-8') as f:
                    f.write(f'SSID="{ssid}"\nPASSWORD="{pwd}"')  # 更新設定檔
                # 回應連線成功頁面
                cs.send(html_template % ok_template.format(ip))
                for i in range(25):  # 快閃 5 秒
                    wifi_led.value(0)
                    time.sleep_ms(100)
                    wifi_led.value(1)
                    time.sleep_ms(100)
                cs.close()
                s.close()
                return ip
            else:  # WiFi 連線失敗
                print('WiFi 連線失敗 : 請按 Reset 鈕後重設.')
                wifi_led.value(1)  # 熄滅 LED
                cs.send(html_template % ng_template)  # 回應連線失敗頁面
                cs.close()
                s.close()
                return None
        else:  # 顯示設定 WiFi 頁面
            cs.send(html_template % form_template)  # 回應設定 WiFi 頁面
        cs.close()
        del cs, request

    不過這個建議我沒採納, 因為這樣就多出四個網頁檔出來, 還不如內建在 xtools 中方便


3. 在line_msg(), line_sticker(), line_image_url(), 和 ask_gpt() 等函式中, 重複的 HTTP header 可以提取到共用函數中統一處理.  

修改後的程式碼如下 : 

def get_headers(service, token=None, api_key=None):
    headers={
        "Content-Type": "application/x-www-form-urlencoded"
        } # 須有初始值 
    if service == 'line' and token: # 根據服務類型返回標頭
        headers["Authorization"]="Bearer " + token
    elif service == 'openai' and api_key:
        headers["Authorization"]="Bearer " + api_key
        headers["Content-Type"]="application/json"
    return headers

def line_msg(token, message):
    url="https://notify-api.line.me/api/notify"
    headers=get_headers('line', token=token)    
    params={"message": message}  # 參數字典
    # 將參數字典轉成 URL 字串, 再轉成 utf-8 編碼的 bytes 
    payload=urlencode(params).encode('utf-8')
    # 用編碼後的 payload 傳給 data 參數發送 POST 請求
    r=urequests.post(url, headers=headers, data=payload)  
    if r is not None and r.status_code == 200:
        print("Message has been sent.")
    else:
        print("Error! Failed to send notification message.")  
    r.close()  # 關閉連線

def line_sticker(token, message, stickerPackageId, stickerId):
    url="https://notify-api.line.me/api/notify"
    headers=get_headers('line', token=token)
    # 設定正確的 payload
    params={
        "message": message,
        "stickerPackageId": stickerPackageId,
        "stickerId": stickerId
        }
    # 使用自訂的 urlencode 函數將參數編碼,並轉換成 UTF-8 的字節串
    payload=urlencode(params).encode('utf-8')
    # 發送 POST 請求
    r=urequests.post(url, headers=headers, data=payload)
    # 判斷是否成功
    if r is not None and r.status_code == 200:
        return "The sticker has been sent."
    else:
        return "Error! Failed to send the sticker."

def line_image_url(token, message, image_url):
    # 透過 LINE Notify 發送雲端圖片
    url="https://notify-api.line.me/api/notify"
    headers=get_headers('line', token=token)
    # 構造請求的數據,包含圖片的 URL
    params={
        "message": message,
        "imageFullsize": image_url,  # 完整圖片的 URL
        "imageThumbnail": image_url  # 縮略圖圖片 URL,可與完整圖片相同
        }
    # 轉成 URL 字串並用 utf-8 編碼為 bytes 
    payload=urlencode(params).encode('utf-8')
    # 發送 POST 請求
    r=urequests.post(url, headers=headers, data=payload)
    # 判斷是否成功
    if r is not None and r.status_code == 200:
        return "The image URL has been sent."
    else:
        return "Error! Failed to send the image URL."

def ask_gpt(prompt, api_key, model='gpt-4o-mini'):
    url='https://api.openai.com/v1/chat/completions'
    headers=get_headers('openai', api_key=api_key)
    # 建立 data 參數字典
    data={
        'model': model,
        'messages': [{'role': 'user', 'content': prompt}]
        }
    # 將字典轉成字串後再編碼成 UTF-8
    payload=ujson.dumps(data).encode('utf-8')
    # 發送 POST 請求
    response=urequests.post(url, headers=headers, data=payload)
    if response.status_code == 200:
        reply=response.json() # 轉成字典
        return reply['choices'][0]['message']['content']
    else:
        return response.json()  # 返回錯誤信息

測試結果如下 :

MicroPython v1.23.0 on 2024-06-02; Generic ESP32 module with ESP32
Type "help()" for more information.

>>> import config   
>>> import xtools     
>>> ip=xtools.connect_wifi(led=5)   
network config: ('192.168.192.189', '255.255.255.0', '192.168.192.92', '192.168.192.92')
>>> line_token=config.LINE_NOTIFY_TOKEN 
>>> openai_api_key=config.OPENAI_API_KEY    
>>> message='test'   
>>> image_url='https://cdn.pixabay.com/photo/2024/03/15/17/50/dogs-8635461_1280.jpg'    
>>> xtools.line_msg(line_token, message)    
Message has been sent.   
>>> xtools.line_sticker(line_token, message, 1, 4)      
'The sticker has been sent.'    
>>> xtools.line_image_url(line_token, message, image_url)      
'The image URL has been sent.'
>>> prompt='Who are you'         
>>> print(xtools.ask_gpt(prompt, openai_api_key))     
I am an AI language model created by OpenAI, designed to assist with a wide range of questions and topics by providing information and generating text-based responses. How can I help you today?

結果如下 : 




程式碼 :

import config   
import xtools     
ip=xtools.connect_wifi(led=5)   
line_token=config.LINE_NOTIFY_TOKEN 
openai_api_key=config.OPENAI_API_KEY    
message='test'   
image_url='https://cdn.pixabay.com/photo/2024/03/15/17/50/dogs-8635461_1280.jpg'    
xtools.line_msg(line_token, message)    
xtools.line_sticker(line_token, message, 1, 4)      
xtools.line_image_url(line_token, message, image_url)      
prompt='Who are you'         
print(xtools.ask_gpt(prompt, openai_api_key))

優化後的 xtools (for ESP32) 已上傳 GitHub 更新 :


沒有留言:

張貼留言