Lint: E302 expected 2 blank lines, found X

This commit is contained in:
2020-11-07 14:45:24 -05:00
parent e9643651f7
commit 260b39ebf2
36 changed files with 694 additions and 19 deletions

View File

@ -27,6 +27,7 @@ import psutil
import pvcnoded.zkhandler as zkhandler
import pvcnoded.common as common
class CephOSDInstance(object):
def __init__(self, zk_conn, this_node, osd_id):
self.zk_conn = zk_conn
@ -66,6 +67,7 @@ class CephOSDInstance(object):
if data and data != self.stats:
self.stats = json.loads(data)
def add_osd(zk_conn, logger, node, device, weight):
# We are ready to create a new OSD on this node
logger.out('Creating new OSD disk on block device {}'.format(device), state='i')
@ -188,6 +190,7 @@ def add_osd(zk_conn, logger, node, device, weight):
logger.out('Failed to create new OSD disk: {}'.format(e), state='e')
return False
def remove_osd(zk_conn, logger, osd_id, osd_obj):
logger.out('Removing OSD disk {}'.format(osd_id), state='i')
try:
@ -281,6 +284,7 @@ def remove_osd(zk_conn, logger, osd_id, osd_obj):
logger.out('Failed to purge OSD disk with ID {}: {}'.format(osd_id, e), state='e')
return False
class CephPoolInstance(object):
def __init__(self, zk_conn, this_node, name):
self.zk_conn = zk_conn
@ -319,6 +323,7 @@ class CephPoolInstance(object):
if data and data != self.stats:
self.stats = json.loads(data)
class CephVolumeInstance(object):
def __init__(self, zk_conn, this_node, pool, name):
self.zk_conn = zk_conn
@ -342,6 +347,7 @@ class CephVolumeInstance(object):
if data and data != self.stats:
self.stats = json.loads(data)
class CephSnapshotInstance(object):
def __init__(self, zk_conn, this_node, pool, volume, name):
self.zk_conn = zk_conn
@ -366,6 +372,7 @@ class CephSnapshotInstance(object):
if data and data != self.stats:
self.stats = json.loads(data)
# Primary command function
# This command pipe is only used for OSD adds and removes
def run_command(zk_conn, logger, this_node, data, d_osd):

View File

@ -29,6 +29,7 @@ from threading import Thread, Event
import pvcnoded.common as common
class DNSAggregatorInstance(object):
# Initialization function
def __init__(self, zk_conn, config, logger):
@ -65,6 +66,7 @@ class DNSAggregatorInstance(object):
del self.dns_networks[network]
self.dns_axfr_daemon.update_networks(self.dns_networks)
class PowerDNSInstance(object):
# Initialization function
def __init__(self, aggregator):
@ -138,6 +140,7 @@ class PowerDNSInstance(object):
state='o'
)
class DNSNetworkInstance(object):
# Initialization function
def __init__(self, aggregator, network):

View File

