Micropython学习交流群 学习QQ群:786510434 提供多种固件下载和学习交流。

Micropython-扇贝物联 QQ群:31324057 扇贝物联是一个让你与智能设备沟通更方便的物联网云平台

Micropython学习交流群 学习QQ群:468985481 学习交流ESP8266、ESP32、ESP8285、wifi模块开发交流、物联网。

Micropython老哥俩的IT农场分享QQ群:929132891 为喜欢科创制作的小白们分享一些自制的计算机软硬件免费公益课程,由两位多年从事IT研发的中年大叔发起。

Micropython ESP频道

esp32 micropython的ble终于更新配对功能啦!


esp32 micropython的ble终于更新配对功能啦!

其实这个ISSUE在mpy库被提了好久了,不知道为啥,大约五天前突然被解决,也就是说,最新的主分支已经可以用gap_pair功能了,貌似还有个gap_passkey,现在mpy的ble功能终于补上最后一块了,遗憾的是目前还不能进行功率的控制。
github上有个mpy ble hid设备的库,也挺好的,用01studio的手柄我改了程序,能用,但是不能二次链接重连,自己想了半天也没明白,最后鼓起勇气在提了ISSUE,平生第一次提ISSUE,没想到过了一天被答复了,还让我有空测试下功能,感谢伟大的开源精神!
说到开源,其实在国内前途很不好,有文化的原因,也有现实制度原因,国内的玩家更喜欢白嫖,不是批判谁也不是装清高,我是觉得大家是该有点贡献精神的,我虽然大多拿钱办事,但是也不忘贡献点力量,写写博客,偶尔提交点pr,国内主要是生存压力比较大,没有太多自己的时间玩爱好,都喜欢要现成的。我个人觉得01studio真的不如把库放在GITEE,说是开源项目,实际上也就是微信群聊,都搞了4个开源项目了,贡献者好像还是只有我一个,当然主要是老板送我板子我不好意思不贡献哈哈,能在群里唠唠项目提提意见就不错了,能会git操作的、有足够水平的、愿意贡献代码的,层层筛选下来太少了,当然还是有很多活跃用户在群里贡献了代码,只是不习惯GIT操作而已。
当ISSUE被解决时竟然有一丝丝感动,地球的另外一个角落的某个嵌入工程师不嫌弃我的蹩脚塑料英语,不限国家、肤色、生活习性方式,只为了解决同一个问题展开讨论,感觉真好,该死的是那些总夹带私货别有用心的人,污染了环境,想起列侬的imagine了,理想全世界的人无隔阂在一起,太过于美好,虽然知道不可能实现,也忍不住去想想那样一副画面。
拉回正题,搬运了一个ble hid的库,很有用:

# MicroPython Human Interface Device library
# Copyright (C) 2021 H. Groefsema
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.


from micropython import const
import struct
import bluetooth
import json
import binascii
from bluetooth import UUID

F_READ = bluetooth.FLAG_READ
F_WRITE = bluetooth.FLAG_WRITE
F_READ_WRITE = bluetooth.FLAG_READ | bluetooth.FLAG_WRITE
F_READ_NOTIFY = bluetooth.FLAG_READ | bluetooth.FLAG_NOTIFY

ATT_F_READ = 0x01
ATT_F_WRITE = 0x02

# Advertising payloads are repeated packets of the following form:
#   1 byte data length (N + 1)
#   1 byte type (see constants below)
#   N bytes type-specific data
_ADV_TYPE_FLAGS = const(0x01)
_ADV_TYPE_NAME = const(0x09)
_ADV_TYPE_UUID16_COMPLETE = const(0x3)
_ADV_TYPE_UUID32_COMPLETE = const(0x5)
_ADV_TYPE_UUID128_COMPLETE = const(0x7)
_ADV_TYPE_UUID16_MORE = const(0x2)
_ADV_TYPE_UUID32_MORE = const(0x4)
_ADV_TYPE_UUID128_MORE = const(0x6)
_ADV_TYPE_APPEARANCE = const(0x19)

# IRQ peripheral role event codes
_IRQ_CENTRAL_CONNECT = const(1)
_IRQ_CENTRAL_DISCONNECT = const(2)
_IRQ_GATTS_WRITE = const(3)
_IRQ_GATTS_READ_REQUEST = const(4)
_IRQ_SCAN_RESULT = const(5)
_IRQ_SCAN_DONE = const(6)
_IRQ_PERIPHERAL_CONNECT = const(7)
_IRQ_PERIPHERAL_DISCONNECT = const(8)
_IRQ_GATTC_SERVICE_RESULT = const(9)
_IRQ_GATTC_SERVICE_DONE = const(10)
_IRQ_GATTC_CHARACTERISTIC_RESULT = const(11)
_IRQ_GATTC_CHARACTERISTIC_DONE = const(12)
_IRQ_GATTC_DESCRIPTOR_RESULT = const(13)
_IRQ_GATTC_DESCRIPTOR_DONE = const(14)
_IRQ_GATTC_READ_RESULT = const(15)
_IRQ_GATTC_READ_DONE = const(16)
_IRQ_GATTC_WRITE_DONE = const(17)
_IRQ_GATTC_NOTIFY = const(18)
_IRQ_GATTC_INDICATE = const(19)
_IRQ_GATTS_INDICATE_DONE = const(20)
_IRQ_MTU_EXCHANGED = const(21)
_IRQ_L2CAP_ACCEPT = const(22)
_IRQ_L2CAP_CONNECT = const(23)
_IRQ_L2CAP_DISCONNECT = const(24)
_IRQ_L2CAP_RECV = const(25)
_IRQ_L2CAP_SEND_READY = const(26)
_IRQ_CONNECTION_UPDATE = const(27)
_IRQ_ENCRYPTION_UPDATE = const(28)
_IRQ_GET_SECRET = const(29)
_IRQ_SET_SECRET = const(30)
_IRQ_PASSKEY_ACTION = const(31)

