Examples

An example can often speed things up when you are trying to get started with a library so there are few below. There is also a Getting Started workshop using a Raspberry Pi and a BBC micro:bit

Adapter

This example prints out the status of the Bluetooth device on your Linux computer. It also checks to see if it is enabled (powered) before scanning for nearby Bluetooth devices:

from bluezero import adapter
import logging
try:  # Python 2.7+
    from logging import NullHandler
except ImportError:
    class NullHandler(logging.Handler):
        def emit(self, record):
            pass


def main():
    dongles = adapter.list_adapters()
    print('dongles available: ', dongles)
    dongle = adapter.Adapter(dongles[0])

    print('address: ', dongle.address)
    print('name: ', dongle.name)
    print('alias: ', dongle.alias)
    print('powered: ', dongle.powered)
    print('pairable: ', dongle.pairable)
    print('pairable timeout: ', dongle.pairabletimeout)
    print('discoverable: ', dongle.discoverable)
    print('discoverable timeout: ', dongle.discoverabletimeout)
    print('discovering: ', dongle.discovering)
    print('Powered: ', dongle.powered)
    if not dongle.powered:
        dongle.powered = True
        print('Now powered: ', dongle.powered)
    print('Start discovering')
    dongle.nearby_discovery()
    # dongle.powered = False


if __name__ == '__main__':
    print(__name__)
    logger = logging.getLogger('adapter')
    logger.setLevel(logging.DEBUG)
    logger.addHandler(NullHandler())
    main()

Central Role

This example uses the micro:bit API that has been written in Bluezero to interact with the micro:bit

import time
from bluezero import microbit

ubit = microbit.Microbit(adapter_addr='B8:27:EB:22:57:E0',
                         device_addr='E3:AC:D2:F8:EB:B9')

ubit.connect()

assert ubit.pixels == [0b01110,
                       0b10000,
                       0b10000,
                       0b10000,
                       0b01110]

ubit.scroll_delay = 20
delay = ubit.scroll_delay
ubit.text = 'Scroll speed {}'.format(delay)
time.sleep(5)
ubit.text = 'This is a really long string '
time.sleep(5)

while not ubit.button_a:
    ubit.pixels = [0b00000,
                   0b01000,
                   0b11111,
                   0b01000,
                   0b00000]
    time.sleep(0.5)
    ubit.clear_display()

while not ubit.button_b:
    ubit.pixels = [0b00000,
                   0b00010,
                   0b11111,
                   0b00010,
                   0b00000]
    time.sleep(0.5)
    ubit.clear_display()

ubit.clear_display()
ubit.scroll_delay = 120
ubit.text = '{0}'.format(ubit.temperature)
time.sleep(5)

ubit.text = '{0}'.format(ubit.accelerometer)
time.sleep(5)

ubit.disconnect()

Scanner: Eddystone

This example scans for beacons in the Eddystone format. It will report on UID beacons <https://github.com/google/eddystone/tree/master/eddystone-uid> and URL beacons <https://github.com/google/eddystone/tree/master/eddystone-url>.

This uses the aioblescan Python library which requires your code to be run with sudo

from bluezero import observer


def print_eddystone_values(data):
    """

    :param data:
    :return:
    """
    expected_keys = {'name space': 'hex_format',
                     'instance': 'hex_format',
                     'url': 'string_format',
                     'mac address': 'string_format',
                     'tx_power': 'int_format',
                     'rssi': 'int_format'}
    endian = 'big'

    print('New Eddystone data:')
    for prop in data:
        if prop in expected_keys:
            if expected_keys[prop] == 'string_format':
                print('\t{} = {}'.format(prop, data[prop]))
            if expected_keys[prop] == 'hex_format':
                print('\t{} = 0x{:X}'.format(prop,
                                             int.from_bytes(data[prop],
                                                            endian)))
            if expected_keys[prop] == 'int_format':
                print('\t{} = {}'.format(prop, int(data[prop])))


if __name__ == '__main__':
    observer.scan_eddystone(on_data=print_eddystone_values)

Beacon: Eddystone URL

This example broadcasts a given URL in a format for the Physical Web: You will need to put the BlueZ bluetoothd into experimental mode for this one.

