Repository
Munin (contrib)
Last change
2021-09-23
Graph Categories
Family
contrib
Capabilities
Keywords
Language
Python (3.x)
License
GPL-2.0-only

switchbotmeterbt

Example graph: 1

Name

switchbotmeterbt - Munin plugin to monitor temperature/humidity with SwitchBot Meter BLE

Configuration

Python 3 and bluepy are necessary.

Example for Ubuntu:
  $ sudo apt install python3-pip
  $ sudo pip3 install bluepy

Environment Variables

user            : root privilege is necessary for BLE scan
env.macaddr     : Mac Address(es) of SwitchBot Meter(s) separated by white-space (Required)
env.hcidev      : HCI device index. (Optional, default is 0)
env.tempunit    : Temperature unit. (Optional, default is C)
env.scantimeout : Timeout for BLE scan. (Optional, default is 5.0 seconds)

Example:
  [switchbotmeterbt]
    user  root
    env.macaddr  aa:aa:aa:aa:aa:aa bb:bb:bb:bb:bb:bb cc:cc:cc:cc:cc:cc

Notes

For more details about SwitchBot Meter, see https://www.switch-bot.com/products/switchbot-meter

Author

K.Cima https://github.com/shakemid

License

GPLv2
SPDX-License-Identifier: GPL-2.0-only

Magic Markers

#%# family=contrib
#%# capabilities=
#!/usr/bin/env python3

"""
=head1 NAME

  switchbotmeterbt - Munin plugin to monitor temperature/humidity with SwitchBot Meter BLE

=head1 CONFIGURATION

  Python 3 and bluepy are necessary.

  Example for Ubuntu:
    $ sudo apt install python3-pip
    $ sudo pip3 install bluepy

=head1 ENVIRONMENT VARIABLES

  user            : root privilege is necessary for BLE scan
  env.macaddr     : Mac Address(es) of SwitchBot Meter(s) separated by white-space (Required)
  env.hcidev      : HCI device index. (Optional, default is 0)
  env.tempunit    : Temperature unit. (Optional, default is C)
  env.scantimeout : Timeout for BLE scan. (Optional, default is 5.0 seconds)

  Example:
    [switchbotmeterbt]
      user  root
      env.macaddr  aa:aa:aa:aa:aa:aa bb:bb:bb:bb:bb:bb cc:cc:cc:cc:cc:cc

=head1 NOTES

  For more details about SwitchBot Meter, see https://www.switch-bot.com/products/switchbot-meter

=head1 AUTHOR

  K.Cima https://github.com/shakemid

=head1 LICENSE

  GPLv2
  SPDX-License-Identifier: GPL-2.0-only

=head1 Magic markers

  #%# family=contrib
  #%# capabilities=

=cut
"""

import sys
import os
import subprocess
from bluepy.btle import Scanner, DefaultDelegate


class SwitchbotScanDelegate(DefaultDelegate):
    def __init__(self, macaddrs):
        super().__init__()
        self.sensorValues = {}
        self.macaddrs = macaddrs

    # Called when advertising data is received from an LE device
    def handleDiscovery(self, dev, isNewDev, isNewData):
        if dev.addr in self.macaddrs:
            for (adtype, desc, value) in dev.getScanData():
                if desc == '16b Service Data':
                    munin_debug(value)
                    self._decodeSensorData(dev, value)

    def _decodeSensorData(self, dev, value):
        # Extract lower 6 octets
        valueBinary = bytes.fromhex(value[4:16])

        # Refer to Meter BLE open API
        # https://github.com/OpenWonderLabs/python-host/wiki/Meter-BLE-open-API
        deviceType = chr(valueBinary[0] & 0b01111111)
        battery = valueBinary[2] & 0b01111111
        tint = valueBinary[4] & 0b01111111
        tdec = valueBinary[3] & 0b00001111
        temperature = tint + tdec / 10
        isTemperatureAboveFreezing = valueBinary[4] & 0b10000000
        if not isTemperatureAboveFreezing:
            temperature = -temperature
        humidity = valueBinary[5] & 0b01111111

        self.sensorValues[dev.addr] = {
            'DeviceType':  deviceType,
            'Temperature': temperature,
            'Humidity':    humidity,
            'Battery':     battery,
            'RSSI':        dev.rssi
        }
        munin_debug(str(self.sensorValues))