_IO_CAPABILITY_DISPLAY_ONLY = const(0)
_IO_CAPABILITY_DISPLAY_YESNO = const(1)
_IO_CAPABILITY_KEYBOARD_ONLY = const(2)
_IO_CAPABILITY_NO_INPUT_OUTPUT = const(3)
_IO_CAPABILITY_KEYBOARD_DISPLAY = const(4)

_PASSKEY_ACTION_INPUT = const(2)
_PASSKEY_ACTION_DISP = const(3)
_PASSKEY_ACTION_NUMCMP = const(4)

class Advertiser:

    # Generate a payload to be passed to gap_advertise(adv_data=...).
    def advertising_payload(self, limited_disc=False, br_edr=False, name=None, services=None, appearance=0):
        payload = bytearray()

        def _append(adv_type, value):
            nonlocal payload
            payload += struct.pack("BB", len(value) + 1, adv_type) + value

        _append(
            _ADV_TYPE_FLAGS,
            struct.pack("B", (0x01 if limited_disc else 0x02) + (0x18 if br_edr else 0x04)),
        )

        if name:
            _append(_ADV_TYPE_NAME, name)

        if services:
            for uuid in services:
                b = bytes(uuid)
                if len(b) == 2:
                    _append(_ADV_TYPE_UUID16_COMPLETE, b)
                elif len(b) == 4:
                    _append(_ADV_TYPE_UUID32_COMPLETE, b)
                elif len(b) == 16:
                    _append(_ADV_TYPE_UUID128_COMPLETE, b)

        # See org.bluetooth.characteristic.gap.appearance.xml
        if appearance:
            _append(_ADV_TYPE_APPEARANCE, struct.pack("<h", appearance))

        return payload


    def decode_field(self, payload, adv_type):
        i = 0
        result = []
        while i + 1 < len(payload):
            if payload[i + 1] == adv_type:
                result.append(payload[i + 2 : i + payload[i] + 1])
            i += 1 + payload[i]
        return result


    def decode_name(self, payload):
        n = self.decode_field(payload, _ADV_TYPE_NAME)
        return str(n[0], "utf-8") if n else ""


    def decode_services(self, payload):
        services = []
        for u in self.decode_field(payload, _ADV_TYPE_UUID16_COMPLETE):
            services.append(bluetooth.UUID(struct.unpack("<h", u)[0]))
        for u in self.decode_field(payload, _ADV_TYPE_UUID32_COMPLETE):
            services.append(bluetooth.UUID(struct.unpack("<d", u)[0]))
        for u in self.decode_field(payload, _ADV_TYPE_UUID128_COMPLETE):
            services.append(bluetooth.UUID(u))
        return services

    # Init as generic HID device (960 = generic HID appearance value)
    def __init__(self, ble, services=[UUID(0x1812)], appearance=const(960), name="Generic HID Device"):
        self._ble = ble
        self._payload = self.advertising_payload(name=name, services=services, appearance=appearance)

        self.advertising = False
        print("Advertiser created: ", self.decode_name(self._payload), " with services: ", self.decode_services(self._payload))

    # Start advertising at 100000 interval
    def start_advertising(self):
        if not self.advertising:
            self._ble.gap_advertise(100000, adv_data=self._payload)
            print("Started advertising")

    # Stop advertising by setting interval of 0
    def stop_advertising(self):
        if self.advertising:
            self._ble.gap_advertise(0, adv_data=self._payload)
            print("Stopped advertising")


