diff --git a/daemon-common/migrations/versions/0.json b/daemon-common/migrations/versions/0.json new file mode 100644 index 00000000..a0617f80 --- /dev/null +++ b/daemon-common/migrations/versions/0.json @@ -0,0 +1 @@ +{"version": "0", "root": "", "base": {"schema": "/schema", "schema.version": "/schema/version", "config": "/config", "config.maintenance": "/config/maintenance", "config.primary_node": "/config/primary_node", "config.upstream_ip": "/config/upstream_ip", "config.migration_target_selector": "/config/migration_target_selector", "lock": "/locks", "lock.primary_node": "/locks/primary_node", "lock.flush_lock": "/locks/flush_lock", "lock.domain_migrate": "/locks/domain_migrate", "cmd": "/cmd", "cmd.nodes": "/cmd/nodes", "cmd.domains": "/cmd/domains", "cmd.ceph": "/cmd/ceph", "node": "/nodes", "domain": "/domains", "network": "/networks", "storage": "/ceph", "storage.util": "/ceph/util", "osd": "/ceph/osds", "pool": "/ceph/pools", "volume": "/ceph/volumes", "snapshot": "/ceph/snapshots"}, "node": {"keepalive": "/keepalive", "mode": "/daemonmode", "staticdata": "/static_data", "data.kernel": "/oskernelversion", "data.os": "/ostype", "data.arch": "/osarch", "counts.provisioned_domains": "/domainscount", "counts.running_domains": "/runningdomains", "counts.networks": "/networkscount", "state.daemon": "/daemonstate", "state.router": "/routerstate", "state.domain": "/domainstate", "vcpu.total": "/vcputotal", "vcpu.allocated": "/vcpualloc", "memory.total": "/memtotal", "memory.used": "/memused", "memory.free": "/memfree", "memory.allocated": "/memalloc", "memory.provisioned": "/memprov", "ipmi.hostname": "/ipmihostname", "ipmi.username": "/ipmiusername", "ipmi.password": "/ipmipassword"}, "domain": {"name": "", "xml": "/xml", "state": "/state", "profile": "/profile", "stats": "/stats", "node": "/node", "last_node": "/lastnode", "failed_reason": "/failedreason", "console.log": "/consolelog", "console.vnc": "/vnc", "meta.autostart": "/node_autostart", "meta.migrate_method": "/migration_method", "meta.node_selector": "/node_selector", "meta.node_limit": "/node_limit"}, "network": {"type": "/nettype", "rules": "/firewall_rules", "nameservers": "/name_servers", "domain": "/domain", "ip4.gateway": "/ip4_gateway", "ip4.network": "/ip4_network", "ip4.dhcp": "/dhcp4_flag", "ip4.reservations": "/dhcp4_reservations", "ip4.dhcp_start": "/dhcp4_start", "ip4.dhcp_end": "/dhcp4_end", "ip6.gateway": "/ip6_gateway", "ip6.network": "/ip6_network", "ip6.dhcp": "/dhcp6_flag"}, "osd": {"node": "/node", "device": "/device", "stats": "/stats"}, "pool": {"pgs": "/pgs", "stats": "/stats"}, "volume": {"stats": "/stats"}, "snapshot": {"stats": "/stats"}} \ No newline at end of file diff --git a/daemon-common/zkhandler.py b/daemon-common/zkhandler.py index 3843d574..21358310 100644 --- a/daemon-common/zkhandler.py +++ b/daemon-common/zkhandler.py @@ -19,10 +19,13 @@ # ############################################################################### +import os import time import uuid +import json from functools import wraps from kazoo.client import KazooClient, KazooState +from kazoo.exceptions import NoNodeError # @@ -254,7 +257,7 @@ class ZKHandler(object): child_destination_key = "{}/{}".format(destination_key, child_key) rename_element(transaction, child_source_key, child_destination_key) - transaction.delete(source_key, recursive=True) + transaction.delete(source_key) for kkpair in (kkpairs): if type(kkpair) is not tuple: @@ -351,3 +354,364 @@ class ZKHandler(object): continue return lock + + +# +# Schema classes +# +class ZKSchema(object): + # Current version + _version = 0 + + # Root for doing nested keys + _schema_root = '' + + # Primary schema definition for the current version + _schema = { + 'version': f'{_version}', + 'root': f'{_schema_root}', + # Base schema defining core keys; this is all that is initialized on cluster init() + 'base': { + 'schema': f'{_schema_root}/schema', + 'schema.version': f'{_schema_root}/schema/version', + 'config': f'{_schema_root}/config', + 'config.maintenance': f'{_schema_root}/config/maintenance', + 'config.primary_node': f'{_schema_root}/config/primary_node', + 'config.upstream_ip': f'{_schema_root}/config/upstream_ip', + 'config.migration_target_selector': f'{_schema_root}/config/migration_target_selector', + 'lock': f'{_schema_root}/locks', + 'lock.primary_node': f'{_schema_root}/locks/primary_node', + 'lock.flush_lock': f'{_schema_root}/locks/flush_lock', + 'lock.domain_migrate': f'{_schema_root}/locks/domain_migrate', + 'cmd': f'{_schema_root}/cmd', + 'cmd.nodes': f'{_schema_root}/cmd/nodes', + 'cmd.domains': f'{_schema_root}/cmd/domains', + 'cmd.ceph': f'{_schema_root}/cmd/ceph', + 'node': f'{_schema_root}/nodes', + 'domain': f'{_schema_root}/domains', + 'network': f'{_schema_root}/networks', + 'storage': f'{_schema_root}/ceph', + 'storage.util': f'{_schema_root}/ceph/util', + 'osd': f'{_schema_root}/ceph/osds', + 'pool': f'{_schema_root}/ceph/pools', + 'volume': f'{_schema_root}/ceph/volumes', + 'snapshot': f'{_schema_root}/ceph/snapshots', + }, + # The schema of an individual node entry (/nodes/{node_name}) + 'node': { + 'keepalive': '/keepalive', + 'mode': '/daemonmode', + 'staticdata': '/static_data', + 'data.kernel': '/oskernelversion', + 'data.os': '/ostype', + 'data.arch': '/osarch', + 'counts.provisioned_domains': '/domainscount', + 'counts.running_domains': '/runningdomains', + 'counts.networks': '/networkscount', + 'state.daemon': '/daemonstate', + 'state.router': '/routerstate', + 'state.domain': '/domainstate', + 'vcpu.total': '/vcputotal', + 'vcpu.allocated': '/vcpualloc', + 'memory.total': '/memtotal', + 'memory.used': '/memused', + 'memory.free': '/memfree', + 'memory.allocated': '/memalloc', + 'memory.provisioned': '/memprov', + 'ipmi.hostname': '/ipmihostname', + 'ipmi.username': '/ipmiusername', + 'ipmi.password': '/ipmipassword' + }, + # The schema of an individual domain entry (/domains/{domain_uuid}) + 'domain': { + 'name': '', # The root key + 'xml': '/xml', + 'state': '/state', + 'profile': '/profile', + 'stats': '/stats', + 'node': '/node', + 'last_node': '/lastnode', + 'failed_reason': '/failedreason', + 'console.log': '/consolelog', + 'console.vnc': '/vnc', + 'meta.autostart': '/node_autostart', + 'meta.migrate_method': '/migration_method', + 'meta.node_selector': '/node_selector', + 'meta.node_limit': '/node_limit' + }, + # The schema of an individual network entry (/networks/{vni}) + 'network': { + 'type': '/nettype', + 'rules': '/firewall_rules', + 'nameservers': '/name_servers', + 'domain': '/domain', + 'ip4.gateway': '/ip4_gateway', + 'ip4.network': '/ip4_network', + 'ip4.dhcp': '/dhcp4_flag', + 'ip4.reservations': '/dhcp4_reservations', + 'ip4.dhcp_start': '/dhcp4_start', + 'ip4.dhcp_end': '/dhcp4_end', + 'ip6.gateway': '/ip6_gateway', + 'ip6.network': '/ip6_network', + 'ip6.dhcp': '/dhcp6_flag' + }, + # The schema of an individual OSD entry (/ceph/osds/{osd_id}) + 'osd': { + 'node': '/node', + 'device': '/device', + 'stats': '/stats' + }, + # The schema of an individual pool entry (/ceph/pools/{pool_name}) + 'pool': { + 'pgs': '/pgs', + 'stats': '/stats' + }, + # The schema of an individual volume entry (/ceph/volumes/{pool_name}/{volume_name}) + 'volume': { + 'stats': '/stats' + }, + # The schema of an individual snapshot entry (/ceph/volumes/{pool_name}/{volume_name}/{snapshot_name}) + 'snapshot': { + 'stats': '/stats' + } + } + + # Properties + @property + def schema_root(self): + return self._schema_root + + @schema_root.setter + def schema_root(self, schema_root): + self._schema_root = schema_root + + @property + def version(self): + return int(self._version) + + @version.setter + def version(self, version): + self._version = int(version) + + @property + def schema(self): + return self._schema + + @schema.setter + def schema(self, schema): + self._schema = schema + + def __init__(self): + pass + + def __repr__(self): + return f'ZKSchema({self.version})' + + def __lt__(self, other): + if self.version < other.version: + return True + else: + return False + + def __le__(self, other): + if self.version <= other.version: + return True + else: + return False + + def __gt__(self, other): + if self.version > other.version: + return True + else: + return False + + def __ge__(self, other): + if self.version >= other.version: + return True + else: + return False + + def __eq__(self, other): + if self.version == other.version: + return True + else: + return False + + # Load the schema of a given version from a file + def load(self, version): + print(f'Loading schema version {version}') + with open(f'daemon_lib/migrations/versions/{version}.json', 'r') as sfh: + self.schema = json.load(sfh) + self.version = self.schema.get('version') + + # Get key paths + def path(self, ipath, item=None): + itype, *ipath = ipath.split('.') + + if item is None: + return self.schema.get(itype).get('.'.join(ipath)) + else: + base_path = self.schema.get('base').get(itype) + sub_path = self.schema.get(itype).get('.'.join(ipath)) + return f'{base_path}/{item}{sub_path}' + + # Get keys of a schema location + def keys(self, itype=None): + if itype is None: + return list(self.schema.get('base').keys()) + else: + return list(self.schema.get(itype).keys()) + + # Get the active version of a cluster's schema + def get_version(self, zkhandler): + try: + current_version = zkhandler.read(self.path('base.schema.version')) + except NoNodeError: + current_version = 0 + return current_version + + # Validate an active schema against a Zookeeper cluster + def validate(self, zkhandler, path='base'): + for key in self.keys(path): + if not zkhandler.exists(self.path(path + '.' + key)): + print(f"Key not found: {self.path(key)}") + return False + return True + + # Apply the current schema to the cluster + def apply(self, zkhandler, path='base'): + for key in self.keys(path): + if not zkhandler.exists(self.path(path + '.' + key)): + zkhandler.write([ + (self.path(path + '.' + key), '') + ]) + + zkhandler.write([ + (self.path('base.schema.version'), self.version) + ]) + + # Migrate key diffs + def run_migrate(self, zkhandler, changes): + diff_add = changes['add'] + diff_remove = changes['remove'] + diff_rename = changes['rename'] + add_tasks = list() + for key in diff_add.keys(): + add_tasks.append((diff_add[key], '')) + remove_tasks = list() + for key in diff_remove.keys(): + remove_tasks.append(diff_remove[key]) + rename_tasks = list() + for key in diff_rename.keys(): + rename_tasks.append((diff_rename[key]['from'], diff_rename[key]['to'])) + + print(add_tasks) + print(remove_tasks) + print(rename_tasks) + zkhandler.write(add_tasks) + zkhandler.delete(remove_tasks) + zkhandler.rename(rename_tasks) + + # Migrate from older to newer schema + def migrate(self, zkhandler, new_version): + # Determine the versions in between + versions = ZKSchema.find_all(start=self.version, end=new_version) + print(versions) + + for version in versions: + # Create a new schema at that version + zkschema_new = ZKSchema() + zkschema_new.load(version) + # Get a list of changes + changes = ZKSchema.key_diff(self, zkschema_new) + print(changes) + # Apply those changes + self.run_migrate(zkhandler, changes) + # Update the schema version key + zkhandler.write([ + (self.key('base.schema.version'), zkschema_new.version) + ]) + + # Rollback from newer to older schema + def rollback(self, zkhandler, old_version): + # Determine the versions in between + versions = ZKSchema.find_all(start=old_version - 1, end=self.version - 1) + versions.reverse() + print(versions) + + for version in versions: + # Create a new schema at that version + zkschema_old = ZKSchema() + zkschema_old.load(version) + # Get a list of changes + changes = ZKSchema.key_diff(self, zkschema_old) + print(changes) + # Apply those changes + self.run_migrate(zkhandler, changes) + # Update the schema version key + zkhandler.write([ + (self.key('base.schema.version'), zkschema_old.version) + ]) + + @classmethod + def key_diff(cls, schema_a, schema_b): + # schema_a = current + # schema_b = new + + diff_add = dict() + diff_remove = dict() + diff_rename = dict() + + # Parse through each core element + for elem in ['base', 'node', 'domain', 'network', 'osd', 'pool', 'volume', 'snapshot']: + set_a = set(schema_a.keys(elem)) + set_b = set(schema_b.keys(elem)) + diff_keys = set_a ^ set_b + + for item in diff_keys: + elem_item = elem + '.' + item + if item not in schema_a.keys(elem) and item in schema_b.keys(elem): + diff_add[elem_item] = schema_b.path(elem_item) + if item in schema_a.keys(elem) and item not in schema_b.keys(elem): + diff_remove[elem_item] = schema_a.path(elem_item) + + for item in set_b: + elem_item = elem + '.' + item + if schema_a.path(elem_item) is not None and \ + schema_b.path(elem_item) is not None and \ + schema_a.path(elem_item) != schema_b.path(elem_item): + diff_rename[elem_item] = {'from': schema_a.path(elem_item), 'to': schema_b.path(elem_item)} + + return {'add': diff_add, 'remove': diff_remove, 'rename': diff_rename} + + # Load in the schemal of the current cluster + @classmethod + def load_current(cls, zkhandler): + new_instance = cls() + version = new_instance.get_version(zkhandler) + new_instance.load(version) + return new_instance + + # Write the latest schema to a file + @classmethod + def write(cls): + schema_file = 'daemon_lib/migrations/versions/{}.json'.format(cls._version) + with open(schema_file, 'w') as sfh: + json.dump(cls._schema, sfh) + + # Static methods for reading information from the files + @staticmethod + def find_all(start=0, end=None): + versions = list() + for version in os.listdir('daemon_lib/migrations/versions'): + sequence_id = int(version.split('.')[0]) + if end is None: + if sequence_id > start: + versions.append(sequence_id) + else: + if sequence_id > start and sequence_id <= end: + versions.append(sequence_id) + if len(versions) > 0: + return versions + else: + return None diff --git a/node-daemon/test-schema.py b/node-daemon/test-schema.py new file mode 100755 index 00000000..977e13e5 --- /dev/null +++ b/node-daemon/test-schema.py @@ -0,0 +1,43 @@ +#!/usr/bin/env python3 + +# flake8: noqa + +import sys +import datetime +from daemon_lib.zkhandler import ZKHandler, ZKSchema + +ZKSchema.write() + +sys.exit(0) + +print(datetime.datetime.now()) +zkhandler = ZKHandler({'coordinators': ['hv1.tc', 'hv2.tc', 'hv3.tc']}) +zkhandler.connect() +print(datetime.datetime.now()) + +zkschema = ZKSchema.load_current(zkhandler) + +#print(zkschema.path('base.schema.version')) +#print(zkschema.path('node.state.daemon', 'hv1')) +#print(zkschema.path('domain.state', 'test1')) +#print(zkschema.keys('base')) +#print(zkschema.keys('node')) + + +zkschema.validate(zkhandler) +zkschema.apply(zkhandler) + +zkschema_latest = ZKSchema() +#if zkschema < zkschema_latest: +# print("I'm older") +#elif zkschema == zkschema_latest: +# print("I'm the same") +#elif zkschema > zkschema_latest: +# print("I'm newer") + +#diff = ZKSchema.key_diff(zkschema, zkschema_latest) +zkschema.migrate(zkhandler, zkschema_latest.version) + +#zkschema_earliest = ZKSchema() +#zkschema_earliest.load(0) +#zkschema.rollback(zkhandler, zkschema_earliest.version)