2022年11月19日 星期六

MicroPython 學習筆記 : ESP8266/ESP32 網路存取測試 (一)

在上一篇文章中已將網路存取要用到的自訂模組上傳到開發板, 接下來就可以進行網路存取測試了. 本系列之前的文章參考 :


比較舊的 ESP8266/ESP32 測試文章參考 :



1. 連線 WiFi 基地台 :

以下以 ESP8266 開發板利用 xtools 函式庫的 connect_wifi_led() 函式連線 WiFi 熱點 : 

MicroPython v1.19.1 on 2022-06-18; ESP module with ESP8266
Type "help()" for more information.
>>> import xtools    
>>> import config     
>>> config.SSID  
'TonyNote8'   
>>> config.PASSWORD      
'a123456789'    
>>> ip=xtools.connect_wifi_led(config.SSID, config.PASSWORD)    
#5 ets_task(4020f560, 28, 3fff9ed8, 10)    
Connecting to network...   
network config: ('192.168.43.54', '255.255.255.0', '192.168.43.1', '192.168.43.1')   
>>> print(ip)    
'192.168.43.54'    
>>> id=xtools.get_id()   
>>> id   
b'14f5a800'          # MAC 位址
>>> type(id)     
<class 'bytes'>      # 型態為 bytes 

可見板載 LED 閃爍幾下後就連上基地台, connect_wifi_led() 函式會傳回開發板所取得之 ip; 呼叫 get_id() 則可取得 ESP8266 的 MAC 位址, 其傳回值為 bytes 串流物件, 可呼叫 encode('utf8') 方法轉成字串, 參考 :


>>> id.decode("utf-8")  
'14f5a800'

可見 MAC 已轉成字串. 

以上是在 Thonny 的互動環境測試的結果, 可將上面的程式碼寫成 main.py 上傳到板子, MicroPython 韌體相當於是一個微小版的作業系統, 每次板子重開機或 reset 時會先執行 boot.py, 然後會自動搜尋根目錄下是否有 main.py, 有的話就執行它, 執行程序如下 :




因此如果將連網的程式碼寫成如下的 main.py, 則開機後就會自動連上 WiFi 基地台了 :

# main,py 
import xtools    
import config   

ip=xtools.connect_wifi_led(config.SSID, config.PASSWORD)
mac=xtools.get_id().decode("utf-8")
print("ip : ", ip)
print("mac : ", mac)

上傳到開發板根目錄後按 reset 鈕 :

MicroPython v1.19.1 on 2022-06-18; ESP module with ESP8266
Type "help()" for more information.
Connecting to network...
network config: ('192.168.43.54', '255.255.255.0', '192.168.43.1', '192.168.43.1')  
ip :  192.168.43.54
mac :  14f5a800


如果專案需要視情況執行不同的 App, 則可以將 App 寫成含有 main() 函式的單獨程式, 例如 app1.py, 在 main.py 中判斷要執行哪一個 App, 然後於 main.py 內呼叫 app1.main() 即可將執行控制權交給 app1.py, 基本架構如下 :




範例程式如下, 首先是 App 程式 app1.py, 裡面要定義一個 main() 函式, 然後把 App 的邏輯寫在 main() 裡面, 然後用 if 判斷此程式是否是被呼叫的 (這時程式的內建隱含變數 __name__ 的值就是 '__main__', 如果是用 import 匯入就是 None, 避免 import 時 main() 就被執行一遍), 是的話就執行 main() : 

# app1.py
def main():
    #application codes are placed here  
    print('app1 執行中 ...')

if __name__ == "__main__":  
    main()  

其次是主程式 main.py, 裡面要先用 import 匯入會用到的全部 App (app1, app2, ...), 然後在符合條件時呼叫特定 App 的 main() 函式將執行控制權交給該 App, 例如 :

# main.py
import xtools    
import config
import app1    

ip=xtools.connect_wifi_led(config.SSID, config.PASSWORD)
mac=xtools.get_id()
print("ip : ", ip)
print("mac : ", mac)
if not ip:      # 連線成功會傳回 ip 字串, 否則為 None
    print('無法連線 WiFi 基地台')
else:
    app1.main()    # 連線成功才執行 App 程式


2. 掃描附近基地台 :

我將之前自己寫的掃瞄基地台函式添加到 xtools.py 中, 取名為 scan_ssid(), 只要呼叫它就會整齊地列出附近所有基地台的 ssid 與其功率強度, 例如 : 

