Examples

An example can often speed things up when you are trying to get started with a library so there are few below. There is also a Getting Started workshop using a Raspberry Pi and a BBC micro:bit

Adapter

This example prints out the status of the Bluetooth device on your Linux computer. It also checks to see if it is enabled (powered) before scanning for nearby Bluetooth devices:

import logging

from bluezero import adapter
from bluezero import tools


def main():
    dongles = adapter.list_adapters()
    print('dongles available: ', dongles)
    dongle = adapter.Adapter(dongles[0])

    print('address: ', dongle.address)
    print('name: ', dongle.name)
    print('alias: ', dongle.alias)
    print('powered: ', dongle.powered)
    print('pairable: ', dongle.pairable)
    print('pairable timeout: ', dongle.pairabletimeout)
    print('discoverable: ', dongle.discoverable)
    print('discoverable timeout: ', dongle.discoverabletimeout)
    print('discovering: ', dongle.discovering)
    print('Powered: ', dongle.powered)
    if not dongle.powered:
        dongle.powered = True
        print('Now powered: ', dongle.powered)
    print('Start discovering')
    dongle.nearby_discovery()
    # dongle.powered = False


if __name__ == '__main__':
    print(__name__)
    logger = tools.create_module_logger('adapter')
    logger.setLevel(logging.DEBUG)
    main()

Central Role

This example uses the micro:bit API that has been written in Bluezero to interact with the micro:bit

import time
from bluezero import microbit


def main():
    ubit = microbit.Microbit(adapter_addr='00:01:02:03:04:05',
                             device_addr='E9:06:4D:45:FC:8D')

    ubit.connect()

    assert ubit.pixels == [0b01110,
                           0b10000,
                           0b10000,
                           0b10000,
                           0b01110]

    ubit.scroll_delay = 40
    delay = ubit.scroll_delay
    ubit.text = 'Scroll speed {}'.format(delay)
    time.sleep(5)
    ubit.text = 'This is a really long string '
    time.sleep(5)

    while not ubit.button_a:
        ubit.pixels = [0b00000,
                       0b01000,
                       0b11111,
                       0b01000,
                       0b00000]
        time.sleep(0.5)
        ubit.clear_display()

    while not ubit.button_b:
        ubit.pixels = [0b00000,
                       0b00010,
                       0b11111,
                       0b00010,
                       0b00000]
        time.sleep(0.5)
        ubit.clear_display()

    ubit.clear_display()
    ubit.scroll_delay = 120
    ubit.text = '{0}'.format(ubit.temperature)
    time.sleep(5)

    ubit.text = '{0}'.format(ubit.accelerometer)
    time.sleep(5)

    ubit.disconnect()


if __name__ == '__main__':
    main()

Scanner: Eddystone

This example scans for beacons in the Eddystone URL format. It will report on URL beacons <https://github.com/google/eddystone/tree/master/eddystone-url>.

import logging
from bluezero import observer


def print_eddystone_url_values(data):
    """
    Callback to print data found with scan_eddystone
    :param data:
    :return:
    """
    print(f'Eddystone URL: {data.url} \u2197 {data.tx_pwr} \u2198 {data.rssi}')


def main():
    observer.scan_eddystone(on_data=print_eddystone_url_values)


if __name__ == '__main__':
    observer.logger.setLevel(logging.INFO)
    main()

Beacon: Eddystone URL

This example broadcasts a given URL in a format for the Physical Web: You will need to put the BlueZ bluetoothd into experimental mode for this one.

from bluezero import eddystone_beacon


def main():
    eddystone_beacon.EddystoneURL('https://github.com/ukBaz')


if __name__ == '__main__':
    main()

Peripheral Role

This example transmits the temperature of the CPU over the single characteristic. If your hardware does not support the vcgencmd then change the get_cpu_temperature() function to use the randomly generated temperature. Values are only updated when notification are switched on. You will need to have BlueZ in experimental mode and have modified the DBus configuration file to open the permissions for ‘ukBaz.bluezero’

# Standard modules
import os
import random
import dbus

# Bluezero modules
from bluezero import async_tools
from bluezero import constants
from bluezero import adapter
from bluezero import advertisement
from bluezero import localGATT
from bluezero import GATT

# constants
CPU_TMP_SRVC = '12341000-1234-1234-1234-123456789abc'
CPU_TMP_CHRC = '2A6E'
CPU_FMT_DSCP = '2904'


def get_cpu_temperature():
    return random.randrange(3200, 5310, 10) / 100
    # cpu_temp = os.popen('vcgencmd measure_temp').readline()
    # return float(cpu_temp.replace('temp=', '').replace("'C\n", ''))