from bluezero import eddystone_beacon

eddystone_beacon.EddystoneURL('https://github.com/ukBaz')

Peripheral Role

This example transmits the temperature of the CPU over the single characteristic. If your hardware does not support the vcgencmd then change the get_cpu_temperature() function to use the randomly generated temperature. Values are only updated when notification are switched on. You will need to have BlueZ in experimental mode and have modified the DBus configuration file to open the permissions for ‘ukBaz.bluezero’

# Standard modules
import os
import dbus
try:
    from gi.repository import GObject
except ImportError:
    import gobject as GObject

# Bluezero modules
from bluezero import constants
from bluezero import adapter
from bluezero import advertisement
from bluezero import localGATT
from bluezero import GATT

# constants
CPU_TMP_SRVC = '12341000-1234-1234-1234-123456789abc'
CPU_TMP_CHRC = '2A6E'
CPU_FMT_DSCP = '2904'


def get_cpu_temperature():
    # return random.randrange(3200, 5310, 10) / 100
    cpu_temp = os.popen('vcgencmd measure_temp').readline()
    return float(cpu_temp.replace('temp=', '').replace("'C\n", ''))


def sint16(value):
    return int(value * 100).to_bytes(2, byteorder='little', signed=True)


def cpu_temp_sint16(value):
    answer = []
    value_int16 = sint16(value[0])
    for bytes in value_int16:
        answer.append(dbus.Byte(bytes))

    return answer


class TemperatureChrc(localGATT.Characteristic):
    def __init__(self, service):
        localGATT.Characteristic.__init__(self,
                                          1,
                                          CPU_TMP_CHRC,
                                          service,
                                          [get_cpu_temperature()],
                                          False,
                                          ['read', 'notify'])

    def temperature_cb(self):
        reading = [get_cpu_temperature()]
        print('Getting new temperature',
              reading,
              self.props[constants.GATT_CHRC_IFACE]['Notifying'])
        self.props[constants.GATT_CHRC_IFACE]['Value'] = reading

        self.PropertiesChanged(constants.GATT_CHRC_IFACE,
                               {'Value': dbus.Array(cpu_temp_sint16(reading))},
                               [])
        print('Array value: ', cpu_temp_sint16(reading))
        return self.props[constants.GATT_CHRC_IFACE]['Notifying']

    def _update_temp_value(self):
        if not self.props[constants.GATT_CHRC_IFACE]['Notifying']:
            return

        print('Starting timer event')
        GObject.timeout_add(500, self.temperature_cb)

    def ReadValue(self, options):
        reading = [get_cpu_temperature()]
        self.props[constants.GATT_CHRC_IFACE]['Value'] = reading
        return dbus.Array(
            cpu_temp_sint16(self.props[constants.GATT_CHRC_IFACE]['Value'])
        )

    def StartNotify(self):
        if self.props[constants.GATT_CHRC_IFACE]['Notifying']:
            print('Already notifying, nothing to do')
            return
        print('Notifying on')
        self.props[constants.GATT_CHRC_IFACE]['Notifying'] = True
        self._update_temp_value()

    def StopNotify(self):
        if not self.props[constants.GATT_CHRC_IFACE]['Notifying']:
            print('Not notifying, nothing to do')
            return

        print('Notifying off')
        self.props[constants.GATT_CHRC_IFACE]['Notifying'] = False
        self._update_temp_value()


