Reformat code with Black code formatter
Unify the code style along PEP and Black principles using the tool.
This commit is contained in:
@ -38,63 +38,73 @@ class CephOSDInstance(object):
|
||||
self.size = None
|
||||
self.stats = dict()
|
||||
|
||||
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('osd.node', self.osd_id))
|
||||
def watch_osd_node(data, stat, event=''):
|
||||
if event and event.type == 'DELETED':
|
||||
@self.zkhandler.zk_conn.DataWatch(
|
||||
self.zkhandler.schema.path("osd.node", self.osd_id)
|
||||
)
|
||||
def watch_osd_node(data, stat, event=""):
|
||||
if event and event.type == "DELETED":
|
||||
# The key has been deleted after existing before; terminate this watcher
|
||||
# because this class instance is about to be reaped in Daemon.py
|
||||
return False
|
||||
|
||||
try:
|
||||
data = data.decode('ascii')
|
||||
data = data.decode("ascii")
|
||||
except AttributeError:
|
||||
data = ''
|
||||
data = ""
|
||||
|
||||
if data and data != self.node:
|
||||
self.node = data
|
||||
|
||||
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('osd.stats', self.osd_id))
|
||||
def watch_osd_stats(data, stat, event=''):
|
||||
if event and event.type == 'DELETED':
|
||||
@self.zkhandler.zk_conn.DataWatch(
|
||||
self.zkhandler.schema.path("osd.stats", self.osd_id)
|
||||
)
|
||||
def watch_osd_stats(data, stat, event=""):
|
||||
if event and event.type == "DELETED":
|
||||
# The key has been deleted after existing before; terminate this watcher
|
||||
# because this class instance is about to be reaped in Daemon.py
|
||||
return False
|
||||
|
||||
try:
|
||||
data = data.decode('ascii')
|
||||
data = data.decode("ascii")
|
||||
except AttributeError:
|
||||
data = ''
|
||||
data = ""
|
||||
|
||||
if data and data != self.stats:
|
||||
self.stats = json.loads(data)
|
||||
|
||||
@staticmethod
|
||||
def add_osd(zkhandler, logger, node, device, weight, ext_db_flag=False, ext_db_ratio=0.05):
|
||||
def add_osd(
|
||||
zkhandler, logger, node, device, weight, ext_db_flag=False, ext_db_ratio=0.05
|
||||
):
|
||||
# We are ready to create a new OSD on this node
|
||||
logger.out('Creating new OSD disk on block device {}'.format(device), state='i')
|
||||
logger.out("Creating new OSD disk on block device {}".format(device), state="i")
|
||||
try:
|
||||
# 1. Create an OSD; we do this so we know what ID will be gen'd
|
||||
retcode, stdout, stderr = common.run_os_command('ceph osd create')
|
||||
retcode, stdout, stderr = common.run_os_command("ceph osd create")
|
||||
if retcode:
|
||||
print('ceph osd create')
|
||||
print("ceph osd create")
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise Exception
|
||||
osd_id = stdout.rstrip()
|
||||
|
||||
# 2. Remove that newly-created OSD
|
||||
retcode, stdout, stderr = common.run_os_command('ceph osd rm {}'.format(osd_id))
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
"ceph osd rm {}".format(osd_id)
|
||||
)
|
||||
if retcode:
|
||||
print('ceph osd rm')
|
||||
print("ceph osd rm")
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise Exception
|
||||
|
||||
# 3a. Zap the disk to ensure it is ready to go
|
||||
logger.out('Zapping disk {}'.format(device), state='i')
|
||||
retcode, stdout, stderr = common.run_os_command('ceph-volume lvm zap --destroy {}'.format(device))
|
||||
logger.out("Zapping disk {}".format(device), state="i")
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
"ceph-volume lvm zap --destroy {}".format(device)
|
||||
)
|
||||
if retcode:
|
||||
print('ceph-volume lvm zap')
|
||||
print("ceph-volume lvm zap")
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise Exception
|
||||
@ -103,9 +113,13 @@ class CephOSDInstance(object):
|
||||
|
||||
# 3b. Prepare the logical volume if ext_db_flag
|
||||
if ext_db_flag:
|
||||
_, osd_size_bytes, _ = common.run_os_command('blockdev --getsize64 {}'.format(device))
|
||||
_, osd_size_bytes, _ = common.run_os_command(
|
||||
"blockdev --getsize64 {}".format(device)
|
||||
)
|
||||
osd_size_bytes = int(osd_size_bytes)
|
||||
result = CephOSDInstance.create_osd_db_lv(zkhandler, logger, osd_id, ext_db_ratio, osd_size_bytes)
|
||||
result = CephOSDInstance.create_osd_db_lv(
|
||||
zkhandler, logger, osd_id, ext_db_ratio, osd_size_bytes
|
||||
)
|
||||
if not result:
|
||||
raise Exception
|
||||
db_device = "osd-db/osd-{}".format(osd_id)
|
||||
@ -114,63 +128,67 @@ class CephOSDInstance(object):
|
||||
db_device = ""
|
||||
|
||||
# 3c. Create the OSD for real
|
||||
logger.out('Preparing LVM for new OSD disk with ID {} on {}'.format(osd_id, device), state='i')
|
||||
logger.out(
|
||||
"Preparing LVM for new OSD disk with ID {} on {}".format(
|
||||
osd_id, device
|
||||
),
|
||||
state="i",
|
||||
)
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
'ceph-volume lvm prepare --bluestore {devices}'.format(
|
||||
osdid=osd_id,
|
||||
devices=dev_flags
|
||||
"ceph-volume lvm prepare --bluestore {devices}".format(
|
||||
osdid=osd_id, devices=dev_flags
|
||||
)
|
||||
)
|
||||
if retcode:
|
||||
print('ceph-volume lvm prepare')
|
||||
print("ceph-volume lvm prepare")
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise Exception
|
||||
|
||||
# 4a. Get OSD FSID
|
||||
logger.out('Getting OSD FSID for ID {} on {}'.format(osd_id, device), state='i')
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
'ceph-volume lvm list {device}'.format(
|
||||
osdid=osd_id,
|
||||
device=device
|
||||
)
|
||||
logger.out(
|
||||
"Getting OSD FSID for ID {} on {}".format(osd_id, device), state="i"
|
||||
)
|
||||
for line in stdout.split('\n'):
|
||||
if 'osd fsid' in line:
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
"ceph-volume lvm list {device}".format(osdid=osd_id, device=device)
|
||||
)
|
||||
for line in stdout.split("\n"):
|
||||
if "osd fsid" in line:
|
||||
osd_fsid = line.split()[-1]
|
||||
|
||||
if not osd_fsid:
|
||||
print('ceph-volume lvm list')
|
||||
print('Could not find OSD fsid in data:')
|
||||
print("ceph-volume lvm list")
|
||||
print("Could not find OSD fsid in data:")
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise Exception
|
||||
|
||||
# 4b. Activate the OSD
|
||||
logger.out('Activating new OSD disk with ID {}'.format(osd_id, device), state='i')
|
||||
logger.out(
|
||||
"Activating new OSD disk with ID {}".format(osd_id, device), state="i"
|
||||
)
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
'ceph-volume lvm activate --bluestore {osdid} {osdfsid}'.format(
|
||||
osdid=osd_id,
|
||||
osdfsid=osd_fsid
|
||||
"ceph-volume lvm activate --bluestore {osdid} {osdfsid}".format(
|
||||
osdid=osd_id, osdfsid=osd_fsid
|
||||
)
|
||||
)
|
||||
if retcode:
|
||||
print('ceph-volume lvm activate')
|
||||
print("ceph-volume lvm activate")
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise Exception
|
||||
|
||||
# 5. Add it to the crush map
|
||||
logger.out('Adding new OSD disk with ID {} to CRUSH map'.format(osd_id), state='i')
|
||||
logger.out(
|
||||
"Adding new OSD disk with ID {} to CRUSH map".format(osd_id), state="i"
|
||||
)
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
'ceph osd crush add osd.{osdid} {weight} root=default host={node}'.format(
|
||||
osdid=osd_id,
|
||||
weight=weight,
|
||||
node=node
|
||||
"ceph osd crush add osd.{osdid} {weight} root=default host={node}".format(
|
||||
osdid=osd_id, weight=weight, node=node
|
||||
)
|
||||
)
|
||||
if retcode:
|
||||
print('ceph osd crush add')
|
||||
print("ceph osd crush add")
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise Exception
|
||||
@ -178,65 +196,73 @@ class CephOSDInstance(object):
|
||||
|
||||
# 6. Verify it started
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
'systemctl status ceph-osd@{osdid}'.format(
|
||||
osdid=osd_id
|
||||
)
|
||||
"systemctl status ceph-osd@{osdid}".format(osdid=osd_id)
|
||||
)
|
||||
if retcode:
|
||||
print('systemctl status')
|
||||
print("systemctl status")
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise Exception
|
||||
|
||||
# 7. Add the new OSD to the list
|
||||
logger.out('Adding new OSD disk with ID {} to Zookeeper'.format(osd_id), state='i')
|
||||
zkhandler.write([
|
||||
(('osd', osd_id), ''),
|
||||
(('osd.node', osd_id), node),
|
||||
(('osd.device', osd_id), device),
|
||||
(('osd.db_device', osd_id), db_device),
|
||||
(('osd.stats', osd_id), '{}'),
|
||||
])
|
||||
logger.out(
|
||||
"Adding new OSD disk with ID {} to Zookeeper".format(osd_id), state="i"
|
||||
)
|
||||
zkhandler.write(
|
||||
[
|
||||
(("osd", osd_id), ""),
|
||||
(("osd.node", osd_id), node),
|
||||
(("osd.device", osd_id), device),
|
||||
(("osd.db_device", osd_id), db_device),
|
||||
(("osd.stats", osd_id), "{}"),
|
||||
]
|
||||
)
|
||||
|
||||
# Log it
|
||||
logger.out('Created new OSD disk with ID {}'.format(osd_id), state='o')
|
||||
logger.out("Created new OSD disk with ID {}".format(osd_id), state="o")
|
||||
return True
|
||||
except Exception as e:
|
||||
# Log it
|
||||
logger.out('Failed to create new OSD disk: {}'.format(e), state='e')
|
||||
logger.out("Failed to create new OSD disk: {}".format(e), state="e")
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def remove_osd(zkhandler, logger, osd_id, osd_obj):
|
||||
logger.out('Removing OSD disk {}'.format(osd_id), state='i')
|
||||
logger.out("Removing OSD disk {}".format(osd_id), state="i")
|
||||
try:
|
||||
# 1. Verify the OSD is present
|
||||
retcode, stdout, stderr = common.run_os_command('ceph osd ls')
|
||||
osd_list = stdout.split('\n')
|
||||
retcode, stdout, stderr = common.run_os_command("ceph osd ls")
|
||||
osd_list = stdout.split("\n")
|
||||
if osd_id not in osd_list:
|
||||
logger.out('Could not find OSD {} in the cluster'.format(osd_id), state='e')
|
||||
logger.out(
|
||||
"Could not find OSD {} in the cluster".format(osd_id), state="e"
|
||||
)
|
||||
return True
|
||||
|
||||
# 1. Set the OSD out so it will flush
|
||||
logger.out('Setting out OSD disk with ID {}'.format(osd_id), state='i')
|
||||
retcode, stdout, stderr = common.run_os_command('ceph osd out {}'.format(osd_id))
|
||||
logger.out("Setting out OSD disk with ID {}".format(osd_id), state="i")
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
"ceph osd out {}".format(osd_id)
|
||||
)
|
||||
if retcode:
|
||||
print('ceph osd out')
|
||||
print("ceph osd out")
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise Exception
|
||||
|
||||
# 2. Wait for the OSD to flush
|
||||
logger.out('Flushing OSD disk with ID {}'.format(osd_id), state='i')
|
||||
logger.out("Flushing OSD disk with ID {}".format(osd_id), state="i")
|
||||
osd_string = str()
|
||||
while True:
|
||||
try:
|
||||
retcode, stdout, stderr = common.run_os_command('ceph pg dump osds --format json')
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
"ceph pg dump osds --format json"
|
||||
)
|
||||
dump_string = json.loads(stdout)
|
||||
for osd in dump_string:
|
||||
if str(osd['osd']) == osd_id:
|
||||
if str(osd["osd"]) == osd_id:
|
||||
osd_string = osd
|
||||
num_pgs = osd_string['num_pgs']
|
||||
num_pgs = osd_string["num_pgs"]
|
||||
if num_pgs > 0:
|
||||
time.sleep(5)
|
||||
else:
|
||||
@ -245,10 +271,12 @@ class CephOSDInstance(object):
|
||||
break
|
||||
|
||||
# 3. Stop the OSD process and wait for it to be terminated
|
||||
logger.out('Stopping OSD disk with ID {}'.format(osd_id), state='i')
|
||||
retcode, stdout, stderr = common.run_os_command('systemctl stop ceph-osd@{}'.format(osd_id))
|
||||
logger.out("Stopping OSD disk with ID {}".format(osd_id), state="i")
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
"systemctl stop ceph-osd@{}".format(osd_id)
|
||||
)
|
||||
if retcode:
|
||||
print('systemctl stop')
|
||||
print("systemctl stop")
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise Exception
|
||||
@ -257,161 +285,213 @@ class CephOSDInstance(object):
|
||||
while True:
|
||||
is_osd_up = False
|
||||
# Find if there is a process named ceph-osd with arg '--id {id}'
|
||||
for p in psutil.process_iter(attrs=['name', 'cmdline']):
|
||||
if 'ceph-osd' == p.info['name'] and '--id {}'.format(osd_id) in ' '.join(p.info['cmdline']):
|
||||
for p in psutil.process_iter(attrs=["name", "cmdline"]):
|
||||
if "ceph-osd" == p.info["name"] and "--id {}".format(
|
||||
osd_id
|
||||
) in " ".join(p.info["cmdline"]):
|
||||
is_osd_up = True
|
||||
# If there isn't, continue
|
||||
if not is_osd_up:
|
||||
break
|
||||
|
||||
# 4. Determine the block devices
|
||||
retcode, stdout, stderr = common.run_os_command('readlink /var/lib/ceph/osd/ceph-{}/block'.format(osd_id))
|
||||
vg_name = stdout.split('/')[-2] # e.g. /dev/ceph-<uuid>/osd-block-<uuid>
|
||||
retcode, stdout, stderr = common.run_os_command('vgs --separator , --noheadings -o pv_name {}'.format(vg_name))
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
"readlink /var/lib/ceph/osd/ceph-{}/block".format(osd_id)
|
||||
)
|
||||
vg_name = stdout.split("/")[-2] # e.g. /dev/ceph-<uuid>/osd-block-<uuid>
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
"vgs --separator , --noheadings -o pv_name {}".format(vg_name)
|
||||
)
|
||||
pv_block = stdout.strip()
|
||||
|
||||
# 5. Zap the volumes
|
||||
logger.out('Zapping OSD disk with ID {} on {}'.format(osd_id, pv_block), state='i')
|
||||
retcode, stdout, stderr = common.run_os_command('ceph-volume lvm zap --destroy {}'.format(pv_block))
|
||||
logger.out(
|
||||
"Zapping OSD disk with ID {} on {}".format(osd_id, pv_block), state="i"
|
||||
)
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
"ceph-volume lvm zap --destroy {}".format(pv_block)
|
||||
)
|
||||
if retcode:
|
||||
print('ceph-volume lvm zap')
|
||||
print("ceph-volume lvm zap")
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise Exception
|
||||
|
||||
# 6. Purge the OSD from Ceph
|
||||
logger.out('Purging OSD disk with ID {}'.format(osd_id), state='i')
|
||||
retcode, stdout, stderr = common.run_os_command('ceph osd purge {} --yes-i-really-mean-it'.format(osd_id))
|
||||
logger.out("Purging OSD disk with ID {}".format(osd_id), state="i")
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
"ceph osd purge {} --yes-i-really-mean-it".format(osd_id)
|
||||
)
|
||||
if retcode:
|
||||
print('ceph osd purge')
|
||||
print("ceph osd purge")
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise Exception
|
||||
|
||||
# 7. Remove the DB device
|
||||
if zkhandler.exists(('osd.db_device', osd_id)):
|
||||
db_device = zkhandler.read(('osd.db_device', osd_id))
|
||||
logger.out('Removing OSD DB logical volume "{}"'.format(db_device), state='i')
|
||||
retcode, stdout, stderr = common.run_os_command('lvremove --yes --force {}'.format(db_device))
|
||||
if zkhandler.exists(("osd.db_device", osd_id)):
|
||||
db_device = zkhandler.read(("osd.db_device", osd_id))
|
||||
logger.out(
|
||||
'Removing OSD DB logical volume "{}"'.format(db_device), state="i"
|
||||
)
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
"lvremove --yes --force {}".format(db_device)
|
||||
)
|
||||
|
||||
# 8. Delete OSD from ZK
|
||||
logger.out('Deleting OSD disk with ID {} from Zookeeper'.format(osd_id), state='i')
|
||||
zkhandler.delete(('osd', osd_id), recursive=True)
|
||||
logger.out(
|
||||
"Deleting OSD disk with ID {} from Zookeeper".format(osd_id), state="i"
|
||||
)
|
||||
zkhandler.delete(("osd", osd_id), recursive=True)
|
||||
|
||||
# Log it
|
||||
logger.out('Removed OSD disk with ID {}'.format(osd_id), state='o')
|
||||
logger.out("Removed OSD disk with ID {}".format(osd_id), state="o")
|
||||
return True
|
||||
except Exception as e:
|
||||
# Log it
|
||||
logger.out('Failed to purge OSD disk with ID {}: {}'.format(osd_id, e), state='e')
|
||||
logger.out(
|
||||
"Failed to purge OSD disk with ID {}: {}".format(osd_id, e), state="e"
|
||||
)
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def add_db_vg(zkhandler, logger, device):
|
||||
logger.out('Creating new OSD database volume group on block device {}'.format(device), state='i')
|
||||
logger.out(
|
||||
"Creating new OSD database volume group on block device {}".format(device),
|
||||
state="i",
|
||||
)
|
||||
try:
|
||||
# 0. Check if an existsing volume group exists
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
'vgdisplay osd-db'
|
||||
)
|
||||
retcode, stdout, stderr = common.run_os_command("vgdisplay osd-db")
|
||||
if retcode != 5:
|
||||
logger.out('Ceph OSD database VG "osd-db" already exists', state='e')
|
||||
logger.out('Ceph OSD database VG "osd-db" already exists', state="e")
|
||||
return False
|
||||
|
||||
# 1. Create an empty partition table
|
||||
logger.out('Creating partitions on block device {}'.format(device), state='i')
|
||||
logger.out(
|
||||
"Creating partitions on block device {}".format(device), state="i"
|
||||
)
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
'sgdisk --clear {}'.format(device)
|
||||
"sgdisk --clear {}".format(device)
|
||||
)
|
||||
if retcode:
|
||||
print('sgdisk create partition table')
|
||||
print("sgdisk create partition table")
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise Exception
|
||||
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
'sgdisk --new 1:: --typecode 1:8e00 {}'.format(device)
|
||||
"sgdisk --new 1:: --typecode 1:8e00 {}".format(device)
|
||||
)
|
||||
if retcode:
|
||||
print('sgdisk create pv partition')
|
||||
print("sgdisk create pv partition")
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise Exception
|
||||
|
||||
# Handle the partition ID portion
|
||||
if search(r'by-path', device) or search(r'by-id', device):
|
||||
if search(r"by-path", device) or search(r"by-id", device):
|
||||
# /dev/disk/by-path/pci-0000:03:00.0-scsi-0:1:0:0 -> pci-0000:03:00.0-scsi-0:1:0:0-part1
|
||||
partition = '{}-part1'.format(device)
|
||||
elif search(r'nvme', device):
|
||||
partition = "{}-part1".format(device)
|
||||
elif search(r"nvme", device):
|
||||
# /dev/nvme0n1 -> nvme0n1p1
|
||||
partition = '{}p1'.format(device)
|
||||
partition = "{}p1".format(device)
|
||||
else:
|
||||
# /dev/sda -> sda1
|
||||
# No other '/dev/disk/by-*' types are valid for raw block devices anyways
|
||||
partition = '{}1'.format(device)
|
||||
partition = "{}1".format(device)
|
||||
|
||||
# 2. Create the PV
|
||||
logger.out('Creating PV on block device {}'.format(partition), state='i')
|
||||
logger.out("Creating PV on block device {}".format(partition), state="i")
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
'pvcreate --force {}'.format(partition)
|
||||
"pvcreate --force {}".format(partition)
|
||||
)
|
||||
if retcode:
|
||||
print('pv creation')
|
||||
print("pv creation")
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise Exception
|
||||
|
||||
# 2. Create the VG (named 'osd-db')
|
||||
logger.out('Creating VG "osd-db" on block device {}'.format(partition), state='i')
|
||||
logger.out(
|
||||
'Creating VG "osd-db" on block device {}'.format(partition), state="i"
|
||||
)
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
'vgcreate --force osd-db {}'.format(partition)
|
||||
"vgcreate --force osd-db {}".format(partition)
|
||||
)
|
||||
if retcode:
|
||||
print('vg creation')
|
||||
print("vg creation")
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise Exception
|
||||
|
||||
# Log it
|
||||
logger.out('Created new OSD database volume group on block device {}'.format(device), state='o')
|
||||
logger.out(
|
||||
"Created new OSD database volume group on block device {}".format(
|
||||
device
|
||||
),
|
||||
state="o",
|
||||
)
|
||||
return True
|
||||
except Exception as e:
|
||||
# Log it
|
||||
logger.out('Failed to create OSD database volume group: {}'.format(e), state='e')
|
||||
logger.out(
|
||||
"Failed to create OSD database volume group: {}".format(e), state="e"
|
||||
)
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def create_osd_db_lv(zkhandler, logger, osd_id, ext_db_ratio, osd_size_bytes):
|
||||
logger.out('Creating new OSD database logical volume for OSD ID {}'.format(osd_id), state='i')
|
||||
logger.out(
|
||||
"Creating new OSD database logical volume for OSD ID {}".format(osd_id),
|
||||
state="i",
|
||||
)
|
||||
try:
|
||||
# 0. Check if an existsing logical volume exists
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
'lvdisplay osd-db/osd{}'.format(osd_id)
|
||||
"lvdisplay osd-db/osd{}".format(osd_id)
|
||||
)
|
||||
if retcode != 5:
|
||||
logger.out('Ceph OSD database LV "osd-db/osd{}" already exists'.format(osd_id), state='e')
|
||||
logger.out(
|
||||
'Ceph OSD database LV "osd-db/osd{}" already exists'.format(osd_id),
|
||||
state="e",
|
||||
)
|
||||
return False
|
||||
|
||||
# 1. Determine LV sizing
|
||||
osd_db_size = int(osd_size_bytes * ext_db_ratio / 1024 / 1024)
|
||||
|
||||
# 2. Create the LV
|
||||
logger.out('Creating DB LV "osd-db/osd-{}" of {}M ({} * {})'.format(osd_id, osd_db_size, osd_size_bytes, ext_db_ratio), state='i')
|
||||
logger.out(
|
||||
'Creating DB LV "osd-db/osd-{}" of {}M ({} * {})'.format(
|
||||
osd_id, osd_db_size, osd_size_bytes, ext_db_ratio
|
||||
),
|
||||
state="i",
|
||||
)
|
||||
retcode, stdout, stderr = common.run_os_command(
|
||||
'lvcreate --yes --name osd-{} --size {} osd-db'.format(osd_id, osd_db_size)
|
||||
"lvcreate --yes --name osd-{} --size {} osd-db".format(
|
||||
osd_id, osd_db_size
|
||||
)
|
||||
)
|
||||
if retcode:
|
||||
print('db lv creation')
|
||||
print("db lv creation")
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise Exception
|
||||
|
||||
# Log it
|
||||
logger.out('Created new OSD database logical volume "osd-db/osd-{}"'.format(osd_id), state='o')
|
||||
logger.out(
|
||||
'Created new OSD database logical volume "osd-db/osd-{}"'.format(
|
||||
osd_id
|
||||
),
|
||||
state="o",
|
||||
)
|
||||
return True
|
||||
except Exception as e:
|
||||
# Log it
|
||||
logger.out('Failed to create OSD database logical volume: {}'.format(e), state='e')
|
||||
logger.out(
|
||||
"Failed to create OSD database logical volume: {}".format(e), state="e"
|
||||
)
|
||||
return False
|
||||
|
||||
|
||||
@ -420,35 +500,39 @@ class CephPoolInstance(object):
|
||||
self.zkhandler = zkhandler
|
||||
self.this_node = this_node
|
||||
self.name = name
|
||||
self.pgs = ''
|
||||
self.pgs = ""
|
||||
self.stats = dict()
|
||||
|
||||
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('pool.pgs', self.name))
|
||||
def watch_pool_node(data, stat, event=''):
|
||||
if event and event.type == 'DELETED':
|
||||
@self.zkhandler.zk_conn.DataWatch(
|
||||
self.zkhandler.schema.path("pool.pgs", self.name)
|
||||
)
|
||||
def watch_pool_node(data, stat, event=""):
|
||||
if event and event.type == "DELETED":
|
||||
# The key has been deleted after existing before; terminate this watcher
|
||||
# because this class instance is about to be reaped in Daemon.py
|
||||
return False
|
||||
|
||||
try:
|
||||
data = data.decode('ascii')
|
||||
data = data.decode("ascii")
|
||||
except AttributeError:
|
||||
data = ''
|
||||
data = ""
|
||||
|
||||
if data and data != self.pgs:
|
||||
self.pgs = data
|
||||
|
||||
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('pool.stats', self.name))
|
||||
def watch_pool_stats(data, stat, event=''):
|
||||
if event and event.type == 'DELETED':
|
||||
@self.zkhandler.zk_conn.DataWatch(
|
||||
self.zkhandler.schema.path("pool.stats", self.name)
|
||||
)
|
||||
def watch_pool_stats(data, stat, event=""):
|
||||
if event and event.type == "DELETED":
|
||||
# The key has been deleted after existing before; terminate this watcher
|
||||
# because this class instance is about to be reaped in Daemon.py
|
||||
return False
|
||||
|
||||
try:
|
||||
data = data.decode('ascii')
|
||||
data = data.decode("ascii")
|
||||
except AttributeError:
|
||||
data = ''
|
||||
data = ""
|
||||
|
||||
if data and data != self.stats:
|
||||
self.stats = json.loads(data)
|
||||
@ -462,17 +546,19 @@ class CephVolumeInstance(object):
|
||||
self.name = name
|
||||
self.stats = dict()
|
||||
|
||||
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('volume.stats', f'{self.pool}/{self.name}'))
|
||||
def watch_volume_stats(data, stat, event=''):
|
||||
if event and event.type == 'DELETED':
|
||||
@self.zkhandler.zk_conn.DataWatch(
|
||||
self.zkhandler.schema.path("volume.stats", f"{self.pool}/{self.name}")
|
||||
)
|
||||
def watch_volume_stats(data, stat, event=""):
|
||||
if event and event.type == "DELETED":
|
||||
# The key has been deleted after existing before; terminate this watcher
|
||||
# because this class instance is about to be reaped in Daemon.py
|
||||
return False
|
||||
|
||||
try:
|
||||
data = data.decode('ascii')
|
||||
data = data.decode("ascii")
|
||||
except AttributeError:
|
||||
data = ''
|
||||
data = ""
|
||||
|
||||
if data and data != self.stats:
|
||||
self.stats = json.loads(data)
|
||||
@ -487,17 +573,21 @@ class CephSnapshotInstance(object):
|
||||
self.name = name
|
||||
self.stats = dict()
|
||||
|
||||
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('snapshot.stats', f'{self.pool}/{self.volume}/{self.name}'))
|
||||
def watch_snapshot_stats(data, stat, event=''):
|
||||
if event and event.type == 'DELETED':
|
||||
@self.zkhandler.zk_conn.DataWatch(
|
||||
self.zkhandler.schema.path(
|
||||
"snapshot.stats", f"{self.pool}/{self.volume}/{self.name}"
|
||||
)
|
||||
)
|
||||
def watch_snapshot_stats(data, stat, event=""):
|
||||
if event and event.type == "DELETED":
|
||||
# The key has been deleted after existing before; terminate this watcher
|
||||
# because this class instance is about to be reaped in Daemon.py
|
||||
return False
|
||||
|
||||
try:
|
||||
data = data.decode('ascii')
|
||||
data = data.decode("ascii")
|
||||
except AttributeError:
|
||||
data = ''
|
||||
data = ""
|
||||
|
||||
if data and data != self.stats:
|
||||
self.stats = json.loads(data)
|
||||
@ -510,77 +600,69 @@ def ceph_command(zkhandler, logger, this_node, data, d_osd):
|
||||
command, args = data.split()
|
||||
|
||||
# Adding a new OSD
|
||||
if command == 'osd_add':
|
||||
node, device, weight, ext_db_flag, ext_db_ratio = args.split(',')
|
||||
if command == "osd_add":
|
||||
node, device, weight, ext_db_flag, ext_db_ratio = args.split(",")
|
||||
ext_db_flag = bool(strtobool(ext_db_flag))
|
||||
ext_db_ratio = float(ext_db_ratio)
|
||||
if node == this_node.name:
|
||||
# Lock the command queue
|
||||
zk_lock = zkhandler.writelock('base.cmd.ceph')
|
||||
zk_lock = zkhandler.writelock("base.cmd.ceph")
|
||||
with zk_lock:
|
||||
# Add the OSD
|
||||
result = CephOSDInstance.add_osd(zkhandler, logger, node, device, weight, ext_db_flag, ext_db_ratio)
|
||||
result = CephOSDInstance.add_osd(
|
||||
zkhandler, logger, node, device, weight, ext_db_flag, ext_db_ratio
|
||||
)
|
||||
# Command succeeded
|
||||
if result:
|
||||
# Update the command queue
|
||||
zkhandler.write([
|
||||
('base.cmd.ceph', 'success-{}'.format(data))
|
||||
])
|
||||
zkhandler.write([("base.cmd.ceph", "success-{}".format(data))])
|
||||
# Command failed
|
||||
else:
|
||||
# Update the command queue
|
||||
zkhandler.write([
|
||||
('base.cmd.ceph', 'failure-{}'.format(data))
|
||||
])
|
||||
zkhandler.write([("base.cmd.ceph", "failure-{}".format(data))])
|
||||
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
|
||||
time.sleep(1)
|
||||
|
||||
# Removing an OSD
|
||||
elif command == 'osd_remove':
|
||||
elif command == "osd_remove":
|
||||
osd_id = args
|
||||
|
||||
# Verify osd_id is in the list
|
||||
if d_osd[osd_id] and d_osd[osd_id].node == this_node.name:
|
||||
# Lock the command queue
|
||||
zk_lock = zkhandler.writelock('base.cmd.ceph')
|
||||
zk_lock = zkhandler.writelock("base.cmd.ceph")
|
||||
with zk_lock:
|
||||
# Remove the OSD
|
||||
result = CephOSDInstance.remove_osd(zkhandler, logger, osd_id, d_osd[osd_id])
|
||||
result = CephOSDInstance.remove_osd(
|
||||
zkhandler, logger, osd_id, d_osd[osd_id]
|
||||
)
|
||||
# Command succeeded
|
||||
if result:
|
||||
# Update the command queue
|
||||
zkhandler.write([
|
||||
('base.cmd.ceph', 'success-{}'.format(data))
|
||||
])
|
||||
zkhandler.write([("base.cmd.ceph", "success-{}".format(data))])
|
||||
# Command failed
|
||||
else:
|
||||
# Update the command queue
|
||||
zkhandler.write([
|
||||
('base.cmd.ceph', 'failure-{}'.format(data))
|
||||
])
|
||||
zkhandler.write([("base.cmd.ceph", "failure-{}".format(data))])
|
||||
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
|
||||
time.sleep(1)
|
||||
|
||||
# Adding a new DB VG
|
||||
elif command == 'db_vg_add':
|
||||
node, device = args.split(',')
|
||||
elif command == "db_vg_add":
|
||||
node, device = args.split(",")
|
||||
if node == this_node.name:
|
||||
# Lock the command queue
|
||||
zk_lock = zkhandler.writelock('base.cmd.ceph')
|
||||
zk_lock = zkhandler.writelock("base.cmd.ceph")
|
||||
with zk_lock:
|
||||
# Add the VG
|
||||
result = CephOSDInstance.add_db_vg(zkhandler, logger, device)
|
||||
# Command succeeded
|
||||
if result:
|
||||
# Update the command queue
|
||||
zkhandler.write([
|
||||
('base.cmd.ceph', 'success-{}'.format(data))
|
||||
])
|
||||
zkhandler.write([("base.cmd.ceph", "success-{}".format(data))])
|
||||
# Command failed
|
||||
else:
|
||||
# Update the command queue
|
||||
zkhandler.write([
|
||||
('base.cmd.ceph', 'failure={}'.format(data))
|
||||
])
|
||||
zkhandler.write([("base.cmd.ceph", "failure={}".format(data))])
|
||||
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
|
||||
time.sleep(1)
|
||||
|
@ -74,69 +74,71 @@ class PowerDNSInstance(object):
|
||||
self.dns_server_daemon = None
|
||||
|
||||
# Floating upstreams
|
||||
self.cluster_floatingipaddr, self.cluster_cidrnetmask = self.config['cluster_floating_ip'].split('/')
|
||||
self.upstream_floatingipaddr, self.upstream_cidrnetmask = self.config['upstream_floating_ip'].split('/')
|
||||
self.cluster_floatingipaddr, self.cluster_cidrnetmask = self.config[
|
||||
"cluster_floating_ip"
|
||||
].split("/")
|
||||
self.upstream_floatingipaddr, self.upstream_cidrnetmask = self.config[
|
||||
"upstream_floating_ip"
|
||||
].split("/")
|
||||
|
||||
def start(self):
|
||||
self.logger.out(
|
||||
'Starting PowerDNS zone aggregator',
|
||||
state='i'
|
||||
)
|
||||
self.logger.out("Starting PowerDNS zone aggregator", state="i")
|
||||
# Define the PowerDNS config
|
||||
dns_configuration = [
|
||||
# Option # Explanation
|
||||
'--no-config',
|
||||
'--daemon=no', # Start directly
|
||||
'--guardian=yes', # Use a guardian
|
||||
'--disable-syslog=yes', # Log only to stdout (which is then captured)
|
||||
'--disable-axfr=no', # Allow AXFRs
|
||||
'--allow-axfr-ips=0.0.0.0/0', # Allow AXFRs to anywhere
|
||||
'--local-address={},{}'.format(self.cluster_floatingipaddr, self.upstream_floatingipaddr), # Listen on floating IPs
|
||||
'--local-port=53', # On port 53
|
||||
'--log-dns-details=on', # Log details
|
||||
'--loglevel=3', # Log info
|
||||
'--master=yes', # Enable master mode
|
||||
'--slave=yes', # Enable slave mode
|
||||
'--slave-renotify=yes', # Renotify out for our slaved zones
|
||||
'--version-string=powerdns', # Set the version string
|
||||
'--default-soa-name=dns.pvc.local', # Override dnsmasq's invalid name
|
||||
'--socket-dir={}'.format(self.config['pdns_dynamic_directory']), # Standard socket directory
|
||||
'--launch=gpgsql', # Use the PostgreSQL backend
|
||||
'--gpgsql-host={}'.format(self.config['pdns_postgresql_host']), # PostgreSQL instance
|
||||
'--gpgsql-port={}'.format(self.config['pdns_postgresql_port']), # Default port
|
||||
'--gpgsql-dbname={}'.format(self.config['pdns_postgresql_dbname']), # Database name
|
||||
'--gpgsql-user={}'.format(self.config['pdns_postgresql_user']), # User name
|
||||
'--gpgsql-password={}'.format(self.config['pdns_postgresql_password']), # User password
|
||||
'--gpgsql-dnssec=no', # Do DNSSEC elsewhere
|
||||
"--no-config",
|
||||
"--daemon=no", # Start directly
|
||||
"--guardian=yes", # Use a guardian
|
||||
"--disable-syslog=yes", # Log only to stdout (which is then captured)
|
||||
"--disable-axfr=no", # Allow AXFRs
|
||||
"--allow-axfr-ips=0.0.0.0/0", # Allow AXFRs to anywhere
|
||||
"--local-address={},{}".format(
|
||||
self.cluster_floatingipaddr, self.upstream_floatingipaddr
|
||||
), # Listen on floating IPs
|
||||
"--local-port=53", # On port 53
|
||||
"--log-dns-details=on", # Log details
|
||||
"--loglevel=3", # Log info
|
||||
"--master=yes", # Enable master mode
|
||||
"--slave=yes", # Enable slave mode
|
||||
"--slave-renotify=yes", # Renotify out for our slaved zones
|
||||
"--version-string=powerdns", # Set the version string
|
||||
"--default-soa-name=dns.pvc.local", # Override dnsmasq's invalid name
|
||||
"--socket-dir={}".format(
|
||||
self.config["pdns_dynamic_directory"]
|
||||
), # Standard socket directory
|
||||
"--launch=gpgsql", # Use the PostgreSQL backend
|
||||
"--gpgsql-host={}".format(
|
||||
self.config["pdns_postgresql_host"]
|
||||
), # PostgreSQL instance
|
||||
"--gpgsql-port={}".format(
|
||||
self.config["pdns_postgresql_port"]
|
||||
), # Default port
|
||||
"--gpgsql-dbname={}".format(
|
||||
self.config["pdns_postgresql_dbname"]
|
||||
), # Database name
|
||||
"--gpgsql-user={}".format(self.config["pdns_postgresql_user"]), # User name
|
||||
"--gpgsql-password={}".format(
|
||||
self.config["pdns_postgresql_password"]
|
||||
), # User password
|
||||
"--gpgsql-dnssec=no", # Do DNSSEC elsewhere
|
||||
]
|
||||
# Start the pdns process in a thread
|
||||
self.dns_server_daemon = common.run_os_daemon(
|
||||
'/usr/sbin/pdns_server {}'.format(
|
||||
' '.join(dns_configuration)
|
||||
),
|
||||
"/usr/sbin/pdns_server {}".format(" ".join(dns_configuration)),
|
||||
environment=None,
|
||||
logfile='{}/pdns-aggregator.log'.format(self.config['pdns_log_directory'])
|
||||
logfile="{}/pdns-aggregator.log".format(self.config["pdns_log_directory"]),
|
||||
)
|
||||
if self.dns_server_daemon:
|
||||
self.logger.out(
|
||||
'Successfully started PowerDNS zone aggregator',
|
||||
state='o'
|
||||
)
|
||||
self.logger.out("Successfully started PowerDNS zone aggregator", state="o")
|
||||
|
||||
def stop(self):
|
||||
if self.dns_server_daemon:
|
||||
self.logger.out(
|
||||
'Stopping PowerDNS zone aggregator',
|
||||
state='i'
|
||||
)
|
||||
self.logger.out("Stopping PowerDNS zone aggregator", state="i")
|
||||
# Terminate, then kill
|
||||
self.dns_server_daemon.signal('term')
|
||||
self.dns_server_daemon.signal("term")
|
||||
time.sleep(0.2)
|
||||
self.dns_server_daemon.signal('kill')
|
||||
self.logger.out(
|
||||
'Successfully stopped PowerDNS zone aggregator',
|
||||
state='o'
|
||||
)
|
||||
self.dns_server_daemon.signal("kill")
|
||||
self.logger.out("Successfully stopped PowerDNS zone aggregator", state="o")
|
||||
|
||||
|
||||
class DNSNetworkInstance(object):
|
||||
@ -153,29 +155,24 @@ class DNSNetworkInstance(object):
|
||||
network_domain = self.network.domain
|
||||
|
||||
self.logger.out(
|
||||
'Adding entry for client domain {}'.format(
|
||||
network_domain
|
||||
),
|
||||
prefix='DNS aggregator',
|
||||
state='o'
|
||||
"Adding entry for client domain {}".format(network_domain),
|
||||
prefix="DNS aggregator",
|
||||
state="o",
|
||||
)
|
||||
|
||||
# Connect to the database
|
||||
self.sql_conn = psycopg2.connect(
|
||||
"host='{}' port='{}' dbname='{}' user='{}' password='{}' sslmode='disable'".format(
|
||||
self.config['pdns_postgresql_host'],
|
||||
self.config['pdns_postgresql_port'],
|
||||
self.config['pdns_postgresql_dbname'],
|
||||
self.config['pdns_postgresql_user'],
|
||||
self.config['pdns_postgresql_password']
|
||||
self.config["pdns_postgresql_host"],
|
||||
self.config["pdns_postgresql_port"],
|
||||
self.config["pdns_postgresql_dbname"],
|
||||
self.config["pdns_postgresql_user"],
|
||||
self.config["pdns_postgresql_password"],
|
||||
)
|
||||
)
|
||||
sql_curs = self.sql_conn.cursor()
|
||||
# Try to access the domains entry
|
||||
sql_curs.execute(
|
||||
"SELECT * FROM domains WHERE name=%s",
|
||||
(network_domain,)
|
||||
)
|
||||
sql_curs.execute("SELECT * FROM domains WHERE name=%s", (network_domain,))
|
||||
results = sql_curs.fetchone()
|
||||
|
||||
# If we got back a result, don't try to add the domain to the DB
|
||||
@ -188,14 +185,11 @@ class DNSNetworkInstance(object):
|
||||
if self.aggregator.is_active and write_domain:
|
||||
sql_curs.execute(
|
||||
"INSERT INTO domains (name, type, account, notified_serial) VALUES (%s, 'MASTER', 'internal', 0)",
|
||||
(network_domain,)
|
||||
(network_domain,),
|
||||
)
|
||||
self.sql_conn.commit()
|
||||
|
||||
sql_curs.execute(
|
||||
"SELECT id FROM domains WHERE name=%s",
|
||||
(network_domain,)
|
||||
)
|
||||
sql_curs.execute("SELECT id FROM domains WHERE name=%s", (network_domain,))
|
||||
domain_id = sql_curs.fetchone()
|
||||
|
||||
sql_curs.execute(
|
||||
@ -203,13 +197,22 @@ class DNSNetworkInstance(object):
|
||||
INSERT INTO records (domain_id, name, content, type, ttl, prio) VALUES
|
||||
(%s, %s, %s, %s, %s, %s)
|
||||
""",
|
||||
(domain_id, network_domain, 'nsX.{d} root.{d} 1 10800 1800 86400 86400'.format(d=self.config['upstream_domain']), 'SOA', 86400, 0)
|
||||
(
|
||||
domain_id,
|
||||
network_domain,
|
||||
"nsX.{d} root.{d} 1 10800 1800 86400 86400".format(
|
||||
d=self.config["upstream_domain"]
|
||||
),
|
||||
"SOA",
|
||||
86400,
|
||||
0,
|
||||
),
|
||||
)
|
||||
|
||||
if self.network.name_servers:
|
||||
ns_servers = self.network.name_servers
|
||||
else:
|
||||
ns_servers = ['pvc-dns.{}'.format(self.config['upstream_domain'])]
|
||||
ns_servers = ["pvc-dns.{}".format(self.config["upstream_domain"])]
|
||||
|
||||
for ns_server in ns_servers:
|
||||
sql_curs.execute(
|
||||
@ -217,7 +220,7 @@ class DNSNetworkInstance(object):
|
||||
INSERT INTO records (domain_id, name, content, type, ttl, prio) VALUES
|
||||
(%s, %s, %s, %s, %s, %s)
|
||||
""",
|
||||
(domain_id, network_domain, ns_server, 'NS', 86400, 0)
|
||||
(domain_id, network_domain, ns_server, "NS", 86400, 0),
|
||||
)
|
||||
|
||||
self.sql_conn.commit()
|
||||
@ -229,42 +232,31 @@ class DNSNetworkInstance(object):
|
||||
network_domain = self.network.domain
|
||||
|
||||
self.logger.out(
|
||||
'Removing entry for client domain {}'.format(
|
||||
network_domain
|
||||
),
|
||||
prefix='DNS aggregator',
|
||||
state='o'
|
||||
"Removing entry for client domain {}".format(network_domain),
|
||||
prefix="DNS aggregator",
|
||||
state="o",
|
||||
)
|
||||
|
||||
# Connect to the database
|
||||
self.sql_conn = psycopg2.connect(
|
||||
"host='{}' port='{}' dbname='{}' user='{}' password='{}' sslmode='disable'".format(
|
||||
self.config['pdns_postgresql_host'],
|
||||
self.config['pdns_postgresql_port'],
|
||||
self.config['pdns_postgresql_dbname'],
|
||||
self.config['pdns_postgresql_user'],
|
||||
self.config['pdns_postgresql_password']
|
||||
self.config["pdns_postgresql_host"],
|
||||
self.config["pdns_postgresql_port"],
|
||||
self.config["pdns_postgresql_dbname"],
|
||||
self.config["pdns_postgresql_user"],
|
||||
self.config["pdns_postgresql_password"],
|
||||
)
|
||||
)
|
||||
sql_curs = self.sql_conn.cursor()
|
||||
|
||||
# Get the domain ID
|
||||
sql_curs.execute(
|
||||
"SELECT id FROM domains WHERE name=%s",
|
||||
(network_domain,)
|
||||
)
|
||||
sql_curs.execute("SELECT id FROM domains WHERE name=%s", (network_domain,))
|
||||
domain_id = sql_curs.fetchone()
|
||||
|
||||
# Delete the domain from the database if we're active
|
||||
if self.aggregator.is_active and domain_id:
|
||||
sql_curs.execute(
|
||||
"DELETE FROM domains WHERE id=%s",
|
||||
(domain_id,)
|
||||
)
|
||||
sql_curs.execute(
|
||||
"DELETE FROM records WHERE domain_id=%s",
|
||||
(domain_id,)
|
||||
)
|
||||
sql_curs.execute("DELETE FROM domains WHERE id=%s", (domain_id,))
|
||||
sql_curs.execute("DELETE FROM records WHERE domain_id=%s", (domain_id,))
|
||||
|
||||
self.sql_conn.commit()
|
||||
self.sql_conn.close()
|
||||
@ -295,11 +287,11 @@ class AXFRDaemonInstance(object):
|
||||
# after the leader transitions
|
||||
self.sql_conn = psycopg2.connect(
|
||||
"host='{}' port='{}' dbname='{}' user='{}' password='{}' sslmode='disable'".format(
|
||||
self.config['pdns_postgresql_host'],
|
||||
self.config['pdns_postgresql_port'],
|
||||
self.config['pdns_postgresql_dbname'],
|
||||
self.config['pdns_postgresql_user'],
|
||||
self.config['pdns_postgresql_password']
|
||||
self.config["pdns_postgresql_host"],
|
||||
self.config["pdns_postgresql_port"],
|
||||
self.config["pdns_postgresql_dbname"],
|
||||
self.config["pdns_postgresql_user"],
|
||||
self.config["pdns_postgresql_password"],
|
||||
)
|
||||
)
|
||||
|
||||
@ -328,7 +320,7 @@ class AXFRDaemonInstance(object):
|
||||
|
||||
# Set up our basic variables
|
||||
domain = network.domain
|
||||
if network.ip4_gateway != 'None':
|
||||
if network.ip4_gateway != "None":
|
||||
dnsmasq_ip = network.ip4_gateway
|
||||
else:
|
||||
dnsmasq_ip = network.ip6_gateway
|
||||
@ -341,53 +333,67 @@ class AXFRDaemonInstance(object):
|
||||
z = dns.zone.from_xfr(axfr)
|
||||
records_raw = [z[n].to_text(n) for n in z.nodes.keys()]
|
||||
except Exception as e:
|
||||
if self.config['debug']:
|
||||
self.logger.out('{} {} ({})'.format(e, dnsmasq_ip, domain), state='d', prefix='dns-aggregator')
|
||||
if self.config["debug"]:
|
||||
self.logger.out(
|
||||
"{} {} ({})".format(e, dnsmasq_ip, domain),
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
continue
|
||||
|
||||
# Fix the formatting because it's useless
|
||||
# reference: ['@ 600 IN SOA . . 4 1200 180 1209600 600\n@ 600 IN NS .', 'test3 600 IN A 10.1.1.203\ntest3 600 IN AAAA 2001:b23e:1113:0:5054:ff:fe5c:f131', etc.]
|
||||
# We don't really care about dnsmasq's terrible SOA or NS records which are in [0]
|
||||
string_records = '\n'.join(records_raw[1:])
|
||||
string_records = "\n".join(records_raw[1:])
|
||||
# Split into individual records
|
||||
records_new = list()
|
||||
for element in string_records.split('\n'):
|
||||
for element in string_records.split("\n"):
|
||||
if element:
|
||||
record = element.split()
|
||||
# Handle space-containing data elements
|
||||
if domain not in record[0]:
|
||||
name = '{}.{}'.format(record[0], domain)
|
||||
name = "{}.{}".format(record[0], domain)
|
||||
else:
|
||||
name = record[0]
|
||||
entry = '{} {} IN {} {}'.format(name, record[1], record[3], ' '.join(record[4:]))
|
||||
entry = "{} {} IN {} {}".format(
|
||||
name, record[1], record[3], " ".join(record[4:])
|
||||
)
|
||||
records_new.append(entry)
|
||||
|
||||
#
|
||||
# Get the current zone from the database
|
||||
#
|
||||
try:
|
||||
sql_curs.execute(
|
||||
"SELECT id FROM domains WHERE name=%s",
|
||||
(domain,)
|
||||
)
|
||||
sql_curs.execute("SELECT id FROM domains WHERE name=%s", (domain,))
|
||||
domain_id = sql_curs.fetchone()
|
||||
sql_curs.execute(
|
||||
"SELECT * FROM records WHERE domain_id=%s",
|
||||
(domain_id,)
|
||||
"SELECT * FROM records WHERE domain_id=%s", (domain_id,)
|
||||
)
|
||||
results = list(sql_curs.fetchall())
|
||||
if self.config['debug']:
|
||||
self.logger.out('SQL query results: {}'.format(results), state='d', prefix='dns-aggregator')
|
||||
if self.config["debug"]:
|
||||
self.logger.out(
|
||||
"SQL query results: {}".format(results),
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
except Exception as e:
|
||||
self.logger.out('ERROR: Failed to obtain DNS records from database: {}'.format(e))
|
||||
self.logger.out(
|
||||
"ERROR: Failed to obtain DNS records from database: {}".format(
|
||||
e
|
||||
)
|
||||
)
|
||||
|
||||
# Fix the formatting because it's useless for comparison
|
||||
# reference: ((10, 28, 'testnet01.i.bonilan.net', 'SOA', 'nsX.pvc.local root.pvc.local 1 10800 1800 86400 86400', 86400, 0, None, 0, None, 1), etc.)
|
||||
records_old = list()
|
||||
records_old_ids = list()
|
||||
if not results:
|
||||
if self.config['debug']:
|
||||
self.logger.out('No results found, skipping.', state='d', prefix='dns-aggregator')
|
||||
if self.config["debug"]:
|
||||
self.logger.out(
|
||||
"No results found, skipping.",
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
continue
|
||||
for record in results:
|
||||
# Skip the non-A
|
||||
@ -397,14 +403,24 @@ class AXFRDaemonInstance(object):
|
||||
r_type = record[3]
|
||||
r_data = record[4]
|
||||
# Assemble a list element in the same format as the AXFR data
|
||||
entry = '{} {} IN {} {}'.format(r_name, r_ttl, r_type, r_data)
|
||||
if self.config['debug']:
|
||||
self.logger.out('Found record: {}'.format(entry), state='d', prefix='dns-aggregator')
|
||||
entry = "{} {} IN {} {}".format(r_name, r_ttl, r_type, r_data)
|
||||
if self.config["debug"]:
|
||||
self.logger.out(
|
||||
"Found record: {}".format(entry),
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
|
||||
# Skip non-A or AAAA records
|
||||
if r_type != 'A' and r_type != 'AAAA':
|
||||
if self.config['debug']:
|
||||
self.logger.out('Skipping record {}, not A or AAAA: "{}"'.format(entry, r_type), state='d', prefix='dns-aggregator')
|
||||
if r_type != "A" and r_type != "AAAA":
|
||||
if self.config["debug"]:
|
||||
self.logger.out(
|
||||
'Skipping record {}, not A or AAAA: "{}"'.format(
|
||||
entry, r_type
|
||||
),
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
continue
|
||||
|
||||
records_old.append(entry)
|
||||
@ -413,9 +429,17 @@ class AXFRDaemonInstance(object):
|
||||
records_new.sort()
|
||||
records_old.sort()
|
||||
|
||||
if self.config['debug']:
|
||||
self.logger.out('New: {}'.format(records_new), state='d', prefix='dns-aggregator')
|
||||
self.logger.out('Old: {}'.format(records_old), state='d', prefix='dns-aggregator')
|
||||
if self.config["debug"]:
|
||||
self.logger.out(
|
||||
"New: {}".format(records_new),
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
self.logger.out(
|
||||
"Old: {}".format(records_old),
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
|
||||
# Find the differences between the lists
|
||||
# Basic check one: are they completely equal
|
||||
@ -426,9 +450,17 @@ class AXFRDaemonInstance(object):
|
||||
in_new_not_in_old = in_new - in_old
|
||||
in_old_not_in_new = in_old - in_new
|
||||
|
||||
if self.config['debug']:
|
||||
self.logger.out('New but not old: {}'.format(in_new_not_in_old), state='d', prefix='dns-aggregator')
|
||||
self.logger.out('Old but not new: {}'.format(in_old_not_in_new), state='d', prefix='dns-aggregator')
|
||||
if self.config["debug"]:
|
||||
self.logger.out(
|
||||
"New but not old: {}".format(in_new_not_in_old),
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
self.logger.out(
|
||||
"Old but not new: {}".format(in_old_not_in_new),
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
|
||||
# Go through the old list
|
||||
remove_records = list() # list of database IDs
|
||||
@ -445,18 +477,24 @@ class AXFRDaemonInstance(object):
|
||||
for newrecord in in_new_not_in_old:
|
||||
splitnewrecord = newrecord.split()
|
||||
# If there's a name and type match with different content, remove the old one
|
||||
if splitrecord[0] == splitnewrecord[0] and splitrecord[3] == splitnewrecord[3]:
|
||||
if (
|
||||
splitrecord[0] == splitnewrecord[0]
|
||||
and splitrecord[3] == splitnewrecord[3]
|
||||
):
|
||||
remove_records.append(record_id)
|
||||
|
||||
changed = False
|
||||
if len(remove_records) > 0:
|
||||
# Remove the invalid old records
|
||||
for record_id in remove_records:
|
||||
if self.config['debug']:
|
||||
self.logger.out('Removing record: {}'.format(record_id), state='d', prefix='dns-aggregator')
|
||||
if self.config["debug"]:
|
||||
self.logger.out(
|
||||
"Removing record: {}".format(record_id),
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
sql_curs.execute(
|
||||
"DELETE FROM records WHERE id=%s",
|
||||
(record_id,)
|
||||
"DELETE FROM records WHERE id=%s", (record_id,)
|
||||
)
|
||||
changed = True
|
||||
|
||||
@ -469,53 +507,81 @@ class AXFRDaemonInstance(object):
|
||||
r_ttl = record[1]
|
||||
r_type = record[3]
|
||||
r_data = record[4]
|
||||
if self.config['debug']:
|
||||
self.logger.out('Add record: {}'.format(name), state='d', prefix='dns-aggregator')
|
||||
if self.config["debug"]:
|
||||
self.logger.out(
|
||||
"Add record: {}".format(name),
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
try:
|
||||
sql_curs.execute(
|
||||
"INSERT INTO records (domain_id, name, ttl, type, prio, content) VALUES (%s, %s, %s, %s, %s, %s)",
|
||||
(domain_id, r_name, r_ttl, r_type, 0, r_data)
|
||||
(domain_id, r_name, r_ttl, r_type, 0, r_data),
|
||||
)
|
||||
changed = True
|
||||
except psycopg2.IntegrityError as e:
|
||||
if self.config['debug']:
|
||||
self.logger.out('Failed to add record due to {}: {}'.format(e, name), state='d', prefix='dns-aggregator')
|
||||
if self.config["debug"]:
|
||||
self.logger.out(
|
||||
"Failed to add record due to {}: {}".format(
|
||||
e, name
|
||||
),
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
except psycopg2.errors.InFailedSqlTransaction as e:
|
||||
if self.config['debug']:
|
||||
self.logger.out('Failed to add record due to {}: {}'.format(e, name), state='d', prefix='dns-aggregator')
|
||||
if self.config["debug"]:
|
||||
self.logger.out(
|
||||
"Failed to add record due to {}: {}".format(
|
||||
e, name
|
||||
),
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
|
||||
if changed:
|
||||
# Increase SOA serial
|
||||
sql_curs.execute(
|
||||
"SELECT content FROM records WHERE domain_id=%s AND type='SOA'",
|
||||
(domain_id,)
|
||||
(domain_id,),
|
||||
)
|
||||
soa_record = list(sql_curs.fetchone())[0].split()
|
||||
current_serial = int(soa_record[2])
|
||||
new_serial = current_serial + 1
|
||||
soa_record[2] = str(new_serial)
|
||||
if self.config['debug']:
|
||||
self.logger.out('Records changed; bumping SOA: {}'.format(new_serial), state='d', prefix='dns-aggregator')
|
||||
if self.config["debug"]:
|
||||
self.logger.out(
|
||||
"Records changed; bumping SOA: {}".format(new_serial),
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
sql_curs.execute(
|
||||
"UPDATE records SET content=%s WHERE domain_id=%s AND type='SOA'",
|
||||
(' '.join(soa_record), domain_id)
|
||||
(" ".join(soa_record), domain_id),
|
||||
)
|
||||
|
||||
# Commit all the previous changes
|
||||
if self.config['debug']:
|
||||
self.logger.out('Committing database changes and reloading PDNS', state='d', prefix='dns-aggregator')
|
||||
if self.config["debug"]:
|
||||
self.logger.out(
|
||||
"Committing database changes and reloading PDNS",
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
try:
|
||||
self.sql_conn.commit()
|
||||
except Exception as e:
|
||||
self.logger.out('ERROR: Failed to commit DNS aggregator changes: {}'.format(e), state='e')
|
||||
self.logger.out(
|
||||
"ERROR: Failed to commit DNS aggregator changes: {}".format(
|
||||
e
|
||||
),
|
||||
state="e",
|
||||
)
|
||||
|
||||
# Reload the domain
|
||||
common.run_os_command(
|
||||
'/usr/bin/pdns_control --socket-dir={} reload {}'.format(
|
||||
self.config['pdns_dynamic_directory'],
|
||||
domain
|
||||
"/usr/bin/pdns_control --socket-dir={} reload {}".format(
|
||||
self.config["pdns_dynamic_directory"], domain
|
||||
),
|
||||
background=False
|
||||
background=False,
|
||||
)
|
||||
|
||||
# Wait for 10 seconds
|
||||
|
@ -46,45 +46,52 @@ class MetadataAPIInstance(object):
|
||||
|
||||
# Add flask routes inside our instance
|
||||
def add_routes(self):
|
||||
@self.mdapi.route('/', methods=['GET'])
|
||||
@self.mdapi.route("/", methods=["GET"])
|
||||
def api_root():
|
||||
return flask.jsonify({"message": "PVC Provisioner Metadata API version 1"}), 209
|
||||
return (
|
||||
flask.jsonify({"message": "PVC Provisioner Metadata API version 1"}),
|
||||
209,
|
||||
)
|
||||
|
||||
@self.mdapi.route('/<version>/meta-data/', methods=['GET'])
|
||||
@self.mdapi.route("/<version>/meta-data/", methods=["GET"])
|
||||
def api_metadata_root(version):
|
||||
metadata = """instance-id\nname\nprofile"""
|
||||
return metadata, 200
|
||||
|
||||
@self.mdapi.route('/<version>/meta-data/instance-id', methods=['GET'])
|
||||
@self.mdapi.route("/<version>/meta-data/instance-id", methods=["GET"])
|
||||
def api_metadata_instanceid(version):
|
||||
source_address = flask.request.__dict__['environ']['REMOTE_ADDR']
|
||||
source_address = flask.request.__dict__["environ"]["REMOTE_ADDR"]
|
||||
vm_details = self.get_vm_details(source_address)
|
||||
instance_id = vm_details.get('uuid', None)
|
||||
instance_id = vm_details.get("uuid", None)
|
||||
return instance_id, 200
|
||||
|
||||
@self.mdapi.route('/<version>/meta-data/name', methods=['GET'])
|
||||
@self.mdapi.route("/<version>/meta-data/name", methods=["GET"])
|
||||
def api_metadata_hostname(version):
|
||||
source_address = flask.request.__dict__['environ']['REMOTE_ADDR']
|
||||
source_address = flask.request.__dict__["environ"]["REMOTE_ADDR"]
|
||||
vm_details = self.get_vm_details(source_address)
|
||||
vm_name = vm_details.get('name', None)
|
||||
vm_name = vm_details.get("name", None)
|
||||
return vm_name, 200
|
||||
|
||||
@self.mdapi.route('/<version>/meta-data/profile', methods=['GET'])
|
||||
@self.mdapi.route("/<version>/meta-data/profile", methods=["GET"])
|
||||
def api_metadata_profile(version):
|
||||
source_address = flask.request.__dict__['environ']['REMOTE_ADDR']
|
||||
source_address = flask.request.__dict__["environ"]["REMOTE_ADDR"]
|
||||
vm_details = self.get_vm_details(source_address)
|
||||
vm_profile = vm_details.get('profile', None)
|
||||
vm_profile = vm_details.get("profile", None)
|
||||
return vm_profile, 200
|
||||
|
||||
@self.mdapi.route('/<version>/user-data', methods=['GET'])
|
||||
@self.mdapi.route("/<version>/user-data", methods=["GET"])
|
||||
def api_userdata(version):
|
||||
source_address = flask.request.__dict__['environ']['REMOTE_ADDR']
|
||||
source_address = flask.request.__dict__["environ"]["REMOTE_ADDR"]
|
||||
vm_details = self.get_vm_details(source_address)
|
||||
vm_profile = vm_details.get('profile', None)
|
||||
vm_profile = vm_details.get("profile", None)
|
||||
# Get the userdata
|
||||
if vm_profile:
|
||||
userdata = self.get_profile_userdata(vm_profile)
|
||||
self.logger.out("Returning userdata for profile {}".format(vm_profile), state='i', prefix='Metadata API')
|
||||
self.logger.out(
|
||||
"Returning userdata for profile {}".format(vm_profile),
|
||||
state="i",
|
||||
prefix="Metadata API",
|
||||
)
|
||||
else:
|
||||
userdata = None
|
||||
return flask.Response(userdata)
|
||||
@ -92,46 +99,46 @@ class MetadataAPIInstance(object):
|
||||
def launch_wsgi(self):
|
||||
try:
|
||||
self.md_http_server = gevent.pywsgi.WSGIServer(
|
||||
('169.254.169.254', 80),
|
||||
("169.254.169.254", 80),
|
||||
self.mdapi,
|
||||
log=sys.stdout,
|
||||
error_log=sys.stdout
|
||||
error_log=sys.stdout,
|
||||
)
|
||||
self.md_http_server.serve_forever()
|
||||
except Exception as e:
|
||||
self.logger.out('Error starting Metadata API: {}'.format(e), state='e')
|
||||
self.logger.out("Error starting Metadata API: {}".format(e), state="e")
|
||||
|
||||
# WSGI start/stop
|
||||
def start(self):
|
||||
# Launch Metadata API
|
||||
self.logger.out('Starting Metadata API at 169.254.169.254:80', state='i')
|
||||
self.logger.out("Starting Metadata API at 169.254.169.254:80", state="i")
|
||||
self.thread = Thread(target=self.launch_wsgi)
|
||||
self.thread.start()
|
||||
self.logger.out('Successfully started Metadata API thread', state='o')
|
||||
self.logger.out("Successfully started Metadata API thread", state="o")
|
||||
|
||||
def stop(self):
|
||||
if not self.md_http_server:
|
||||
return
|
||||
|
||||
self.logger.out('Stopping Metadata API at 169.254.169.254:80', state='i')
|
||||
self.logger.out("Stopping Metadata API at 169.254.169.254:80", state="i")
|
||||
try:
|
||||
self.md_http_server.stop()
|
||||
time.sleep(0.1)
|
||||
self.md_http_server.close()
|
||||
time.sleep(0.1)
|
||||
self.md_http_server = None
|
||||
self.logger.out('Successfully stopped Metadata API', state='o')
|
||||
self.logger.out("Successfully stopped Metadata API", state="o")
|
||||
except Exception as e:
|
||||
self.logger.out('Error stopping Metadata API: {}'.format(e), state='e')
|
||||
self.logger.out("Error stopping Metadata API: {}".format(e), state="e")
|
||||
|
||||
# Helper functions
|
||||
def open_database(self):
|
||||
conn = psycopg2.connect(
|
||||
host=self.config['metadata_postgresql_host'],
|
||||
port=self.config['metadata_postgresql_port'],
|
||||
dbname=self.config['metadata_postgresql_dbname'],
|
||||
user=self.config['metadata_postgresql_user'],
|
||||
password=self.config['metadata_postgresql_password']
|
||||
host=self.config["metadata_postgresql_host"],
|
||||
port=self.config["metadata_postgresql_port"],
|
||||
dbname=self.config["metadata_postgresql_dbname"],
|
||||
user=self.config["metadata_postgresql_user"],
|
||||
password=self.config["metadata_postgresql_password"],
|
||||
)
|
||||
cur = conn.cursor(cursor_factory=RealDictCursor)
|
||||
return conn, cur
|
||||
@ -153,7 +160,7 @@ class MetadataAPIInstance(object):
|
||||
data_raw = cur.fetchone()
|
||||
self.close_database(conn, cur)
|
||||
if data_raw is not None:
|
||||
data = data_raw.get('userdata', None)
|
||||
data = data_raw.get("userdata", None)
|
||||
return data
|
||||
else:
|
||||
return None
|
||||
@ -165,27 +172,31 @@ class MetadataAPIInstance(object):
|
||||
|
||||
# Figure out which server this is via the DHCP address
|
||||
host_information = dict()
|
||||
networks_managed = (x for x in networks if x.get('type') == 'managed')
|
||||
networks_managed = (x for x in networks if x.get("type") == "managed")
|
||||
for network in networks_managed:
|
||||
network_leases = pvc_network.getNetworkDHCPLeases(self.zkhandler, network.get('vni'))
|
||||
network_leases = pvc_network.getNetworkDHCPLeases(
|
||||
self.zkhandler, network.get("vni")
|
||||
)
|
||||
for network_lease in network_leases:
|
||||
information = pvc_network.getDHCPLeaseInformation(self.zkhandler, network.get('vni'), network_lease)
|
||||
information = pvc_network.getDHCPLeaseInformation(
|
||||
self.zkhandler, network.get("vni"), network_lease
|
||||
)
|
||||
try:
|
||||
if information.get('ip4_address', None) == source_address:
|
||||
if information.get("ip4_address", None) == source_address:
|
||||
host_information = information
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Get our real information on the host; now we can start querying about it
|
||||
client_macaddr = host_information.get('mac_address', None)
|
||||
client_macaddr = host_information.get("mac_address", None)
|
||||
|
||||
# Find the VM with that MAC address - we can't assume that the hostname is actually right
|
||||
_discard, vm_list = pvc_vm.get_list(self.zkhandler, None, None, None, None)
|
||||
vm_details = dict()
|
||||
for vm in vm_list:
|
||||
try:
|
||||
for network in vm.get('networks'):
|
||||
if network.get('mac', None) == client_macaddr:
|
||||
for network in vm.get("networks"):
|
||||
if network.get("mac", None) == client_macaddr:
|
||||
vm_details = vm
|
||||
except Exception:
|
||||
pass
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -23,10 +23,10 @@ import daemon_lib.common as common
|
||||
|
||||
|
||||
def boolToOnOff(state):
|
||||
if state and str(state) == 'True':
|
||||
return 'on'
|
||||
if state and str(state) == "True":
|
||||
return "on"
|
||||
else:
|
||||
return 'off'
|
||||
return "off"
|
||||
|
||||
|
||||
class SRIOVVFInstance(object):
|
||||
@ -39,12 +39,20 @@ class SRIOVVFInstance(object):
|
||||
self.this_node = this_node
|
||||
self.myhostname = self.this_node.name
|
||||
|
||||
self.pf = self.zkhandler.read(('node.sriov.vf', self.myhostname, 'sriov_vf.pf', self.vf))
|
||||
self.mtu = self.zkhandler.read(('node.sriov.vf', self.myhostname, 'sriov_vf.mtu', self.vf))
|
||||
self.vfid = self.vf.replace('{}v'.format(self.pf), '')
|
||||
self.pf = self.zkhandler.read(
|
||||
("node.sriov.vf", self.myhostname, "sriov_vf.pf", self.vf)
|
||||
)
|
||||
self.mtu = self.zkhandler.read(
|
||||
("node.sriov.vf", self.myhostname, "sriov_vf.mtu", self.vf)
|
||||
)
|
||||
self.vfid = self.vf.replace("{}v".format(self.pf), "")
|
||||
|
||||
self.logger.out('Setting MTU to {}'.format(self.mtu), state='i', prefix='SR-IOV VF {}'.format(self.vf))
|
||||
common.run_os_command('ip link set {} mtu {}'.format(self.vf, self.mtu))
|
||||
self.logger.out(
|
||||
"Setting MTU to {}".format(self.mtu),
|
||||
state="i",
|
||||
prefix="SR-IOV VF {}".format(self.vf),
|
||||
)
|
||||
common.run_os_command("ip link set {} mtu {}".format(self.vf, self.mtu))
|
||||
|
||||
# These properties are set via the DataWatch functions, to ensure they are configured on the system
|
||||
self.mac = None
|
||||
@ -58,153 +66,244 @@ class SRIOVVFInstance(object):
|
||||
self.query_rss = None
|
||||
|
||||
# Zookeeper handlers for changed configs
|
||||
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('node.sriov.vf', self.myhostname) + self.zkhandler.schema.path('sriov_vf.mac', self.vf))
|
||||
def watch_vf_mac(data, stat, event=''):
|
||||
if event and event.type == 'DELETED':
|
||||
@self.zkhandler.zk_conn.DataWatch(
|
||||
self.zkhandler.schema.path("node.sriov.vf", self.myhostname)
|
||||
+ self.zkhandler.schema.path("sriov_vf.mac", self.vf)
|
||||
)
|
||||
def watch_vf_mac(data, stat, event=""):
|
||||
if event and event.type == "DELETED":
|
||||
# The key has been deleted after existing before; terminate this watcher
|
||||
# because this class instance is about to be reaped in Daemon.py
|
||||
return False
|
||||
|
||||
try:
|
||||
data = data.decode('ascii')
|
||||
data = data.decode("ascii")
|
||||
except AttributeError:
|
||||
data = '00:00:00:00:00:00'
|
||||
data = "00:00:00:00:00:00"
|
||||
|
||||
if data != self.mac:
|
||||
self.mac = data
|
||||
|
||||
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('node.sriov.vf', self.myhostname) + self.zkhandler.schema.path('sriov_vf.config.vlan_id', self.vf))
|
||||
def watch_vf_vlan_id(data, stat, event=''):
|
||||
if event and event.type == 'DELETED':
|
||||
@self.zkhandler.zk_conn.DataWatch(
|
||||
self.zkhandler.schema.path("node.sriov.vf", self.myhostname)
|
||||
+ self.zkhandler.schema.path("sriov_vf.config.vlan_id", self.vf)
|
||||
)
|
||||
def watch_vf_vlan_id(data, stat, event=""):
|
||||
if event and event.type == "DELETED":
|
||||
# The key has been deleted after existing before; terminate this watcher
|
||||
# because this class instance is about to be reaped in Daemon.py
|
||||
return False
|
||||
|
||||
try:
|
||||
data = data.decode('ascii')
|
||||
data = data.decode("ascii")
|
||||
except AttributeError:
|
||||
data = '0'
|
||||
data = "0"
|
||||
|
||||
if data != self.vlan_id:
|
||||
self.vlan_id = data
|
||||
self.logger.out('Setting vLAN ID to {}'.format(self.vlan_id), state='i', prefix='SR-IOV VF {}'.format(self.vf))
|
||||
common.run_os_command('ip link set {} vf {} vlan {} qos {}'.format(self.pf, self.vfid, self.vlan_id, self.vlan_qos))
|
||||
self.logger.out(
|
||||
"Setting vLAN ID to {}".format(self.vlan_id),
|
||||
state="i",
|
||||
prefix="SR-IOV VF {}".format(self.vf),
|
||||
)
|
||||
common.run_os_command(
|
||||
"ip link set {} vf {} vlan {} qos {}".format(
|
||||
self.pf, self.vfid, self.vlan_id, self.vlan_qos
|
||||
)
|
||||
)
|
||||
|
||||
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('node.sriov.vf', self.myhostname) + self.zkhandler.schema.path('sriov_vf.config.vlan_qos', self.vf))
|
||||
def watch_vf_vlan_qos(data, stat, event=''):
|
||||
if event and event.type == 'DELETED':
|
||||
@self.zkhandler.zk_conn.DataWatch(
|
||||
self.zkhandler.schema.path("node.sriov.vf", self.myhostname)
|
||||
+ self.zkhandler.schema.path("sriov_vf.config.vlan_qos", self.vf)
|
||||
)
|
||||
def watch_vf_vlan_qos(data, stat, event=""):
|
||||
if event and event.type == "DELETED":
|
||||
# The key has been deleted after existing before; terminate this watcher
|
||||
# because this class instance is about to be reaped in Daemon.py
|
||||
return False
|
||||
|
||||
try:
|
||||
data = data.decode('ascii')
|
||||
data = data.decode("ascii")
|
||||
except AttributeError:
|
||||
data = '0'
|
||||
data = "0"
|
||||
|
||||
if data != self.vlan_qos:
|
||||
self.vlan_qos = data
|
||||
self.logger.out('Setting vLAN QOS to {}'.format(self.vlan_qos), state='i', prefix='SR-IOV VF {}'.format(self.vf))
|
||||
common.run_os_command('ip link set {} vf {} vlan {} qos {}'.format(self.pf, self.vfid, self.vlan_id, self.vlan_qos))
|
||||
self.logger.out(
|
||||
"Setting vLAN QOS to {}".format(self.vlan_qos),
|
||||
state="i",
|
||||
prefix="SR-IOV VF {}".format(self.vf),
|
||||
)
|
||||
common.run_os_command(
|
||||
"ip link set {} vf {} vlan {} qos {}".format(
|
||||
self.pf, self.vfid, self.vlan_id, self.vlan_qos
|
||||
)
|
||||
)
|
||||
|
||||
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('node.sriov.vf', self.myhostname) + self.zkhandler.schema.path('sriov_vf.config.tx_rate_min', self.vf))
|
||||
def watch_vf_tx_rate_min(data, stat, event=''):
|
||||
if event and event.type == 'DELETED':
|
||||
@self.zkhandler.zk_conn.DataWatch(
|
||||
self.zkhandler.schema.path("node.sriov.vf", self.myhostname)
|
||||
+ self.zkhandler.schema.path("sriov_vf.config.tx_rate_min", self.vf)
|
||||
)
|
||||
def watch_vf_tx_rate_min(data, stat, event=""):
|
||||
if event and event.type == "DELETED":
|
||||
# The key has been deleted after existing before; terminate this watcher
|
||||
# because this class instance is about to be reaped in Daemon.py
|
||||
return False
|
||||
|
||||
try:
|
||||
data = data.decode('ascii')
|
||||
data = data.decode("ascii")
|
||||
except AttributeError:
|
||||
data = '0'
|
||||
data = "0"
|
||||
|
||||
if data != self.tx_rate_min:
|
||||
self.tx_rate_min = data
|
||||
self.logger.out('Setting minimum TX rate to {}'.format(self.tx_rate_min), state='i', prefix='SR-IOV VF {}'.format(self.vf))
|
||||
common.run_os_command('ip link set {} vf {} min_tx_rate {}'.format(self.pf, self.vfid, self.tx_rate_min))
|
||||
self.logger.out(
|
||||
"Setting minimum TX rate to {}".format(self.tx_rate_min),
|
||||
state="i",
|
||||
prefix="SR-IOV VF {}".format(self.vf),
|
||||
)
|
||||
common.run_os_command(
|
||||
"ip link set {} vf {} min_tx_rate {}".format(
|
||||
self.pf, self.vfid, self.tx_rate_min
|
||||
)
|
||||
)
|
||||
|
||||
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('node.sriov.vf', self.myhostname) + self.zkhandler.schema.path('sriov_vf.config.tx_rate_max', self.vf))
|
||||
def watch_vf_tx_rate_max(data, stat, event=''):
|
||||
if event and event.type == 'DELETED':
|
||||
@self.zkhandler.zk_conn.DataWatch(
|
||||
self.zkhandler.schema.path("node.sriov.vf", self.myhostname)
|
||||
+ self.zkhandler.schema.path("sriov_vf.config.tx_rate_max", self.vf)
|
||||
)
|
||||
def watch_vf_tx_rate_max(data, stat, event=""):
|
||||
if event and event.type == "DELETED":
|
||||
# The key has been deleted after existing before; termaxate this watcher
|
||||
# because this class instance is about to be reaped in Daemon.py
|
||||
return False
|
||||
|
||||
try:
|
||||
data = data.decode('ascii')
|
||||
data = data.decode("ascii")
|
||||
except AttributeError:
|
||||
data = '0'
|
||||
data = "0"
|
||||
|
||||
if data != self.tx_rate_max:
|
||||
self.tx_rate_max = data
|
||||
self.logger.out('Setting maximum TX rate to {}'.format(self.tx_rate_max), state='i', prefix='SR-IOV VF {}'.format(self.vf))
|
||||
common.run_os_command('ip link set {} vf {} max_tx_rate {}'.format(self.pf, self.vfid, self.tx_rate_max))
|
||||
self.logger.out(
|
||||
"Setting maximum TX rate to {}".format(self.tx_rate_max),
|
||||
state="i",
|
||||
prefix="SR-IOV VF {}".format(self.vf),
|
||||
)
|
||||
common.run_os_command(
|
||||
"ip link set {} vf {} max_tx_rate {}".format(
|
||||
self.pf, self.vfid, self.tx_rate_max
|
||||
)
|
||||
)
|
||||
|
||||
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('node.sriov.vf', self.myhostname) + self.zkhandler.schema.path('sriov_vf.config.spoof_check', self.vf))
|
||||
def watch_vf_spoof_check(data, stat, event=''):
|
||||
if event and event.type == 'DELETED':
|
||||
@self.zkhandler.zk_conn.DataWatch(
|
||||
self.zkhandler.schema.path("node.sriov.vf", self.myhostname)
|
||||
+ self.zkhandler.schema.path("sriov_vf.config.spoof_check", self.vf)
|
||||
)
|
||||
def watch_vf_spoof_check(data, stat, event=""):
|
||||
if event and event.type == "DELETED":
|
||||
# The key has been deleted after existing before; terminate this watcher
|
||||
# because this class instance is about to be reaped in Daemon.py
|
||||
return False
|
||||
|
||||
try:
|
||||
data = data.decode('ascii')
|
||||
data = data.decode("ascii")
|
||||
except AttributeError:
|
||||
data = '0'
|
||||
data = "0"
|
||||
|
||||
if data != self.spoof_check:
|
||||
self.spoof_check = data
|
||||
self.logger.out('Setting spoof checking {}'.format(boolToOnOff(self.spoof_check)), state='i', prefix='SR-IOV VF {}'.format(self.vf))
|
||||
common.run_os_command('ip link set {} vf {} spoofchk {}'.format(self.pf, self.vfid, boolToOnOff(self.spoof_check)))
|
||||
self.logger.out(
|
||||
"Setting spoof checking {}".format(boolToOnOff(self.spoof_check)),
|
||||
state="i",
|
||||
prefix="SR-IOV VF {}".format(self.vf),
|
||||
)
|
||||
common.run_os_command(
|
||||
"ip link set {} vf {} spoofchk {}".format(
|
||||
self.pf, self.vfid, boolToOnOff(self.spoof_check)
|
||||
)
|
||||
)
|
||||
|
||||
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('node.sriov.vf', self.myhostname) + self.zkhandler.schema.path('sriov_vf.config.link_state', self.vf))
|
||||
def watch_vf_link_state(data, stat, event=''):
|
||||
if event and event.type == 'DELETED':
|
||||
@self.zkhandler.zk_conn.DataWatch(
|
||||
self.zkhandler.schema.path("node.sriov.vf", self.myhostname)
|
||||
+ self.zkhandler.schema.path("sriov_vf.config.link_state", self.vf)
|
||||
)
|
||||
def watch_vf_link_state(data, stat, event=""):
|
||||
if event and event.type == "DELETED":
|
||||
# The key has been deleted after existing before; terminate this watcher
|
||||
# because this class instance is about to be reaped in Daemon.py
|
||||
return False
|
||||
|
||||
try:
|
||||
data = data.decode('ascii')
|
||||
data = data.decode("ascii")
|
||||
except AttributeError:
|
||||
data = 'on'
|
||||
data = "on"
|
||||
|
||||
if data != self.link_state:
|
||||
self.link_state = data
|
||||
self.logger.out('Setting link state to {}'.format(boolToOnOff(self.link_state)), state='i', prefix='SR-IOV VF {}'.format(self.vf))
|
||||
common.run_os_command('ip link set {} vf {} state {}'.format(self.pf, self.vfid, self.link_state))
|
||||
self.logger.out(
|
||||
"Setting link state to {}".format(boolToOnOff(self.link_state)),
|
||||
state="i",
|
||||
prefix="SR-IOV VF {}".format(self.vf),
|
||||
)
|
||||
common.run_os_command(
|
||||
"ip link set {} vf {} state {}".format(
|
||||
self.pf, self.vfid, self.link_state
|
||||
)
|
||||
)
|
||||
|
||||
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('node.sriov.vf', self.myhostname) + self.zkhandler.schema.path('sriov_vf.config.trust', self.vf))
|
||||
def watch_vf_trust(data, stat, event=''):
|
||||
if event and event.type == 'DELETED':
|
||||
@self.zkhandler.zk_conn.DataWatch(
|
||||
self.zkhandler.schema.path("node.sriov.vf", self.myhostname)
|
||||
+ self.zkhandler.schema.path("sriov_vf.config.trust", self.vf)
|
||||
)
|
||||
def watch_vf_trust(data, stat, event=""):
|
||||
if event and event.type == "DELETED":
|
||||
# The key has been deleted after existing before; terminate this watcher
|
||||
# because this class instance is about to be reaped in Daemon.py
|
||||
return False
|
||||
|
||||
try:
|
||||
data = data.decode('ascii')
|
||||
data = data.decode("ascii")
|
||||
except AttributeError:
|
||||
data = 'off'
|
||||
data = "off"
|
||||
|
||||
if data != self.trust:
|
||||
self.trust = data
|
||||
self.logger.out('Setting trust mode {}'.format(boolToOnOff(self.trust)), state='i', prefix='SR-IOV VF {}'.format(self.vf))
|
||||
common.run_os_command('ip link set {} vf {} trust {}'.format(self.pf, self.vfid, boolToOnOff(self.trust)))
|
||||
self.logger.out(
|
||||
"Setting trust mode {}".format(boolToOnOff(self.trust)),
|
||||
state="i",
|
||||
prefix="SR-IOV VF {}".format(self.vf),
|
||||
)
|
||||
common.run_os_command(
|
||||
"ip link set {} vf {} trust {}".format(
|
||||
self.pf, self.vfid, boolToOnOff(self.trust)
|
||||
)
|
||||
)
|
||||
|
||||
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('node.sriov.vf', self.myhostname) + self.zkhandler.schema.path('sriov_vf.config.query_rss', self.vf))
|
||||
def watch_vf_query_rss(data, stat, event=''):
|
||||
if event and event.type == 'DELETED':
|
||||
@self.zkhandler.zk_conn.DataWatch(
|
||||
self.zkhandler.schema.path("node.sriov.vf", self.myhostname)
|
||||
+ self.zkhandler.schema.path("sriov_vf.config.query_rss", self.vf)
|
||||
)
|
||||
def watch_vf_query_rss(data, stat, event=""):
|
||||
if event and event.type == "DELETED":
|
||||
# The key has been deleted after existing before; terminate this watcher
|
||||
# because this class instance is about to be reaped in Daemon.py
|
||||
return False
|
||||
|
||||
try:
|
||||
data = data.decode('ascii')
|
||||
data = data.decode("ascii")
|
||||
except AttributeError:
|
||||
data = 'off'
|
||||
data = "off"
|
||||
|
||||
if data != self.query_rss:
|
||||
self.query_rss = data
|
||||
self.logger.out('Setting RSS query ability {}'.format(boolToOnOff(self.query_rss)), state='i', prefix='SR-IOV VF {}'.format(self.vf))
|
||||
common.run_os_command('ip link set {} vf {} query_rss {}'.format(self.pf, self.vfid, boolToOnOff(self.query_rss)))
|
||||
self.logger.out(
|
||||
"Setting RSS query ability {}".format(boolToOnOff(self.query_rss)),
|
||||
state="i",
|
||||
prefix="SR-IOV VF {}".format(self.vf),
|
||||
)
|
||||
common.run_os_command(
|
||||
"ip link set {} vf {} query_rss {}".format(
|
||||
self.pf, self.vfid, boolToOnOff(self.query_rss)
|
||||
)
|
||||
)
|
||||
|
@ -33,22 +33,26 @@ class VMConsoleWatcherInstance(object):
|
||||
self.domname = domname
|
||||
self.zkhandler = zkhandler
|
||||
self.config = config
|
||||
self.logfile = '{}/{}.log'.format(config['console_log_directory'], self.domname)
|
||||
self.console_log_lines = config['console_log_lines']
|
||||
self.logfile = "{}/{}.log".format(config["console_log_directory"], self.domname)
|
||||
self.console_log_lines = config["console_log_lines"]
|
||||
self.logger = logger
|
||||
self.this_node = this_node
|
||||
|
||||
# Try to append (create) the logfile and set its permissions
|
||||
open(self.logfile, 'a').close()
|
||||
open(self.logfile, "a").close()
|
||||
os.chmod(self.logfile, 0o600)
|
||||
|
||||
try:
|
||||
self.logdeque = deque(open(self.logfile), self.console_log_lines)
|
||||
except UnicodeDecodeError:
|
||||
# There is corruption in the log file; overwrite it
|
||||
self.logger.out('Failed to decode console log file; clearing existing file', state='w', prefix='Domain {}'.format(self.domuuid))
|
||||
with open(self.logfile, 'w') as lfh:
|
||||
lfh.write('\n')
|
||||
self.logger.out(
|
||||
"Failed to decode console log file; clearing existing file",
|
||||
state="w",
|
||||
prefix="Domain {}".format(self.domuuid),
|
||||
)
|
||||
with open(self.logfile, "w") as lfh:
|
||||
lfh.write("\n")
|
||||
self.logdeque = deque(open(self.logfile), self.console_log_lines)
|
||||
|
||||
self.stamp = None
|
||||
@ -66,13 +70,19 @@ class VMConsoleWatcherInstance(object):
|
||||
def start(self):
|
||||
self.thread_stopper.clear()
|
||||
self.thread = Thread(target=self.run, args=(), kwargs={})
|
||||
self.logger.out('Starting VM log parser', state='i', prefix='Domain {}'.format(self.domuuid))
|
||||
self.logger.out(
|
||||
"Starting VM log parser", state="i", prefix="Domain {}".format(self.domuuid)
|
||||
)
|
||||
self.thread.start()
|
||||
|
||||
# Stop execution thread
|
||||
def stop(self):
|
||||
if self.thread and self.thread.is_alive():
|
||||
self.logger.out('Stopping VM log parser', state='i', prefix='Domain {}'.format(self.domuuid))
|
||||
self.logger.out(
|
||||
"Stopping VM log parser",
|
||||
state="i",
|
||||
prefix="Domain {}".format(self.domuuid),
|
||||
)
|
||||
self.thread_stopper.set()
|
||||
# Do one final flush
|
||||
self.update()
|
||||
@ -91,11 +101,11 @@ class VMConsoleWatcherInstance(object):
|
||||
self.fetch_lines()
|
||||
# Update Zookeeper with the new loglines if they changed
|
||||
if self.loglines != self.last_loglines:
|
||||
self.zkhandler.write([
|
||||
(('domain.console.log', self.domuuid), self.loglines)
|
||||
])
|
||||
self.zkhandler.write(
|
||||
[(("domain.console.log", self.domuuid), self.loglines)]
|
||||
)
|
||||
self.last_loglines = self.loglines
|
||||
|
||||
def fetch_lines(self):
|
||||
self.logdeque = deque(open(self.logfile), self.console_log_lines)
|
||||
self.loglines = ''.join(self.logdeque)
|
||||
self.loglines = "".join(self.logdeque)
|
||||
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user