Arduino Raspberry Pi,使用DBUS API的蓝牙连接

2024-06-01 06:44:18 发布

您现在位置:Python中文网/ 问答频道 /正文

任务是使用基于python脚本的D-BusAPI,通过蓝牙自动化Arduino和Raspberry Pi之间的配对和连接过程

连接到Arduino的蓝牙模块为:Grove - Serial Bluetooth v3.0.

我能够自动化配对过程。配对脚本按顺序执行以下操作:

  1. 它通过创建适配器对象并使用StartDiscovery方法来查找名为Slave的蓝牙模块(命名在Arduino中完成)
  2. 注册蓝牙代理。
  3. 如果设备尚未配对,则通过配对方法创建设备对象和配对。

执行上述步骤的代码部分如下所示:

register_agent()
start_discovery() 
time.sleep(10) 
for i in range(len(address_list)):
    new_dbus_device = get_device(address_list[i])
    dev_path = new_dbus_device.object_path
    device_properties = dbus.Interface(new_dbus_device, "org.freedesktop.DBus.Properties")
    pair_status = device_properties.Get("org.bluez.Device1", "Paired")
    if not pair_status:
        new_dbus_device.Pair(reply_handler=pair_reply, error_handler=pair_error, timeout=60000)

以下是注册代理()根据请求所做的操作:

def register_agent():    
    agent = Agent(bus, path)
    capability = "NoInputNoOutput"
    obj = bus.get_object(BUS_NAME, "/org/bluez");
    manager = dbus.Interface(obj, "org.bluez.AgentManager1")
    manager.RegisterAgent(path, capability)

然而,当我尝试调用Bluez的device-api中描述的Connect方法时,它总是失败。我创建了一个自定义的串行端口配置文件,并尝试了连接配置文件方法,但再次失败

如果我使用不推荐使用的rfcomm工具,蓝牙通信可以正常工作,或者如果我使用python套接字模块,蓝牙通信也可以正常工作。但是,由于不推荐使用,我希望避免使用rfcomm。关于使用python套接字库,连接仅在rfcomm通道1中工作,其他通道产生连接拒绝错误

添加MRE,以便更好地澄清问题:

import dbus
import dbus.service
import dbus.mainloop.glib
import sys
import time
import subprocess

from bluezutils import *
from bluetooth import *
from gi.repository import GObject, GLib
from dbus.mainloop.glib import DBusGMainLoop

DBusGMainLoop(set_as_default=True) 

path = "/test/agent"
AGENT_INTERFACE = 'org.bluez.Agent1'
BUS_NAME = 'org.bluez'
bus = dbus.SystemBus() 

device_obj = None
dev_path = None

def set_trusted(path2):
    props = dbus.Interface(bus.get_object("org.bluez", path2),
                    "org.freedesktop.DBus.Properties")
    props.Set("org.bluez.Device1", "Trusted", True)

class Rejected(dbus.DBusException):
    _dbus_error_name = "org.bluez.Error.Rejected"
    
class Agent(dbus.service.Object):
    exit_on_release = True

    def set_exit_on_release(self, exit_on_release):
        self.exit_on_release = exit_on_release

    @dbus.service.method(AGENT_INTERFACE,
                    in_signature="", out_signature="")
    def Release(self):
        print("Release")
        if self.exit_on_release:
            mainloop.quit()

    @dbus.service.method(AGENT_INTERFACE,
                    in_signature="os", out_signature="")
    def AuthorizeService(self, device, uuid):
        print("AuthorizeService (%s, %s)" % (device, uuid))
        return 

    @dbus.service.method(AGENT_INTERFACE,
                    in_signature="o", out_signature="s")
    def RequestPinCode(self, device):
        set_trusted(device)
        return "0000" 

    @dbus.service.method(AGENT_INTERFACE,
                    in_signature="o", out_signature="u")
    def RequestPasskey(self, device): 
        set_trusted(device)
        return dbus.UInt32("0000") 

    @dbus.service.method(AGENT_INTERFACE,
                    in_signature="ou", out_signature="")
    def RequestConfirmation(self, device, passkey):
        set_trusted(device)
        return 
        
    @dbus.service.method(AGENT_INTERFACE,
                    in_signature="o", out_signature="")
    def RequestAuthorization(self, device):
        return 

    @dbus.service.method(AGENT_INTERFACE,
                    in_signature="", out_signature="")
    def Cancel(self):
        print("Cancel")