@ -74,6 +74,7 @@ version = '0.9.1'
# Daemon functions
###############################################################################
# Create timer to update this node in Zookeeper
def startKeepaliveTimer():
# Create our timer object
@ -85,6 +86,7 @@ def startKeepaliveTimer():
node_keepalive()
return update_timer
def stopKeepaliveTimer():
global update_timer
try:
@ -125,6 +127,7 @@ staticdata.append(subprocess.run(['uname', '-r'], stdout=subprocess.PIPE).stdout
staticdata.append(subprocess.run(['uname', '-o'], stdout=subprocess.PIPE).stdout.decode('ascii').strip())
staticdata.append(subprocess.run(['uname', '-m'], stdout=subprocess.PIPE).stdout.decode('ascii').strip())
# Read and parse the config file
def readConfig(pvcnoded_config_file, myhostname):
print('Loading configuration from file "{}"'.format(pvcnoded_config_file))
@ -512,6 +515,7 @@ except Exception as e:
logger.out('ERROR: Failed to connect to Zookeeper cluster: {}'.format(e), state='e')
exit(1)
# Handle zookeeper failures
def zk_listener(state):
global zk_conn, update_timer
@ -551,6 +555,7 @@ zk_conn.add_listener(zk_listener)
# PHASE 5 - Gracefully handle termination
###############################################################################
# Cleanup function
def cleanup():
global zk_conn, update_timer, d_domain
@ -615,10 +620,12 @@ def cleanup():
logger.out('Terminated pvc daemon', state='s')
sys.exit(0)
# Termination function
def term(signum='', frame=''):
cleanup()
# Hangup (logrotate) function
def hup(signum='', frame=''):
if config['file_logging']:
@ -796,6 +803,7 @@ else:
dns_aggregator = None
metadata_api = None
# Node objects
@zk_conn.ChildrenWatch('/nodes')
def update_nodes(new_node_list):
@ -824,6 +832,7 @@ def update_nodes(new_node_list):
# Alias for our local node (passed to network and domain objects)
this_node = d_node[myhostname]
# Maintenance mode
@zk_conn.DataWatch('/maintenance')
def set_maintenance(_maintenance, stat, event=''):
@ -833,6 +842,7 @@ def set_maintenance(_maintenance, stat, event=''):
except Exception:
maintenance = False
# Primary node
@zk_conn.DataWatch('/primary_node')
def update_primary(new_primary, stat, event=''):
@ -1023,6 +1033,7 @@ if enable_storage:
volume_list[pool] = new_volume_list
logger.out('{}Volume list [{pool}]:{} {plist}'.format(fmt_blue, fmt_end, pool=pool, plist=' '.join(volume_list[pool])), state='i')
###############################################################################
# PHASE 9 - Run the daemon
###############################################################################
@ -1311,6 +1322,7 @@ libvirt_vm_states = {
7: "PMSUSPENDED"
}
# VM stats update function
def collect_vm_stats(queue):
if debug:
@ -1454,6 +1466,7 @@ def collect_vm_stats(queue):
if debug:
logger.out("Thread finished", state='d', prefix='vm-thread')
# Keepalive update function
def node_keepalive():
if debug:

View File

@ -32,6 +32,7 @@ from psycopg2.extras import RealDictCursor
import daemon_lib.vm as pvc_vm
import daemon_lib.network as pvc_network
class MetadataAPIInstance(object):
mdapi = flask.Flask(__name__)

View File

@ -27,6 +27,7 @@ from threading import Thread
import pvcnoded.zkhandler as zkhandler
import pvcnoded.common as common
class NodeInstance(object):
# Initialization function
def __init__(self, name, this_node, zk_conn, config, logger, d_node, d_network, d_domain, dns_aggregator, metadata_api):

View File

@ -28,6 +28,7 @@ from collections import deque
import pvcnoded.zkhandler as zkhandler
class VMConsoleWatcherInstance(object):
# Initialization function
def __init__(self, domuuid, domname, zk_conn, config, logger, this_node):

View File

@ -34,6 +34,7 @@ import pvcnoded.VMConsoleWatcherInstance as VMConsoleWatcherInstance
import daemon_lib.common as daemon_common
def flush_locks(zk_conn, logger, dom_uuid):
logger.out('Flushing RBD locks for VM "{}"'.format(dom_uuid), state='i')
# Get the list of RBD images
@ -64,6 +65,7 @@ def flush_locks(zk_conn, logger, dom_uuid):
return True
# Primary command function
def run_command(zk_conn, logger, this_node, data):
# Get the command and args
@ -92,6 +94,7 @@ def run_command(zk_conn, logger, this_node, data):
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1)
class VMInstance(object):
# Initialization function
def __init__(self, domuuid, zk_conn, config, logger, this_node):

View File

@ -28,6 +28,7 @@ from textwrap import dedent
import pvcnoded.zkhandler as zkhandler
import pvcnoded.common as common
class VXNetworkInstance(object):
# Initialization function
def __init__(self, vni, zk_conn, config, logger, this_node, dns_aggregator):

View File

