2025年4月22日 星期二

MicroPython 學習筆記 : xtools.py 改版為 esptools.py (使用 Telegram)

春假因應 LINE Notify 終止服務, 開始測試 Telegram API, 順利將爬蟲通訊平台轉換至 Telegram, 今天輪到 MicroPython on ESP32/ESP8266 的物聯網函式庫 xtools 改版, 去年改版時我加入利用 LINE Notify 傳訊息與圖片, 以及串接 OpenAI API 的功能, 在 ESP32 上都能運作. 今天修改 xtools.py 函式庫, 將其中使用 LINE Notify 部分全部改用 Telegram. 


完整程式碼如下 : 

# esptools.py for ESP32, 2025-04-21 updated (adapted from xtools.py)
from machine import Pin, RTC, unique_id
import urandom, time, network, urequests, ubinascii
import ntptime, ujson
import uos

def get_id():
    return ubinascii.hexlify(unique_id()).decode('utf8')

def get_mac():
    sta=network.WLAN(network.STA_IF)
    mac=sta.config('mac')
    return ubinascii.hexlify(mac, ':').decode('utf8')

def get_num(x):
    return float(''.join(filter(lambda c: c.isdigit() or c == ".", x)))

def random_in_range(low=0, high=1000):
    return urandom.getrandbits(32) % (high - low) + low

def map_range(x, in_min, in_max, out_min, out_max):
    return int((x-in_min) * (out_max-out_min) / (in_max-in_min) + out_min)
   
def connect_wifi(ssid, password, led=2, timeout=20):
    wifi_led=Pin(led, Pin.OUT, value=1)
    sta=network.WLAN(network.STA_IF)
    if not sta.active():
        sta.active(True)   # 確保已啟動 WiFi
    start_time=time.time() # 記錄時間判斷是否超時
    if not sta.isconnected():
        print("Connecting to network...")
        sta.connect(ssid, password)
        while not sta.isconnected() and time.time() - start_time <= timeout:
            wifi_led.value(0)
            time.sleep_ms(300)
            wifi_led.value(1)
            time.sleep_ms(300)
        if not sta.isconnected():
            print("Wifi connecting timeout!")
            return None
    if sta.isconnected():
        for _ in range(25):  # 連線成功 : 快閃 5 秒
            wifi_led.value(0)
            time.sleep_ms(100)
            wifi_led.value(1)
            time.sleep_ms(100)
        print("network config:", sta.ifconfig())
        return sta.ifconfig()[0] 

def scan_ssid():
    sta=network.WLAN(network.STA_IF)
    sta.active(True)
    aps=sta.scan()
    for ap in aps:
        ssid=ap[0].decode()
        mac=ubinascii.hexlify(ap[1], ':').decode()
        rssi=str(ap[3]) + 'dBm'
        print(f'{ssid} {mac} {rssi}')

def show_error(led=2, final_state=0):
    led=Pin(led, Pin.OUT)   # D1 mini built-in D4=LED 2
    for i in range(3):
        led.value(1)
        time.sleep(0.5)
        led.value(0)
        time.sleep(0.5)
    led.value(final_state)    

def webhook_post(url, value, led=2):
    try:
        r=urequests.post(url, data=value)
        if r.status_code == 200:
            print("Webhook invoked")
        else:
            print("Webhook failed")
            show_error(led)
    finally:
        r.close()  # 釋放資源
    return r

def webhook_get(url, led=2):
    try:
        r=urequests.get(url)
        if r.status_code == 200:
            print("Webhook invoked")
        else:
            print("Webhook failed")
            show_error(led)
    finally:
        r.close()  # 釋放資源
    return r

def urlencode(params):
    # 將字典的鍵值對轉換為 URL 編碼的字串 (k=v) 並以 & 連接多個鍵值對
    kv=['{}={}'.format(k, v) for k, v in params.items()]
    return '&'.join(kv)

def get_headers(service, token=None, api_key=None):
    headers={
        "Content-Type": "application/x-www-form-urlencoded"
        } # 須有初始值 
    if service == 'telegram' and token: # 根據服務類型返回標頭
        headers["Authorization"]="Bearer " + token
    elif service == 'openai' and api_key:
        headers["Authorization"]="Bearer " + api_key
        headers["Content-Type"]="application/json"
    return headers

