2024年10月29日 星期二

MicroPython 學習筆記 : xtools for ESP8266

自從讀了陳會安老師的 "超簡單 Python/MicroPython 物聯網應用" 這本書後我都改用書中提供的 xtools.py 函式庫, 但我有添加與改寫不分函式, 使得大小從 3KB 增加到 6KB, 上傳到 ESP8266 後發現會在呼叫函式時當機重啟, 已下是用 gc.mem_free() 追蹤記憶體耗用情形的紀錄 :

MicroPython v1.23.0 on 2024-06-02; ESP module with ESP8266
Type "help()" for more information.
>>> import gc  
>>> gc.mem_free()   
33568  

開機後 160KB 的 SRAM 剩下 33.5KB (系統佔掉一大部分). 接

著匯入 6KB 的 xtools 與設定檔 : 

>>> import config  
>>> import xtools  
>>> gc.mem_free()  
24160  

吃掉 9.4KB. 連線 WiFi :

>>> xtools.connect_wifi()   
#4 ets_task(4020f540, 28, 3fffa050, 10)
Connecting to network...
network config: ('192.168.192.208', '255.255.255.0', '192.168.192.92', '192.168.192.92')
'192.168.192.208'
>>> gc.mem_free()      
23920  

低於 24KB 了. 呼叫 tw_now() 查詢 NTP 還可以 :

>>> xtools.tw_now()   
Querying NTP server and set RTC time ... OK.
'2024-10-28 10:00:38'
>>> gc.mem_free()   
23248

但呼叫 line_msg() 就會發生系統重啟動 (Reset) :

>>> xtools.line_msg(line_token, message)     

 ets Jan  8 2013,rst cause:2, boot mode:(3,6)

