Skip to content

Commit

Permalink
Merge pull request #15 from dhalbert/python-advertisement-data
Browse files Browse the repository at this point in the history
Python advertisement data
  • Loading branch information
tannewt authored Jul 19, 2019
2 parents 04c84be + 960688c commit f38c77c
Show file tree
Hide file tree
Showing 10 changed files with 754 additions and 145 deletions.
145 changes: 132 additions & 13 deletions adafruit_ble/advertising.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@

import struct

class AdvertisingData:
"""Build up a BLE advertising data packet."""
class AdvertisingPacket:
"""Build up a BLE advertising data or scan response packet."""
# BR/EDR flags not included here, since we don't support BR/EDR.
FLAG_LIMITED_DISCOVERY = 0x01
"""Discoverable only for a limited time period."""
Expand All @@ -53,7 +53,7 @@ class AdvertisingData:
"""Complete list of 128 bit service UUIDs."""
SHORT_LOCAL_NAME = 0x08
"""Short local device name (shortened to fit)."""
COMPLETE_LOCALNAME = 0x09
COMPLETE_LOCAL_NAME = 0x09
"""Complete local device name."""
TX_POWER = 0x0A
"""Transmit power level"""
Expand Down Expand Up @@ -81,32 +81,151 @@ class AdvertisingData:
MAX_DATA_SIZE = 31
"""Data size in a regular BLE packet."""

def __init__(self, flags=(FLAG_GENERAL_DISCOVERY | FLAG_LE_ONLY), max_length=MAX_DATA_SIZE):
"""Initalize an advertising packet, with the given flags, no larger than max_length."""
self.data = bytearray((2, self.FLAGS, flags))
def __init__(self, data=None, *, max_length=MAX_DATA_SIZE):
"""Create an advertising packet, no larger than max_length.
:param buf data: if not supplied (None), create an empty packet
if supplied, create a packet with supplied data. This is usually used
to parse an existing packet.
:param int max_length: maximum length of packet
"""
self._packet_bytes = bytearray(data) if data else bytearray()
self._max_length = max_length
self._check_length()

@property
def packet_bytes(self):
"""The raw packet bytes."""
return self._packet_bytes

@packet_bytes.setter
def packet_bytes(self, value):
self._packet_bytes = value

def __getitem__(self, element_type):
"""Return the bytes stored in the advertising packet for the given element type.
:param int element_type: An integer designating an advertising element type.
A number of types are defined in `AdvertisingPacket`,
such as `AdvertisingPacket.TX_POWER`.
:returns: bytes that are the value for the given element type.
If the element type is not present in the packet, raise KeyError.
"""
i = 0
adv_bytes = self.packet_bytes
while i < len(adv_bytes):
item_length = adv_bytes[i]
if element_type != adv_bytes[i+1]:
# Type doesn't match: skip to next item.
i += item_length + 1
else:
return adv_bytes[i + 2:i + 1 + item_length]
raise KeyError

def get(self, element_type, default=None):
"""Return the bytes stored in the advertising packet for the given element type,
returning the default value if not found.
"""
try:
return self.__getitem__(element_type)
except KeyError:
return default

@property
def bytes_remaining(self):
"""Number of bytes still available for use in the packet."""
return self._max_length - len(self._packet_bytes)

def _check_length(self):
if len(self.data) > self._max_length:
raise IndexError("Advertising data exceeds max_length")
if len(self._packet_bytes) > self._max_length:
raise IndexError("Advertising data too long")

def add_field(self, field_type, field_data):
"""Append an advertising data field to the current packet, of the given type.
The length field is calculated from the length of field_data."""
self.data.append(1 + len(field_data))
self.data.append(field_type)
self.data.extend(field_data)
self._packet_bytes.append(1 + len(field_data))
self._packet_bytes.append(field_type)
self._packet_bytes.extend(field_data)
self._check_length()

def add_flags(self, flags=(FLAG_GENERAL_DISCOVERY | FLAG_LE_ONLY)):
"""Add default or custom advertising flags."""
self.add_field(self.FLAGS, struct.pack("<B", flags))