>>> import xtools   
>>> xtools.scan_ssid()    
    EDIMAX-tony-Plus    5c:27:d4:f3:82:22     -75dBm
                JANE    cc:2d:21:42:9a:61     -84dBm
         EDIMAX-tony    80:1f:02:2d:5a:9e     -76dBm
          45N5F-2.4G    20:6a:94:3b:c9:7e     -93dBm


3. 日期時間測試 :

MicroPyhton 只支援 time (或 utime) 模組, 不支援 datetime 與 calendar, 所以用到日期時間時必須從 time 模組著手, 參考 : 


為了節省空間考量, MicroPython 的 utime 模組並未實作 ctime() 函式, 只能用 time(), maketime() 與 localtiome() 函式, time.time() 會傳回自 2000/1/1 起至今的累計秒數 (稱為 epoch, 但與 Linux 從 1970/1/1 起算日期不同), 而 time.localtime() 則傳回日期時間元組 (年, 月, 日, 時, 分, 秒, 星期, 天), 其中星期值為 0~6 (0 為星期天), 天值為 1~366 (一年中的第幾天), 例如 : 

>>> import time    
>>> time.time()    
78
>>> time.localtime()   
(2000, 1, 1, 0, 1, 25, 5, 1) 

time.maketime() 函式功能與 time.localtime() 相反, 它可以將 6 元素的日期時間元組轉回起自 2000/1/1 的 epoch 秒數, 例如 :

>>> time.mktime(time.localtime())    # 轉成起自 2000/1/1 的 epoch 秒數
368

因為板子內的 RTC 時鐘目前是 2000/1/1, 所以秒數僅 368 秒. 

事實上 time.localtime() 是透過 machine 模組內的 RTC 類別去查詢 ESP8266/ESP32 內部的即時時鐘 (RTC) 得到的日期時間, 這也可以直接用 machine 模組的 RTC 物件查詢, 參考 :  


>>> import machine     
>>> rtc=machine.RTC()               # 建立 RTC 物件
>>> rtc.datetime()                         # 取得 RTC 的日期時間
(2000, 1, 1, 5, 0, 4, 18, 586130)      # (年, 月, 日, 星期, 時, 分, 秒, 毫秒) 

但此 RTC 預設初始值是 2000/01/01 00:00:00, 必須透過 WiFi 連網後利用 ntptime 模組的 settime() 函式從 NTP 伺服器取得網路時間來同步, 這樣呼叫 time.localtime() 才會得到準確的時間. 

MicroPython 有內建 ntptime 模組以便與 NTP 伺服器連線取得 UTC 時間, 

>>> import ntptime   
>>> dir(ntptime)  
['__class__', '__name__', 'socket', 'struct', 'time', 'settime', 'NTP_DELTA', 'host']
>>> help(ntptime)    
object <module 'ntptime'> is of type module
  socket -- <module 'lwip'>
  NTP_DELTA -- 3155673600
  __name__ -- ntptime
  struct -- <module 'ustruct'>
  time -- <function time at 0x3ffeff50>
  host -- pool.ntp.org
  settime -- <function settime at 0x3fff0250> 

其中 host 變數用來儲存 NTP 伺服器網址, 預設為 pool.ntp.org, 亦可改用如下伺服器 : 
  • tock.stdtime.gov.tw
  • watch.stdtime.gov.tw
  • time.stdtime.gov.tw
  • clock.stdtime.gov.tw  
  • tick.stdtime.gov.tw 
  • time.nist.gov
NTP_DELTA 常數則是用來設定 NTP 與 Python 的基準時間差距, 因 NTP 時戳是從1900/1/1 起算的秒數; 但 Python 時戳卻是從 2000/1/1 起算的, 兩者差了 100 年, 所以要補上 36524 天 (含閏年)*24*60*60=3155673600 秒, 其值已預設到 NTP_DELTA 屬性 (勿改). 

ntp.time() 函式會傳回目前 NTP 伺服器的 epoch 秒數 (UTC/GMT 時區), 呼叫 ntp.settime() 函式會用 ntp.time() 來設定 ESP8266/ESP32 內部的 RTC 時鐘, 這樣就可以讓板子與網路時間同步了, 例如 :

同步前先查詢板子上的 RTC 時鐘 : 

>>> time.time()         
2172
>>> rtc.datetime()      
(2000, 1, 1, 5, 0, 36, 15, 615714)

可見此時 RTC 時鐘仍是預設值. 接著查詢 NTP 時間並呼叫 ntp.settime() 來同步 :

>>> ntptime.time()       
722416034
>>> ntptime.settime()    

注意, 以前 ntptime.settime() 會傳回所取得之時間, 現在新版的不會. 同步後查詢板子上的 RTC 時鐘 :