# Class that represents a general HID device services
class HumanInterfaceDevice(object):
    DEVICE_STOPPED = const(0)
    DEVICE_IDLE = const(1)
    DEVICE_ADVERTISING = const(2)
    DEVICE_CONNECTED = const(3)

    def __init__(self, device_name="Generic HID Device"):
        self._ble = bluetooth.BLE()
        self.adv = None
        self.device_state = HumanInterfaceDevice.DEVICE_STOPPED
        self.conn_handle = None
        self.state_change_callback = None
        self.io_capability = _IO_CAPABILITY_NO_INPUT_OUTPUT
        self.bond = False
        self.le_secure = False

        print("Server created")

        self.device_name = device_name
        self.service_uuids = [UUID(0x180A), UUID(0x180F), UUID(0x1812)]  # Service UUIDs: DIS, BAS, HIDS
        self.device_appearance = 960                                     # Generic HID Appearance
        self.battery_level = 100

        self.model_number = "1"
        self.serial_number = "1"
        self.firmware_revision = "1"
        self.hardware_revision = "1"
        self.software_revision = "1"
        self.manufacture_name = "Homebrew"
        self.pnp_manufacturer_source = 0x01     # Bluetooth uuid list
        self.pnp_manufacturer_uuid = 0xFE61     # 0xFEB2 for Microsoft, 0xFE61 for Logitech, 0xFD65 for Razer
        self.pnp_product_id = 0x01              # ID 1
        self.pnp_product_version = 0x0123       # Version 1.2.3

        self.DIS = (                            # Device Information Service description
            UUID(0x180A),                       # Device Information
            (
                (UUID(0x2A24), F_READ),         # Model number string
                (UUID(0x2A25), F_READ),         # Serial number string
                (UUID(0x2A26), F_READ),         # Firmware revision string
                (UUID(0x2A27), F_READ),         # Hardware revision string
                (UUID(0x2A28), F_READ),         # Software revision string
                (UUID(0x2A29), F_READ),         # Manufacturer name string
                (UUID(0x2A50), F_READ),         # PnP ID
            ),
        )
        self.BAS = (                            # Battery Service description
            UUID(0x180F),                       # Device Information
            (
                (UUID(0x2A19), F_READ_NOTIFY),  # Battery level
            ),
        )

        self.services = [self.DIS, self.BAS]    # List of service descriptions, append HIDS

        self.HID_INPUT_REPORT = None

        # Passkey for pairing
        # Only used when io capability allows so
        self.passkey = 1234

        # Key store for bonding
        self.keys = {}

        # Load known keys
        self.load_secrets()

    # Interrupt request callback function
    def ble_irq(self, event, data):
        if event == _IRQ_CENTRAL_CONNECT:              # Central connected
            self.conn_handle, _, _ = data              # Save the handle
            print("Central connected: ", self.conn_handle)
            self.set_state(HumanInterfaceDevice.DEVICE_CONNECTED)     # (HIDS specification only allow one central to be connected)
        elif event == _IRQ_CENTRAL_DISCONNECT:         # Central disconnected
            self.conn_handle = None                    # Discard old handle
            conn_handle, addr_type, addr = data
            print("Central disconnected: ", conn_handle)
            self.set_state(HumanInterfaceDevice.DEVICE_IDLE)
        elif event == _IRQ_MTU_EXCHANGED:              # MTU was set
            conn_handle, mtu = data
            print("MTU exchanged: ", mtu)
        elif event == _IRQ_CONNECTION_UPDATE:          # Connection parameters were updated
            self.conn_handle, _, _, _, _ = data        # The new parameters
            print("Connection update")
        elif event == _IRQ_ENCRYPTION_UPDATE:          # Encryption updated
            conn_handle, encrypted, authenticated, bonded, key_size = data
            print("encryption update", conn_handle, encrypted, authenticated, bonded, key_size)
        elif event == _IRQ_PASSKEY_ACTION:             # Passkey actions: accept connection or show/enter passkey
            conn_handle, action, passkey = data
            print("passkey action", conn_handle, action, passkey)
            if action == _PASSKEY_ACTION_NUMCMP:       # Do we accept this connection?
                accept = False
                if self.passkey_callback is not None:  # Is callback function set?
                    accept = self.passkey_callback()   # Call callback for input
                self._ble.gap_passkey(conn_handle, action, accept)
            elif action == _PASSKEY_ACTION_DISP:       # Show our passkey
                print("displaying passkey")
                self._ble.gap_passkey(conn_handle, action, self.passkey)
            elif action == _PASSKEY_ACTION_INPUT:      # Enter passkey
                print("prompting for passkey")
                pk = None
                if self.passkey_callback is not None:  # Is callback function set?
                    pk = self.passkey_callback()       # Call callback for input
                self._ble.gap_passkey(conn_handle, action, pk)
            else:
                print("unknown action")
        elif event == _IRQ_GATTS_INDICATE_DONE:
            conn_handle, value_handle, status = data
            print("gatts done: ", conn_handle)
        elif event == _IRQ_SET_SECRET:                 # Set secret for bonding
            sec_type, key, value = data
            key = sec_type, bytes(key)
            value = bytes(value) if value else None
            print("set secret: ", key, value)
            if value is None:                          # If value is empty, and
                if key in self.keys:                   # If key is known then
                    del self.keys[key]                 # Forget key
                    self.save_secrets()                # Save bonding information
                    return True
                else:
                    return False
            else:
                self.keys[key] = value                 # Remember key/value
                self.save_secrets()                    # Save bonding information
            return True
        elif event == _IRQ_GET_SECRET:                 # Get secret for bonding
            sec_type, index, key = data
            print("get secret: ", sec_type, index, bytes(key) if key else None)
            if key is None:
                i = 0
                for (t, _key), value in self.keys.items():
                    if t == sec_type:
                        if i == index:
                            return value
                        i += 1
                return None
            else:
                key = sec_type, bytes(key)
                return self.keys.get(key, None)
        else:
            print("Unhandled IRQ event: ", event)

    # Start the service
    # Must be overwritten by subclass, and called in
    # the overwritten function by using super(Subclass, self).start()
    # io_capability determines whether and how passkeys are used
    def start(self):
        if self.device_state is HumanInterfaceDevice.DEVICE_STOPPED:
            # Set interrupt request callback function
            self._ble.irq(self.ble_irq)

            # Turn on BLE radio
            self._ble.active(1)

            # Configure BLE interface
            # Set GAP device name
            self._ble.config(gap_name=self.device_name)

            # Configure MTU
            self._ble.config(mtu=23)

            # Allow bonding
            if self.bond:  # calling this on ESP32 is unsupported
                self._ble.config(bond=True)

            if self.le_secure:  # calling these on ESP32 is unsupported
                # Require secure pairing
                self._ble.config(le_secure=True)
                # Require man in the middle protection
                self._ble.config(mitm=True)
                # Set our input/output capabilities
                self._ble.config(io=self.io_capability)

            self.set_state(HumanInterfaceDevice.DEVICE_IDLE)
            print("BLE on")

    # After registering the DIS and BAS services, write their characteristic values
    # Must be overwritten by subclass, and called in
    # the overwritten function by using
    # super(Subclass, self).write_service_characteristics(handles)
    def write_service_characteristics(self, handles):
        print("Writing service characteristics")

        # Get handles to service characteristics
        # These correspond directly to self.DIS and sel.BAS
        (h_mod, h_ser, h_fwr, h_hwr, h_swr, h_man, h_pnp) = handles[0]
        (self.h_bat,) = handles[1]

        def string_pack(in_str):
            return struct.pack(str(len(in_str))+"s", in_str.encode('UTF-8'))

        # Write service characteristics
        print("Writing device information service characteristics")

        self._ble.gatts_write(h_mod, string_pack(self.model_number))
        self._ble.gatts_write(h_ser, string_pack(self.serial_number))
        self._ble.gatts_write(h_fwr, string_pack(self.firmware_revision))
        self._ble.gatts_write(h_hwr, string_pack(self.hardware_revision))
        self._ble.gatts_write(h_swr, string_pack(self.software_revision))
        self._ble.gatts_write(h_man, string_pack(self.manufacture_name))
        # "<B" is now "<BHHH" basis https://www.bluetooth.com/wp-content/uploads/Sitecore-Media-Library/Gatt/Xml/Characteristics/org.bluetooth.characteristic.pnp_id.xml 
        self._ble.gatts_write(h_pnp, struct.pack("<BHHH", self.pnp_manufacturer_source, self.pnp_manufacturer_uuid, self.pnp_product_id, self.pnp_product_version))

        print("Writing battery service characteristics")
        # Battery level
        self._ble.gatts_write(self.h_bat, struct.pack("<B", self.battery_level))

    # Stop the service
    def stop(self):
        if self.device_state is not HumanInterfaceDevice.DEVICE_STOPPED:
            if self.device_state is HumanInterfaceDevice.DEVICE_ADVERTISING:
                self.adv.stop_advertising()

            if self.conn_handle is not None:
                self._ble.gap_disconnect(self.conn_handle)
                self.conn_handle = None

            self._ble.active(0)

            self.set_state(HumanInterfaceDevice.DEVICE_STOPPED)
            print("Server stopped")

    # Load bonding keys from json file
    def load_secrets(self):
        try:
            with open("keys.json", "r") as file:
                entries = json.load(file)
                for sec_type, key, value in entries:
                    self.keys[sec_type, binascii.a2b_base64(key)] = binascii.a2b_base64(value)
        except:
            print("no secrets available")

    # Save bonding keys from json file
    def save_secrets(self):
        try:
            with open("keys.json", "w") as file:
                json_secrets = [
                    (sec_type, binascii.b2a_base64(key), binascii.b2a_base64(value))
                    for (sec_type, key), value in self.keys.items()
                ]
                json.dump(json_secrets, file)
        except:
            print("failed to save secrets")

    def is_running(self):
        return self.device_state is not HumanInterfaceDevice.DEVICE_STOPPED

    def is_connected(self):
        return self.device_state is HumanInterfaceDevice.DEVICE_CONNECTED

    def is_advertising(self):
        return self.device_state is HumanInterfaceDevice.DEVICE_ADVERTISING

    # Set a new state and notify the user's callback function
    def set_state(self, state):
        self.device_state = state
        if self.state_change_callback is not None:
            self.state_change_callback()

    def get_state(self):
        return self.device_state

    # Set a callback function to get notifications of state changes, i.e.
    # - Device stopped
    # - Device idle
    # - Device advertising
    # - Device connected
    def set_state_change_callback(self, callback):
        self.state_change_callback = callback

    def start_advertising(self):
        if self.device_state is not HumanInterfaceDevice.DEVICE_STOPPED and self.device_state is not HumanInterfaceDevice.DEVICE_ADVERTISING:
            self.adv.start_advertising()
            self.set_state(HumanInterfaceDevice.DEVICE_ADVERTISING)

    def stop_advertising(self):
        if self.device_state is not HumanInterfaceDevice.DEVICE_STOPPED:
            self.adv.stop_advertising()
            if self.device_state is not HumanInterfaceDevice.DEVICE_CONNECTED:
                self.set_state(HumanInterfaceDevice.DEVICE_IDLE)

    def get_device_name(self):
        return self.device_name

    def get_services_uuids(self):
        return self.service_uuids

    def get_appearance(self):
        return self.device_appearance

    def get_battery_level(self):
        return self.battery_level

    # Sets the value for the battery level
    def set_battery_level(self, level):
        if level > 100:
            self.battery_level = 100
        elif level < 0:
            self.battery_level = 0
        else:
            self.battery_level = level

    # Set device information
    # Must be called before calling Start()
    # Variables must be Strings
    def set_device_information(self, manufacture_name="Homebrew", model_number="1", serial_number="1"):
        self.manufacture_name = manufacture_name
        self.model_number = model_number
        self.serial_number = serial_number

    # Set device revision
    # Must be called before calling Start()
    # Variables must be Strings
    def set_device_revision(self, firmware_revision="1", hardware_revision="1", software_revision="1"):
        self.firmware_revision = firmware_revision
        self.hardware_revision = hardware_revision
        self.software_revision = software_revision

    # Set device pnp information
    # Must be called before calling Start()
    # Must use the following format:
    #   pnp_manufacturer_source: 0x01 for manufacturers uuid from the Bluetooth uuid list OR 0x02 from the USBs id list
    #   pnp_manufacturer_uuid: 0xFEB2 for Microsoft, 0xFE61 for Logitech, 0xFD65 for Razer with source 0x01
    #   pnp_product_id: One byte, user defined
    #   pnp_product_version: Two bytes, user defined, format as 0xJJMN which corresponds to version JJ.M.N
    def set_device_pnp_information(self, pnp_manufacturer_source=0x01, pnp_manufacturer_uuid=0xFE61, pnp_product_id=0x01, pnp_product_version=0x0123):
        self.pnp_manufacturer_source = pnp_manufacturer_source
        self.pnp_manufacturer_uuid = pnp_manufacturer_uuid
        self.pnp_product_id = pnp_product_id
        self.pnp_product_version = pnp_product_version

    # Set whether to use Bluetooth bonding
    def set_bonding(self, bond):
        self.bond = bond

    # Set whether to use LE secure pairing
    def set_le_secure(self, le_secure):
        self.le_secure = le_secure

    # Set input/output capability of this device
    # Determines the pairing procedure, e.g., accept connection/passkey entry/just works
    # Must be called before calling Start()
    # Must use the following values:
    #   _IO_CAPABILITY_DISPLAY_ONLY
    #   _IO_CAPABILITY_DISPLAY_YESNO
    #   _IO_CAPABILITY_KEYBOARD_ONLY
    #   _IO_CAPABILITY_NO_INPUT_OUTPUT
    #   _IO_CAPABILITY_KEYBOARD_DISPLAY
    def set_io_capability(self, io_capability):
        self.io_capability = io_capability

    # Set callback function for pairing events
    # Depending on the I/O capability used, the callback function should return either a
    # - boolean to accept or deny a connection, or a
    # - passkey that was displayed by the main
    def set_passkey_callback(self, passkey_callback):
        self.passkey_callback = passkey_callback

    # Set the passkey used during pairing when entering a passkey at the main
    def set_passkey(self, passkey):
        self.passkey = passkey

    # Notifies the central by writing to the battery level handle
    def notify_battery_level(self):
        if self.is_connected():
            print("Notify battery level: ", self.battery_level)
            self._ble.gatts_notify(self.conn_handle, self.h_bat, struct.pack("<B", self.battery_level))

    # Notifies the central of the HID state
    # Must be overwritten by subclass
    def notify_hid_report(self):
        return

