如何使用python检测firebase子级中的更改?

2024-09-27 09:33:40 发布

您现在位置:Python中文网/ 问答频道 /正文

我对这个应用程序有一些问题。我需要的是,如果我检测到数据库(FIREBASE)的变化,特别是在'sala'和'ventilacion'节点中,函数会做它必须做的。如果数据库没有任何变化,它也不会做任何事情。我正在使用python和pyresebase库。这是密码。非常感谢你的帮助。在

            import pyrebase
            import serial
            import time
            config = {
                        #firebase configurations
                 }


            firebase = pyrebase.initialize_app(config)


            db = firebase.database()
            def ReconfiguracionFabrica():
                    ser.write('AT')
                    time.sleep(0.2)
                    ser.write('AT+RENEW')
                    time.sleep(0.3)

            def ConfiguracionMaster():
                    time.sleep(0.5)
                    ser.write('AT+IMME1')
                    time.sleep(0.350)
                    ser.write('AT+ROLE1')
                    time.sleep(0.2)     

            ser=serial.Serial(port="/dev/ttyAMA0", baudrate=9600, timeout=1)
            ReconfiguracionFabrica()
            time.sleep(0.1)
            ConfiguracionMaster()
            time.sleep(0.1)

            print "**********   INICIO  *************"

            ser.flushInput()
            contador=0
            prender= ''
            ventilacion1= ''
            checkeo= ''

            while True:
                #if db.child("sala").: # It is the line where would be the conditional that allows me to detect any change only in the sala's node.
                            salidaLed1 = db.child("sala").get()
                            ser.write('AT')
                            time.sleep(0.1)
                            ser.write('AT+CON508CB16A7014')
                            time.sleep(0.1)
                            if salidaLed1.val()== True:
                                    prender=";"
                            if salidaLed1.val()== False:
                                    prender=","

                            ser.write('luz: %s \n' %(prender))
                            print ('luz: %s \n' %(prender))
                            time.sleep(1)
                            ser.read(checkeo)
                            if checkeo== 'j':
                                    ReconfiguracionFabrica()
                                    time.sleep(0.1)
                                    ConfiguracionMaster()

Tags: theimportdbiftimesleepatser
2条回答

我知道这篇文章已经有2年了,但希望这篇文章能有所帮助。尝试使用firebase_管理模块。在

使用以下命令-pip install firebase admin

我也有一个要求,我需要检查对Firebase数据库所做的更改。我提到了here

下面是一个基于您的问题的示例代码,您可以参考并尝试它。在

import firebase_admin
from firebase_admin import credentials
from firebase_admin import db


cred = credentials.Certificate("path/to/serviceAccountKey.json")
firebase_admin.initialize_app(cred, {
    'databaseURL': 'https://example.firebaseio.com',
    'databaseAuthVariableOverride': None
})


def ignore_first_call(fn):
    called = False

    def wrapper(*args, **kwargs):
        nonlocal called
        if called:
            return fn(*args, **kwargs)
        else:
            called = True
            return None

    return wrapper


@ignore_first_call
def listener(event):
    print(event.event_type)  # can be 'put' or 'patch'
    print(event.path)  # relative to the reference, it seems
    print(event.data)  # new data at /reference/event.path. None if deleted

    node = str(event.path).split('/')[-2] #you can slice the path according to your requirement
    property = str(event.path).split('/')[-1] 
    value = event.data
    if (node=='sala'):
        #do something
    elif (node=='ventilacion'):
        #do something
    else:
        #do something else


db.reference('/').listen(listener)

Question: How to detect changes in firebase child


Note: All Examples use Public Access

  1. 设置示例数据并验证其可读性。
    这个阶段要完成一次!在

    enter image description here

    temperature_c = 30
    data = {'date':time.strftime('%Y-%m-%d'), 
            'time':time.strftime('%H:%M:%S'), 
            'temperature':temperature_c}
    db.child('public').child('Device_1').set(data)
    
    response = db.child('public').child('Device_1').get()
    print(response.val())
    
  2. 创建第一个执行更新的脚本:

    for t in [25, 26, 27, 28, 29, 30, 31, 32, 33, 35]:
        temperature_c = t
        data = {'date':time.strftime('%Y-%m-%d'), 
                'time':time.strftime('%H:%M:%S'), 
                'temperature':temperature_c}
        db.child('public').child('Device_1').update(data)
        time.sleep(60)
    
  3. 使用流处理程序创建第二个脚本

    ^{3美元
  4. 运行流处理程序脚本:

    启动后def stream_handler的响应输出(初始数据):

    event="put"; path=/;  data={'Device_1': {'temperature': 30, 'time': '13:34:24', 'date': '2017-07-20'}}
    
  5. 运行更新程序脚本:

  6. 监视流处理程序脚本的输出

    在第一次更新数据之后,def stream_handler的响应输出:

    event=patch; path=/Device_1;  data={'temperature': 25, 'time': '13:49:12'}
    

用Python:3.4.2测试


Pyrebase
streaming

You can listen to live changes to your data with the stream() method.

def stream_handler(message):
    print(message["event"]) # put
    print(message["path"]) # /-K7yGTTEp7O549EzTYtI
    print(message["data"]) # {'title': 'Pyrebase', "body": "etc..."}

my_stream = db.child("posts").stream(stream_handler)

You should at least handle put and patch events. Refer to "Streaming from the REST API" for details.

相关问题 更多 >

    热门问题