>>> time.time()      
722416046
>>> rtc.datetime()     
(2022, 11, 22, 1, 7, 7, 31, 572148)
>>> time.localtime()      
(2022, 11, 22, 7, 7, 42, 1, 326)

現在時間是下午 3 點多, 可見從 NTP 取得的是 UTC 時間, 必需再加上 28800 秒才是台灣時區的秒數 (UTC+8). xtools.py 工具函式庫的 format_datetime() 函式可用來製作 'YYYY-MM-DD HH:mm:SS' 格式的日期時間字串, 只要將 time.localtiome() 傳入 xtools.format_datetime() 即可, 例如 : 

>>> xtools.format_datetime(time.localtime())     
'2022-11-22 07:15:05'

但是此 format_datetime() 函式並沒有處理台灣時區秒差問題, 所以我參考自己之前寫的函式修改為 tw_now() 加到 xtools 裡面來彌補此缺陷 (需匯入 ntptime) :

import ntptime

def tw_now():
    try:
        ntptime.settime()
    except:
        pass
    utc_epoch=time.mktime(time.localtime())
    Y,M,D,H,m,S,ms,W=time.localtime(utc_epoch + 28800)
    t='%s-%s-%s %s:%s:%s' % \
    (Y, pad_zero(M), pad_zero(D), pad_zero(H), pad_zero(m), pad_zero(S))
    return t

此函式會先呼叫 ntptime.settime() 來同步內部 RTC 時鐘, 因為跑一段時間後 RTC 會與 NTP 伺服器有秒差, 但只要每次呼叫 tw_now() 就會同步一次, 可避免秒差越來越大. 將新版 xtools.py 上傳到板子, 然後在 Thonny 按 "執行/停止重新值行後端程式" 重設板子, 再次匯入 xtools 呼叫 tw_now() 即可得到台灣時間了 (此處使用 try except 避免 NTP 伺服器連線失敗) :

MicroPython v1.19.1 on 2022-06-18; ESP32 module with ESP32

Type "help()" for more information.
>>> import xtools   
>>> xtools.tw_now()    
'2022-11-22 15:27:30'   

這樣就得到台灣時間而非 UTC/GMT 時間了. 


4. 用 urequests 擷取網頁 :

較舊版的 MicroPython 韌體中有實作 urllib 模組, 但目前 (v1.19.1) 已經沒有了 : 

>>> import urllib    
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
ImportError: no module named 'urllib'

但 MicroPython 內建了 CPython 第三方模組 requests 的子集 urequest 模組可用來擷取網頁, 我五年前曾經測試過, 參考 :


先檢視 urequests 模組內容 : 

>>> import urequests  
>>> dir(urequests)  
['__class__', '__name__', 'get', '__file__', 'put', 'usocket', 'Response', 'request', 'head', 'post', 'patch', 'delete']
>>> help(urequests)    
object <module 'urequests' from 'urequests.py'> is of type module
  put -- <function put at 0x3ffe5b30>
  post -- <function post at 0x3ffe5b20>
  usocket -- <module 'usocket'>
  patch -- <function patch at 0x3ffe5b40>
  request -- <function request at 0x3ffe5960>
  __file__ -- urequests.py
  __name__ -- urequests
  delete -- <function delete at 0x3ffe5b50>
  head -- <function head at 0x3ffe5890>
  Response -- <class 'Response'>
  get -- <function get at 0x3ffe58a0>

與以前相比可知新版韌體多了一個 Response 類別, 此類別用來放伺服器對 HTTP 請求之回應. 其實最常用的是 get() 與 post() 方法, 分別用來向伺服器提出 GET 與 POST 請求, 說明文件參考 :


首先用 urequest 擷取 MicroPython 官網測試網頁的範例 : 

>>> import urequests      
>>> r=urequests.get("http://micropython.org/ks/test.html")    
>>> r    
<Response object at 3fff3870>   

可見 get() 函式會傳回一個 Response 物件, 裡面存放伺服器對 HTTP 請求的回應, 先用 help() 檢視 Response 物件內容 :

>>> dir(r)   
['__class__', '__init__', '__module__', '__qualname__', 'close', '__dict__', 'encoding', 'text', 'json', 'status_code', 'reason', 'raw', '_cached', 'content']
>>> help(r)    
object <Response object at 3fff3870> is of type Response
  text -- <property>
  __init__ -- <function __init__ at 0x3ffe58d0>
  __qualname__ -- Response
  close -- <function close at 0x3ffe58c0>
  content -- <property>
  json -- <function json at 0x3ffe5950>
  __module__ -- urequests

