Repository
Munin (contrib)
Last change
2020-10-24
Graph Categories
Family
contrib
Capabilities
Keywords
Language
Python (3.x)
Authors

arris-tg3442

Name

arris - MUNIN Plugin to monitor status of Arris TG3442 / TG2492LG-85 and compatible cable modems

Description

Connect to the web-frontend and get current DOCSIS status of upstream and downstream channels. (Signal Power, SNR, Lock Status)

Requirements

  • BeautifulSoup
  • pycryptodome

Configuration

Example

[arris]
env.url http://192.168.100.1
env.username admin
env.password yourpassword

Parameters

url - URL to web-frontend username - defaults to “admin” password - valid password

References

https://www.arris.com/products/touchstone-tg3442-cable-voice-gateway/

Author

Copyright (c) 2019 Daniel Hiepler d-munin@coderdu.de

Copyright (c) 2004-2009 Nicolas Stransky Nico@stransky.cx

Copyright (c) 2018 Lars Kruse devel@sumpfralle.de

License

Permission to use, copy, and modify this software with or without fee is hereby granted, provided that this entire notice is included in all source code copies of any software which is or includes a copy or modification of this software.

THIS SOFTWARE IS BEING PROVIDED “AS IS”, WITHOUT ANY EXPRESS OR IMPLIED WARRANTY. IN PARTICULAR, NONE OF THE AUTHORS MAKES ANY REPRESENTATION OR WARRANTY OF ANY KIND CONCERNING THE MERCHANTABILITY OF THIS SOFTWARE OR ITS FITNESS FOR ANY PARTICULAR PURPOSE.

Magic Markers

#%# family=contrib
#!/usr/bin/env python3

"""
=head1 NAME

arris - MUNIN Plugin to monitor status of Arris TG3442 / TG2492LG-85
        and compatible cable modems

=head1 DESCRIPTION
Connect to the web-frontend and get current DOCSIS status of upstream and
downstream channels. (Signal Power, SNR, Lock Status)


=head1 REQUIREMENTS

=over 4

=item BeautifulSoup

=item pycryptodome

=back


=head1 CONFIGURATION

=head2 Example

 [arris]
 env.url http://192.168.100.1
 env.username admin
 env.password yourpassword


=head2 Parameters

url      - URL to web-frontend
username - defaults to "admin"
password - valid password


=head1 REFERENCES
https://www.arris.com/products/touchstone-tg3442-cable-voice-gateway/


=head1 AUTHOR

Copyright (c) 2019 Daniel Hiepler <d-munin@coderdu.de>

Copyright (c) 2004-2009 Nicolas Stransky <Nico@stransky.cx>

Copyright (c) 2018 Lars Kruse <devel@sumpfralle.de>


=head1 LICENSE

Permission to use, copy, and modify this software with or without fee
is hereby granted, provided that this entire notice is included in
all source code copies of any software which is or includes a copy or
modification of this software.

THIS SOFTWARE IS BEING PROVIDED "AS IS", WITHOUT ANY EXPRESS OR
IMPLIED WARRANTY. IN PARTICULAR, NONE OF THE AUTHORS MAKES ANY
REPRESENTATION OR WARRANTY OF ANY KIND CONCERNING THE
MERCHANTABILITY OF THIS SOFTWARE OR ITS FITNESS FOR ANY PARTICULAR
PURPOSE.


=head1 MAGIC MARKERS

 #%# family=contrib

=cut
"""

import binascii
from bs4 import BeautifulSoup
from Crypto.Cipher import AES
import hashlib
import json
import re
import requests
import sys
import os


"""
The CREDENTIAL_COOKIE below equals the following:
base64.encodebytes(b'{ "unique":"280oaPSLiF", "family":"852", "modelname":"TG2492LG-85", '
                   '"name":"technician", "tech":true, "moca":0, "wifi":5, "conType":"WAN", '
                   '"gwWan":"f", "DefPasswdChanged":"YES" }').decode()
"""
CREDENTIAL_COOKIE = "eyAidW5pcXVlIjoiMjgwb2FQU0xpRiIsICJmYW1pbHkiOiI4NTIiLCAibW9kZWxuYW1lIjoiVEcy"\
    "NDkyTEctODUiLCAibmFtZSI6InRlY2huaWNpYW4iLCAidGVjaCI6dHJ1ZSwgIm1vY2EiOjAsICJ3"\
    "aWZpIjo1LCAiY29uVHlwZSI6IldBTiIsICJnd1dhbiI6ImYiLCAiRGVmUGFzc3dkQ2hhbmdlZCI6"\
    "IllFUyIgfQ=="