def telegram_text(token, chat_id, text):
    url=f'https://api.telegram.org/bot{token}/sendMessage'
    headers=get_headers('telegram', token=token)
    params={'chat_id': chat_id, 'text': text}  # 參數字典
    # 將參數字典轉成 URL 字串, 再轉成 utf-8 編碼的 bytes 
    data=urlencode(params).encode('utf-8')
    # 用編碼後的 payload 傳給 data 參數發送 POST 請求
    r=None
    try:  # 發送 POST 請求
        r=urequests.post(url, headers=headers, data=data)
        # 判斷是否成功
        if r.status_code == 200:
            print('The message has been sent.')
            return True
        else:
            print('Error! Failed to send the message.')
            return False
    except Exception as e:
        print(f'Exception occurred : {e}')
        return False
    finally:
        if r:
            r.close() 

def telegram_photo_url(token, chat_id, photo_url, caption=None):
    # 透過 Telegram 發送網路圖片
    url=f'https://api.telegram.org/bot{token}/sendPhoto'
    headers=get_headers('telegram', token=token)
    # 構造請求的數據,包含圖片的 URL
    if caption:
        params={
            'chat_id': chat_id,
            'photo': photo_url,
            'caption': caption,
            'parse_mode': 'Markdown'
            }
    else:
        params={'chat_id': chat_id, 'photo': photo_url}
    # 轉成 URL 字串並用 utf-8 編碼為 bytes 
    data=urlencode(params).encode('utf-8')
    r=None
    try:  # 發送 POST 請求
        r=urequests.post(url, headers=headers, data=data)
        # 判斷是否成功
        if r.status_code == 200:
            print('The image has been sent.')
            return True
        else:
            print('Error! Failed to send the image.')
            return False
    except Exception as e:
        print(f'Exception occurred : {e}')
        return False
    finally:
        if r:
            r.close()    

def telegram_photo_file(token, chat_id, photo_path, caption=None):
    url=f'https://api.telegram.org/bot{token}/sendPhoto'
    boundary='----WebKitFormBoundary7MA4YWxkTrZu0gW'
    try:
        with open(photo_path, 'rb') as f:   # 讀取圖片
            photo_data=f.read()
    except Exception as e:
        print(f"Failed to read file: {e}")
        return False
    # 建構 multipart body
    payload=[]
    payload.append('--' + boundary)
    payload.append(f'Content-Disposition: form-data; name="chat_id"\r\n')
    payload.append(str(chat_id))
    if caption:
        payload.append('--' + boundary)
        payload.append(f'Content-Disposition: form-data; name="caption"\r\n')
        payload.append(caption)
    payload.append('--' + boundary)
    payload.append(f'Content-Disposition: form-data; name="photo"; filename="{photo_path}"')
    payload.append('Content-Type: image/jpeg\r\n')
    # 組裝 payload
    body=b''  
    for part in payload:
        if isinstance(part, str):
            body += part.encode('utf-8') + b'\r\n'
        else:
            body += part + b'\r\n'
    body += photo_data + b'\r\n'
    body += ('--' + boundary + '--\r\n').encode('utf-8')
    headers={
        'Content-Type': f'multipart/form-data; boundary={boundary}',
        'Content-Length': str(len(body))
        }
    r=None
    try:
        r=urequests.post(url, data=body, headers=headers)
        print("Status:", r.status_code)
        print("Response:", r.text)
        r.close()
        return True
    except Exception as e:
        print("Request error:", e)
        return False
    finally:
        if r:
            r.close()    

def ask_gpt(prompt, api_key, model='gpt-3.5-turbo'):
    url='https://api.openai.com/v1/chat/completions'
    headers=get_headers('openai', api_key=api_key)
    # 建立 data 參數字典
    data={
        'model': model,
        'messages': [{'role': 'user', 'content': prompt}]
        }
    # 將字典轉成字串後再編碼成 UTF-8
    payload=ujson.dumps(data).encode('utf-8')
    # 發送 POST 請求
    response=urequests.post(url, headers=headers, data=payload)
    if response.status_code == 200:
        reply=response.json() # 轉成字典
        return reply['choices'][0]['message']['content']
    else:
        return response.json()  # 返回錯誤信息