class ble:
    def __init__(self):
        self.bus = dbus.SystemBus()
        self.app = localGATT.Application()
        self.srv = localGATT.Service(1, CPU_TMP_SRVC, True)

        self.charc = TemperatureChrc(self.srv)

        self.charc.service = self.srv.path

        cpu_format_value = dbus.Array([dbus.Byte(0x0E),
                                       dbus.Byte(0xFE),
                                       dbus.Byte(0x2F),
                                       dbus.Byte(0x27),
                                       dbus.Byte(0x01),
                                       dbus.Byte(0x00),
                                       dbus.Byte(0x00)])
        self.cpu_format = localGATT.Descriptor(4,
                                               CPU_FMT_DSCP,
                                               self.charc,
                                               cpu_format_value,
                                               ['read'])

        self.app.add_managed_object(self.srv)
        self.app.add_managed_object(self.charc)
        self.app.add_managed_object(self.cpu_format)

        self.srv_mng = GATT.GattManager(adapter.list_adapters()[0])
        self.srv_mng.register_application(self.app, {})

        self.dongle = adapter.Adapter(adapter.list_adapters()[0])
        advert = advertisement.Advertisement(1, 'peripheral')

        advert.service_UUIDs = [CPU_TMP_SRVC]
        # eddystone_data = tools.url_to_advert(WEB_BLINKT, 0x10, TX_POWER)
        # advert.service_data = {EDDYSTONE: eddystone_data}
        if not self.dongle.powered:
            self.dongle.powered = True
        ad_manager = advertisement.AdvertisingManager(self.dongle.address)
        ad_manager.register_advertisement(advert, {})

    def add_call_back(self, callback):
        self.charc.PropertiesChanged = callback

    def start_bt(self):
        # self.light.StartNotify()
        self.app.start()


if __name__ == '__main__':
    print('CPU temperature is {}C'.format(get_cpu_temperature()))
    print(sint16(get_cpu_temperature()))
    pi_cpu_monitor = ble()
    pi_cpu_monitor.start_bt()

Peripheral - Nordic UART Service

This service simulates a basic UART connection over two lines, TXD and RXD.

It is based on a proprietary UART service specification by Nordic Semiconductors. Data sent to and from this service can be viewed using the nRF UART apps from Nordic Semiconductors for Android and iOS.

It uses the Bluezero peripheral file (level 10) so should be easier than the previous CPU Temperature example that was a level 100.

from gi.repository import GLib
from evdev import InputDevice, categorize, ecodes

# Bluezero modules
from bluezero import peripheral

# constants
UART_SERIVCE = '6E400001-B5A3-F393-E0A9-E50E24DCCA9E'
RX_CHARACTERISTIC = '6E400002-B5A3-F393-E0A9-E50E24DCCA9E'
TX_CHARACTERISTIC = '6E400003-B5A3-F393-E0A9-E50E24DCCA9E'


class UartService:
    def __init__(self):
        self.app = peripheral.Application()
        self.ble_uart = peripheral.Service(UART_SERIVCE,
                                           True)
        self.rx_uart = peripheral.Characteristic(RX_CHARACTERISTIC,
                                                 ['write',
                                                  'write-without-response'],
                                                 self.ble_uart)
        self.tx_uart = peripheral.Characteristic(TX_CHARACTERISTIC,
                                                 ['notify'],
                                                 self.ble_uart)
        self.rx_uart.add_write_event(self.uart_print)
        self.tx_uart.add_notify_event(print)
        self.tx_uart.StartNotify()
        self.ble_uart.add_characteristic(self.rx_uart)
        self.ble_uart.add_characteristic(self.tx_uart)
        self.app.add_service(self.ble_uart)

    @staticmethod
    def _from_bytes(data):
        return ''.join(chr(letter) for letter in data)

    @staticmethod
    def _to_bytes(word):
        return [ord(x) for x in word]

    def uart_print(self, value):
        print(self._from_bytes(value))

    def start(self):
        self.app.start()

    def stop(self):
        self.app.stop()


def detect_keys(dev_loop):
    event = dev.read_one()
    if event is not None:
        if event.code == ecodes.KEY_DOWN and event.value == 1:
            dev_loop.tx_uart.send_notify_event('down')
        elif event.code == ecodes.KEY_UP and event.value == 1:
            dev_loop.tx_uart.send_notify_event('up')
        elif event.code == ecodes.KEY_LEFT and event.value == 1:
            dev_loop.tx_uart.send_notify_event('left')
        elif event.code == ecodes.KEY_RIGHT and event.value == 1:
            dev_loop.tx_uart.send_notify_event('right')
        elif event.code == ecodes.KEY_ENTER and event.value == 1:
            dev_loop.disconnect()
            dev_loop.stop()
    return True


if __name__ == '__main__':
    dev = InputDevice('/dev/input/event0')
    uart = UartService()
    uart.app.dongle.alias = 'RPi_UART'
    GLib.idle_add(detect_keys, uart)
    uart.start()