2017年8月28日 星期一

MicroPython on ESP8266 (二十) : 從 ThingSpeak 讀取資料

昨天完成 MicroPython on ESP8266 的太陽能測候站後, 可以在電腦上連線 ThingSpeak 網站遠端觀察 ESP8266 傳遞上來的數據, 也可以在手機上安裝 ThingView 這個 App 來隨時檢視監測數據, 關於 ThingView 參考 :

MicroPython on ESP8266 (十三) : DHT11 溫溼度感測器測試

回高雄的途中突然想到, ThingSpeak 上除了 Write API Key 外, 還有 Read API Key, 亦即可以從遠端設備直接讀取 ThingSpeak 資料庫上儲存的通道數據. 我的想法是用 ESP8266 模組讀取 ThingSpeak 數據後將資料顯示在 1602 LCD 或 SSD1306 OLED 顯示器上, 這樣便毋須開啟電腦或拿出手機檢視了. 在 ThingSpeak 網站的 Channels/API Keys 裡可查得通道的 Channel ID 與 Read API Keys (可自行建立多個讀取 API Key) :




ThingSpeak 的寫入週期免費用戶限制最快 15 秒一次, 付費用戶則是每秒一次, 超過限制會產生錯誤而無法寫入. 不過讀取週期卻對任何用戶而言都沒有限制, 參考 :

https://www.mathworks.com/help/thingspeak/channel-settings.html#keys

"API Rate Limits
A free user can update a ThingSpeak channel every 15 seconds, and a paid user can update every 1 second. Updating more frequently results in an error. The time between read requests is not limited by ThingSpeak for any users. To change settings to meet your application requirements locally, download the source code from GitHub."

以下記錄測試過程, 這次我使用 WeMOS D1 Mini 搭配 SSD1306 OLED 顯示器. 本系列 MicroPython on ESP8266 測試文章參考 :

MicroPython on ESP8266 (一) : 燒錄韌體
MicroPython on ESP8266 (二) : 數值型別測試
MicroPython on ESP8266 (三) : 序列型別測試
MicroPython on ESP8266 (四) : 字典與集合型別測試
MicroPython on ESP8266 (五) : WiFi 連線與 WebREPL 測試
MicroPython on ESP8266 (六) : 檔案系統測試
MicroPython on ESP8266 (七) : 時間日期測試
MicroPython on ESP8266 (八) : GPIO 測試
MicroPython on ESP8266 (九) : PIR 紅外線移動偵測
MicroPython v1.9.1 版韌體測試
MicroPython on ESP8266 (十) : socket 模組測試
MicroPython on ESP8266 (十一) : urllib.urequest 模組測試
MicroPython on ESP8266 (十二) : urequests 模組測試
MicroPython on ESP8266 (十三) : DHT11 溫溼度感測器測試
MicroPython 使用 ampy 突然無法上傳檔案問題
MicroPython on ESP8266 (十四) : 網頁伺服器測試
WeMOS D1 Mini 開發板測試
MicroPython on ESP8266 (十五) : 光敏電阻與 ADC 測試
MicroPython on ESP8266 (十六) : 蜂鳴器測試
MicroPython on ESP8266 (十七) : 液晶顯示器 1602A 測試
MicroPython on ESP8266 (十八) : SSD1306 液晶顯示器測試
MicroPython v1.9.2 版釋出
MicroPython on ESP8266 (十九) : 太陽能測候站與移動偵測控制照明

MicroPython 文件參考 :

MicroPython tutorial for ESP8266  (官方教學)
http://docs.micropython.org/en/latest/micropython-esp8266.pdf
http://docs.micropython.org/en/latest/pyboard/library/usocket.html#class-socket
http://docs.micropython.org/en/v1.8.7/esp8266/library/usocket.html#module-usocket
https://gist.github.com/xyb/9a4c8d7fba92e6e3761a (驅動程式)

實際做法我參考了下面這篇文章 :

How to read data from a Thingspeak channel using a Raspberry Pi | Raspberry Pi | Forum

