Add network functions to API client

This commit is contained in:
2019-12-29 16:13:32 -05:00
parent d0b6bb4cc3
commit d2f27cc8fe
2 changed files with 470 additions and 105 deletions

View File

@ -23,12 +23,22 @@
import difflib
import colorama
import click
import requests
import cli_lib.ansiprint as ansiprint
#
# Cluster search functions
#
def get_request_uri(config, endpoint):
"""
Return the fully-formed URI for {endpoint}
"""
uri = '{}://{}{}{}'.format(
config['api_scheme'],
config['api_host'],
config['api_prefix'],
endpoint
)
return uri
def isValidMAC(macaddr):
allowed = re.compile(r"""
(
@ -56,7 +66,399 @@ def isValidIP(ipaddr):
return False
#
# Direct functions
# Primary functions
#
def net_info(config, net):
"""
Get information about network
API endpoint: GET /api/v1/network/{net}
API arguments:
API schema: {json_data_object}
"""
request_uri = get_request_uri(config, '/network/{net}'.format(net=net))
response = requests.get(
request_uri
)
if config['debug']:
print('API endpoint: POST {}'.format(request_uri))
print('Response code: {}'.format(response.status_code))
print('Response headers: {}'.format(response.headers))
if response.status_code == 200:
return True, response.json()
else:
return False, response.json()['message']
def net_list(config, limit):
"""
Get list information about networks (limited by {limit})
API endpoint: GET /api/v1/network
API arguments: limit={limit}
API schema: [{json_data_object},{json_data_object},etc.]
"""
params = dict()
if limit:
params['limit'] = limit
request_uri = get_request_uri(config, '/network')
response = requests.get(
request_uri,
params=params
)
if config['debug']:
print('API endpoint: POST {}'.format(request_uri))
print('Response code: {}'.format(response.status_code))
print('Response headers: {}'.format(response.headers))
if response.status_code == 200:
return True, response.json()
else:
return False, response.json()['message']
def net_add(config, vni, description, nettype, domain, name_servers, ip4_network, ip4_gateway, ip6_network, ip6_gateway, dhcp4_flag, dhcp4_start, dhcp4_end):
"""
Add new network
API endpoint: POST /api/v1/network
API arguments: lots
API schema: {"message":"{data}"}
"""
request_uri = get_request_uri(config, '/network')
response = requests.post(
request_uri,
params={
'vni': vni,
'description': description,
'nettype': nettype,
'domain': domain,
'name_servers': name_servers,
'ip4_network': ip4_network,
'ip4_gateway': ip4_gateway,
'ip6_network': ip6_network,
'ip6_gateway': ip6_gateway,
'dhcp4': dhcp4_flag,
'dhcp4_start': dhcp4_start,
'dhcp4_end': dhcp4_end
}
)
if config['debug']:
print('API endpoint: POST {}'.format(request_uri))
print('Response code: {}'.format(response.status_code))
print('Response headers: {}'.format(response.headers))
if response.status_code == 200:
retstatus = True
else:
retstatus = False
return retstatus, response.json()['message']
def net_modify(config, net, description, domain, name_servers, ip4_network, ip4_gateway, ip6_network, ip6_gateway, dhcp4_flag, dhcp4_start, dhcp4_end):
"""
Modify a network
API endpoint: POST /api/v1/network/{net}
API arguments: lots
API schema: {"message":"{data}"}
"""
request_uri = get_request_uri(config, '/network/{net}'.format(net=net))
params = dict()
if description is not None:
params['description'] = description
if domain is not None:
params['domain'] = domain
if name_servers is not None:
params['name_servers'] = name_servers
if ip4_network is not None:
params['ip4_network'] = ip4_network
if ip4_gateway is not None:
params['ip4_gateway'] = ip4_gateway
if ip6_network is not None:
params['ip6_network'] = ip6_network
if ip6_gateway is not None:
params['ip6_gateway'] = ip6_gateway
if dhcp4_flag is not None:
params['dhcp4_flag'] = dhcp4_flag
if dhcp4_start is not None:
params['dhcp4_start'] = dhcp4_start
if dhcp4_end is not None:
params['dhcp4_end'] = dhcp4_end
response = requests.put(
request_uri,
params=params
)
if config['debug']:
print('API endpoint: POST {}'.format(request_uri))
print('Response code: {}'.format(response.status_code))
print('Response headers: {}'.format(response.headers))
if response.status_code == 200:
retstatus = True
else:
retstatus = False
return retstatus, response.json()['message']
def net_remove(config, net):
"""
Remove a network
API endpoint: DELETE /api/v1/network/{net}
API arguments:
API schema: {"message":"{data}"}
"""
request_uri = get_request_uri(config, '/network/{net}'.format(net=net))
response = requests.delete(
request_uri
)
if config['debug']:
print('API endpoint: DELETE {}'.format(request_uri))
print('Response code: {}'.format(response.status_code))
print('Response headers: {}'.format(response.headers))
if response.status_code == 200:
retstatus = True
else:
retstatus = False
return retstatus, response.json()['message']
#
# DHCP lease functions
#
def net_dhcp_info(config, net, mac):
"""A
Get information about network DHCP lease
API endpoint: GET /api/v1/network/{net}/lease/{mac}
API arguments:
API schema: {json_data_object}
"""
request_uri = get_request_uri(config, '/network/{net}/lease/{mac}'.format(net=net, mac=mac))
response = requests.get(
request_uri
)
if config['debug']:
print('API endpoint: POST {}'.format(request_uri))
print('Response code: {}'.format(response.status_code))
print('Response headers: {}'.format(response.headers))
if response.status_code == 200:
return True, response.json()
else:
return False, response.json()['message']
def net_dhcp_list(config, net, limit, only_static=False):
"""
Get list information about leases (limited by {limit})
API endpoint: GET /api/v1/network/{net}/lease
API arguments: limit={limit}, static={only_static}
API schema: [{json_data_object},{json_data_object},etc.]
"""
params = dict()
if limit:
params['limit'] = limit
if only_static:
params['static'] = True
request_uri = get_request_uri(config, '/network/{net}/lease'.format(net=net))
response = requests.get(
request_uri,
params=params
)
if config['debug']:
print('API endpoint: POST {}'.format(request_uri))
print('Response code: {}'.format(response.status_code))
print('Response headers: {}'.format(response.headers))
if response.status_code == 200:
return True, response.json()
else:
return False, response.json()['message']
def net_dhcp_add(config, net, ipaddr, macaddr, hostname):
"""
Add new network DHCP lease
API endpoint: POST /api/v1/network/{net}/lease
API arguments: macaddress=macaddr, ipaddress=ipaddr, hostname=hostname
API schema: {"message":"{data}"}
"""
request_uri = get_request_uri(config, '/network/{net}/lease'.format(net=net))
response = requests.post(
request_uri,
params={
'macaddress': macaddr,
'ipaddress': ipaddr,
'hostname': hostname
}
)
if config['debug']:
print('API endpoint: POST {}'.format(request_uri))
print('Response code: {}'.format(response.status_code))
print('Response headers: {}'.format(response.headers))
if response.status_code == 200:
retstatus = True
else:
retstatus = False
return retstatus, response.json()['message']
def net_dhcp_remove(config, net, mac):
"""
Remove a network DHCP lease
API endpoint: DELETE /api/v1/network/{vni}/lease/{mac}
API arguments:
API schema: {"message":"{data}"}
"""
request_uri = get_request_uri(config, '/network/{net}/lease/{mac}'.format(net=net, mac=mac))
response = requests.delete(
request_uri
)
if config['debug']:
print('API endpoint: DELETE {}'.format(request_uri))
print('Response code: {}'.format(response.status_code))
print('Response headers: {}'.format(response.headers))
if response.status_code == 200:
retstatus = True
else:
retstatus = False
return retstatus, response.json()['message']
#
# ACL functions
#
def net_acl_info(config, net, description):
"""
Get information about network ACL
API endpoint: GET /api/v1/network/{net}/acl/{description}
API arguments:
API schema: {json_data_object}
"""
request_uri = get_request_uri(config, '/network/{net}/acl/{description}'.format(net=net, description=description))
response = requests.get(
request_uri
)
if config['debug']:
print('API endpoint: POST {}'.format(request_uri))
print('Response code: {}'.format(response.status_code))
print('Response headers: {}'.format(response.headers))
if response.status_code == 200:
return True, response.json()
else:
return False, response.json()['message']
def net_acl_list(config, net, limit, direction):
"""
Get list information about ACLs (limited by {limit})
API endpoint: GET /api/v1/network/{net}/acl
API arguments: limit={limit}, direction={direction}
API schema: [{json_data_object},{json_data_object},etc.]
"""
params = dict()
if limit:
params['limit'] = limit
if direction is not None:
params['direction'] = direction
request_uri = get_request_uri(config, '/network/{net}/acl'.format(net=net))
response = requests.get(
request_uri,
params=params
)
if config['debug']:
print('API endpoint: POST {}'.format(request_uri))
print('Response code: {}'.format(response.status_code))
print('Response headers: {}'.format(response.headers))
if response.status_code == 200:
return True, response.json()
else:
return False, response.json()['message']
def net_acl_add(config, net, direction, description, rule, order):
"""
Add new network acl
API endpoint: POST /api/v1/network/{net}/acl
API arguments: description=description, direction=direction, order=order, rule=rule
API schema: {"message":"{data}"}
"""
params = dict()
params['description'] = description
params['direction'] = direction
params['rule'] = rule
if order is not None:
params['order'] = order
request_uri = get_request_uri(config, '/network/{net}/acl'.format(net=net))
response = requests.post(
request_uri,
params=params
)
if config['debug']:
print('API endpoint: POST {}'.format(request_uri))
print('Response code: {}'.format(response.status_code))
print('Response headers: {}'.format(response.headers))
if response.status_code == 200:
retstatus = True
else:
retstatus = False
return retstatus, response.json()['message']
def net_acl_remove(config, net, description):
"""
Remove a network ACL
API endpoint: DELETE /api/v1/network/{vni}/acl/{description}
API arguments:
API schema: {"message":"{data}"}
"""
request_uri = get_request_uri(config, '/network/{net}/acl/{description}'.format(net=net, description=description))
response = requests.delete(
request_uri
)
if config['debug']:
print('API endpoint: DELETE {}'.format(request_uri))
print('Response code: {}'.format(response.status_code))
print('Response headers: {}'.format(response.headers))
if response.status_code == 200:
retstatus = True
else:
retstatus = False
return retstatus, response.json()['message']
#
# Output display functions
#
def getOutputColours(network_information):
if network_information['ip6']['network'] != "None":
@ -79,7 +481,7 @@ def getOutputColours(network_information):
return v6_flag_colour, v4_flag_colour, dhcp6_flag_colour, dhcp4_flag_colour
def format_info(network_information, long_output):
def format_info(config, network_information, long_output):
if not network_information:
click.echo("No network found")
return
@ -131,7 +533,7 @@ def format_info(network_information, long_output):
# Join it all together
click.echo('\n'.join(ainformation))
def format_list(network_list):
def format_list(config, network_list):
if not network_list:
click.echo("No network found")
return
@ -314,6 +716,10 @@ def format_list_dhcp(dhcp_lease_list):
click.echo('\n'.join(sorted(dhcp_lease_list_output)))
def format_list_acl(acl_list):
# Handle when we get a single entry
if isinstance(acl_list, dict):
acl_list = [ acl_list ]
acl_list_output = []
# Determine optimal column widths
@ -376,4 +782,3 @@ def format_list_acl(acl_list):
)
click.echo('\n'.join(sorted(acl_list_output)))