def sint16(value):
    return int(value * 100).to_bytes(2, byteorder='little', signed=True)


def cpu_temp_sint16(value):
    answer = []
    value_int16 = sint16(value[0])
    for bytes in value_int16:
        answer.append(dbus.Byte(bytes))

    return answer


class TemperatureChrc(localGATT.Characteristic):
    def __init__(self, service):
        localGATT.Characteristic.__init__(self,
                                          1,
                                          CPU_TMP_CHRC,
                                          service,
                                          [get_cpu_temperature()],
                                          False,
                                          ['read', 'notify'])

    def temperature_cb(self):
        reading = [get_cpu_temperature()]
        print('Getting new temperature',
              reading,
              self.props[constants.GATT_CHRC_IFACE]['Notifying'])
        self.props[constants.GATT_CHRC_IFACE]['Value'] = reading

        self.PropertiesChanged(constants.GATT_CHRC_IFACE,
                               {'Value': dbus.Array(cpu_temp_sint16(reading))},
                               [])
        print('Array value: ', cpu_temp_sint16(reading))
        return self.props[constants.GATT_CHRC_IFACE]['Notifying']

    def _update_temp_value(self):
        if not self.props[constants.GATT_CHRC_IFACE]['Notifying']:
            return

        print('Starting timer event')
        async_tools.add_timer_ms(500, self.temperature_cb)

    def ReadValue(self, options):
        reading = [get_cpu_temperature()]
        self.props[constants.GATT_CHRC_IFACE]['Value'] = reading
        return dbus.Array(
            cpu_temp_sint16(self.props[constants.GATT_CHRC_IFACE]['Value'])
        )

    def StartNotify(self):
        if self.props[constants.GATT_CHRC_IFACE]['Notifying']:
            print('Already notifying, nothing to do')
            return
        print('Notifying on')
        self.props[constants.GATT_CHRC_IFACE]['Notifying'] = True
        self._update_temp_value()

    def StopNotify(self):
        if not self.props[constants.GATT_CHRC_IFACE]['Notifying']:
            print('Not notifying, nothing to do')
            return

        print('Notifying off')
        self.props[constants.GATT_CHRC_IFACE]['Notifying'] = False
        self._update_temp_value()


class ble:
    def __init__(self, adapter_address=None):
        self.mainloop = async_tools.EventLoop()

        self.app = localGATT.Application()
        self.srv = localGATT.Service(1, CPU_TMP_SRVC, True)

        self.charc = TemperatureChrc(self.srv)

        self.charc.service = self.srv.path

        cpu_format_value = dbus.Array([dbus.Byte(0x0E),
                                       dbus.Byte(0xFE),
                                       dbus.Byte(0x2F),
                                       dbus.Byte(0x27),
                                       dbus.Byte(0x01),
                                       dbus.Byte(0x00),
                                       dbus.Byte(0x00)])
        self.cpu_format = localGATT.Descriptor(4,
                                               CPU_FMT_DSCP,
                                               self.charc,
                                               cpu_format_value,
                                               ['read'])

        self.app.add_managed_object(self.srv)
        self.app.add_managed_object(self.charc)
        self.app.add_managed_object(self.cpu_format)

        self.srv_mng = GATT.GattManager(adapter.list_adapters()[0])
        self.srv_mng.register_application(self.app, {})
        if adapter_address:
            self.dongle = adapter.Adapter(adapter_address)
        else:
            self.dongle = adapter.Adapter(adapter.list_adapters()[0])
        advert = advertisement.Advertisement(1, 'peripheral')

        advert.service_UUIDs = [CPU_TMP_SRVC]
        advert.local_name = 'CPU Temp'
        advert.appearance = 1344
        # eddystone_data = tools.url_to_advert(WEB_BLINKT, 0x10, TX_POWER)
        # advert.service_data = {EDDYSTONE: eddystone_data}
        if not self.dongle.powered:
            self.dongle.powered = True
        ad_manager = advertisement.AdvertisingManager(self.dongle.address)
        ad_manager.register_advertisement(advert, {})

    def add_call_back(self, callback):
        self.charc.PropertiesChanged = callback

    def start_bt(self):
        # self.light.StartNotify()
        try:
            self.mainloop.run()
        except KeyboardInterrupt:
            self.mainloop.quit()


def main(adapter_address=None, test_mode=False):
    print('CPU temperature is {}C'.format(get_cpu_temperature()))
    print(cpu_temp_sint16([get_cpu_temperature()]))
    pi_cpu_monitor = ble(adapter_address)
    if not test_mode:
        pi_cpu_monitor.start_bt()


if __name__ == '__main__':
    main('00:AA:01:01:00:24')

