Update for API usage

This commit is contained in:
Benjamin Collet 2025-04-16 16:54:27 +02:00
parent f125c39ba7
commit 014df3493a
Signed by: bcollet
SSH key fingerprint: SHA256:8UJspOIcCOS+MtSOcnuq2HjKFube4ox1s/+A62ixov4
2 changed files with 72 additions and 65 deletions

View file

@ -1,8 +1,6 @@
#base_url: 'http://sandbox.alt.tf/whois/glanet'
base_url: 'https://rest.db.ripe.net/ripe'
# https://apps.db.ripe.net/db-web-ui/api-keys
username: 'username'
password: 'password'
params:
password:
- 'mnt1-password'
- 'mnt2-password'
- 'mnt3-password'
unfiltered: True # Needed to get all attributes

View file

@ -9,32 +9,40 @@ import os
import hashlib
from subprocess import call
config = os.path.join(os.path.dirname(os.path.realpath(__file__)),"config.yml")
config = os.path.join(os.path.dirname(os.path.realpath(__file__)), "config.yml")
# Read configuration
with open(config, 'r') as ymlfile:
with open(config, "r") as ymlfile:
cfg = yaml.load(ymlfile, Loader=yaml.FullLoader)
# Functions
def call_api(method, url, object_data=None):
headers = { 'Content-Type': 'application/json',
'Accept': 'application/json' }
response = requests.request(method, url, json=object_data, headers=headers,
params=cfg['params'])
headers = {"Content-Type": "application/json", "Accept": "application/json"}
auth_basic = requests.auth.HTTPBasicAuth(cfg["username"], cfg["password"])
response = requests.request(
method,
url,
json=object_data,
headers=headers,
params=cfg["params"],
auth=auth_basic,
)
if not response.text:
print("No response received (error %i)" % response.status_code)
sys.exit(1)
json_response=json.loads(response.text)
json_response = json.loads(response.text)
if "errormessages" in json_response:
for err in json_response['errormessages']['errormessage']:
if 'args' in err:
print(err['text'].replace('\n', ' ').replace('\r', '') \
% tuple([d['value'] for d in err['args']]))
for err in json_response["errormessages"]["errormessage"]:
if "args" in err:
print(
err["text"].replace("\n", " ").replace("\r", "")
% tuple([d["value"] for d in err["args"]])
)
else:
print(err['text'])
print(err["text"])
return json_response
@ -42,9 +50,8 @@ def call_api(method, url, object_data=None):
# Print formatted output from JSON
def print_output(json, object_file):
if "objects" in json:
for attr in json['objects']['object'][0]['attributes']['attribute']:
object_file.write("%-20s %s\n" % (attr['name']+':',
attr['value']))
for attr in json["objects"]["object"][0]["attributes"]["attribute"]:
object_file.write("%-20s %s\n" % (attr["name"] + ":", attr["value"]))
if object_file:
object_file.close()
@ -52,29 +59,24 @@ def print_output(json, object_file):
def read_input(object_file):
attr = []
for line in object_file.readlines():
if not line: continue
attr.append({'name':line.split(':', 1)[0].strip(),
'value':line.split(':', 1)[1].strip() })
if not line:
continue
attr.append(
{
"name": line.split(":", 1)[0].strip(),
"value": line.split(":", 1)[1].strip(),
}
)
object_data = {
"objects": {
"object": [
{
"attributes": {
"attribute": attr
}
}
]
}
}
object_data = {"objects": {"object": [{"attributes": {"attribute": attr}}]}}
return object_data
# Get record
def get(args):
print("Getting %s object %s" % (args.type, args.key))
url = '/'.join((cfg['base_url'],args.type,args.key))
json_response=call_api("GET", url)
url = "/".join((cfg["base_url"], args.type, args.key))
json_response = call_api("GET", url)
print_output(json_response, args.file)
return
@ -82,17 +84,17 @@ def get(args):
# Delete record
def delete(args):
print("Deleting %s object %s" % (args.type, args.key))
url = '/'.join((cfg['base_url'],args.type,args.key))
json_response=call_api("DELETE", url)
url = "/".join((cfg["base_url"], args.type, args.key))
json_response = call_api("DELETE", url)
return
# Create record
def create(args):
print("Creating %s object" % (args.type))
url = '/'.join((cfg['base_url'],args.type))
url = "/".join((cfg["base_url"], args.type))
object_data = read_input(args.file)
json_response=call_api("POST", url, object_data)
json_response = call_api("POST", url, object_data)
print_output(json_response, sys.stdout)
return
@ -100,9 +102,9 @@ def create(args):
# Update record
def update(args):
print("Updating %s object %s" % (args.type, args.key))
url = '/'.join((cfg['base_url'],args.type,args.key))
url = "/".join((cfg["base_url"], args.type, args.key))
object_data = read_input(args.file)
json_response=call_api("PUT", url, object_data)
json_response = call_api("PUT", url, object_data)
print_output(json_response, sys.stdout)
return
@ -111,11 +113,11 @@ def update(args):
def edit(args):
tmp_fd, tmp_name = tempfile.mkstemp()
get(parser.parse_args(["get", args.type, args.key, tmp_name]))
EDITOR = os.environ.get('EDITOR','vim')
EDITOR = os.environ.get("EDITOR", "vim")
# Memory inefficient, but who cares?
hash1 = hashlib.md5(open(tmp_name).read().encode('utf-8')).hexdigest()
hash1 = hashlib.md5(open(tmp_name).read().encode("utf-8")).hexdigest()
call([EDITOR, tmp_name])
hash2 = hashlib.md5(open(tmp_name).read().encode('utf-8')).hexdigest()
hash2 = hashlib.md5(open(tmp_name).read().encode("utf-8")).hexdigest()
if hash1 != hash2:
update(parser.parse_args(["update", args.type, args.key, tmp_name]))
@ -125,7 +127,8 @@ def edit(args):
os.unlink(tmp_name)
return
if __name__ == '__main__':
if __name__ == "__main__":
# Arguments parsing
# USAGE : ./ripe-api.py delete <TYPE> <KEY>
@ -133,30 +136,36 @@ if __name__ == '__main__':
# ./ripe-api.py update <TYPE> <KEY> <FILE>
# ./ripe-api.py get <TYPE> <KEY> <FILE>
# ./ripe-api.py edit <TYPE> <KEY>
parser = argparse.ArgumentParser(description='RIPE API client')
subparsers = parser.add_subparsers(help='Action to perform',dest='action',required=True)
parser = argparse.ArgumentParser(description="RIPE API client")
subparsers = parser.add_subparsers(
help="Action to perform", dest="action", required=True
)
parser_get = subparsers.add_parser('get', help='Get an object')
parser_get.add_argument('type', type=str, help='Object type')
parser_get.add_argument('key', type=str, help='Object identifier')
parser_get.add_argument('file', type=argparse.FileType('w'), help='Output file')
parser_get = subparsers.add_parser("get", help="Get an object")
parser_get.add_argument("type", type=str, help="Object type")
parser_get.add_argument("key", type=str, help="Object identifier")
parser_get.add_argument(
"file", type=argparse.FileType("w"), help="Output file", nargs="?", default="-"
)
parser_delete = subparsers.add_parser('delete', help='Delete an object')
parser_delete.add_argument('type', type=str, help='Object type')
parser_delete.add_argument('key', type=str, help='Object identifier')
parser_delete = subparsers.add_parser("delete", help="Delete an object")
parser_delete.add_argument("type", type=str, help="Object type")
parser_delete.add_argument("key", type=str, help="Object identifier")
parser_post = subparsers.add_parser('create', help='Create an object')
parser_post.add_argument('type', type=str, help='Object type')
parser_post.add_argument('file', type=argparse.FileType('r'), help='Input file')
parser_post = subparsers.add_parser("create", help="Create an object")
parser_post.add_argument("type", type=str, help="Object type")
parser_post.add_argument(
"file", type=argparse.FileType("r"), help="Input file", nargs="?", default="-"
)
parser_put = subparsers.add_parser('update', help='Update an object')
parser_put.add_argument('type', type=str, help='Object type')
parser_put.add_argument('key', type=str, help='Object identifier')
parser_put.add_argument('file', type=argparse.FileType('r'), help='Input file')
parser_put = subparsers.add_parser("update", help="Update an object")
parser_put.add_argument("type", type=str, help="Object type")
parser_put.add_argument("key", type=str, help="Object identifier")
parser_put.add_argument("file", type=argparse.FileType("r"), help="Input file")
parser_edit = subparsers.add_parser('edit', help='Edit an object')
parser_edit.add_argument('type', type=str, help='Object type')
parser_edit.add_argument('key', type=str, help='Object identifier')
parser_edit = subparsers.add_parser("edit", help="Edit an object")
parser_edit.add_argument("type", type=str, help="Object type")
parser_edit.add_argument("key", type=str, help="Object identifier")
args = parser.parse_args()
globals()[args.action](args)