# Class that represents the Joystick service
class Joystick(HumanInterfaceDevice):
    def __init__(self, name="Bluetooth Joystick"):
        super(Joystick, self).__init__(name)  # Set up the general HID services in super
        self.device_appearance = 963          # Device appearance ID, 963 = joystick

        self.HIDS = (                         # Service description: describes the service and how we communicate
            UUID(0x1812),                     # Human Interface Device
            (
                (UUID(0x2A4A), F_READ),       # HID information
                (UUID(0x2A4B), F_READ),       # HID report map
                (UUID(0x2A4C), F_WRITE),      # HID control point
                (UUID(0x2A4D), F_READ_NOTIFY, ((UUID(0x2908), ATT_F_READ),)),  # HID report / reference
                (UUID(0x2A4E), F_READ_WRITE), # HID protocol mode
            ),
        )

        # fmt: off
        self.HID_INPUT_REPORT = bytes([    # Report Description: describes what we communicate
            0x05, 0x01,                    # USAGE_PAGE (Generic Desktop)
            0x09, 0x04,                    # USAGE (Joystick)
            0xa1, 0x01,                    # COLLECTION (Application)
            0x85, 0x01,                    #   REPORT_ID (1)
            0xa1, 0x00,                    #   COLLECTION (Physical)
            0x09, 0x30,                    #     USAGE (X)
            0x09, 0x31,                    #     USAGE (Y)
            0x15, 0x81,                    #     LOGICAL_MINIMUM (-127)
            0x25, 0x7f,                    #     LOGICAL_MAXIMUM (127)
            0x75, 0x08,                    #     REPORT_SIZE (8)
            0x95, 0x02,                    #     REPORT_COUNT (2)
            0x81, 0x02,                    #     INPUT (Data,Var,Abs)
            0x05, 0x09,                    #     USAGE_PAGE (Button)
            0x29, 0x08,                    #     USAGE_MAXIMUM (Button 8)
            0x19, 0x01,                    #     USAGE_MINIMUM (Button 1)
            0x95, 0x08,                    #     REPORT_COUNT (8)
            0x75, 0x01,                    #     REPORT_SIZE (1)
            0x25, 0x01,                    #     LOGICAL_MAXIMUM (1)
            0x15, 0x00,                    #     LOGICAL_MINIMUM (0)
            0x81, 0x02,                    #     Input (Data, Variable, Absolute)
            0xc0,                          #   END_COLLECTION
            0xc0                           # END_COLLECTION
        ])
        # fmt: on

        # Define the initial joystick state
        self.x = 0
        self.y = 0

        self.button1 = 0
        self.button2 = 0
        self.button3 = 0
        self.button4 = 0
        self.button5 = 0
        self.button6 = 0
        self.button7 = 0
        self.button8 = 0

        self.services = [self.DIS, self.BAS, self.HIDS]  # List of service descriptions

    # Overwrite super to register HID specific service
    # Call super to register DIS and BAS services
    def start(self):
        super(Joystick, self).start()  # Start super

        print("Registering services")
        # Register services and get read/write handles for all services
        handles = self._ble.gatts_register_services(self.services)
        # Write the values for the characteristics
        self.write_service_characteristics(handles)

        # Create an Advertiser
        # Only advertise the top level service, i.e., the HIDS
        self.adv = Advertiser(self._ble, [UUID(0x1812)], self.device_appearance, self.device_name)

        print("Server started")

    # Overwrite super to write HID specific characteristics
    # Call super to write DIS and BAS characteristics
    def write_service_characteristics(self, handles):
        super(Joystick, self).write_service_characteristics(handles)

        # Get the handles from the hids, the third service after DIS and BAS
        # These correspond directly to self.HIDS
        (h_info, h_hid, _, self.h_rep, h_d1, h_proto,) = handles[2]

        # Pack the initial joystick state as described by the input report
        b = self.button1 + self.button2 * 2 + self.button3 * 4 + self.button4 * 8 + self.button5 * 16 + self.button6 * 32 + self.button7 * 64 + self.button8 * 128
        state = struct.pack("bbB", self.x, self.y, b)

        print("Writing hid service characteristics")
        # Write service characteristics
        self._ble.gatts_write(h_info, b"\x01\x01\x00\x02")     # HID info: ver=1.1, country=0, flags=normal
        self._ble.gatts_write(h_hid, self.HID_INPUT_REPORT)    # HID input report map
        self._ble.gatts_write(self.h_rep, state)               # HID report
        self._ble.gatts_write(h_d1, struct.pack("<BB", 1, 1))  # HID reference: id=1, type=input
        self._ble.gatts_write(h_proto, b"\x01")                # HID protocol mode: report

    # Overwrite super to notify central of a hid report
    def notify_hid_report(self):
        if self.is_connected():
            # Pack the joystick state as described by the input report
            b = self.button1 + self.button2 * 2 + self.button3 * 4 + self.button4 * 8 + self.button5 * 16 + self.button6 * 32 + self.button7 * 64 + self.button8 * 128
            state = struct.pack("bbB", self.x, self.y, b)

            print("Notify with report: ", struct.unpack("bbB", state))
            # Notify central by writing to the report handle
            self._ble.gatts_notify(self.conn_handle, self.h_rep, state)

    def set_axes(self, x=0, y=0):
        if x > 127:
            x = 127
        elif x < -127:
            x = -127

        if y > 127:
            y = 127
        elif y < -127:
            y = -127

        self.x = x
        self.y = y

    def set_buttons(self, b1=0, b2=0, b3=0, b4=0, b5=0, b6=0, b7=0, b8=0):
        self.button1 = b1
        self.button2 = b2
        self.button3 = b3
        self.button4 = b4
        self.button5 = b5
        self.button6 = b6
        self.button7 = b7
        self.button8 = b8

