From 5540bdc86b78155b9636f7d5354c14c0b28cc2fa Mon Sep 17 00:00:00 2001 From: "Joshua M. Boniface" Date: Tue, 8 Jun 2021 23:17:07 -0400 Subject: [PATCH] Add automatic schema upgrade to nodes Performs an automatic schema upgrade when all nodes are updated to the latest version. Addresses #129 --- daemon-common/migrations/versions/0.json | 2 +- daemon-common/zkhandler.py | 14 ++-- node-daemon/pvcnoded/Daemon.py | 87 +++++++++++++++++++++++- node-daemon/test-schema.py | 43 ------------ 4 files changed, 96 insertions(+), 50 deletions(-) delete mode 100755 node-daemon/test-schema.py diff --git a/daemon-common/migrations/versions/0.json b/daemon-common/migrations/versions/0.json index edab15b2..00be1b59 100644 --- a/daemon-common/migrations/versions/0.json +++ b/daemon-common/migrations/versions/0.json @@ -1 +1 @@ -{"version": "0", "root": "", "base": {"schema": "/schema", "schema.version": "/schema/version", "config": "/config", "config.maintenance": "/config/maintenance", "config.primary_node": "/config/primary_node", "config.upstream_ip": "/config/upstream_ip", "config.migration_target_selector": "/config/migration_target_selector", "lock": "/locks", "lock.primary_node": "/locks/primary_node", "lock.flush_lock": "/locks/flush_lock", "lock.domain_migrate": "/locks/domain_migrate", "cmd": "/cmd", "cmd.nodes": "/cmd/nodes", "cmd.domains": "/cmd/domains", "cmd.ceph": "/cmd/ceph", "node": "/nodes", "domain": "/domains", "network": "/networks", "storage": "/ceph", "storage.util": "/ceph/util", "osd": "/ceph/osds", "pool": "/ceph/pools", "volume": "/ceph/volumes", "snapshot": "/ceph/snapshots"}, "node": {"keepalive": "/keepalive", "mode": "/daemonmode", "data.active_schema": "/activeschema", "data.static": "/staticdata", "counts.provisioned_domains": "/domainscount", "counts.running_domains": "/runningdomains", "counts.networks": "/networkscount", "state.daemon": "/daemonstate", "state.router": "/routerstate", "state.domain": "/domainstate", "vcpu.allocated": "/vcpualloc", "memory.total": "/memtotal", "memory.used": "/memused", "memory.free": "/memfree", "memory.allocated": "/memalloc", "memory.provisioned": "/memprov", "ipmi.hostname": "/ipmihostname", "ipmi.username": "/ipmiusername", "ipmi.password": "/ipmipassword"}, "domain": {"name": "", "xml": "/xml", "state": "/state", "profile": "/profile", "stats": "/stats", "node": "/node", "last_node": "/lastnode", "failed_reason": "/failedreason", "console.log": "/consolelog", "console.vnc": "/vnc", "meta.autostart": "/node_autostart", "meta.migrate_method": "/migration_method", "meta.node_selector": "/node_selector", "meta.node_limit": "/node_limit"}, "network": {"type": "/nettype", "rules": "/firewall_rules", "nameservers": "/name_servers", "domain": "/domain", "ip4.gateway": "/ip4_gateway", "ip4.network": "/ip4_network", "ip4.dhcp": "/dhcp4_flag", "ip4.reservations": "/dhcp4_reservations", "ip4.dhcp_start": "/dhcp4_start", "ip4.dhcp_end": "/dhcp4_end", "ip6.gateway": "/ip6_gateway", "ip6.network": "/ip6_network", "ip6.dhcp": "/dhcp6_flag"}, "osd": {"node": "/node", "device": "/device", "stats": "/stats"}, "pool": {"pgs": "/pgs", "stats": "/stats"}, "volume": {"stats": "/stats"}, "snapshot": {"stats": "/stats"}} \ No newline at end of file +{"version": "0", "root": "", "base": {"schema": "/schema", "schema.version": "/schema/version", "config": "/config", "config.maintenance": "/config/maintenance", "config.primary_node": "/config/primary_node", "config.upstream_ip": "/config/upstream_ip", "config.migration_target_selector": "/config/migration_target_selector", "lock": "/locks", "lock.primary_node": "/locks/primary_node", "lock.flush_lock": "/locks/flush_lock", "lock.domain_migrate": "/locks/domain_migrate", "cmd": "/cmd", "cmd.nodes": "/cmd/nodes", "cmd.domains": "/cmd/domains", "cmd.ceph": "/cmd/ceph", "node": "/nodes", "domain": "/domains", "network": "/networks", "storage": "/ceph", "storage.util": "/ceph/util", "osd": "/ceph/osds", "pool": "/ceph/pools", "volume": "/ceph/volumes", "snapshot": "/ceph/snapshots"}, "node": {"keepalive": "/keepalive", "mode": "/daemonmode", "data.active_schema": "/activeschema", "data.latest_schema": "/latestschema", "data.static": "/staticdata", "counts.provisioned_domains": "/domainscount", "counts.running_domains": "/runningdomains", "counts.networks": "/networkscount", "state.daemon": "/daemonstate", "state.router": "/routerstate", "state.domain": "/domainstate", "vcpu.allocated": "/vcpualloc", "memory.total": "/memtotal", "memory.used": "/memused", "memory.free": "/memfree", "memory.allocated": "/memalloc", "memory.provisioned": "/memprov", "ipmi.hostname": "/ipmihostname", "ipmi.username": "/ipmiusername", "ipmi.password": "/ipmipassword"}, "domain": {"name": "", "xml": "/xml", "state": "/state", "profile": "/profile", "stats": "/stats", "node": "/node", "last_node": "/lastnode", "failed_reason": "/failedreason", "console.log": "/consolelog", "console.vnc": "/vnc", "meta.autostart": "/node_autostart", "meta.migrate_method": "/migration_method", "meta.node_selector": "/node_selector", "meta.node_limit": "/node_limit"}, "network": {"type": "/nettype", "rules": "/firewall_rules", "nameservers": "/name_servers", "domain": "/domain", "ip4.gateway": "/ip4_gateway", "ip4.network": "/ip4_network", "ip4.dhcp": "/dhcp4_flag", "ip4.reservations": "/dhcp4_reservations", "ip4.dhcp_start": "/dhcp4_start", "ip4.dhcp_end": "/dhcp4_end", "ip6.gateway": "/ip6_gateway", "ip6.network": "/ip6_network", "ip6.dhcp": "/dhcp6_flag"}, "osd": {"node": "/node", "device": "/device", "stats": "/stats"}, "pool": {"pgs": "/pgs", "stats": "/stats"}, "volume": {"stats": "/stats"}, "snapshot": {"stats": "/stats"}} \ No newline at end of file diff --git a/daemon-common/zkhandler.py b/daemon-common/zkhandler.py index 94943789..d7995471 100644 --- a/daemon-common/zkhandler.py +++ b/daemon-common/zkhandler.py @@ -402,6 +402,7 @@ class ZKSchema(object): 'keepalive': '/keepalive', 'mode': '/daemonmode', 'data.active_schema': '/activeschema', + 'data.latest_schema': '/latestschema', 'data.static': '/staticdata', 'counts.provisioned_domains': '/domainscount', 'counts.running_domains': '/runningdomains', @@ -724,10 +725,6 @@ class ZKSchema(object): print(changes) # Apply those changes self.run_migrate(zkhandler, changes) - # Update the schema version key - zkhandler.write([ - (self.key('base.schema.version'), zkschema_new.version) - ]) # Rollback from newer to older schema def rollback(self, zkhandler, old_version): @@ -812,3 +809,12 @@ class ZKSchema(object): return versions else: return None + + @staticmethod + def find_latest(): + latest_version = 0 + for version in os.listdir('daemon_lib/migrations/versions'): + sequence_id = int(version.split('.')[0]) + if sequence_id > latest_version: + latest_version = sequence_id + return latest_version diff --git a/node-daemon/pvcnoded/Daemon.py b/node-daemon/pvcnoded/Daemon.py index cc9ca360..90602d97 100644 --- a/node-daemon/pvcnoded/Daemon.py +++ b/node-daemon/pvcnoded/Daemon.py @@ -21,7 +21,6 @@ import kazoo.client import libvirt -import sys import os import signal import psutil @@ -74,6 +73,9 @@ version = '0.9.19' # Daemon functions ############################################################################### +# Ensure the update_timer is None until it's set for real +update_timer = None + # Create timer to update this node in Zookeeper def startKeepaliveTimer(): @@ -542,8 +544,15 @@ except Exception: (zkschema.path('node.data.active_schema', myhostname), node_schema_version) ]) +# Load in the current node schema version zkschema.load(node_schema_version) +# Record the latest intalled schema version +latest_schema_version = ZKSchema.find_latest() +zkhandler.write([ + (zkschema.path('node.data.latest_schema', myhostname), latest_schema_version) +]) + # Validate our schema against that version if not zkschema.validate(zkhandler, logger): logger.out('Found schema violations, applying', state='i') @@ -552,6 +561,80 @@ else: logger.out('Schema successfully validated', state='o') +# Watch for a global schema update and fire +# This will only change by the API when triggered after seeing all nodes can update +@zkhandler.zk_conn.DataWatch(zkschema.path('base.schema.version')) +def update_schema(new_schema_version, stat, event=''): + global zkschema, zkhandler, update_timer + + new_schema_version = int(new_schema_version.decode('ascii')) + + if new_schema_version == node_schema_version: + return + + logger.out('Hot update of schema version started', state='s') + + # Prevent any keepalive updates while this happens + if update_timer is not None: + stopKeepaliveTimer() + + # Perform the migration (primary only) + if zkhandler.read(zkschema.path('base.config.primary_node')) == myhostname: + logger.out('Primary node acquiring exclusive lock', state='s') + # Wait for things to settle + time.sleep(0.5) + # Acquire a write lock on the root key + with zkhandler.exclusivelock('/'): + # Perform the schema migration tasks + logger.out('Performing schema update', state='s') + zkschema.migrate(zkhandler, latest_schema_version) + # Wait for the exclusive lock to be lifted + else: + logger.out('Non-primary node acquiring read lock', state='s') + # Wait for things to settle + time.sleep(1) + # Wait for a read lock + lock = zkhandler.readlock('/') + lock.acquire() + # Wait a bit more for the primary to return to normal + time.sleep(1) + + # Update the local schema version + logger.out('Updating local schema', state='s') + zkschema.load(new_schema_version) + zkhandler.write([ + (zkschema.path('node.data.active_schema', myhostname), new_schema_version) + ]) + + # Restart the zookeeper connection + logger.out('Restarting Zookeeper connection', state='s') + zkhandler.disconnect() + time.sleep(1) + zkhandler.connect(persistent=True) + + # Restart the update timer + if update_timer is not None: + update_timer = startKeepaliveTimer() + + # Restart the API daemons if applicable + if zkhandler.read(zkschema.path('base.config.primary_node')) == myhostname: + common.run_os_command('systemctl start pvcapid.service') + common.run_os_command('systemctl start pvcapid-worker.service') + + +# If we are the last node to get a schema update, fire the master update +if latest_schema_version > node_schema_version: + node_latest_schema_version = list() + for node in zkhandler.children(zkschema.path('base.node')): + node_latest_schema_version.append(zkhandler.read(zkschema.path('node.data.latest_schema', node))) + + # This is true if all elements of the latest schema version are identical to the latest version, + # i.e. they have all had the latest schema installed and ready to load. + if node_latest_schema_version.count(latest_schema_version) == len(node_latest_schema_version): + zkhandler.write([ + (zkschema.path('base.schema.version'), latest_schema_version) + ]) + ############################################################################### # PHASE 5 - Gracefully handle termination ############################################################################### @@ -624,7 +707,7 @@ def cleanup(): pass logger.out('Terminated pvc daemon', state='s') - sys.exit(0) + os._exit(0) # Termination function diff --git a/node-daemon/test-schema.py b/node-daemon/test-schema.py deleted file mode 100755 index 977e13e5..00000000 --- a/node-daemon/test-schema.py +++ /dev/null @@ -1,43 +0,0 @@ -#!/usr/bin/env python3 - -# flake8: noqa - -import sys -import datetime -from daemon_lib.zkhandler import ZKHandler, ZKSchema - -ZKSchema.write() - -sys.exit(0) - -print(datetime.datetime.now()) -zkhandler = ZKHandler({'coordinators': ['hv1.tc', 'hv2.tc', 'hv3.tc']}) -zkhandler.connect() -print(datetime.datetime.now()) - -zkschema = ZKSchema.load_current(zkhandler) - -#print(zkschema.path('base.schema.version')) -#print(zkschema.path('node.state.daemon', 'hv1')) -#print(zkschema.path('domain.state', 'test1')) -#print(zkschema.keys('base')) -#print(zkschema.keys('node')) - - -zkschema.validate(zkhandler) -zkschema.apply(zkhandler) - -zkschema_latest = ZKSchema() -#if zkschema < zkschema_latest: -# print("I'm older") -#elif zkschema == zkschema_latest: -# print("I'm the same") -#elif zkschema > zkschema_latest: -# print("I'm newer") - -#diff = ZKSchema.key_diff(zkschema, zkschema_latest) -zkschema.migrate(zkhandler, zkschema_latest.version) - -#zkschema_earliest = ZKSchema() -#zkschema_earliest.load(0) -#zkschema.rollback(zkhandler, zkschema_earliest.version)