def pair_reply():
    print("Device paired and trusted")
    set_trusted(dev_path) 
    
def pair_error(error):
    err_name = error.get_dbus_name()
    if err_name == "org.freedesktop.DBus.Error.NoReply" and device_obj:
        print("Timed out. Cancelling pairing")
        device_obj.CancelPairing()
    else:
        print("Creating device failed: %s" % (error))
    mainloop.quit() 
    
def register_agent():    
    agent = Agent(bus, path)
    capability = "NoInputNoOutput"
    obj = bus.get_object(BUS_NAME, "/org/bluez");
    manager = dbus.Interface(obj, "org.bluez.AgentManager1")
    manager.RegisterAgent(path, capability)
    
def start_discovery():
    global pi_adapter
    pi_adapter = find_adapter() 
    scan_filter = dict({"DuplicateData": False}) 
    pi_adapter.SetDiscoveryFilter(scan_filter)
    pi_adapter.StartDiscovery()
    
def stop_discovery():
    pi_adapter.StopDiscovery()
    
def get_device(dev_str):
    # use [Service] and [Object path]:
    device_proxy_object = bus.get_object("org.bluez","/org/bluez/hci0/dev_"+dev_str)
    # use [Interface]:
    device1 = dbus.Interface(device_proxy_object,"org.bluez.Device1")
    return device1

def char_changer(text):
    text = text.replace(':', r'_')
    return text

def slave_finder(device_name):
    
    global sublist_normal
    sublist_normal = []
    sublist= []
    address = []
    edited_address = None
    sub = subprocess.run(["hcitool scan"], text = True, shell = True, capture_output=True)
    print(sub.stdout) #string type
    sublist = sub.stdout.split()
    for i in range(len(sublist)):
        if sublist[i] == device_name:
            print(sublist[i-1])
            sublist_normal.append(sublist[i-1])
            edited_address = char_changer(sublist[i-1])
            address.append(edited_address)
    return address
    
def remove_all_paired():
    for i in range(len(sublist_normal)):
        sub = subprocess.run(["bluetoothctl remove " + sublist_normal[i]], text = True, shell = True, capture_output=True)
        time.sleep(1)
    
    
if __name__ == '__main__':
    
    
    pair_status = None
    address_list = slave_finder('Slave') #Arduino bluetooth module named as "Slave", here we are finding it.
    time.sleep(2)
    remove_all_paired() #rfcomm requires repairing after release
    print(sublist_normal)
    if address_list: 
        register_agent()
        start_discovery() 
        time.sleep(10) 
        for i in range(len(address_list)):
            new_dbus_device = get_device(address_list[i])
            dev_path = new_dbus_device.object_path
            device_properties = dbus.Interface(new_dbus_device, "org.freedesktop.DBus.Properties")
            pair_status = device_properties.Get("org.bluez.Device1", "Paired")
            if not pair_status:
                new_dbus_device.Pair(reply_handler=pair_reply, error_handler=pair_error, timeout=60000)
    
    
    mainloop = GLib.MainLoop()
    mainloop.run()

sudo btmon输出:

Bluetooth monitor ver 5.50
= Note: Linux version 5.4.83-v7l+ (armv7l)                             0.832473
= Note: Bluetooth subsystem version 2.22                               0.832478
= New Index: DC:A6:32:58:FE:13 (Primary,UART,hci0)              [hci0] 0.832481
= Open Index: DC:A6:32:58:FE:13                                 [hci0] 0.832484
= Index Info: DC:A6:32:5.. (Cypress Semiconductor Corporation)  [hci0] 0.832487
@ MGMT Open: bluetoothd (privileged) version 1.14             {0x0001} 0.832490
@ MGMT Open: btmon (privileged) version 1.14                  {0x0002} 0.832540

所以问题是为什么ConnectConnectProfile方法总是失败,我需要做什么来建立Arduino和Raspberry Pi之间基于D-BUS API的蓝牙通信