load 0x40100000, len 31212, room 16 
tail 12
chksum 0xa6
ho 0 tail 12 room 4
load 0x3ffe8000, len 1060, room 12 
tail 8
chksum 0x80
load 0x3ffe8430, len 1124, room 0 
tail 4
chksum 0xcb
csum 0xcb
����n�r��n|� l lll`b� �|r�l�n� �n�

研判是記憶體溢位造成, 可用的記憶體低於 24KB 會有此問題. 

於是我將 xtools.py 中大部分我自行添加的函式刪除, 包含 set_ap(), line_sticker(), line_img_url(), 以及 ask_gpt(), 但留下 line_msg(), 盡量回歸到接近陳會安老師原著中的原始樣貌, 大小從 13KB 縮小到 5KB 左右 (書中原版約 3KB), 瘦身後的 ESP8266 版 xtools 如下 :

# xtools.py for ESP8266
from machine import Pin, RTC, unique_id
import urandom, math
import time, network, urequests
import ubinascii
import config
import ntptime
import ujson

def get_id():
    return ubinascii.hexlify(unique_id()).decode('utf8')

def get_mac():
    sta=network.WLAN(network.STA_IF)
    mac=sta.config('mac')
    return ubinascii.hexlify(mac, ':').decode('utf8')

def get_num(x):
    return float("".join(ele for ele in x if ele.isdigit() or ele =="."))

def random_in_range(low=0, high=1000):
    r1 = urandom.getrandbits(32)
    r2 = r1 % (high-low) + low
    return math.floor(r2)

def map_range(x, in_min, in_max, out_min, out_max):
   return int((x-in_min) * (out_max-out_min) / (in_max-in_min) + out_min)
   
def connect_wifi(ssid=config.SSID, passwd=config.PASSWORD, led=2, timeout=20):
    wifi_led=Pin(led, Pin.OUT, value=1)
    sta=network.WLAN(network.STA_IF)
    sta.active(True)
    start_time=time.time() # 記錄時間判斷是否超時
    if not sta.isconnected():
        print("Connecting to network...")
        sta.connect(ssid, passwd)
        while not sta.isconnected():
            wifi_led.value(0)
            time.sleep_ms(300)
            wifi_led.value(1)
            time.sleep_ms(300)
            # 判斷是否超過timeout秒數
            if time.time()-start_time > timeout:
                print("Wifi connecting timeout!")
                break
    if sta.isconnected():
        for i in range(25):   # 連線成功 : 快閃 5 秒
            wifi_led.value(0)
            time.sleep_ms(100)
            wifi_led.value(1)
            time.sleep_ms(100)
        print("network config:", sta.ifconfig())
        return sta.ifconfig()[0] 

def scan_ssid():
    sta=network.WLAN(network.STA_IF)
    sta.active(True)
    aps=sta.scan()
    for ap in aps:
        ssid=ap[0].decode()
        mac=ubinascii.hexlify(ap[1], ':').decode()
        rssi=str(ap[3]) + 'dBm'
        print('{:>20} {:>20} {:>10}'.format(ssid, mac, rssi))

def show_error(final_state=0):
    led = Pin(2, Pin.OUT)   # Built-in D4
    for i in range(3):
        led.value(1)
        time.sleep(0.5)
        led.value(0)
        time.sleep(0.5)
    led.value(final_state)    

def webhook_post(url, value):
    print("invoking webhook")
    r = urequests.post(url, data=value)
    if r is not None and r.status_code == 200:
        print("Webhook invoked")
    else:
        print("Webhook failed")
        show_error()
    return r

def webhook_get(url):
    print("invoking webhook")
    r = urequests.get(url)
    if r is not None and r.status_code == 200:
        print("Webhook invoked")
    else:
        print("Webhook failed")
        show_error()
    return r

def urlencode(params):
    # 將字典的鍵值對轉換為 URL 編碼的字串 (k=v) 並以 & 連接多個鍵值對
    kv=['{}={}'.format(k, v) for k, v in params.items()]
    return '&'.join(kv)

def line_msg(token, message):
    url="https://notify-api.line.me/api/notify"
    headers={
        "Authorization": "Bearer " + token,
        "Content-Type": "application/x-www-form-urlencoded"
        }     
    params={"message": message}  # 參數字典
    # 呼叫自訂的 URL 編碼函式將字典轉成 URL 字串, 再轉成 utf-8 編碼的 bytes 
    payload=urlencode(params).encode('utf-8')
    # 用編碼後的 payload 傳給 data 參數發送 POST 請求
    r=urequests.post(url, headers=headers, data=payload)  
    if r is not None and r.status_code == 200:
        print("Message has been sent.")
    else:
        print("Error! Failed to send notification message.")  
    r.close()  # 關閉連線

def tw_now():
    try: # 從 NTP 取得 UTC 時戳加 8 為台灣時間, 若成功設定 RTC
        print('Querying NTP server and set RTC time ... ', end='')
        utc=ntptime.time() # 取得 UTC 時戳
        print('OK.')
        t=time.localtime(utc + 28800) # 傳回台灣時間的元組
        rtc=RTC() # RTC 物件
        rtc.datetime(t[0:3] + (0,) + t[3:6] + (0,)) # 設定 RTC
    except:  # 查詢 NTP 失敗不設定 RTC 
        print('Failed.')
    return strftime()  # 傳回目前之日期時間字串 YYYY-mm-dd HH:MM:SS 

def strptime(dt_str, format_str="%Y-%m-%d %H:%M:%S"):
    if format_str=="%Y-%m-%d %H:%M:%S":
        year=int(dt_str[0:4])
        month=int(dt_str[5:7])
        day=int(dt_str[8:10])
        hour=int(dt_str[11:13])
        minute=int(dt_str[14:16])
        second=int(dt_str[17:19])
        return (year, month, day, hour, minute, second, 0, 0, 0)
    else:
        raise ValueError("Unsupported format string")
    
def strftime(dt=None, format_str="%Y-%m-%d %H:%M:%S"):
    if dt is None:
        dt=time.localtime()
    return format_str.replace("%Y", str(dt[0])) \
                     .replace("%m", "{:02d}".format(dt[1])) \
                     .replace("%d", "{:02d}".format(dt[2])) \
                     .replace("%H", "{:02d}".format(dt[3])) \
                     .replace("%M", "{:02d}".format(dt[4])) \
                     .replace("%S", "{:02d}".format(dt[5]))

原始碼放在 GitHub :


將此 xtools.py 上傳 ESP8266 開發板重新測試追蹤記憶體耗用情形 : 

MicroPython v1.23.0 on 2024-06-02; ESP module with ESP8266
Type "help()" for more information.
>>> import gc  
>>> gc.mem_free()     
35152
>>> import config  
>>> import xtools  
>>> gc.mem_free()     
28640

瘦身下來多出 4KB 容量, 連線 WiFi : 

>>> xtools.connect_wifi()   
network config: ('192.168.192.208', '255.255.255.0', '192.168.192.92', '192.168.192.92')
'192.168.192.208'
>>> gc.mem_free()      
28592

呼叫 tw_now() :

>>> xtools.tw_now()   
Querying NTP server and set RTC time ... OK.
'2024-10-28 11:13:12'
>>> gc.mem_free()    
29872

呼叫 line_msg() :

>>> line_token=config.LINE_NOTIFY_TOKEN     
>>> message='test'   
>>> gc.mem_free()     
28400  
>>> xtools.line_msg(line_token, message)      
Message has been sent.
>>> gc.mem_free()      
27424

可見減肥 3KB 是有效果的, 只要 SRAM 可用容量保持在 25KB 以上應該就沒問題. 


2024-10-29 補充 :

雖然經過上面一番操作刪除一些函式將 xtools 瘦身後消除了記憶體溢位造成的當機問題, 但是函式庫寫法可能存在浪費記憶體的情形, 於是我將瘦身後的 xtools.py 貼給 ChatGPT 請它幫我檢查函式寫法是否有改進之處 : 

"你是精通 MicroPython 的物聯網大師, 請問下列這個 xtools.py 函式庫是否有浪費記憶體的寫法? 有的話是哪個函式, 請分別指出改正寫法."

結果它指出下列優化建議 :

1. random_in_range() 可以用 urandom 來處理, 無須匯入 math, 改寫如下 :

def random_in_range(low=0, high=1000):
    return urandom.getrandbits(32) % (high - low) + low

2. connect_wifi() 寫法改為如下 :

def connect_wifi(ssid=config.SSID, passwd=config.PASSWORD, led=2, timeout=20):
    wifi_led=Pin(led, Pin.OUT, value=1)
    sta=network.WLAN(network.STA_IF)
    if not sta.active():
        sta.active(True)   # 確保已啟動 WiFi
    start_time=time.time() # 記錄時間判斷是否超時
    if not sta.isconnected():
        print("Connecting to network...")
        sta.connect(ssid, passwd)
        while not sta.isconnected() and time.time() - start_time <= timeout:
            wifi_led.value(0)
            time.sleep_ms(300)
            wifi_led.value(1)
            time.sleep_ms(300)
        if not sta.isconnected():
            print("Wifi connecting timeout!")
            return None
    if sta.isconnected():
        for _ in range(25):  # 連線成功 : 快閃 5 秒
            wifi_led.value(0)
            time.sleep_ms(100)
            wifi_led.value(1)
            time.sleep_ms(100)
        print("network config:", sta.ifconfig())
        return sta.ifconfig()[0]

主要是迴圈改用 _ 當虛變數, 無法連線時傳回 None. 


3. webhook_post() 與 webhook_get() 加入 finally 釋放資源 :

def webhook_post(url, value):
    try:
        r=urequests.post(url, data=value)
        if r.status_code == 200:
            print("Webhook invoked")
        else:
            print("Webhook failed")
            show_error()
    finally:
        r.close()  # 釋放資源
    return r

def webhook_get(url):
    try:
        r=urequests.get(url)
        if r.status_code == 200:
            print("Webhook invoked")
        else:
            print("Webhook failed")
            show_error()
    finally:
        r.close()  # 釋放資源
    return r


4. tw_now() 移除 rtc 變數直接用 RTC() :

def tw_now():
    try: # 從 NTP 取得 UTC 時戳加 8 為台灣時間, 若成功設定 RTC
        print('Querying NTP server and set RTC time ... ', end='')
        utc=ntptime.time() # 取得 UTC 時戳
        print('OK.')
        t=time.localtime(utc + 28800) # 傳回台灣時間的元組
        RTC().datetime(t[0:3] + (0,) + t[3:6] + (0,))
    except Exception as e:  # 加入例外處理
        print(f'Failed. {e}')
    return strftime()  # 傳回目前之日期時間字串 YYYY-mm-dd HH:MM:SS


5. scan_ssid() 傳回值改用 f 字串嵌入 : 

def scan_ssid():
    sta=network.WLAN(network.STA_IF)
    sta.active(True)
    aps=sta.scan()
    for ap in aps:
        ssid=ap[0].decode()
        mac=ubinascii.hexlify(ap[1], ':').decode()
        rssi=str(ap[3]) + 'dBm'
        print(f'{ssid} {mac} {rssi}')


優化後的完整 xtools for ESP8266 如下 : 

# xtools.py for ESP8266 (optimized)
from machine import Pin, RTC, unique_id
import urandom, time, network, urequests, ubinascii, ntptime
import config

def get_id():
    return ubinascii.hexlify(unique_id()).decode('utf8')

def get_mac():
    sta=network.WLAN(network.STA_IF)
    mac=sta.config('mac')
    return ubinascii.hexlify(mac, ':').decode('utf8')

def get_num(x):
    return float(''.join(filter(lambda c: c.isdigit() or c == ".", x)))

def random_in_range(low=0, high=1000):
    return urandom.getrandbits(32) % (high - low) + low

def map_range(x, in_min, in_max, out_min, out_max):
    return int((x-in_min) * (out_max-out_min) / (in_max-in_min) + out_min)
   
def connect_wifi(ssid=config.SSID, passwd=config.PASSWORD, led=2, timeout=20):
    wifi_led=Pin(led, Pin.OUT, value=1)
    sta=network.WLAN(network.STA_IF)
    if not sta.active():
        sta.active(True)   # 確保已啟動 WiFi
    start_time=time.time() # 記錄時間判斷是否超時
    if not sta.isconnected():
        print("Connecting to network...")
        sta.connect(ssid, passwd)
        while not sta.isconnected() and time.time() - start_time <= timeout:
            wifi_led.value(0)
            time.sleep_ms(300)
            wifi_led.value(1)
            time.sleep_ms(300)
        if not sta.isconnected():
            print("Wifi connecting timeout!")
            return None
    if sta.isconnected():
        for _ in range(25):  # 連線成功 : 快閃 5 秒
            wifi_led.value(0)
            time.sleep_ms(100)
            wifi_led.value(1)
            time.sleep_ms(100)
        print("network config:", sta.ifconfig())
        return sta.ifconfig()[0]

def scan_ssid():
    sta=network.WLAN(network.STA_IF)
    sta.active(True)
    aps=sta.scan()
    for ap in aps:
        ssid=ap[0].decode()
        mac=ubinascii.hexlify(ap[1], ':').decode()
        rssi=str(ap[3]) + 'dBm'
        print(f'{ssid} {mac} {rssi}')

def show_error(final_state=0):
    led = Pin(2, Pin.OUT)   # Built-in D4
    for i in range(3):
        led.value(1)
        time.sleep(0.5)
        led.value(0)
        time.sleep(0.5)
    led.value(final_state)    

def webhook_post(url, value):
    try:
        r=urequests.post(url, data=value)
        if r.status_code == 200:
            print("Webhook invoked")
        else:
            print("Webhook failed")
            show_error()
    finally:
        r.close()  # 釋放資源
    return r

def webhook_get(url):
    try:
        r=urequests.get(url)
        if r.status_code == 200:
            print("Webhook invoked")
        else:
            print("Webhook failed")
            show_error()
    finally:
        r.close()  # 釋放資源
    return r

def urlencode(params):
    # 將字典的鍵值對轉換為 URL 編碼的字串 (k=v) 並以 & 連接多個鍵值對
    kv=['{}={}'.format(k, v) for k, v in params.items()]
    return '&'.join(kv)

def line_msg(token, message):
    url="https://notify-api.line.me/api/notify"
    headers={
        "Authorization": "Bearer " + token,
        "Content-Type": "application/x-www-form-urlencoded"
        }     
    params={"message": message}  # 參數字典
    # 呼叫自訂的 URL 編碼函式將字典轉成 URL 字串, 再轉成 utf-8 編碼的 bytes 
    payload=urlencode(params).encode('utf-8')
    # 用編碼後的 payload 傳給 data 參數發送 POST 請求
    r=urequests.post(url, headers=headers, data=payload)  
    if r is not None and r.status_code == 200:
        print("Message has been sent.")
    else:
        print("Error! Failed to send notification message.")  
    r.close()  # 關閉連線

def tw_now():
    try: # 從 NTP 取得 UTC 時戳加 8 為台灣時間, 若成功設定 RTC
        print('Querying NTP server and set RTC time ... ', end='')
        utc=ntptime.time() # 取得 UTC 時戳
        print('OK.')
        t=time.localtime(utc + 28800) # 傳回台灣時間的元組
        RTC().datetime(t[0:3] + (0,) + t[3:6] + (0,))
    except Exception as e:  # 加入例外處理
        print(f'Failed. {e}')
    return strftime()  # 傳回目前之日期時間字串 YYYY-mm-dd HH:MM:SS 

def strptime(dt_str, format_str="%Y-%m-%d %H:%M:%S"):
    if format_str=="%Y-%m-%d %H:%M:%S":
        year=int(dt_str[0:4])
        month=int(dt_str[5:7])
        day=int(dt_str[8:10])
        hour=int(dt_str[11:13])
        minute=int(dt_str[14:16])
        second=int(dt_str[17:19])
        return (year, month, day, hour, minute, second, 0, 0, 0)
    else:
        raise ValueError("Unsupported format string")
    
def strftime(dt=None, format_str="%Y-%m-%d %H:%M:%S"):
    if dt is None:
        dt=time.localtime()
    return format_str.replace("%Y", str(dt[0])) \
                     .replace("%m", "{:02d}".format(dt[1])) \
                     .replace("%d", "{:02d}".format(dt[2])) \
                     .replace("%H", "{:02d}".format(dt[3])) \
                     .replace("%M", "{:02d}".format(dt[4])) \
                     .replace("%S", "{:02d}".format(dt[5]))

沒有留言:

張貼留言