@ -28,6 +28,7 @@ from shlex import split as shlex_split
import pvcnoded.zkhandler as zkhandler
class OSDaemon(object):
def __init__(self, command_string, environment, logfile):
command = shlex_split(command_string)
@ -55,10 +56,12 @@ class OSDaemon(object):
}
self.proc.send_signal(signal_map[sent_signal])
def run_os_daemon(command_string, environment=None, logfile=None):
daemon = OSDaemon(command_string, environment, logfile)
return daemon
# Run a oneshot command, optionally without blocking
def run_os_command(command_string, background=False, environment=None, timeout=None):
command = shlex_split(command_string)
@ -100,6 +103,7 @@ def run_os_command(command_string, background=False, environment=None, timeout=N
stderr = ''
return retcode, stdout, stderr
# Reload the firewall rules of the system
def reload_firewall_rules(logger, rules_file):
logger.out('Reloading firewall configuration', state='o')
@ -107,6 +111,7 @@ def reload_firewall_rules(logger, rules_file):
if retcode != 0:
logger.out('Failed to reload configuration: {}'.format(stderr), state='e')
# Create IP address
def createIPAddress(ipaddr, cidrnetmask, dev):
run_os_command(
@ -123,6 +128,7 @@ def createIPAddress(ipaddr, cidrnetmask, dev):
)
)
# Remove IP address
def removeIPAddress(ipaddr, cidrnetmask, dev):
run_os_command(
@ -133,6 +139,7 @@ def removeIPAddress(ipaddr, cidrnetmask, dev):
)
)
#
# Find a migration target
#
@ -173,6 +180,7 @@ def findTargetNode(zk_conn, config, logger, dom_uuid):
# Nothing was found
return None
# Get the list of valid target nodes
def getNodes(zk_conn, node_limit, dom_uuid):
valid_node_list = []
@ -196,6 +204,7 @@ def getNodes(zk_conn, node_limit, dom_uuid):
return valid_node_list
# via free memory (relative to allocated memory)
def findTargetNodeMem(zk_conn, config, logger, node_limit, dom_uuid):
most_provfree = 0
@ -222,6 +231,7 @@ def findTargetNodeMem(zk_conn, config, logger, node_limit, dom_uuid):
logger.out('Selected node {}'.format(target_node), state='d', prefix='node-flush')
return target_node
# via load average
def findTargetNodeLoad(zk_conn, config, logger, node_limit, dom_uuid):
least_load = 9999.0
@ -244,6 +254,7 @@ def findTargetNodeLoad(zk_conn, config, logger, node_limit, dom_uuid):
logger.out('Selected node {}'.format(target_node), state='d', prefix='node-flush')
return target_node
# via total vCPUs
def findTargetNodeVCPUs(zk_conn, config, logger, node_limit, dom_uuid):
least_vcpus = 9999
@ -266,6 +277,7 @@ def findTargetNodeVCPUs(zk_conn, config, logger, node_limit, dom_uuid):
logger.out('Selected node {}'.format(target_node), state='d', prefix='node-flush')
return target_node
# via total VMs
def findTargetNodeVMs(zk_conn, config, logger, node_limit, dom_uuid):
least_vms = 9999

View File