Tags: pathinorgimportselfaddressdevicedef
1条回答
网友
1楼 · 发布于 2024-06-01 06:44:18

我想你已经回答了你自己的问题。一个Connect不起作用的原因是您没有在Raspberry Pi上运行串行端口配置文件(SPP)。如果运行btmon,您将看到客户端断开连接,因为没有与Arduino提供的配置文件相匹配的配置文件

Python套接字连接中使用的端口号需要与Arduino蓝牙模块上的端口号匹配。这可能就是为什么只有1在工作

作为参考,我测试了一个树莓皮和HC-06我躺在附近。我删除了所有的扫描代码,试图得到最小的可运行代码。以下是我使用的代码:

import socket
from time import sleep
import dbus
import dbus.service
import dbus.mainloop.glib
from gi.repository import GLib

BUS_NAME = 'org.bluez'
AGENT_IFACE = 'org.bluez.Agent1'
AGNT_MNGR_IFACE = 'org.bluez.AgentManager1'
ADAPTER_IFACE = 'org.bluez.Adapter1'
AGENT_PATH = '/ukBaz/bluezero/agent'
AGNT_MNGR_PATH = '/org/bluez'
DEVICE_IFACE = 'org.bluez.Device1'
CAPABILITY = 'KeyboardDisplay'
my_adapter_address = '11:22:33:44:55:66'
my_device_path = '/org/bluez/hci0/dev_00_00_12_34_56_78'
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SystemBus()


class Agent(dbus.service.Object):

    @dbus.service.method(AGENT_IFACE,
                         in_signature='o', out_signature='s')
    def RequestPinCode(self, device):
        print(f'RequestPinCode {device}')
        return '0000'


class Device:
    def __init__(self, device_path):
        dev_obj = bus.get_object(BUS_NAME, device_path)
        self.methods = dbus.Interface(dev_obj, DEVICE_IFACE)
        self.props = dbus.Interface(dev_obj, dbus.PROPERTIES_IFACE)
        self._port = 1
        self._client_sock = socket.socket(socket.AF_BLUETOOTH,
                                          socket.SOCK_STREAM,
                                          socket.BTPROTO_RFCOMM)

    def connect(self):
        # self.methods.Connect()
        self._client_sock.bind((my_adapter_address, self._port))
        self._client_sock.connect((self.address, self._port))

    def disconnect(self):
        self.methods.Disconnect()

    def pair(self):
        self.methods.Pair(reply_handler=self._pair_reply,
                          error_handler=self._pair_error)

    def _pair_reply(self):
        print(f'Device trusted={self.trusted}, connected={self.connected}')
        self.trusted = True
        print(f'Device trusted={self.trusted}, connected={self.connected}')
        while self.connected:
            sleep(0.5)
        self.connect()
        print('Successfully paired and connected')

    def _pair_error(self, error):
        err_name = error.get_dbus_name()
        print(f'Creating device failed: {err_name}')

    @property
    def trusted(self):
        return bool(self.props.Get(DEVICE_IFACE, 'Trusted'))

    @trusted.setter
    def trusted(self, value):
        self.props.Set(DEVICE_IFACE, 'Trusted', bool(value))

    @property
    def paired(self):
        return bool(self.props.Get(DEVICE_IFACE, 'Paired'))

    @property
    def connected(self):
        return bool(self.props.Get(DEVICE_IFACE, 'Connected'))

    @property
    def address(self):
        return str(self.props.Get(DEVICE_IFACE, 'Address'))


if __name__ == '__main__':
    agent = Agent(bus, AGENT_PATH)

    agnt_mngr = dbus.Interface(bus.get_object(BUS_NAME, AGNT_MNGR_PATH),
                               AGNT_MNGR_IFACE)
    agnt_mngr.RegisterAgent(AGENT_PATH, CAPABILITY)

    device = Device(my_device_path)
    if device.paired:
        device.connect()
    else:
        device.pair()


    mainloop = GLib.MainLoop()
    try:
        mainloop.run()
    except KeyboardInterrupt:
        agnt_mngr.UnregisterAgent(AGENT_PATH)
        mainloop.quit()

相关问题 更多 >