def tw_now():
    try: # 從 NTP 取得 UTC 時戳加 8 為台灣時間, 若成功設定 RTC
        print('Querying NTP server and set RTC time ... ', end='')
        utc=ntptime.time() # 取得 UTC 時戳
        print('OK.')
        t=time.localtime(utc + 28800) # 傳回台灣時間的元組
        RTC().datetime(t[0:3] + (0,) + t[3:6] + (0,))
    except Exception as e:  # 加入例外處理
        print(f'Failed. {e}')
    return strftime()  # 傳回目前之日期時間字串 YYYY-mm-dd HH:MM:SS 

def set_ap(led=2):
    html='''
    <!DOCTYPE html>
    <html>
      <head>
        <meta charset="utf-8">
        <meta name="viewport" content="width=device-width,initial-scale=1">
      </head>
      <body>
        %s
      </body>
    </html>
    '''
    form='''
        <form method='get' action='/update_ap' 
        style='width: max-content; margin: 10px auto'>
          <h2 style='text-align: center; font-size: 20px'>設定 WiFi 基地台</h2>
          <div style='display: block; margin-bottom: 20px'>
            <label for='ssid' style='display: block; font-size: 16px'>SSID</label>
            <input type='text' id='ssid' name='ssid' 
            style='padding: 10px 8px; width: 100%; font-size: 16px'>
          </div>
          <div style='display: block; margin-bottom: 20px'>
            <label for='pwd' style='display: block; font-size: 16px'>Password</label>
            <input type='text' id='pwd' name='pwd' 
            style='padding: 10px 8px; width: 100%; font-size: 16px'>
          </div>
          <button type="submit" style='width:100%;font-size: 16px'>連線</button>
        </form>
    '''
    ok='''
       <h2>WiFi 連線成功<br>IP : <a href={0}>{0}</a></h2>
       <a href=192.168.4.1>
         <button style="width:100%;font-size: 16px">重新設定</button>
       </a>              
    '''
    ng='''
       <h2 style="text-align: center;">WiFi 基地台連線失敗<br> 
       按 Reset 鈕後重新設定</h2>
       <a href="192.168.4.1">
         <button style="width:100%;font-size: 16px">重新設定</button>
       </a>   
    '''
    wifi_led=Pin(led, Pin.OUT, value=1)  # 預設熄滅板上 LED
    ap=network.WLAN(network.AP_IF)       # 開啟 AP 模式
    ap.active(True)
    sta=network.WLAN(network.STA_IF)     # 開啟 STA 模式
    sta.active(True)
    import socket
    addr=socket.getaddrinfo('192.168.4.1', 80)[0][-1] # 傳回 (ip, port)
    s=socket.socket()  # 建立伺服端 TCP socket
    s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # 網址可重複請求
    s.bind(addr)  # 綁定 192.168.4.1 的 80 埠
    s.listen(5)   # 最多同時 5 個連線
    print('網頁伺服器正在監聽 : ', addr)
    while True:   # 監聽 192.168.4.1 的 80 埠
        cs, addr=s.accept()  
        print('發現來自客戶端的連線 : ', addr)
        data=cs.recv(1024)      
        request=str(data, 'utf8')   
        print(request, end='\n')
        del data, addr
        if request.find('update_ap?') == 5:  # 檢查是否為更新之 URL 
            # 擷取請求參數中的 SSID 與密碼
            para=request[request.find('ssid='):request.find(' HTTP/')]
            ssid=para.split('&')[0].split('=')[1] 
            pwd=para.split('&')[1].split('=')[1]
            sta.connect(ssid, pwd)       # 連線 WiFi 基地台
            start_time=time.time()       # 紀錄起始時間  
            while not sta.isconnected(): # 連線 WiFi (15 秒)
                wifi_led.value(0)  # 讓板載 LED 閃爍
                time.sleep_ms(300)
                wifi_led.value(1)
                time.sleep_ms(300)                
                if time.time()-start_time > 15: # 是否超過連線秒數
                    print('WiFi 連線逾時!')
                    break  # 逾時跳出無限迴圈
            # 確認是否連線成功
            if sta.isconnected():     # WiFi 連線成功
                print('WiFi 連線成功 : ', sta.ifconfig())
                ip=sta.ifconfig()[0]  # 取得 ip
                print('取得 IP : ' + ip)
                with open('config.py', 'w', encoding='utf-8') as f:
                    f.write(f'SSID="{ssid}"\nPASSWORD="{pwd}"') # 更新設定檔
                cs.send(html % ok.format(ip))  # 回應連線成功頁面
                for i in range(25):   # 連線成功 : 快閃 5 秒
                    wifi_led.value(0)
                    time.sleep_ms(100)
                    wifi_led.value(1)
                    time.sleep_ms(100)
                cs.close()
                s.close()
                return ip
            else:
                print('WiFi 連線失敗 : 請按 Reset 鈕後重設.')
                wifi_led.value(1)     # 連線失敗 : 熄滅 LED
                cs.send(html % ng)    # 回應連線失敗頁面
                cs.close()
                s.close()
                return None
        else:  # 顯示設定 WiFi 頁面
            cs.send(html % form)  # 回應設定 WiFi 頁面
        cs.close()
        del cs, request