# Class that represents the Mouse service
class Mouse(HumanInterfaceDevice):
    def __init__(self, name="Bluetooth Mouse"):
        super(Mouse, self).__init__(name)     # Set up the general HID services in super
        self.device_appearance = 962          # Device appearance ID, 962 = mouse

        self.HIDS = (                         # Service description: describes the service and how we communicate
            UUID(0x1812),                     # Human Interface Device
            (
                (UUID(0x2A4A), F_READ),       # HID information
                (UUID(0x2A4B), F_READ),       # HID report map
                (UUID(0x2A4C), F_WRITE),      # HID control point
                (UUID(0x2A4D), F_READ_NOTIFY, ((UUID(0x2908), ATT_F_READ),)),  # HID report / reference
                (UUID(0x2A4E), F_READ_WRITE), # HID protocol mode
            ),
        )

        # fmt: off
        self.HID_INPUT_REPORT = bytes([    # Report Description: describes what we communicate
            0x05, 0x01,                    # USAGE_PAGE (Generic Desktop)
            0x09, 0x02,                    # USAGE (Mouse)
            0xa1, 0x01,                    # COLLECTION (Application)
            0x85, 0x01,                    #   REPORT_ID (1)
            0x09, 0x01,                    #   USAGE (Pointer)
            0xa1, 0x00,                    #   COLLECTION (Physical)
            0x05, 0x09,                    #         Usage Page (Buttons)
            0x19, 0x01,                    #         Usage Minimum (1)
            0x29, 0x03,                    #         Usage Maximum (3)
            0x15, 0x00,                    #         Logical Minimum (0)
            0x25, 0x01,                    #         Logical Maximum (1)
            0x95, 0x03,                    #         Report Count (3)
            0x75, 0x01,                    #         Report Size (1)
            0x81, 0x02,                    #         Input(Data, Variable, Absolute); 3 button bits
            0x95, 0x01,                    #         Report Count(1)
            0x75, 0x05,                    #         Report Size(5)
            0x81, 0x03,                    #         Input(Constant);                 5 bit padding
            0x05, 0x01,                    #         Usage Page (Generic Desktop)
            0x09, 0x30,                    #         Usage (X)
            0x09, 0x31,                    #         Usage (Y)
            0x09, 0x38,                    #         Usage (Wheel)
            0x15, 0x81,                    #         Logical Minimum (-127)
            0x25, 0x7F,                    #         Logical Maximum (127)
            0x75, 0x08,                    #         Report Size (8)
            0x95, 0x03,                    #         Report Count (3)
            0x81, 0x06,                    #         Input(Data, Variable, Relative); 3 position bytes (X,Y,Wheel)
            0xc0,                          #   END_COLLECTION
            0xc0                           # END_COLLECTION
        ])
        # fmt: on

        # Define the initial mouse state
        self.x = 0
        self.y = 0
        self.w = 0

        self.button1 = 0
        self.button2 = 0
        self.button3 = 0

        self.services = [self.DIS, self.BAS, self.HIDS]  # List of service descriptions

    # Overwrite super to register HID specific service
    # Call super to register DIS and BAS services
    def start(self):
        super(Mouse, self).start()  # Start super

        print("Registering services")
        # Register services and get read/write handles for all services
        handles = self._ble.gatts_register_services(self.services)
        # Write the values for the characteristics
        self.write_service_characteristics(handles)

        # Create an Advertiser
        # Only advertise the top level service, i.e., the HIDS
        self.adv = Advertiser(self._ble, [UUID(0x1812)], self.device_appearance, self.device_name)

        print("Server started")

    # Overwrite super to write HID specific characteristics
    # Call super to write DIS and BAS characteristics
    def write_service_characteristics(self, handles):
        super(Mouse, self).write_service_characteristics(handles)

        # Get the handles from the hids, the third service after DIS and BAS
        # These correspond directly to self.HIDS
        (h_info, h_hid, _, self.h_rep, h_d1, h_proto,) = handles[2]

        # Pack the initial mouse state as described by the input report
        b = self.button1 + self.button2 * 2 + self.button3 * 4
        state = struct.pack("Bbbb", b, self.x, self.y, self.w)

        print("Writing hid service characteristics")
        # Write service characteristics
        self._ble.gatts_write(h_info, b"\x01\x01\x00\x02")     # HID info: ver=1.1, country=0, flags=normal
        self._ble.gatts_write(h_hid, self.HID_INPUT_REPORT)    # HID input report map
        self._ble.gatts_write(self.h_rep, state)               # HID report
        self._ble.gatts_write(h_d1, struct.pack("<BB", 1, 1))  # HID reference: id=1, type=input
        self._ble.gatts_write(h_proto, b"\x01")                # HID protocol mode: report

    # Overwrite super to notify central of a hid report
    def notify_hid_report(self):
        if self.is_connected():
            # Pack the mouse state as described by the input report
            b = self.button1 + self.button2 * 2 + self.button3
            state = struct.pack("Bbbb", b, self.x, self.y, self.w)

            print("Notify with report: ", struct.unpack("Bbbb", state))
            # Notify central by writing to the report handle
            self._ble.gatts_notify(self.conn_handle, self.h_rep, state)

    def set_axes(self, x=0, y=0):
        if x > 127:
            x = 127
        elif x < -127:
            x = -127

        if y > 127:
            y = 127
        elif y < -127:
            y = -127

        self.x = x
        self.y = y

    def set_wheel(self, w=0):
        if w > 127:
            w = 127
        elif w < -127:
            w = -127

        self.w = w

    def set_buttons(self, b1=0, b2=0, b3=0):
        self.button1 = b1
        self.button2 = b2
        self.button3 = b3