其中 ststus_code 屬性存放 HTTP 請求的回應狀態, 正常為 200. encoding 是回應內容之編碼, 例如 'utf-8' 等. text 與 content 屬性用來存放回應訊息, text 是純文字內容, 而 content 則是原始 bytes 串流, 可以呼叫 decode('utf8') 轉成純文字. json() 方法用來將 json 格式的回應資料轉換成 dict 型態, 但若回應不是 json 格式會出現錯誤, 例如 : 

>>> r.status_code    
200
>>> r.encoding   
'utf-8'
>>> r.text          # 型態是字串
'<!DOCTYPE html>\n<html lang="en">\n    <head>\n        <title>Test</title>\n    </head>\n    <body>\n        <h1>Test</h1>\n        It\'s working if you can read this!\n    </body>\n</html>\n'
>>> r.content    # 型態是 bytes 串流 
b'<!DOCTYPE html>\n<html lang="en">\n    <head>\n        <title>Test</title>\n    </head>\n    <body>\n        <h1>Test</h1>\n        It\'s working if you can read this!\n    </body>\n</html>\n'
>>> type(r.content)   
<class 'bytes'>   
>>> r.content.decode('utf8')      # 將 bytes 串流轉成字串
'<!DOCTYPE html>\n<html lang="en">\n    <head>\n        <title>Test</title>\n    </head>\n    <body>\n        <h1>Test</h1>\n        It\'s working if you can read this!\n    </body>\n</html>\n'

可見 r.text 與 r.content 內容其實是一樣的, 只是資料型態不同而已. 對於非 json 格式資料若呼叫 json() 方法會出現 json 語法錯誤, 例如 : 

>>> r.json()    
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "urequests.py", line 33, in json
ValueError: syntax error in JSON

回應 json 格式資料的伺服器可用 http://ip-api.com/json/ 來測試, 例如 : 

>>> r=urequests.get("http://ip-api.com/json/")     
>>> r.text   
'{"status":"success","country":"Taiwan","countryCode":"TW","region":"TNN","regionName":"Tainan","city":"Tainan City","zip":"","lat":22.9917,"lon":120.2148,"timezone":"Asia/Taipei","isp":"Chunghwa Telecom Co., Ltd.","org":"Chunghwa Telecom Co. Ltd.","as":"AS3462 Data Communication Business Group","query":"218.166.10.110"}'
>>> r.content   
b'{"status":"success","country":"Taiwan","countryCode":"TW","region":"TNN","regionName":"Tainan","city":"Tainan City","zip":"","lat":22.9917,"lon":120.2148,"timezone":"Asia/Taipei","isp":"Chunghwa Telecom Co., Ltd.","org":"Chunghwa Telecom Co. Ltd.","as":"AS3462 Data Communication Business Group","query":"218.166.10.110"}'
>>> r.json()   
{'countryCode': 'TW', 'lon': 120.2148, 'timezone': 'Asia/Taipei', 'city': 'Tainan City', 'status': 'success', 'country': 'Taiwan', 'org': 'Chunghwa Telecom Co. Ltd.', 'query': '218.166.10.110', 'region': 'TNN', 'lat': 22.9917, 'zip': '', 'isp': 'Chunghwa Telecom Co., Ltd.', 'as': 'AS3462 Data Communication Business Group', 'regionName': 'Tainan'}
>>> type(r.json())   
<class 'dict'>

可見 json() 方法已將 json 格式的字串資料轉成 dict 型態資料. 

傳入 get() 方法的 URL 字串後面可以用 ? 與 & 攜帶查詢參數, 這些參數會放在網址中傳遞, 安全性低且可攜帶的參數量較少, 可以使用 httpbin.org 提供的 echo 服務來測試所傳遞之參數, 此網站會將 HTTP 請求的訊息放在回應中傳回來, 例如 : 

>>> r=urequests.get('http://httpbin.org/get?a=1&b=2')    
>>> print(r.text)      
{
  "args": {
    "a": "1"
    "b": "2"
  }, 
  "headers": {
    "Host": "httpbin.org", 
    "X-Amzn-Trace-Id": "Root=1-637ce676-76373b6349b758384140f662"
  }, 
  "origin": "218.166.10.110", 
  "url": "http://httpbin.org/get?a=1&b=2"
}

可見查詢參數 a 與 b 被放在 args 屬性中傳回來. 如果有多個參數就用 & 串接起來即可. 不過 r.text 是字串, 如果要用字典取得 args 等屬性, 可以利用 json 模組在 MicroPython 的內建子集模組 ujson 的 loads() 函式轉換成 dict 型態, 例如 :

>>> import ujson   
>>> ujson.loads(r.text)['args']    
{'a': '1', 'b': '2'}