此文是針對 Raspberry Pi 上的 CPython, 使用 urllib2 模組來處理 HTTP 協定; 我則是偏好使用 requests (在 MicroPython 上為 urequests) 模組, 我只參考了 API 的 URL 格式 :

GET https://api.thingspeak.com/channels/58096/feeds/last.json?api_key=PWEAKSDX9X9QP1V6

這個 Query 字串中的關鍵資訊是黃底部分的 Channel ID 與 Read API Key, 預期 ThingSpeak 伺服器將傳回最近一筆寫入數據. 其他 Feeds 的擷取方法參考 :

https://www.mathworks.com/help/thingspeak/get-a-channel-feed.html


測試 1 : 讀取 ThingSpeak 通道資料

#myapp.py
import urequests
import time

host='http://api.thingspeak.com'
read_api_key='PWEAKSDX9X9QP1V6'
channel_id='58096'

url='%s/channels/%s/feeds/last.json?api_key=%s' \
     %(host, channel_id, read_api_key)

while True:
    try:
        r=urequests.get(url)
        print(r.text)
        print(r.json())
    except:
        print('urequests.get() exception occurred!')
    time.sleep(3)

將此檔案存成 myapp.py, 用 ampy 傳 ESP8266 後開啟 PuTTY 按 Ctrl+D 軟開機即可. 連接 WiFi 工作交由 WiFi 連網程式 main.py 負責, 參考下面這篇文章中的 "3.  SSD1306 OLED 版 AP 設定程式 " :

WeMOS D1 Mini 開發板測試

在上面的程式中, 我以 3 秒週期去 ThingSpeak 讀取指定通道之數據, 分別印出回應物件的 text 屬性與 json() 方法的傳回值, 兩者之差別是 text 屬性為字串, 而 json() 傳回的是 JSON 資料轉成的字典物件, 參考 :

MicroPython on ESP8266 (十二) : urequests 模組測試

REPL 輸出如下 :

PYB: soft reboot
#9 ets_task(40100164, 3, 3fff829c, 4)
WebREPL is not configured, run 'import webrepl_setup'
Connecting to AP ...
Connected:IP= 192.168.43.252
{"created_at":"2017-08-28T03:03:22Z","entry_id":75291,"field1":"30","field2":"86","field3":"39","field4":"4","field5":"0"}
{'created_at': '2017-08-28T03:03:22Z', 'field5': '0', 'entry_id': 75291, 'field3': '39', 'field2': '86', 'field1': '30', 'field4': '4'}
{"created_at":"2017-08-28T03:03:22Z","entry_id":75291,"field1":"30","field2":"86","field3":"39","field4":"4","field5":"0"}
{'created_at': '2017-08-28T03:03:22Z', 'field5': '0', 'entry_id': 75291, 'field3': '39', 'field2': '86', 'field1': '30', 'field4': '4'}
{"created_at":"2017-08-28T03:03:22Z","entry_id":75291,"field1":"30","field2":"86","field3":"39","field4":"4","field5":"0"}
{'created_at': '2017-08-28T03:03:22Z', 'field5': '0', 'entry_id': 75291, 'field3': '39', 'field2': '86', 'field1': '30', 'field4': '4'}
{"created_at":"2017-08-28T03:03:22Z","entry_id":75291,"field1":"30","field2":"86","field3":"39","field4":"4","field5":"0"}
{'created_at': '2017-08-28T03:03:22Z', 'field5': '0', 'entry_id': 75291, 'field3': '39', 'field2': '86', 'field1': '30', 'field4': '4'}

由於在上一篇測試 "太陽能測候站與移動偵測控制照明" 中是以 20 秒的週期將測候站量得之數據寫入 ThingSpeak 資料庫, 而此處卻以 3 秒週期讀取最新一筆寫入資料, 所以會連續讀到好幾筆重複的資料. 同時也可看出 text 屬性因為是字串, 鍵與值都用雙引號; 而 json() 傳回值 (字典) 則以單引號括起來. 傳回的每一筆資料包括下列欄位 :
  1. created_at : 數據寫入日期與時間 (注意, 這是 GMT 時間)
  2. entry_id : 紀錄的 ID
  3. field1 : 攝氏溫度
  4. field2 : 華氏溫度
  5. field3 : 濕度
  6. field4 : 亮度
  7. field5 : PIR 觸發次數