# Class that represents the Keyboard service
class Keyboard(HumanInterfaceDevice):
    def __init__(self, name="Bluetooth Keyboard"):
        super(Keyboard, self).__init__(name)  # Set up the general HID services in super
        self.device_appearance = 961          # Device appearance ID, 961 = keyboard

        self.HIDS = (                         # Service description: describes the service and how we communicate
            UUID(0x1812),                     # Human Interface Device
            (
                (UUID(0x2A4A), F_READ),       # HID information
                (UUID(0x2A4B), F_READ),       # HID report map
                (UUID(0x2A4C), F_WRITE),      # HID control point
                (UUID(0x2A4D), F_READ_NOTIFY, ((UUID(0x2908), ATT_F_READ),)),  # HID report / reference
                (UUID(0x2A4D), F_READ_WRITE, ((UUID(0x2908), ATT_F_READ),)),  # HID report / reference
                (UUID(0x2A4E), F_READ_WRITE), # HID protocol mode
            ),
        )

        # fmt: off
        self.HID_INPUT_REPORT = bytes([    # Report Description: describes what we communicate
            0x05, 0x01,                    # USAGE_PAGE (Generic Desktop)
            0x09, 0x06,                    # USAGE (Keyboard)
            0xa1, 0x01,                    # COLLECTION (Application)
            0x85, 0x01,                    #     REPORT_ID (1)
            0x75, 0x01,                    #     Report Size (1)
            0x95, 0x08,                    #     Report Count (8)
            0x05, 0x07,                    #     Usage Page (Key Codes)
            0x19, 0xE0,                    #     Usage Minimum (224)
            0x29, 0xE7,                    #     Usage Maximum (231)
            0x15, 0x00,                    #     Logical Minimum (0)
            0x25, 0x01,                    #     Logical Maximum (1)
            0x81, 0x02,                    #     Input (Data, Variable, Absolute); Modifier byte
            0x95, 0x01,                    #     Report Count (1)
            0x75, 0x08,                    #     Report Size (8)
            0x81, 0x01,                    #     Input (Constant); Reserved byte
            0x95, 0x05,                    #     Report Count (5)
            0x75, 0x01,                    #     Report Size (1)
            0x05, 0x08,                    #     Usage Page (LEDs)
            0x19, 0x01,                    #     Usage Minimum (1)
            0x29, 0x05,                    #     Usage Maximum (5)
            0x91, 0x02,                    #     Output (Data, Variable, Absolute); LED report
            0x95, 0x01,                    #     Report Count (1)
            0x75, 0x03,                    #     Report Size (3)
            0x91, 0x01,                    #     Output (Constant); LED report padding
            0x95, 0x06,                    #     Report Count (6)
            0x75, 0x08,                    #     Report Size (8)
            0x15, 0x00,                    #     Logical Minimum (0)
            0x25, 0x65,                    #     Logical Maximum (101)
            0x05, 0x07,                    #     Usage Page (Key Codes)
            0x19, 0x00,                    #     Usage Minimum (0)
            0x29, 0x65,                    #     Usage Maximum (101)
            0x81, 0x00,                    #     Input (Data, Array); Key array (6 bytes)
            0xc0                           # END_COLLECTION
        ])
        # fmt: on

        # Define the initial keyboard state
        self.modifiers = 0             # 8 bits signifying Right GUI(Win/Command), Right ALT/Option, Right Shift, Right Control, Left GUI, Left ALT, Left Shift, Left Control
        self.keypresses = [0x00] * 6   # 6 keys to hold

        # Callback function for keyboard messages from central
        self.kb_callback = None

        self.services = [self.DIS, self.BAS, self.HIDS]  # List of service descriptions

    # Interrupt request callback function
    # Overwrite super to catch keyboard report write events by the central
    def ble_irq(self, event, data):
        if event == _IRQ_GATTS_WRITE:                   # If a client has written to a characteristic or descriptor.
            print("Keyboard changed by Central")
            conn_handle, attr_handle = data             # Get the handle to the characteristic that was written
            report = self._ble.gatts_read(attr_handle)  # Read the report
            bytes = struct.unpack("B", report)          # Unpack the report
            if self.kb_callback is not None:            # Call the callback function
                self.kb_callback(bytes)
        else:                                           # Else let super handle the event
            super(Keyboard, self).ble_irq(event, data)

    # Overwrite super to register HID specific service
    # Call super to register DIS and BAS services
    def start(self):
        super(Keyboard, self).start()  # Start super

        print("Registering services")
        # Register services and get read/write handles for all services
        handles = self._ble.gatts_register_services(self.services)
        # Write the values for the characteristics
        self.write_service_characteristics(handles)

        # Create an Advertiser
        # Only advertise the top level service, i.e., the HIDS
        self.adv = Advertiser(self._ble, [UUID(0x1812)], self.device_appearance, self.device_name)

        print("Server started")

    # Overwrite super to write HID specific characteristics
    # Call super to write DIS and BAS characteristics
    def write_service_characteristics(self, handles):
        super(Keyboard, self).write_service_characteristics(handles)

        # Get the handles from the hids, the third service after DIS and BAS
        # These correspond directly to self.HIDS
        (h_info, h_hid, _, self.h_rep, h_d1, self.h_repout, h_d2, h_proto,) = handles[2]

        print("Writing hid service characteristics")
        # Write service characteristics
        self._ble.gatts_write(h_info, b"\x01\x01\x00\x02")     # HID info: ver=1.1, country=0, flags=normal
        self._ble.gatts_write(h_hid, self.HID_INPUT_REPORT)    # HID input report map
        self._ble.gatts_write(h_d1, struct.pack("<BB", 1, 1))  # HID reference: id=1, type=input
        self._ble.gatts_write(h_d2, struct.pack("<BB", 1, 2))  # HID reference: id=1, type=output
        self._ble.gatts_write(h_proto, b"\x01")                # HID protocol mode: report

    # Overwrite super to notify central of a hid report
    def notify_hid_report(self):
        if self.is_connected():
            # Pack the Keyboard state as described by the input report
            state = struct.pack("8B", self.modifiers, 0, self.keypresses[0], self.keypresses[1], self.keypresses[2], self.keypresses[3], self.keypresses[4], self.keypresses[5])

            print("Notify with report: ", struct.unpack("8B", state))
            # Notify central by writing to the report handle
            self._ble.gatts_notify(self.conn_handle, self.h_rep, state)

    # Set the modifier bits, notify to send the modifiers to central
    def set_modifiers(self, right_gui=0, right_alt=0, right_shift=0, right_control=0, left_gui=0, left_alt=0, left_shift=0, left_control=0):
        self.modifiers = (right_gui << 7) + (right_alt << 6) + (right_shift << 5) + (right_control << 4) + (left_gui << 3) + (left_alt << 2) + (left_shift << 1) + left_control

    # Press keys, notify to send the keys to central
    # This will hold down the keys, call set_keys() without arguments and notify again to release
    def set_keys(self, k0=0x00, k1=0x00, k2=0x00, k3=0x00, k4=0x00, k5=0x00):
        self.keypresses = [k0, k1, k2, k3, k4, k5]

    # Set a callback function that gets notified on keyboard changes
    # Should take a tuple with the report bytes
    def set_kb_callback(self, kb_callback):
        self.kb_callback = kb_callback


来源:https://blog.csdn.net/jd3096/article/details/126000918?spm=1001.2014.3001.5502



推荐分享
图文皆来源于网络,内容仅做公益性分享,版权归原作者所有,如有侵权请告知删除!
 

Copyright © 2014 ESP56.com All Rights Reserved

执行时间: 0.01007604598999 seconds