@ -26,6 +26,7 @@ import kazoo.client
import re
import yaml
#
# Variables
#
@ -45,6 +46,7 @@ def get_zookeeper_key():
zookeeper_key = '/networks/{}/dhcp4_leases'.format(network_vni)
return zookeeper_key
def get_lease_expiry():
try:
expiry = os.environ['DNSMASQ_LEASE_EXPIRES']
@ -52,6 +54,7 @@ def get_lease_expiry():
expiry = '0'
return expiry
def get_client_id():
try:
client_id = os.environ['DNSMASQ_CLIENT_ID']
@ -59,6 +62,7 @@ def get_client_id():
client_id = '*'
return client_id
def connect_zookeeper():
# We expect the environ to contain the config file
try:
@ -83,9 +87,11 @@ def connect_zookeeper():
return zk_conn
def read_data(zk_conn, key):
return zk_conn.get(key)[0].decode('ascii')
def get_lease(zk_conn, zk_leases_key, macaddr):
expiry = read_data(zk_conn, '{}/{}/expiry'.format(zk_leases_key, macaddr))
ipaddr = read_data(zk_conn, '{}/{}/ipaddr'.format(zk_leases_key, macaddr))
@ -93,6 +99,7 @@ def get_lease(zk_conn, zk_leases_key, macaddr):
clientid = read_data(zk_conn, '{}/{}/clientid'.format(zk_leases_key, macaddr))
return expiry, ipaddr, hostname, clientid
#
# Command Functions
#
@ -108,6 +115,7 @@ def read_lease_database(zk_conn, zk_leases_key):
# Output list
print('\n'.join(output_list))
def add_lease(zk_conn, zk_leases_key, expiry, macaddr, ipaddr, hostname, clientid):
if not hostname:
hostname = ''
@ -119,6 +127,7 @@ def add_lease(zk_conn, zk_leases_key, expiry, macaddr, ipaddr, hostname, clienti
transaction.create('{}/{}/clientid'.format(zk_leases_key, macaddr), clientid.encode('ascii'))
transaction.commit()
def del_lease(zk_conn, zk_leases_key, macaddr, expiry):
zk_conn.delete('{}/{}'.format(zk_leases_key, macaddr), recursive=True)

View File

@ -26,6 +26,7 @@ import pvcnoded.zkhandler as zkhandler
import pvcnoded.common as common
import pvcnoded.VMInstance as VMInstance
#
# Fence thread entry function
#
@ -74,6 +75,7 @@ def fenceNode(node_name, zk_conn, config, logger):
if not fence_status and config['failed_fence'] == 'migrate' and config['suicide_intervals'] != '0':
migrateFromFencedNode(zk_conn, node_name, config, logger)
# Migrate hosts away from a fenced node
def migrateFromFencedNode(zk_conn, node_name, config, logger):
logger.out('Migrating VMs from dead node "{}" to new hosts'.format(node_name), state='i')
@ -111,6 +113,7 @@ def migrateFromFencedNode(zk_conn, node_name, config, logger):
# Set node in flushed state for easy remigrating when it comes back
zkhandler.writedata(zk_conn, {'/nodes/{}/domainstate'.format(node_name): 'flushed'})
#
# Perform an IPMI fence
#
@ -145,6 +148,7 @@ def rebootViaIPMI(ipmi_hostname, ipmi_user, ipmi_password, logger):
print(ipmi_reset_stderr)
return False
#
# Verify that IPMI connectivity to this host exists (used during node init)
#

View File

@ -22,6 +22,7 @@
import datetime
class Logger(object):
# Define a logger class for a daemon instance
# Keeps record of where to log, and is passed messages which are

View File

@ -22,6 +22,7 @@
import uuid
# Child list function
def listchildren(zk_conn, key):
try:
@ -30,6 +31,7 @@ def listchildren(zk_conn, key):
except Exception:
return None
# Key deletion function
def deletekey(zk_conn, key, recursive=True):
try:
@ -38,6 +40,7 @@ def deletekey(zk_conn, key, recursive=True):
except Exception:
return False
# Data read function
def readdata(zk_conn, key):
try:
@ -47,6 +50,7 @@ def readdata(zk_conn, key):
except Exception:
return None
# Data write function
def writedata(zk_conn, kv):
# Commit the transaction
@ -87,6 +91,7 @@ def writedata(zk_conn, kv):
except Exception:
return False
# Key rename function
def renamekey(zk_conn, kv):
# This one is not transactional because, inexplicably, transactions don't
@ -133,6 +138,7 @@ def renamekey(zk_conn, kv):
except Exception:
return False
# Write lock function
def writelock(zk_conn, key):
count = 1
@ -149,6 +155,7 @@ def writelock(zk_conn, key):
continue
return lock
# Read lock function
def readlock(zk_conn, key):
count = 1
@ -165,6 +172,7 @@ def readlock(zk_conn, key):
continue
return lock
# Exclusive lock function
def exclusivelock(zk_conn, key):
count = 1