def strptime(dt_str, format_str="%Y-%m-%d %H:%M:%S"):
    if format_str=="%Y-%m-%d %H:%M:%S":
        year=int(dt_str[0:4])
        month=int(dt_str[5:7])
        day=int(dt_str[8:10])
        hour=int(dt_str[11:13])
        minute=int(dt_str[14:16])
        second=int(dt_str[17:19])
        return (year, month, day, hour, minute, second, 0, 0, 0)
    else:
        raise ValueError("Unsupported format string")
    
def strftime(dt=None, format_str="%Y-%m-%d %H:%M:%S"):
    if dt is None:
        dt=time.localtime()
    return format_str.replace("%Y", str(dt[0])) \
                     .replace("%m", "{:02d}".format(dt[1])) \
                     .replace("%d", "{:02d}".format(dt[2])) \
                     .replace("%H", "{:02d}".format(dt[3])) \
                     .replace("%M", "{:02d}".format(dt[4])) \
                     .replace("%S", "{:02d}".format(dt[5]))

主要的修改部分是 telegram_text(), telegram_photo_url(), 以及 telegram_photo_file() 這三個函式, 分別用來傳送訊息, 網路圖片, 與本機圖片到 Telegram 的聊天室中. 除此之外其餘函式基本上一樣, 僅部分函式的參數傳遞方式有更改 (因為撤銷 config.py 的匯入). 

由於 python-telegram-bot 套件無法在 MicroPython 中安裝, 所以必須使用 urequests 模組呼叫 Telegram 的 HTTP API 才行, 做法與在 PC 上有點差異, 首先是必須傳送 headers 參數; 其次是 payload 必須利用 urlencode() 將字典的鍵值對轉換為 URL 編碼的字串才能傳給 urequests.post() 的 data 參數. 

另外, 因為 urequests 模組不支援傳送檔案的 files 參數, 我向 ChatGPT 尋求解決方案, 它建議了 telegram_photo_file() 這個函式, 它模仿瀏覽器做法, 利用不可能出現的邊界字串, 將讀取的圖檔內容封裝到 HTTP 訊息本體 (body) 的 multipart/form-data 裡面, 經過編碼後傳給 urequests.post() 的 data 參數傳送. 

函式用法說明如下表 : 


 esptools.py 函式 說明
 get_id() 傳回開發板的 MAC 位址字串
 get_mac() 傳回開發板的 MAC 位址字串
 get_num(x) 傳回字串 x 中的數字 (含浮點數)
 random_in_range(low, high) 傳回 low~high 之間的整數亂數 (預設 0~1000)
 map_range(x, in_min, in_max,  out_min, out_max) 將數值 x 從 [in_min, in_max] 映射至 [out_min, out_max] 區間
 conect_wifi(ssid, passwd, timeout=15) 以密碼 passwd 連線 ssid 基地台 (15 秒內閃 LED), 成功 LED 恆亮並傳回所獲取之 IP 
 show_error(final_state=0) 閃爍板載 LED 3 秒以表示錯誤 (預設結束後恆亮)
 webhook_post(url, value) 利用 urequests.py 模組送出 POST 請求, 失敗呼叫 show_error()
 webhook_get(value) 利用 urequests.py 模組送出 GET 請求, 失敗呼叫 show_error()
 telegram_text(token, chat_id, text) 使用 urequests.post() 傳送文字訊息到 Telegram 聊天室
 telegram_photo_url(token, chat_id, photo_url, caption=None) 使用 urequests.post() 傳送網路圖片到 Telegram 聊天室
 telegram_photo_file(token, chat_id, photo_path, caption=None) 使用 urequests.post() 傳送本機圖片到 Telegram 聊天室
 format_datetime(localtime) 將元組 localtime 轉成日期時間字串 'YYYY-MM-DD HH:mm:SS'
 scan_ssid() 掃描附近之熱點 (基地台), 印出其 SSID 與信號強度
 tw_now() 傳回現在台灣的日期與時間
 set_ap() 開啟 AP 模式並建立伺服器 192.168.4.1 以便設定要連線之熱點


