Examples¶
An example can often speed things up when you are trying to get started with a library so there are few below. There is also a Getting Started workshop using a Raspberry Pi and a BBC micro:bit if you are new to Bluetooth.
Adapter¶
This example prints out the status of the Bluetooth device on your Linux computer. It also checks to see if it is enabled (powered) before scanning for nearby Bluetooth devices:
import logging
from bluezero import adapter
from bluezero import tools
def main():
dongles = adapter.list_adapters()
print('dongles available: ', dongles)
dongle = adapter.Adapter(dongles[0])
print('address: ', dongle.address)
print('name: ', dongle.name)
print('alias: ', dongle.alias)
print('powered: ', dongle.powered)
print('pairable: ', dongle.pairable)
print('pairable timeout: ', dongle.pairabletimeout)
print('discoverable: ', dongle.discoverable)
print('discoverable timeout: ', dongle.discoverabletimeout)
print('discovering: ', dongle.discovering)
print('Powered: ', dongle.powered)
if not dongle.powered:
dongle.powered = True
print('Now powered: ', dongle.powered)
print('Start discovering')
dongle.nearby_discovery()
# dongle.powered = False
if __name__ == '__main__':
print(__name__)
logger = tools.create_module_logger('adapter')
logger.setLevel(logging.DEBUG)
main()
Central Role¶
This example uses the micro:bit API that has been written in Bluezero to interact with the micro:bit
import time
from bluezero import microbit
def main():
ubit = microbit.Microbit(adapter_addr='00:01:02:03:04:05',
device_addr='E9:06:4D:45:FC:8D')
ubit.connect()
assert ubit.pixels == [0b01110,
0b10000,
0b10000,
0b10000,
0b01110]
ubit.scroll_delay = 40
delay = ubit.scroll_delay
ubit.text = 'Scroll speed {}'.format(delay)
time.sleep(5)
ubit.text = 'This is a really long string '
time.sleep(5)
while not ubit.button_a:
ubit.pixels = [0b00000,
0b01000,
0b11111,
0b01000,
0b00000]
time.sleep(0.5)
ubit.clear_display()
while not ubit.button_b:
ubit.pixels = [0b00000,
0b00010,
0b11111,
0b00010,
0b00000]
time.sleep(0.5)
ubit.clear_display()
ubit.clear_display()
ubit.scroll_delay = 120
ubit.text = '{0}'.format(ubit.temperature)
time.sleep(5)
ubit.text = '{0}'.format(ubit.accelerometer)
time.sleep(5)
ubit.disconnect()
if __name__ == '__main__':
main()
Scanner: Eddystone¶
This example scans for beacons in the Eddystone URL format. It will report on URL beacons <https://github.com/google/eddystone/tree/master/eddystone-url>.
import logging
from bluezero import observer
def print_eddystone_url_values(data):
"""
Callback to print data found with scan_eddystone
:param data:
:return:
"""
print(f'Eddystone URL: {data.url} \u2197 {data.tx_pwr} \u2198 {data.rssi}')
def main():
observer.scan_eddystone(on_data=print_eddystone_url_values)
if __name__ == '__main__':
observer.logger.setLevel(logging.INFO)
main()
Beacon: Eddystone URL¶
This example broadcasts a given URL in a format for the Physical Web:
from bluezero import eddystone_beacon
def main():
eddystone_beacon.EddystoneURL('https://github.com/ukBaz')
if __name__ == '__main__':
main()
Peripheral Role¶
This example transmits a randomly generated value to represent the temperature of the CPU over the single characteristic. Values are only updated when notification are switched on.
"""Example of how to create a Peripheral device/GATT Server"""
# Standard modules
import logging
import random
# Bluezero modules
from bluezero import async_tools
from bluezero import adapter
from bluezero import peripheral
# constants
# Custom service uuid
CPU_TMP_SRVC = '12341000-1234-1234-1234-123456789abc'
# https://www.bluetooth.com/specifications/assigned-numbers/
# Bluetooth SIG adopted UUID for Temperature characteristic
CPU_TMP_CHRC = '2A6E'
# Bluetooth SIG adopted UUID for Characteristic Presentation Format
CPU_FMT_DSCP = '2904'
def read_value():
"""
Example read callback. Value returned needs to a list of bytes/integers
in little endian format.
This one does a mock reading CPU temperature callback.
Return list of integer values.
Bluetooth expects the values to be in little endian format and the
temperature characteristic to be an sint16 (signed & 2 octets) and that
is what dictates the values to be used in the int.to_bytes method call.
:return: list of uint8 values
"""
cpu_value = random.randrange(3200, 5310, 10) / 100
return list(int(cpu_value * 100).to_bytes(2,
byteorder='little', signed=True))
def update_value(characteristic):
"""
Example of callback to send notifications
:param characteristic:
:return: boolean to indicate if timer should continue
"""
# read/calculate new value.
new_value = read_value()
# Causes characteristic to be updated and send notification
characteristic.set_value(new_value)
# Return True to continue notifying. Return a False will stop notifications
# Getting the value from the characteristic of if it is notifying
return characteristic.is_notifying
def notify_callback(notifying, characteristic):
"""
Noitificaton callback example. In this case used to start a timer event
which calls the update callback ever 2 seconds
:param notifying: boolean for start or stop of notifications
:param characteristic: The python object for this characteristic
"""
if notifying:
async_tools.add_timer_seconds(2, update_value, characteristic)
def main(adapter_address):
"""Creation of peripheral"""
logger = logging.getLogger('localGATT')
logger.setLevel(logging.DEBUG)
# Example of the output from read_value
print('CPU temperature is {}\u00B0C'.format(
int.from_bytes(read_value(), byteorder='little', signed=True)/100))
# Create peripheral
cpu_monitor = peripheral.Peripheral(adapter_address,
local_name='CPU Monitor',
appearance=1344)
# Add service
cpu_monitor.add_service(srv_id=1, uuid=CPU_TMP_SRVC, primary=True)
# Add characteristic
cpu_monitor.add_characteristic(srv_id=1, chr_id=1, uuid=CPU_TMP_CHRC,
value=[], notifying=False,
flags=['read', 'notify'],
read_callback=read_value,
write_callback=None,
notify_callback=notify_callback
)
# Add descriptor
cpu_monitor.add_descriptor(srv_id=1, chr_id=1, dsc_id=1, uuid=CPU_FMT_DSCP,
value=[0x0E, 0xFE, 0x2F, 0x27, 0x01, 0x00,
0x00],
flags=['read'])
# Publish peripheral and start event loop
cpu_monitor.publish()
if __name__ == '__main__':
# Get the default adapter address and pass it to main
main(list(adapter.Adapter.available())[0].address)
Peripheral - Nordic UART Service¶
This service simulates a basic UART connection over two lines, TXD and RXD.
It is based on a proprietary UART service specification by Nordic Semiconductors. Data sent to and from this service can be viewed using the nRF UART apps from Nordic Semiconductors for Android and iOS.
from gi.repository import GLib
# Bluezero modules
from bluezero import adapter
from bluezero import peripheral
from bluezero import device
# constants
UART_SERVICE = '6E400001-B5A3-F393-E0A9-E50E24DCCA9E'
RX_CHARACTERISTIC = '6E400002-B5A3-F393-E0A9-E50E24DCCA9E'
TX_CHARACTERISTIC = '6E400003-B5A3-F393-E0A9-E50E24DCCA9E'
class UARTDevice:
tx_obj = None
@classmethod
def on_connect(cls, ble_device: device.Device):
print("Connected to " + str(ble_device.address))
@classmethod
def on_disconnect(cls, adapter_address, device_address):
print("Disconnected from " + device_address)
@classmethod
def uart_notify(cls, notifying, characteristic):
if notifying:
cls.tx_obj = characteristic
else:
cls.tx_obj = None
@classmethod
def update_tx(cls, value):
if cls.tx_obj:
print("Sending")
cls.tx_obj.set_value(value)
@classmethod
def uart_write(cls, value, options):
print('raw bytes:', value)
print('With options:', options)
print('Text value:', bytes(value).decode('utf-8'))
cls.update_tx(value)
def main(adapter_address):
ble_uart = peripheral.Peripheral(adapter_address, local_name='BLE UART')
ble_uart.add_service(srv_id=1, uuid=UART_SERVICE, primary=True)
ble_uart.add_characteristic(srv_id=1, chr_id=1, uuid=RX_CHARACTERISTIC,
value=[], notifying=False,
flags=['write', 'write-without-response'],
write_callback=UARTDevice.uart_write,
read_callback=None,
notify_callback=None)
ble_uart.add_characteristic(srv_id=1, chr_id=2, uuid=TX_CHARACTERISTIC,
value=[], notifying=False,
flags=['notify'],
notify_callback=UARTDevice.uart_notify,
read_callback=None,
write_callback=None)
ble_uart.on_connect = UARTDevice.on_connect
ble_uart.on_disconnect = UARTDevice.on_disconnect
ble_uart.publish()
if __name__ == '__main__':
main(list(adapter.Adapter.available())[0].address)
Control Media Player over Bluetooth¶
This script displays information about the current track being playered by the connected media player
# When using your Linux computer as a Bluetooth speaker the MediaPlayer
# interfaces allows you interact with the media player on the other end of
# the Bluetooth connection.
# e.g. the music player on your phone.
# This script displays information about the current track.
# Before you can run this scrip you have to pair and connect your audio
# source. For simplicity we can do this on the command line with the
# bluetoothctl tool
# pi@RPi3:~ $ bluetoothctl
# [bluetooth]# agent NoInputNoOutput
# Agent registered
# [bluetooth]# discoverable on
# Changing discoverable on succeeded
# [CHG] Controller B8:27:EB:22:57:E0 Discoverable: yes
#
# Now we have made the Raspberry Pi discoverable we can pair to it from the
# mobile phone. Once it has paired you can tell the Raspberry Pi that it is a
# trusted device
#
# [Nexus 5X]# trust 64:BC:0C:F6:22:F8
#
# Now the phone is connected you can run this script to find which track is
# playing
#
# pi@RPi3:~ $ python3 examples/control_media_player.py
from bluezero import dbus_tools
from bluezero import media_player
# Find the mac address of the first media player connected over Bluetooth
mac_addr = None
for dbus_path in dbus_tools.get_managed_objects():
if dbus_path.endswith('player0'):
mac_addr = dbus_tools.get_device_address_from_dbus_path(dbus_path)
if mac_addr:
mp = media_player.MediaPlayer(mac_addr)
track_details = mp.track
for detail in track_details:
print(f'{detail} : {track_details[detail]}')
else:
print('Error: No media player connected')