Peripheral - Nordic UART Service

This service simulates a basic UART connection over two lines, TXD and RXD.

It is based on a proprietary UART service specification by Nordic Semiconductors. Data sent to and from this service can be viewed using the nRF UART apps from Nordic Semiconductors for Android and iOS.

It uses the Bluezero peripheral file (level 10) so should be easier than the previous CPU Temperature example that was a level 100.

from gi.repository import GLib
from evdev import InputDevice, categorize, ecodes

# Bluezero modules
from bluezero import peripheral

# constants
UART_SERVICE = '6E400001-B5A3-F393-E0A9-E50E24DCCA9E'
RX_CHARACTERISTIC = '6E400002-B5A3-F393-E0A9-E50E24DCCA9E'
TX_CHARACTERISTIC = '6E400003-B5A3-F393-E0A9-E50E24DCCA9E'


class UartService:
    def __init__(self):
        self.app = peripheral.Application()
        self.ble_uart = peripheral.Service(UART_SERVICE,
                                           True)
        self.rx_uart = peripheral.Characteristic(RX_CHARACTERISTIC,
                                                 ['write',
                                                  'write-without-response'],
                                                 self.ble_uart)
        self.tx_uart = peripheral.Characteristic(TX_CHARACTERISTIC,
                                                 ['notify'],
                                                 self.ble_uart)
        self.rx_uart.add_write_event(self.uart_print)
        self.tx_uart.add_notify_event(print)
        self.tx_uart.StartNotify()
        self.ble_uart.add_characteristic(self.rx_uart)
        self.ble_uart.add_characteristic(self.tx_uart)
        self.app.add_service(self.ble_uart)

    @staticmethod
    def _from_bytes(data):
        return ''.join(chr(letter) for letter in data)

    @staticmethod
    def _to_bytes(word):
        return [ord(x) for x in word]

    def uart_print(self, value):
        print(self._from_bytes(value))

    def start(self):
        self.app.start()

    def stop(self):
        self.app.stop()


def detect_keys(dev_loop):
    event = dev.read_one()
    if event is not None:
        if event.code == ecodes.KEY_DOWN and event.value == 1:
            dev_loop.tx_uart.send_notify_event('down')
        elif event.code == ecodes.KEY_UP and event.value == 1:
            dev_loop.tx_uart.send_notify_event('up')
        elif event.code == ecodes.KEY_LEFT and event.value == 1:
            dev_loop.tx_uart.send_notify_event('left')
        elif event.code == ecodes.KEY_RIGHT and event.value == 1:
            dev_loop.tx_uart.send_notify_event('right')
        elif event.code == ecodes.KEY_ENTER and event.value == 1:
            dev_loop.disconnect()
            dev_loop.stop()
    return True


if __name__ == '__main__':
    dev = InputDevice('/dev/input/event0')
    uart = UartService()
    uart.app.dongle.alias = 'RPi_UART'
    GLib.idle_add(detect_keys, uart)
    uart.start()

Control Media Player over Bluetooth

This script displays information about the current track being playered by the connected media player

#  When using your Linux computer as a Bluetooth speaker the MediaPlayer
#  interfaces allows you interact with the media player on the other end of
#  the Bluetooth connection.
#  e.g. the music player on your phone.
#  This script displays information about the current track.
#  Before you can run this scrip you have to pair and connect your audio
#  source. For simplicity we can do this on the command line with the
#  bluetoothctl tool
#     pi@RPi3:~ $ bluetoothctl
#     [bluetooth]# agent NoInputNoOutput
#     Agent registered
#     [bluetooth]# discoverable on
#     Changing discoverable on succeeded
#     [CHG] Controller B8:27:EB:22:57:E0 Discoverable: yes
#
#  Now we have made the Raspberry Pi discoverable we can pair to it from the
#  mobile phone. Once it has paired you can tell the Raspberry Pi that it is a
#  trusted device
#
#     [Nexus 5X]# trust 64:BC:0C:F6:22:F8
#
#  Now the phone is connected you can run this script to find which track is
#  playing
#
#     pi@RPi3:~ $ python3 examples/control_media_player.py

from bluezero import dbus_tools
from bluezero import media_player

# Find the mac address of the first media player connected over Bluetooth
mac_addr = None
for dbus_path in dbus_tools.get_managed_objects():
    if dbus_path.endswith('player0'):
        mac_addr = dbus_tools.get_device_address_from_dbus_path(dbus_path)

if mac_addr:
    mp = media_player.MediaPlayer(mac_addr)

    track_details = mp.track
    for detail in track_details:
        print(f'{detail} : {track_details[detail]}')
else:
    print('Error: No media player connected')