- Repository
- Munin (contrib)
- Last change
- 2020-10-24
- Graph Categories
- Family
- contrib
- Capabilities
- Keywords
- Language
- Python (3.x)
- Authors
arris-tg3442
Name
arris - MUNIN Plugin to monitor status of Arris TG3442 / TG2492LG-85 and compatible cable modems
Description
Connect to the web-frontend and get current DOCSIS status of upstream and downstream channels. (Signal Power, SNR, Lock Status)
Requirements
- BeautifulSoup
- pycryptodome
Configuration
Example
[arris]
env.url http://192.168.100.1
env.username admin
env.password yourpassword
Parameters
url - URL to web-frontend username - defaults to “admin” password - valid password
References
https://www.arris.com/products/touchstone-tg3442-cable-voice-gateway/
Author
Copyright (c) 2019 Daniel Hiepler d-munin@coderdu.de
Copyright (c) 2004-2009 Nicolas Stransky Nico@stransky.cx
Copyright (c) 2018 Lars Kruse devel@sumpfralle.de
License
Permission to use, copy, and modify this software with or without fee is hereby granted, provided that this entire notice is included in all source code copies of any software which is or includes a copy or modification of this software.
THIS SOFTWARE IS BEING PROVIDED “AS IS”, WITHOUT ANY EXPRESS OR IMPLIED WARRANTY. IN PARTICULAR, NONE OF THE AUTHORS MAKES ANY REPRESENTATION OR WARRANTY OF ANY KIND CONCERNING THE MERCHANTABILITY OF THIS SOFTWARE OR ITS FITNESS FOR ANY PARTICULAR PURPOSE.
Magic Markers
#%# family=contrib
#!/usr/bin/env python3
"""
=head1 NAME
arris - MUNIN Plugin to monitor status of Arris TG3442 / TG2492LG-85
and compatible cable modems
=head1 DESCRIPTION
Connect to the web-frontend and get current DOCSIS status of upstream and
downstream channels. (Signal Power, SNR, Lock Status)
=head1 REQUIREMENTS
=over 4
=item BeautifulSoup
=item pycryptodome
=back
=head1 CONFIGURATION
=head2 Example
[arris]
env.url http://192.168.100.1
env.username admin
env.password yourpassword
=head2 Parameters
url - URL to web-frontend
username - defaults to "admin"
password - valid password
=head1 REFERENCES
https://www.arris.com/products/touchstone-tg3442-cable-voice-gateway/
=head1 AUTHOR
Copyright (c) 2019 Daniel Hiepler <d-munin@coderdu.de>
Copyright (c) 2004-2009 Nicolas Stransky <Nico@stransky.cx>
Copyright (c) 2018 Lars Kruse <devel@sumpfralle.de>
=head1 LICENSE
Permission to use, copy, and modify this software with or without fee
is hereby granted, provided that this entire notice is included in
all source code copies of any software which is or includes a copy or
modification of this software.
THIS SOFTWARE IS BEING PROVIDED "AS IS", WITHOUT ANY EXPRESS OR
IMPLIED WARRANTY. IN PARTICULAR, NONE OF THE AUTHORS MAKES ANY
REPRESENTATION OR WARRANTY OF ANY KIND CONCERNING THE
MERCHANTABILITY OF THIS SOFTWARE OR ITS FITNESS FOR ANY PARTICULAR
PURPOSE.
=head1 MAGIC MARKERS
#%# family=contrib
=cut
"""
import binascii
from bs4 import BeautifulSoup
from Crypto.Cipher import AES
import hashlib
import json
import re
import requests
import sys
import os
"""
The CREDENTIAL_COOKIE below equals the following:
base64.encodebytes(b'{ "unique":"280oaPSLiF", "family":"852", "modelname":"TG2492LG-85", '
'"name":"technician", "tech":true, "moca":0, "wifi":5, "conType":"WAN", '
'"gwWan":"f", "DefPasswdChanged":"YES" }').decode()
"""
CREDENTIAL_COOKIE = "eyAidW5pcXVlIjoiMjgwb2FQU0xpRiIsICJmYW1pbHkiOiI4NTIiLCAibW9kZWxuYW1lIjoiVEcy"\
"NDkyTEctODUiLCAibmFtZSI6InRlY2huaWNpYW4iLCAidGVjaCI6dHJ1ZSwgIm1vY2EiOjAsICJ3"\
"aWZpIjo1LCAiY29uVHlwZSI6IldBTiIsICJnd1dhbiI6ImYiLCAiRGVmUGFzc3dkQ2hhbmdlZCI6"\
"IllFUyIgfQ=="
def login(session, url, username, password):
"""login to """
# get login page
r = session.get(f"{url}")
# parse HTML
h = BeautifulSoup(r.text, "lxml")
# get session id from javascript in head
current_session_id = re.search(r".*var currentSessionId = '(.+)';.*", h.head.text)[1]
# encrypt password
salt = os.urandom(8)
iv = os.urandom(8)
key = hashlib.pbkdf2_hmac(
'sha256',
bytes(password.encode("ascii")),
salt,
iterations=1000,
dklen=128 / 8
)
secret = {"Password": password, "Nonce": current_session_id}
plaintext = bytes(json.dumps(secret).encode("ascii"))
associated_data = "loginPassword"
# initialize cipher
cipher = AES.new(key, AES.MODE_CCM, iv)
# set associated data
cipher.update(bytes(associated_data.encode("ascii")))
# encrypt plaintext
encrypt_data = cipher.encrypt(plaintext)
# append digest
encrypt_data += cipher.digest()
# return
login_data = {
'EncryptData': binascii.hexlify(encrypt_data).decode("ascii"),
'Name': username,
'Salt': binascii.hexlify(salt).decode("ascii"),
'Iv': binascii.hexlify(iv).decode("ascii"),
'AuthData': associated_data
}
# login
r = session.put(
f"{url}/php/ajaxSet_Password.php",
headers={
"Content-Type": "application/json",
"csrfNonce": "undefined"
},
data=json.dumps(login_data)
)
# parse result
result = json.loads(r.text)
# success?
if result['p_status'] == "Fail":
print("login failure", file=sys.stderr)
exit(-1)
# remember CSRF nonce
csrf_nonce = result['nonce']
# prepare headers
session.headers.update({
"X-Requested-With": "XMLHttpRequest",
"csrfNonce": csrf_nonce,
"Origin": f"{url}/",
"Referer": f"{url}/"
})
# set credentials cookie
session.cookies.set("credential", CREDENTIAL_COOKIE)
# set session
r = session.post(f"{url}/php/ajaxSet_Session.php")
def docsis_status(session):
"""get current DOCSIS status page, parse and return channel data"""
r = session.get(f"{url}/php/status_docsis_data.php")
# extract json from javascript
json_downstream_data = re.search(r".*json_dsData = (.+);.*", r.text)[1]
json_upstream_data = re.search(r".*json_usData = (.+);.*", r.text)[1]
# parse json
downstream_data = json.loads(json_downstream_data)
upstream_data = json.loads(json_upstream_data)
# convert lock status to numeric values
for d in [upstream_data, downstream_data]:
for c in d:
if c['LockStatus'] == "ACTIVE" or c['LockStatus'] == "Locked":
c['LockStatus'] = 1
else:
c['LockStatus'] = 0
return downstream_data, upstream_data
# -----------------------------------------------------------------------------
if __name__ == "__main__":
# get config
url = os.getenv("url")
username = os.getenv("username")
password = os.getenv("password")
# validate config
if not url or not username or not password:
print("Set url, username and password first.", file=sys.stderr)
exit(1)
# create session
session = requests.Session()
# login with username and password
login(session, url, username, password)
# get DOCSIS status
downstream, upstream = docsis_status(session)
# prepare munin graph info
graph_descriptions = [
{
"name": "up_signal",
"title": "DOCSIS Upstream signal strength",
"vlabel": "dBmV",
"info": "DOCSIS upstream signal strength by channel",
"data": upstream,
"key": "PowerLevel"
},
{
"name": "up_lock",
"title": "DOCSIS Upstream lock",
"vlabel": "locked",
"info": "DOCSIS upstream channel lock status",
"data": upstream,
"key": "LockStatus"
},
{
"name": "down_signal",
"title": "DOCSIS Downstream signal strength",
"vlabel": "dBmV",
"info": "DOCSIS downstream signal strength by channel",
"data": downstream,
"key": "PowerLevel"
},
{
"name": "down_lock",
"title": "DOCSIS Downstream lock",
"vlabel": "locked",
"info": "DOCSIS downstream channel lock status",
"data": downstream,
"key": "LockStatus"
},
{
"name": "down_snr",
"title": "DOCSIS Downstream signal/noise ratio",
"vlabel": "dB",
"info": "SNR/MER",
"data": downstream,
"key": "SNRLevel"
}
]
# configure ?
if len(sys.argv) > 1 and "config" == sys.argv[1]:
# process all graphs
for g in graph_descriptions:
# graph config
print(f"multigraph docsis_{g['name']}")
print(f"graph_title {g['title']}")
print("graph_category network")
print(f"graph_vlabel {g['vlabel']}")
print(f"graph_info {g['info']}")
print("graph_scale no")
# channels
for c in g['data']:
# only use channels with PowerLevel
if not c['PowerLevel']:
continue
info_text = f"Channel type: {c['ChannelType']}, Modulation: {c['Modulation']}"
print(f"channel_{c['ChannelID']}.label {c['ChannelID']} ({c['Frequency']} MHz)")
print(f"channel_{c['ChannelID']}.info {info_text}")
# output values ?
else:
# process all graphs
for g in graph_descriptions:
print(f"multigraph docsis_{g['name']}")
# channels
for c in g['data']:
# only use channels with PowerLevel
if not c['PowerLevel']:
continue
print(f"channel_{c['ChannelID']}.value {c[g['key']]}")