既然 json() 傳回值是 dict 物件, 那麼就可以透過鍵來取得其值, 如下列測試 2 所示 :

測試 2 : 讀取 ThingSpeak 通道資料 (取得字典內各欄位之值)

#myapp.py
import urequests
import time

host='http://api.thingspeak.com'
read_api_key='PWEAKSDX9X9QP1V6'
channel_id='58096'

url='%s/channels/%s/feeds/last.json?api_key=%s' \
     %(host, channel_id, read_api_key)

while True:
    try:
        r=urequests.get(url)
        d=r.json()
        print(d['created_at'],d['entry_id'],d['field1'],d['field2'],
              d['field3'],d['field4'],d['field5'])
    except:
        print('urequests.get() exception occurred!')
    time.sleep(10)

此處我將讀取週期改為 10 秒, 而數據來源端寫入週期為 20 秒, 因此將能大幅降低讀到多筆重複資料情形, 預期頂多重複一次, REPL 輸出如下 :

PYB: soft reboot
#9 ets_task(40100164, 3, 3fff829c, 4)
WebREPL is not configured, run 'import webrepl_setup'
Connecting to AP ...
Connected:IP= 192.168.43.252
2017-08-28T06:56:33Z 75985 32 90 35 3 0
2017-08-28T06:56:54Z 75986 32 90 35 4 0
2017-08-28T06:56:54Z 75986 32 90 35 4 0
2017-08-28T06:57:14Z 75987 32 90 35 4 0
2017-08-28T06:57:34Z 75988 32 90 38 3 0
2017-08-28T06:57:34Z 75988 32 90 38 3 0
2017-08-28T06:57:54Z 75989 32 90 35 4 0
2017-08-28T06:57:54Z 75989 32 90 35 4 0
2017-08-28T06:58:14Z 75990 32 90 35 4 0
2017-08-28T06:58:14Z 75990 32 90 35 4 0
2017-08-28T06:58:34Z 75991 32 90 36 4 0
2017-08-28T06:58:34Z 75991 32 90 36 4 0
2017-08-28T06:58:54Z 75992 33 91 34 3 0
2017-08-28T06:58:54Z 75992 33 91 34 3 0
2017-08-28T06:59:14Z 75993 32 90 38 4 0
2017-08-28T06:59:14Z 75993 32 90 38 4 0
2017-08-28T06:59:35Z 75994 32 90 35 3 0
2017-08-28T06:59:55Z 75995 32 90 35 3 0
2017-08-28T06:59:55Z 75995 32 90 35 3 0
2017-08-28T07:00:15Z 75996 32 90 35 3 0
2017-08-28T07:00:15Z 75996 32 90 35 3 0
2017-08-28T07:00:35Z 75997 32 90 36 3 0

沒錯, 重複次數最多一次.

接下來要將自 ThingSpeak 取得的數據顯示在 SSD1306 OLED 顯示器, 作法參考下面這篇 :

MicroPython on ESP8266 (十八) : SSD1306 液晶顯示器測試

D1 Mini 或 ESP-12 模組與 SSD1306 的連接方式如下 :


SSD1306 使用 I2C 介面, D1 Mini 使用 D1 接 SCL, D2 接 SDA, 但在建立 I2C 物件時必須傳入 D1 與 D2 腳相對應的 GPIO5 與 GPIO4.

使用 SSD1306 需先下載其 MicroPython 驅動程式 ssd1306.py 並用 ampy 上傳 ESP8266 :

micropython/drivers/display/ssd1306.py  (MicroPython 官網)

下面程式是參考 "SSD1306 液晶顯示器測試" 修改測試 2 而得 :


測試 3 : 讀取 ThingSpeak 通道資料並顯示於 OLED 顯示器 (GMT 時間)

import urequests
import time
import ssd1306  
from machine import Pin, I2C

def fill_blank(n):      
    if n<10:
        return ' ' + str(n)
    else:
        return str(n)