def add_16_bit_uuids(self, uuids):
"""Add a complete list of 16 bit service UUIDs."""
self.add_field(self.ALL_16_BIT_SERVICE_UUIDS, bytes(uuid.uuid16 for uuid in uuids))
for uuid in uuids:
self.add_field(self.ALL_16_BIT_SERVICE_UUIDS, struct.pack("<H", uuid.uuid16))

def add_128_bit_uuids(self, uuids):
"""Add a complete list of 128 bit service UUIDs."""
self.add_field(self.ALL_128_BIT_SERVICE_UUIDS, bytes(uuid.uuid128 for uuid in uuids))
for uuid in uuids:
self.add_field(self.ALL_128_BIT_SERVICE_UUIDS, uuid.uuid128)

def add_mfr_specific_data(self, mfr_id, data):
"""Add manufacturer-specific data bytes."""
self.add_field(self.MANUFACTURER_SPECIFIC_DATA, struct.pack('<H', mfr_id) + data)


class ServerAdvertisement:
"""
Data to advertise a peripheral's services.
The advertisement consists of an advertising data packet and an optional scan response packet,
The scan response packet is created only if there is not room in the
advertising data packet for the complete peripheral name.
:param peripheral Peripheral the Peripheral to advertise. Use its services and name
:param int tx_power: transmit power in dBm at 0 meters (8 bit signed value). Default 0 dBm
"""

def __init__(self, peripheral, *, tx_power=0):
self._peripheral = peripheral

packet = AdvertisingPacket()
packet.add_flags()
self._scan_response_packet = None

# Need to check service.secondary
uuids_16_bits = [service.uuid for service in peripheral.services
if service.uuid.size == 16 and not service.secondary]
if uuids_16_bits:
packet.add_16_bit_uuids(uuids_16_bits)

uuids_128_bits = [service.uuid for service in peripheral.services
if service.uuid.size == 128 and not service.secondary]
if uuids_128_bits:
packet.add_128_bit_uuids(uuids_128_bits)

packet.add_field(AdvertisingPacket.TX_POWER, struct.pack("<b", tx_power))

# 2 bytes needed for field length and type.
bytes_available = packet.bytes_remaining - 2
if bytes_available <= 0:
raise IndexError("No room for name")

name_bytes = bytes(peripheral.name, 'utf-8')
if bytes_available >= len(name_bytes):
packet.add_field(AdvertisingPacket.COMPLETE_LOCAL_NAME, name_bytes)
else:
packet.add_field(AdvertisingPacket.SHORT_LOCAL_NAME, name_bytes[:bytes_available])
self._scan_response_packet = AdvertisingPacket()
try:
self._scan_response_packet.add_field(AdvertisingPacket.COMPLETE_LOCAL_NAME,
name_bytes)
except IndexError:
raise IndexError("Name too long")

self._advertising_data_packet = packet

@property
def advertising_data_bytes(self):
"""The raw bytes for the initial advertising data packet."""
return self._advertising_data_packet.packet_bytes

@property
def scan_response_bytes(self):
"""The raw bytes for the scan response packet. None if there is no response packet."""
if self._scan_response_packet:
return self._scan_response_packet.packet_bytes
return None
57 changes: 30 additions & 27 deletions adafruit_ble/beacon.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,26 +32,29 @@
import struct
import bleio

from .advertising import AdvertisingData
from .advertising import AdvertisingPacket

