树莓皮GPIO到Xively python的例子?

2024-06-16 13:47:34 发布

您现在位置:Python中文网/ 问答频道 /正文

我是一个非程序员,通过运行python的Raspberry Pi来体验Xively。我已经成功地完成了在xively网站上给出的教程:https://xively.com/dev/tutorials/pi/

我项目的下一步是不断地从一个GPIO管脚上获取读数(不管它是高还是低),做一些计算,每分钟提供一个数字给Xively提要。在

我有一个单独的程序运行的GPIO部分。但是,当我试图将它整合到我运行的Xively教程示例程序中时,我遇到了一些问题。在

我现在在想,我应该让GPIO程序和Xively程序同时运行,GPIO程序将数据写入文件,Xively程序从该文件读取并将数据上载到提要。所以我的问题是:这是可能的并且相当容易执行吗?在

或者,如果有人可以给我指出一个例子,其中Xively教程示例被修改为接受GPIO输入,那也会有帮助。在

如果有更好/更简单的方法来完成我的目标,我愿意接受建议。。。在


Tags: 文件数据https程序com示例gpio网站
1条回答
网友
1楼 · 发布于 2024-06-16 13:47:34

您需要:

先下载并安装Xively Python库。 下载并安装httplib2模块。在

您可以将“GPIO Sensor”脚本和“Send Data to Xively”脚本结合起来。在

下面是一个示例代码(适用于Raspberry Pi和DHT 11):

#!/usr/bin/python

# Import required Python libraries
import RPi.GPIO as GPIO
import time
import random
import datetime
import json
import sys
import Adafruit_DHT
# Custom HTTP Library
import httplib2

# Use BCM GPIO references
# instead of physical pin numbers
#GPIO.setmode(GPIO.BCM)

# Define GPIO to use on Pi

# Set pin as input
#GPIO.setup(GPIO_PIR,GPIO.IN)

# Define sensor state (1 by default)
SENSOR_STATE = 1

DHT_TYPE = Adafruit_DHT.DHT11
DHT_PIN  = 22
# Define URLs and API key for sending and receiving requests
sensorURL = ""
temperatureURL = ""
humidityURL = ""
API_KEY = ""

# Retrieve latest sensor state from Xively via GET request
def retrieve_sensor_state():
  try:

    global SENSOR_STATE  
    print "\nRetrieving from Xively"

    h = httplib2.Http(disable_ssl_certificate_validation=True)
    (resp_headers, content) = h.request(sensorURL, "GET", headers={'X-ApiKey':API_KEY})
    jsonData = json.loads(content)
    SENSOR_STATE = jsonData["current_value"]
    print "Sensor state: " + SENSOR_STATE
    if (SENSOR_STATE == str(1)):
       upload_to_xively()
       print "continuing!"
    else:
       print "sensor is disabled!"  

  except:
    print "Error occurred!"

# Upload sensor data to Xively via PUT request
def upload_to_xively():
  try:

    print "Uploading to Xively"

    now = str(datetime.datetime.now())
    humidity, value = Adafruit_DHT.read(DHT_TYPE, DHT_PIN)
    if humidity is not None and value is not None:
        print "temperature value :" + str(value) + "    |    " + str(now)
        print "humidity value :" + str(humidity) + "    |    " + str(now)

        jsonString = {"id":"temperature","tags":[],"unit":{},"current_value":value}
        publishString = json.dumps(jsonString)

        h = httplib2.Http(disable_ssl_certificate_validation=True)
        (resp, content) = h.request(temperatureURL, "PUT", body=publishString, headers={'X-ApiKey':API_KEY})

        jsonString1 = {"id":"humidity","tags":[],"unit":{},"current_value":humidity}
        publishString1 = json.dumps(jsonString1)
        g = httplib2.Http(disable_ssl_certificate_validation=True)
        (resp, content) = g.request(humidityURL, "PUT", body=publishString1, headers={'X-    ApiKey':API_KEY})
    else:
    print "failed to read data! Trying again"

  except:
    print "Error occurred!"

# Callback function from event handler when motion is detected
#def motion_callback(GPIO_PIR):

#  global SENSOR_STATE
  # Log the sensor data down only if sensor state = 1 (enabled)
#  if SENSOR_STATE == str(1):
#    if GPIO.input(GPIO_PIR):
#        print "\nMotion Detected! @ " + datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
#        upload_to_xively()
#        time.sleep(1)
#    else:
#        print "\nBack to Normal"

# Main program
print "DHT Module Test (Ctrl+C to exit)"
time.sleep(2)
print "Ready"

try:
  while True: 
    retrieve_sensor_state()
    time.sleep(10)
    continue

except KeyboardInterrupt:
  print "Quit"
  GPIO.cleanup()

相关问题 更多 >