i2c=I2C(scl=Pin(5), sda=Pin(4))              
oled=ssd1306.SSD1306_I2C(128, 32, i2c)

host='http://api.thingspeak.com'
read_api_key='PWEAKSDX9X9QP1V6'
channel_id='58096'

url='%s/channels/%s/feeds/last.json?api_key=%s' \
     %(host, channel_id, read_api_key)

while True:
    try:
        r=urequests.get(url)
    except:
        print('urequests.get() exception occurred!')
    j=r.json()
    print(j['created_at'],j['entry_id'],j['field1'],j['field2'],
          j['field3'],j['field4'],j['field5'])

    YMD,HmS=j['created_at'].split('T')      #以 'T' 拆分日期時間
    HmS=HmS.replace('Z','')                       #去除時間結尾的 'Z'
    T='%sC' % (fill_blank(int(j['field1'])))  
    H='%sH' % (fill_blank(int(j['field3'])))  
    L='%s%%' % (fill_blank(int(j['field4'])))  

    line1='%s' % (YMD)  
    line2='%s' % (HmS)  
    line3='%s %s %s' % (T, H, L)  
    line4='Trigger:%s' % (j['field5'])  
    oled.fill(0)                
    oled.text(line1, 0, 0)    
    oled.text(line2, 0, 8)    
    oled.text(line3, 0, 16)  
    oled.text(line4, 0, 24)  
    oled.show()    
    time.sleep(10)

上面程式中我直接將 'created_at' 欄位裡的日期時間字串以 'T' 為界拆出日期與時間, 然後將溫溼度亮度字串轉成整數後丟給 fill_blank() 製作成固定兩字元字串, 以免個位數時顯示位置位移. 注意, 上面的日期時間是 GMT 時間, 亦即數據來源寫入 ThingSpeak 資料庫時的 GMT 時間. 如果要改為台灣時間可參考 "SSD1306 液晶顯示器測試" 的測試 4 作法 :


測試 4 : 讀取 ThingSpeak 通道資料並顯示於 OLED 顯示器 (台灣時間)

import urequests
import time
import ssd1306
from machine import Pin, I2C

def fill_zero(n):  
    if n < 10:  
        return '0' + str(n)
    else:  
        return str(n)

def fill_blank(n):    
    if n<10:
        return ' ' + str(n)
    else:
        return str(n)

week={0:'Mon',1:'Tue',2:'Wed',3:'Thu',4:'Fri',5:'Sat',6:'Sun'}

i2c=I2C(scl=Pin(5), sda=Pin(4))              
oled=ssd1306.SSD1306_I2C(128, 32, i2c)

host='http://api.thingspeak.com'
read_api_key='PWEAKSDX9X9QP1V6'
channel_id='58096'

url='%s/channels/%s/feeds/last.json?api_key=%s' \
     %(host, channel_id, read_api_key)

while True:
    try:
        r=urequests.get(url)
    except:
        print('urequests.get() exception occurred!')
    j=r.json()
    print(j['created_at'],j['entry_id'],j['field1'],j['field2'],
          j['field3'],j['field4'],j['field5'])

    DT=j['created_at'].replace('T','-')  
    DT=DT.replace('Z','')     #去除時間結尾的 'Z'
    DT=DT.replace(':','-')     #將時間的 ':' 全改為 '-' 以利 split()
    DT=DT + '-0-0'              #因 time.mktime() 需 8 個整數參數故補兩個 0
    DT=DT.split('-')              #拆成 6 元素的串列
    DT=[int(i) for i in DT]   #因 time.mktime() 需 8 個整數參數
    Y,M,D,H,m,S,W,DY=time.localtime(time.mktime(DT) + 28800)    #轉成台灣時間
    YMD='%s-%s-%s' % (str(Y),fill_zero(M),fill_zero(D))  
    WD=week[W]      
    HmS='%s:%s:%s' % (fill_zero(H),fill_zero(m),fill_zero(S))  
    T='%sC' % (fill_blank(int(j['field1'])))
    H='%sH' % (fill_blank(int(j['field3'])))  
    L='%s%%' % (fill_blank(int(j['field4'])))  

    line1='%s %s' % (YMD, WD)
    line2=HmS
    line3='%s %s %s' % (T, H, L)
    line4='Trigger:%s' % (j['field5'])
    oled.fill(0)              
    oled.text(line1, 0, 0)  
    oled.text(line2, 0, 8)  
    oled.text(line3, 0, 16)
    oled.text(line4, 0, 24)
    oled.show()
    time.sleep(10)