或者直接呼叫 r.json() 取得 dict 型態資料更快, 不需要匯入 ujson, 例如 :

>>> r.json()['args']    
{'a': '1', 'b': '2'}

post() 方法是將查詢參數放在 HTTP 訊息的 body 中傳送, 安全性較高且可傳遞之參數量較大. 查詢參數是以 dict 方式表示, 傳給 post() 的 json 參數, 例如 : 

>>> data={'a':'1','b':'2'}    
>>> r=urequests.post("http://httpbin.org/post", json=data)   
>>> print(r.text)    
{
  "args": {}, 
  "data": "{\"a\": \"1\", \"b\": \"2\"}", 
  "files": {}, 
  "form": {}, 
  "headers": {
    "Content-Length": "20", 
    "Content-Type": "application/json", 
    "Host": "httpbin.org", 
    "X-Amzn-Trace-Id": "Root=1-637cea54-73807f7b4c9e23e56076edac"
  }, 
  "json": {
    "a": "1", 
    "b": "2"
  }, 
  "origin": "218.166.10.110", 
  "url": "http://httpbin.org/post"
}
 
也可以傳給 data 參數, 但 dict 資料須用 ujson.dumps() 序列化, 例如 : 

>>> import ujson   
>>> r=urequests.post("http://httpbin.org/post", data=ujson.dumps(data))   
>>> print(r.text)   
{
  "args": {}, 
  "data": "{\"a\": \"1\", \"b\": \"2\"}", 
  "files": {}, 
  "form": {}, 
  "headers": {
    "Content-Length": "20", 
    "Host": "httpbin.org", 
    "X-Amzn-Trace-Id": "Root=1-637cf4ab-6d1cd6a83e9c5e4e0f489370"
  }, 
  "json": {
    "a": "1", 
    "b": "2"
  }, 
  "origin": "218.166.10.110", 
  "url": "http://httpbin.org/post"
}

結果與使用 json 參數是一樣的. 

如果要用 data 參數傳遞 post 字典變數, 則必須將其變成字串型態 (這也是一種序列化), 例如 :

>>> import urequests   
>>> data="{'a':'1','b':'2'}"     # 以字串表示的字典變數 
>>> r=urequests.post("http://httpbin.org/post", data=data)    
>>> print(r.text)    
{
  "args": {}, 
  "data": "{'a':'1','b':'2'}"
  "files": {}, 
  "form": {}, 
  "headers": {
    "Content-Length": "17", 
    "Host": "httpbin.org", 
    "X-Amzn-Trace-Id": "Root=1-63903aaa-7b1086f156a7abf327d2eb8c"
  }, 
  "json": null, 
  "origin": "42.74.198.20", 
  "url": "http://httpbin.org/post"
}

如果 data 不是字面值而是變數, 則可以傳給 str() 轉成字串, 例如 : 

>>> import urequests   
>>> data={'a':'1','b':'2'}   
>>> r=urequests.post("http://httpbin.org/post", data=str(data)   
>>> print(r.text)    
{
  "args": {}, 
  "data": "{'a': '1', 'b': '2'}"
  "files": {}, 
  "form": {}, 
  "headers": {
    "Content-Length": "20", 
    "Host": "httpbin.org", 
    "X-Amzn-Trace-Id": "Root=1-63903bc9-5472953e51e26ee85ab8012a"
  }, 
  "json": null, 
  "origin": "42.74.198.20", 
  "url": "http://httpbin.org/post"
}

除了 urequests 外, 也可以使用從 urequest 修改而來的自訂模組 xrequest (添加 params 參數並對 data 套用 URL 編碼) 來擷取網頁, 用法跟 urequests 相同, 下面改用 xrequest 擷取 :

>>> import xrequests    
>>> r=xrequests.get("http://micropython.org/ks/test.html")    
>>> r.text   
'<!DOCTYPE html>\n<html lang="en">\n    <head>\n        <title>Test</title>\n    </head>\n    <body>\n        <h1>Test</h1>\n        It\'s working if you can read this!\n    </body>\n</html>\n'

網路存取具有不確定性, 如果沒有回應 (timeout) 會出現例外, 所以最好是將 HTTP 請求放在 try except 區塊中, 例如: 

try:
    r=requests.get(url, timeout=3)
    r.raise_for_status()
except:
    print ("HTTP Error!")


2022-11-22 補充 :

今天改寫了連線 NTP 伺服器以同步板子內部 RTC 時鐘以取得目前日期時間部分, 同時為 xtools.py 添加了 tw_now() 函式. 

沒有留言:

張貼留言