class SwitchbotMeterPlugin():
    def __init__(self):
        self.params = {}
        self.params['pluginname'] = os.path.basename(__file__)
        self.params['macaddrs'] = [str.lower(macaddr)
                                   for macaddr in os.getenv('macaddr').split()]
        self.params['hcidev'] = int(os.getenv('hcidev', 0))
        self.params['tempunit'] = os.getenv('tempunit', 'C')
        self.params['scantimeout'] = float(os.getenv('scantimeout', 5.0))

        self.graphs = {
            'temp': {
                'name': 'Temperature',
                'attrs': [
                    'graph_title SwitchBot Meter Temperature',
                    'graph_category sensors',
                    'graph_vlabel Temp ' + self.params['tempunit'],
                    'graph_scale no',
                    'graph_args --base 1000'
                ]
            },
            'humid': {
                'name': 'Humidity',
                'attrs': [
                    'graph_title SwitchBot Meter Humidity',
                    'graph_category sensors',
                    'graph_vlabel Humid %',
                    'graph_scale no',
                    'graph_args --base 1000'
                ]
            },
            'batt': {
                'name': 'Battery',
                'attrs': [
                    'graph_title SwitchBot Meter Battery',
                    'graph_category sensors',
                    'graph_vlabel %',
                    'graph_scale no',
                    'graph_args --base 1000 --lower-limit 0 --upper-limit 100'
                ]
            },
            'rssi': {
                'name': 'RSSI',
                'attrs': [
                    'graph_title SwitchBot Meter RSSI',
                    'graph_category sensors',
                    'graph_vlabel dB',
                    'graph_scale no',
                    'graph_args --base 1000'
                ]
            }
        }

    def config(self):
        # print config
        for k in self.graphs.keys():
            print('multigraph ' + self.params['pluginname'] + '_' + k)
            for line in self.graphs[k]['attrs']:
                print(line)
            for macaddr in self.params['macaddrs']:
                field = macaddr.replace(':', '')
                print(field + '.label ' + macaddr)

    def fetch(self):
        # scan to fetch data
        scanner = Scanner(self.params['hcidev']).withDelegate(SwitchbotScanDelegate(self.params['macaddrs']))

        try:
            # sometimes it might fail
            scanner.scan(self.params['scantimeout'])
            for macaddr in self.params['macaddrs']:
                check = scanner.delegate.sensorValues[macaddr]
        except KeyError as e:
            munin_error('retry scan for exception: ' + str(type(e)))
            scanner.scan(self.params['scantimeout'])
        except Exception as e:
            munin_error('reset hci and retry scan for exception: ' + str(type(e)))
            subprocess.call(f'hciconfig hci{self.params["hcidev"]} down && hciconfig hci{self.params["hcidev"]} up', shell=True)
            scanner.scan(self.params['scantimeout'])

        # print value
        for k in self.graphs.keys():
            print('multigraph ' + self.params['pluginname'] + '_' + k)
            for macaddr in self.params['macaddrs']:
                field = macaddr.replace(':', '')
                try:
                    print(field + '.value ' +
                          str(scanner.delegate.sensorValues[macaddr][self.graphs[k]['name']]))
                except KeyError:
                    pass


def munin_error(message):
    print(message, file=sys.stderr)


def munin_debug(message):
    if os.getenv('MUNIN_DEBUG') == '1':
        print('# ' + message)


def main():
    plugin = SwitchbotMeterPlugin()

    if len(sys.argv) > 1 and sys.argv[1] == 'config':
        plugin.config()
        if os.getenv('MUNIN_CAP_DIRTYCONFIG') == '1':
            plugin.fetch()
    else:
        plugin.fetch()


if __name__ == '__main__':
    main()