class Beacon:
"""Base class for Beacon advertisers."""
def __init__(self, advertising_data, interval=1.0):
"""Set up a beacon with the given AdvertisingData.
def __init__(self, advertising_packet):
"""Set up a beacon with the given AdvertisingPacket.
:param AdvertisingData advertising_data: The advertising packet
:param float interval: Advertising interval in seconds
:param AdvertisingPacket advertising_packet
"""
self.broadcaster = bleio.Broadcaster(interval)
self.advertising_data = advertising_data
self._broadcaster = bleio.Peripheral(name=None)
self._advertising_packet = advertising_packet

def start(self, interval=1.0):
"""Turn on beacon.
def start(self):
"""Turn on beacon."""
self.broadcaster.start_advertising(self.advertising_data.data)
:param float interval: Advertising interval in seconds
"""
self._broadcaster.start_advertising(self._advertising_packet.packet_bytes,
interval=interval)

def stop(self):
"""Turn off beacon."""
self.broadcaster.stop_advertising()
self._broadcaster.stop_advertising()



Expand All @@ -60,7 +63,7 @@ class LocationBeacon(Beacon):
Used for Apple iBeacon, Nordic nRF Beacon, etc.
"""
# pylint: disable=too-many-arguments
def __init__(self, company_id, uuid, major, minor, rssi, interval=1.0):
def __init__(self, company_id, uuid, major, minor, rssi):
"""Create a beacon with the given values.
:param int company_id: 16-bit company id designating beacon specification owner
Expand All @@ -69,7 +72,6 @@ def __init__(self, company_id, uuid, major, minor, rssi, interval=1.0):
:param int major: 16-bit major number, such as a store number
:param int minor: 16-bit minor number, such as a location within a store
:param int rssi: Signal strength in dBm at 1m (signed 8-bit value)
:param float interval: Advertising interval in seconds
Example::
Expand All @@ -81,8 +83,9 @@ def __init__(self, company_id, uuid, major, minor, rssi, interval=1.0):
b.start()
"""

adv_data = AdvertisingData()
adv_data.add_mfr_specific_data(
adv = AdvertisingPacket()
adv.add_flags()
adv.add_mfr_specific_data(
company_id,
b''.join((
# 0x02 means a beacon. 0x15 (=21) is length (16 + 2 + 2 + 1)
Expand All @@ -91,8 +94,8 @@ def __init__(self, company_id, uuid, major, minor, rssi, interval=1.0):
# iBeacon and similar expect big-endian UUIDS. Usually they are little-endian.
bytes(reversed(uuid.uuid128)),
# major and minor are big-endian.
struct.pack(">HHB", major, minor, rssi))))
super().__init__(adv_data, interval=interval)
struct.pack(">HHb", major, minor, rssi))))
super().__init__(adv)


class EddystoneURLBeacon(Beacon):
Expand Down Expand Up @@ -126,16 +129,16 @@ class EddystoneURLBeacon(Beacon):
'.gov',
)

def __init__(self, url, tx_power=0, interval=1.0):
def __init__(self, url, tx_power=0):
"""Create a URL beacon with an encoded version of the url and a transmit power.
:param url URL to encode. Must be short enough to fit after encoding.
:param int tx_power: transmit power in dBm at 0 meters (8 bit signed value)
:param float interval: Advertising interval in seconds
"""

adv_data = AdvertisingData()
adv_data.add_field(AdvertisingData.ALL_16_BIT_SERVICE_UUIDS, self._EDDYSTONE_ID)
adv = AdvertisingPacket()
adv.add_flags()
adv.add_field(AdvertisingPacket.ALL_16_BIT_SERVICE_UUIDS, self._EDDYSTONE_ID)
short_url = None
for idx, prefix in enumerate(self._URL_SCHEMES):
if url.startswith(prefix):
Expand All @@ -148,9 +151,9 @@ def __init__(self, url, tx_power=0, interval=1.0):
short_url = short_url.replace(subst + '/', chr(code))
for code, subst in enumerate(self._SUBSTITUTIONS, 7):
short_url = short_url.replace(subst, chr(code))
adv_data.add_field(AdvertisingData.SERVICE_DATA_16_BIT_UUID,
b''.join((self._EDDYSTONE_ID,
b'\x10',
struct.pack("<BB", tx_power, url_scheme_num),
bytes(short_url, 'ascii'))))
super().__init__(adv_data, interval)
adv.add_field(AdvertisingPacket.SERVICE_DATA_16_BIT_UUID,
b''.join((self._EDDYSTONE_ID,
b'\x10',
struct.pack("<bB", tx_power, url_scheme_num),
bytes(short_url, 'ascii'))))
super().__init__(adv)
Loading

0 comments on commit f38c77c

Please sign in to comment.