def login(session, url, username, password):
    """login to """
    # get login page
    r = session.get(f"{url}")
    # parse HTML
    h = BeautifulSoup(r.text, "lxml")
    # get session id from javascript in head
    current_session_id = re.search(r".*var currentSessionId = '(.+)';.*", h.head.text)[1]

    # encrypt password
    salt = os.urandom(8)
    iv = os.urandom(8)
    key = hashlib.pbkdf2_hmac(
        'sha256',
        bytes(password.encode("ascii")),
        salt,
        iterations=1000,
        dklen=128 / 8
    )
    secret = {"Password": password, "Nonce": current_session_id}
    plaintext = bytes(json.dumps(secret).encode("ascii"))
    associated_data = "loginPassword"
    # initialize cipher
    cipher = AES.new(key, AES.MODE_CCM, iv)
    # set associated data
    cipher.update(bytes(associated_data.encode("ascii")))
    # encrypt plaintext
    encrypt_data = cipher.encrypt(plaintext)
    # append digest
    encrypt_data += cipher.digest()
    # return
    login_data = {
        'EncryptData': binascii.hexlify(encrypt_data).decode("ascii"),
        'Name': username,
        'Salt': binascii.hexlify(salt).decode("ascii"),
        'Iv': binascii.hexlify(iv).decode("ascii"),
        'AuthData': associated_data
    }

    # login
    r = session.put(
        f"{url}/php/ajaxSet_Password.php",
        headers={
            "Content-Type": "application/json",
            "csrfNonce": "undefined"
        },
        data=json.dumps(login_data)
    )

    # parse result
    result = json.loads(r.text)
    # success?
    if result['p_status'] == "Fail":
        print("login failure", file=sys.stderr)
        exit(-1)
    # remember CSRF nonce
    csrf_nonce = result['nonce']

    # prepare headers
    session.headers.update({
        "X-Requested-With": "XMLHttpRequest",
        "csrfNonce": csrf_nonce,
        "Origin": f"{url}/",
        "Referer": f"{url}/"
    })
    # set credentials cookie
    session.cookies.set("credential", CREDENTIAL_COOKIE)

    # set session
    r = session.post(f"{url}/php/ajaxSet_Session.php")


def docsis_status(session):
    """get current DOCSIS status page, parse and return channel data"""
    r = session.get(f"{url}/php/status_docsis_data.php")
    # extract json from javascript
    json_downstream_data = re.search(r".*json_dsData = (.+);.*", r.text)[1]
    json_upstream_data = re.search(r".*json_usData = (.+);.*", r.text)[1]
    # parse json
    downstream_data = json.loads(json_downstream_data)
    upstream_data = json.loads(json_upstream_data)
    # convert lock status to numeric values
    for d in [upstream_data, downstream_data]:
        for c in d:
            if c['LockStatus'] == "ACTIVE" or c['LockStatus'] == "Locked":
                c['LockStatus'] = 1
            else:
                c['LockStatus'] = 0
    return downstream_data, upstream_data


# -----------------------------------------------------------------------------
if __name__ == "__main__":
    # get config
    url = os.getenv("url")
    username = os.getenv("username")
    password = os.getenv("password")
    # validate config
    if not url or not username or not password:
        print("Set url, username and password first.", file=sys.stderr)
        exit(1)
    # create session
    session = requests.Session()
    # login with username and password
    login(session, url, username, password)
    # get DOCSIS status
    downstream, upstream = docsis_status(session)
    # prepare munin graph info
    graph_descriptions = [
        {
            "name": "up_signal",
            "title": "DOCSIS Upstream signal strength",
            "vlabel": "dBmV",
            "info": "DOCSIS upstream signal strength by channel",
            "data": upstream,
            "key": "PowerLevel"
        },
        {
            "name": "up_lock",
            "title": "DOCSIS Upstream lock",
            "vlabel": "locked",
            "info": "DOCSIS upstream channel lock status",
            "data": upstream,
            "key": "LockStatus"
        },
        {
            "name": "down_signal",
            "title": "DOCSIS Downstream signal strength",
            "vlabel": "dBmV",
            "info": "DOCSIS downstream signal strength by channel",
            "data": downstream,
            "key": "PowerLevel"
        },
        {
            "name": "down_lock",
            "title": "DOCSIS Downstream lock",
            "vlabel": "locked",
            "info": "DOCSIS downstream channel lock status",
            "data": downstream,
            "key": "LockStatus"
        },
        {
            "name": "down_snr",
            "title": "DOCSIS Downstream signal/noise ratio",
            "vlabel": "dB",
            "info": "SNR/MER",
            "data": downstream,
            "key": "SNRLevel"
        }
    ]

    # configure ?
    if len(sys.argv) > 1 and "config" == sys.argv[1]:
        # process all graphs
        for g in graph_descriptions:
            # graph config
            print(f"multigraph docsis_{g['name']}")
            print(f"graph_title {g['title']}")
            print("graph_category network")
            print(f"graph_vlabel {g['vlabel']}")
            print(f"graph_info {g['info']}")
            print("graph_scale no")

            # channels
            for c in g['data']:
                # only use channels with PowerLevel
                if not c['PowerLevel']:
                    continue
                info_text = f"Channel type: {c['ChannelType']}, Modulation: {c['Modulation']}"
                print(f"channel_{c['ChannelID']}.label {c['ChannelID']} ({c['Frequency']} MHz)")
                print(f"channel_{c['ChannelID']}.info {info_text}")

    # output values ?
    else:
        # process all graphs
        for g in graph_descriptions:
            print(f"multigraph docsis_{g['name']}")
            # channels
            for c in g['data']:
                # only use channels with PowerLevel
                if not c['PowerLevel']:
                    continue
                print(f"channel_{c['ChannelID']}.value {c[g['key']]}")