此程式將 ThingSpeak 紀錄的 GMT 日期時間字串修整後, 在後面補上兩個 0 拆成 8 元素的串列後傳給 time.maketime() 轉成自 2000/1/1 以來的時戳 (秒), 再加上 28800 秒 (8 小時) 即為台灣時間之時戳, 傳給 time.localtime() 會傳回該時戳對應之日期與時間串列. 由於 MicroPython 沒有實作 strptime() 函數, 所以只好用上面這個方法來轉換.

以上是在 SSD1306 OLED 上顯示從 ThingSpeak 讀取之資料, 如果要在 1602 LCD 上顯示, 可參考 "液晶顯示器 1602A 測試" 這篇的作法. 使用 1602 前須先下載 esp8266_i2c_lcd.py 與 lcd_api.py 這兩個 Python 驅動程式, 下載後用 ampy 上傳到 ESP8266 即可.

將上面測試 4 程式修改為 1602 版如下 :

測試 5 : 讀取 ThingSpeak 通道資料並顯示於 1602 LCD 顯示器 (台灣時間)

import urequests
import time
from machine import Pin, I2C
from esp8266_i2c_lcd import I2cLcd

def fill_zero(n):
    if n < 10:
        return '0' + str(n)
    else:
        return str(n)

def fill_blank(n):    
    if n<10:
        return ' ' + str(n)
    else:
        return str(n)

i2c=I2C(scl=Pin(5),sda=Pin(4),freq=400000)
lcd=I2cLcd(i2c, 0x27, 2, 16)  
lcd.custom_char(0, bytearray([0x1C,0x14,0x1C,0x00,0x00,0x00,0x00,0x00]))    

host='http://api.thingspeak.com'
read_api_key='PWEAKSDX9X9QP1V6'
channel_id='58096'

url='%s/channels/%s/feeds/last.json?api_key=%s' \
     %(host, channel_id, read_api_key)

while True:
    try:
        r=urequests.get(url)
    except:
        print('urequests.get() exception occurred!')
    j=r.json()
    print(j['created_at'],j['entry_id'],j['field1'],j['field2'],
          j['field3'],j['field4'],j['field5'])

    DT=j['created_at'].replace('T','-')
    DT=DT.replace('Z','')
    DT=DT.replace(':','-')
    DT=DT + '-0-0'
    DT=DT.split('-')
    DT=[int(i) for i in DT]
    Y,M,D,H,m,S,W,DY=time.localtime(time.mktime(DT) + 28800)
    YMD='%s-%s-%s' % (str(Y),fill_zero(M),fill_zero(D))
    HmS='%s:%s:%s' % (fill_zero(H),fill_zero(m),fill_zero(S))
    T='%s%s' % (fill_blank(int(j['field1'])),chr(0))  
    H='%sH' % (fill_blank(int(j['field3'])))
    L='%s%%' % (fill_blank(int(j['field4'])))
    Trigger=j['field5']

    lcd.move_to(0, 0)
    lcd.putstr(YMD)
    lcd.move_to(11, 0)    
    lcd.putstr(L)    
    lcd.move_to(15, 0)  
    lcd.putstr(Trigger)  
    lcd.move_to(0, 1)    
    lcd.putstr(HmS)
    lcd.move_to(9, 1)  
    lcd.putstr(T)
    lcd.move_to(13, 1)
    lcd.putstr(H)
    time.sleep(10)




注意, 由於 1602 僅能顯示 16*2=32 個字元, 所以我將顯示資訊的位置重新做了規劃, 完整的時間 HmS 改在第二列開頭顯示, 溫度放在其後; 亮度改至日期後面, 而觸發次數放在第一列的最後一個字元.

沒有留言 :