此函式庫源自陳會安老師寫的 "超簡單 Python/MicroPython 物聯網應用 (博碩, 2021)" 這本書裡面的 xtools 函式庫, 因為被我加寫與改動之處甚多, 故此次改版我將函式庫改名為 esptools.py. 函式庫中需要用到的 WiFi 連線帳密, Telegram 之權杖與聊天室 ID, 與 OpenAI API key 等均放在 config.py 檔案中 : 




使用時必須將 config.py 與 esptools.py 一起用 Thonny 上傳到 ESP32/ESP8266 開發板匯入 (另外我也上傳了一張 jpg 圖檔測試傳送本機圖片), 測試結果如下 :

MicroPython v1.25.0 on 2025-04-15; Generic ESP32S3 module with Octal-SPIRAM with ESP32S3

Type "help()" for more information.

>>> import config    
>>> import esptools   
>>> ssid=config.SSID     
>>> password=config.PASSWORD   
>>> token=config.TELEGRAM_TOKEN   
>>> chat_id=config.TELEGRAM_CHAT_ID    

呼叫 connect_wifi() 連線 WiFi 基地台 : 

>>> ip=esptools.connect_wifi(ssid, password)    
network config: ('192.168.77.242', '255.255.255.0', '192.168.77.150', '192.168.77.150')

測試傳送文字訊息 : 

>>> text='Hello! 你好!'     
>>> esptools.telegram_text(token, chat_id, text)    
The message has been sent.   
True

傳送網路圖片 :

>>> photo_url='https://cdn.pixabay.com/photo/2024/03/15/17/50/dogs-8635461_1280.jpg'   
>>> caption='可愛的狗狗'   
>>> esptools.telegram_photo_url(token, chat_id, photo_url, caption)    
The image has been sent.
True

傳送本機圖片 : 

>>> photo_path='kitten.jpg'    
>>> caption='三隻小貓'   
>>> esptools.telegram_photo_file(token, chat_id, photo_path, caption)   
Status: 200
Response: {"ok":true,"result":{"message_id":600,"from":{"id":7938146214,"is_bot":true,"first_name":"twstock","username":"twstock168_bot"},"chat":{"id":<chat_id>,"first_name":"\u9ec3\u8000\u714c","username":"<my username>","type":"private"},"date":1745333787,"photo":[{"file_id":"AgACAgUAAxkDAAICRmgHSLi8Y6f7DBRLV2fvW9MlgRDiAAK5xjEbCeY5VJtqu9sNyti7AQADAgADcwADNgQ","file_unique_id":"AQADucYxGwnmOVR4","file_size":1072,"width":90,"height":44},{"file_id":"AgACAgUAAxkDAAICRmgHSLi8Y6f7DBRLV2fvW9MlgRDiAAK5xjEbCeY5VJtqu9sNyti7AQADAgADbQADNgQ","file_unique_id":"AQADucYxGwnmOVRy","file_size":12374,"width":320,"height":156}],"caption":"\u4e09\u96bb\u5c0f\u8c93"}}
True

檢查 Telegram App 有收到訊息與圖片 : 




以上測試在 ESP32 S3 (8MB SPI RAM) 與 ESP32 WROVER (4MB SPI RAM) 兩種內建 SPI RAM 的開發板均能順利傳送訊息與圖片至 Telegram. 一般沒有 SPI RAM 的 ESP32 WROOM 開發板傳送網路圖片沒有問題, 但傳送本機圖片就會因為記憶體不足而失敗 : 

>>> photo_path='kitten.jpg'     
>>> caption='三隻小貓'     
>>> esptools.telegram_photo_file(token, chat_id, photo_path, caption)    
Request error: [Errno 12] ENOMEM   
False

NOMEM : Error NO MEMory 

大功告成, 收工囉! 

沒有留言 :