observium_api_client/client.py

102 lines
4 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
2017-11-13 10:20:29 +00:00
import requests
import yaml
import argparse
2018-12-10 14:12:07 +00:00
import os
import shutil
2017-11-13 10:20:29 +00:00
2018-12-10 14:12:07 +00:00
config = os.path.join(os.path.dirname(os.path.realpath(__file__)),"config.yml")
with open(config, "r") as ymlfile:
2017-11-13 10:20:29 +00:00
cfg = yaml.load(ymlfile)
term_cols = getattr(shutil.get_terminal_size((80, 20)), 'columns')
2017-11-13 20:17:35 +00:00
def call_api(params, path):
url = '/'.join((params['base_url'],params['api_path'],path))
try:
response = requests.get(url, auth=(params['username'],
params['password']))
2017-11-13 20:17:35 +00:00
except:
return
2017-11-14 09:51:58 +00:00
if response.status_code is not 200:
return
2017-11-13 10:20:29 +00:00
return response.json()
def print_data(key, value, suffix = None):
if suffix is not None:
2019-01-17 16:39:24 +00:00
value_cols = term_cols - 26 - len(value)
print("(0x(B %-18s (0x(B %s %-*.*s (0x(B" %
(key, value, value_cols, value_cols, suffix))
2017-11-13 10:20:29 +00:00
else:
2019-01-17 16:39:24 +00:00
value_cols = term_cols - 25
print("(0x(B %-18s (0x(B %-*.*s (0x(B" %
(key, value_cols, value_cols, value))
2017-11-13 10:20:29 +00:00
2017-11-13 20:17:35 +00:00
# Test instances
for instance in list(cfg['instances']):
2017-11-13 20:17:35 +00:00
test = call_api(cfg['instances'][instance], 'devices')
if test is None:
print("Instance %s is not working, disabling" % instance)
2017-11-13 20:17:35 +00:00
cfg['instances'].pop(instance)
else:
print("Instance %s is working" % instance)
2017-11-13 20:17:35 +00:00
2017-11-13 10:20:29 +00:00
def search_ports(args):
for instance, params in cfg['instances'].items():
2017-11-13 20:17:35 +00:00
data = call_api(params, 'ports/?ifAlias=%s' % args.string)
2017-11-13 10:20:29 +00:00
2017-11-13 20:17:35 +00:00
if data['count'] < 1:
print("No port found for description %s on instance %s" %
(args.string, instance))
2017-11-13 20:17:35 +00:00
return
2017-11-13 10:20:29 +00:00
for port in data['ports'].values():
2017-11-13 20:17:35 +00:00
device = call_api(params, 'devices/%s' % port['device_id'])
address = call_api(params, 'address/?device_id=%s&interface=%s' % (port['device_id'], port['port_label_short']))
address6 = call_api(params, 'address/?af=ipv6&device_id=%s&interface=%s' % (port['device_id'], port['port_label_short']))
if device['device']['disabled'] == "1": continue
if port['disabled'] == "1": continue
2019-01-16 10:43:54 +00:00
if port['deleted'] == "1": continue
print("(0l" + "q" * 20 + "w" + "q" * (term_cols - 23) + "k(B")
2017-11-13 20:17:35 +00:00
print_data("Device", device['device']['hostname'])
print_data("", "%s/device/device=%s" %
(params['base_url'], port['device_id']))
print_data("Hardware", device['device']['hardware'])
2017-11-13 20:17:35 +00:00
print_data("Port", port['port_label_short'])
print_data("", "%s/device/device=%s/tab=port/port=%s/" %
(params['base_url'], port['device_id'], port['port_id']))
print_data("Description", port['ifAlias'])
print_data("Port status", "%s (Admin) / %s (Oper)" %
(port['ifAdminStatus'], port['ifOperStatus']))
print_data("Last change", port['ifLastChange'])
2017-11-13 20:17:35 +00:00
print_data("Speed", port['ifHighSpeed'], "Mbps")
print_data("Duplex", port['ifDuplex'])
print_data("MTU", port['ifMtu'])
for ip in address['addresses']:
print_data("IP address", "%s/%s" % (ip['ipv4_address'],ip['ipv4_prefixlen']))
for ip6 in address6['addresses']:
print_data("IPv6 address", "%s/%s" % (ip6['ipv6_compressed'],ip6['ipv6_prefixlen']))
2017-11-13 20:17:35 +00:00
print_data("Input rate", port['ifInOctets_rate'], "octets")
print_data("Output rate", port['ifOutOctets_rate'], "octets")
print_data("Input errors rate", port['ifInErrors_rate'])
print_data("Output errors rate", port['ifOutErrors_rate'])
2017-11-13 10:20:29 +00:00
print("(0m" + "q" * 20 + "v" + "q" * (term_cols - 23) + "j(B")
2017-11-13 10:20:29 +00:00
# Argument parsing
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(help='Action to perform',dest='action')
parser_search_port_by_descr = subparsers.add_parser('search_ports', help="Search ports by description")
parser_search_port_by_descr.add_argument('string', type=str, help="String to search")
args = parser.parse_args()
globals()[args.action](args)