Compare commits

..

52 Commits

Author SHA1 Message Date
f1df1cfe93 Bump version to 0.9.55 2022-10-04 13:21:40 -04:00
5942aa50fc Avoid raise/handle deadlocks
Can cause log flooding in some edge cases and isn't really needed any
longer. Use a proper conditional followed by an actual error handler.
2022-10-03 14:04:12 -04:00
096bcdfd75 Try a literal eval first
This is a breakage between the older version of Celery (Deb10) and
newer. The hard removal broke Deb10 instances.

So try that first, and on failure, assume newer Celery format.
2022-09-06 10:34:50 -04:00
239c392892 Bump version to 0.9.54 2022-08-23 11:01:05 -04:00
172d0a86e4 Use proper SSLContext and enable TLSv1
It's bad, but sometimes you need to access the API from a very old
software version. So just enable it for now and clean it up later.
2022-08-23 10:58:47 -04:00
d8e57a26c5 Fix bad variable name 2022-08-18 11:37:57 -04:00
9b499b9f48 Bump version to 0.9.53 2022-08-12 17:47:11 -04:00
881550b610 Actually fix VM sorting
Due to the executor the previous attempt did not work.
2022-08-12 17:46:29 -04:00
2a21d48128 Bump version to 0.9.52 2022-08-12 11:09:25 -04:00
8d0f26ff7a Add additional kb_ values to OSD stats
Allows for easier parsing later to get e.g. % values and more details on
the used amounts.
2022-08-11 11:06:36 -04:00
bcabd7d079 Always sort VM list
Same justification as previous commit.
2022-08-09 12:05:40 -04:00
05a316cdd6 Ensure the node list is sorted
Otherwise the node entries could come back in an arbitrary order; since
this is an ordered list of dictionaries that might not be expected by
the API consumers, so ensure it's always sorted.
2022-08-09 12:03:49 -04:00
4b36753f27 Add reference to bootstrap in index 2022-08-03 20:22:16 -04:00
171f6ac9ed Add missing cluster_req for vm modify 2022-08-02 10:02:26 -04:00
645b525ad7 Bump version to 0.9.51 2022-07-25 23:25:41 -04:00
ec559aec0d Remove pvc-flush service
This service caused more headaches than it was worth, so remove it.

The original goal was to cleanly flush nodes on shutdown and unflush
them on startup, but this is tightly controlled by Ansible playbooks at
this point, and this is something best left to the Administrator and
their particular situation anyways.
2022-07-25 23:21:34 -04:00
71ffd5a191 Add confirmation to disable command 2022-07-21 16:43:37 -04:00
2739c27299 Remove faulty literal_eval 2022-07-18 13:35:15 -04:00
56129a3636 Fix bad changelog entries 2022-07-06 16:57:55 -04:00
932b3c55a3 Bump version to 0.9.50 2022-07-06 16:01:14 -04:00
92e2ff7449 Fix bug with space-containing detect strings 2022-07-06 15:58:57 -04:00
d8d3feee22 Add selector help and adjust flag name
1. Add documentation on the node selector flags. In the API, reference
the daemon configuration manual which now includes details in this
section; in the CLI, provide the help in "pvc vm define" in detail and
then reference that command's help in the other commands that use this
field.

2. Ensure the naming is consistent in the CLI, using the flag name
"--node-selector" everywhere (was "--selector" for "pvc vm" commands and
"--node-selector" for "pvc provisioner" commands).
2022-06-10 02:42:06 -04:00
b1357cafdb Add memfree to selector and use proper defaults 2022-06-10 02:03:12 -04:00
f8cdcb30ba Add migration selector via free memory
Closes #152
2022-05-18 03:47:16 -04:00
51ad2058ed Bump version to 0.9.49 2022-05-06 15:49:39 -04:00
c401a1f655 Use consistent language for primary mode
I didn't call it "router" anywhere else, but the state in the list is
called "coordinator" so, call it "coordinator mode".
2022-05-06 15:40:52 -04:00
7a40c7a55b Add support for replacing/refreshing OSDs
Adds commands to both replace an OSD disk, and refresh (reimport) an
existing OSD disk on a new node. This handles the cases where an OSD
disk should be replaced (either due to upgrades or failures) or where a
node is rebuilt in-place and an existing OSD must be re-imported to it.

This should avoid the need to do a full remove/add sequence for either
case.

Also cleans up some aspects of OSD removal that are identical between
methods (e.g. using safe-to-destroy and sleeping after stopping) and
fixes a bug if an OSD does not truly exist when the daemon starts up.
2022-05-06 15:32:06 -04:00
8027a6efdc Improve handling of rounded values 2022-05-02 15:29:30 -04:00
3801fcc07b Fix bug with initial JSON for stats 2022-05-02 13:28:19 -04:00
c741900baf Refactor OSD removal to use new ZK data
With the OSD LVM information stored in Zookeeper, we can use this to
determine the actual block device to zap rather than relying on runtime
determination and guestimation.
2022-05-02 12:52:22 -04:00
464f0e0356 Store additional OSD information in ZK
Ensures that information like the FSIDs and the OSD LVM volume are
stored in Zookeeper at creation time and updated at daemon start time
(to ensure the data is populated at least once, or if the /dev/sdX
path changes).

This will allow safer operation of OSD removals and the potential
implementation of re-activation after node replacements.
2022-05-02 12:11:39 -04:00
cea8832f90 Ensure initial OSD stats is populated
Values are all invalid but this ensures the client won't error out when
trying to show an OSD that has never checked in yet.
2022-04-29 16:50:30 -04:00
5807351405 Bump version to 0.9.48 2022-04-29 15:03:52 -04:00
d6ca74376a Fix bugs with forced removal 2022-04-29 14:03:07 -04:00
413100a147 Ensure unresponsive OSDs still display in list
It is still useful to see such dead OSDs even if they've never checked
in or have not checked in for quite some time.
2022-04-29 12:11:52 -04:00
4d698be34b Add OSD removal force option
Ensures a removal can continue even in situations where some step(s)
might fail, for instance removing an obsolete OSD from a replaced node.
2022-04-29 11:16:33 -04:00
53aed0a735 Use a singular configured cluster by default
If there is...
  1. No '--cluster' passed, and
  2. No 'local' cluster, and
  3. There is exactly one cluster configured
...then use that cluster by default in the CLI.
2022-01-13 18:36:20 -05:00
ea709f573f Bump version to 0.9.47 2021-12-28 22:03:08 -05:00
1142454934 Add pool PGs count modification
Allows an administrator to adjust the PG count of a given pool. This can
be used to increase the PGs (for example after adding more OSDs) or
decrease it (to remove OSDs, reduce CPU load, etc.).
2021-12-28 21:53:29 -05:00
bbfad340a1 Add PGs count to pool list 2021-12-28 21:12:02 -05:00
c73939e1c5 Fix issue if pool stats have not updated yet 2021-12-28 21:03:10 -05:00
25fe45dd28 Add device class tiers to Ceph pools
Allows specifying a particular device class ("tier") for a given pool,
for instance SSD-only or NVMe-only. This is implemented with Crush
rules on the Ceph side, and via an additional new key in the pool
Zookeeper schema which is defaulted to "default".
2021-12-28 20:58:15 -05:00
58d57d7037 Bump version to 0.9.46 2021-12-28 15:02:14 -05:00
00d2c67c41 Allow single-node clusters to restart and timeout
Prevents a daemon from waiting forever to terminate if it is primary,
and avoids this entirely if there is only a single node in the cluster.
2021-12-28 03:06:03 -05:00
67131de4f6 Fix bug when removing OSDs
Ensure the OSD is down as well as out or purge might fail.
2021-12-28 03:05:34 -05:00
abc23ebb18 Handle detect strings as arguments for blockdevs
Allows specifying blockdevs in the OSD and OSD-DB addition commands as
detect strings rather than actual block device paths. This provides
greater flexibility for automation with pvcbootstrapd (which originates
the concept of detect strings) and in general usage as well.
2021-12-28 02:53:02 -05:00
9f122e916f Allow bypassing confirm message for benchmarks 2021-12-23 21:00:42 -05:00
3ce4d90693 Add auditing to local syslog from PVC client
This ensures that any client command is logged by the local system.
Helps ensure Accounting for users of the CLI. Currently logs the full
command executed along with the $USER environment variable contents.
2021-12-10 16:17:33 -05:00
6ccd19e636 Standardize fuzzy matching and use fullmatch
Solves two problems:

1. How match fuzziness was used was very inconsistent; make them all the
same, i.e. "if is_fuzzy and limit, apply .* to both sides".

2. Use re.fullmatch instead of re.match to ensure exact matching of the
regex to the value. Without fuzziness, this would sometimes cause
inconsistent behavior, for instance if a limit was non-fuzzy "vm",
expecting to match the actual "vm", but also matching "vm1" too.
2021-12-06 16:35:29 -05:00
d8689e6eaa Remove "and started" from message text
This is not necessarily the case.
2021-11-29 16:42:26 -05:00
bc49b5eca2 Fix bug with cloned image sizes 2021-11-29 14:56:50 -05:00
8470dfaa29 Fix bugs with legacy benchmark format 2021-11-26 11:42:35 -05:00
30 changed files with 2045 additions and 320 deletions

View File

@ -1 +1 @@
0.9.45
0.9.55

View File

@ -1,5 +1,70 @@
## PVC Changelog
###### [v0.9.55](https://github.com/parallelvirtualcluster/pvc/releases/tag/v0.9.55)
* Fixes a problem with the literal eval handler in the provisioner (again)
* Fixes a potential log deadlock in Zookeeper-lost situations when doing keepalives
###### [v0.9.54](https://github.com/parallelvirtualcluster/pvc/releases/tag/v0.9.54)
[CLI Client] Fixes a bad variable reference from the previous change
[API Daemon] Enables TLSv1 with an SSLContext object for maximum compatibility
###### [v0.9.53](https://github.com/parallelvirtualcluster/pvc/releases/tag/v0.9.53)
* [API] Fixes sort order of VM list (for real this time)
###### [v0.9.52](https://github.com/parallelvirtualcluster/pvc/releases/tag/v0.9.52)
* [CLI] Fixes a bug with vm modify not requiring a cluster
* [Docs] Adds a reference to the bootstrap daemon
* [API] Adds sorting to node and VM lists for consistency
* [Node Daemon/API] Adds kb_ stats values for OSD stats
###### [v0.9.51](https://github.com/parallelvirtualcluster/pvc/releases/tag/v0.9.51)
* [CLI Client] Fixes a faulty literal_eval when viewing task status
* [CLI Client] Adds a confirmation flag to the vm disable command
* [Node Daemon] Removes the pvc-flush service
###### [v0.9.50](https://github.com/parallelvirtualcluster/pvc/releases/tag/v0.9.50)
* [Node Daemon/API/CLI] Adds free memory node selector
* [Node Daemon] Fixes bug sending space-containing detect disk strings
###### [v0.9.49](https://github.com/parallelvirtualcluster/pvc/releases/tag/v0.9.49)
* [Node Daemon] Fixes bugs with OSD stat population on creation
* [Node Daemon/API] Adds additional information to Zookeeper about OSDs
* [Node Daemon] Refactors OSD removal for improved safety
* [Node Daemon/API/CLI] Adds explicit support for replacing and refreshing (reimporting) OSDs
* [API/CLI] Fixes a language inconsistency about "router mode"
###### [v0.9.48](https://github.com/parallelvirtualcluster/pvc/releases/tag/v0.9.48)
* [CLI] Fixes situation where only a single cluster is available
* [CLI/API/Daemon] Allows forcing of OSD removal ignoring errors
* [CLI] Fixes bug where down OSDs are not displayed
###### [v0.9.47](https://github.com/parallelvirtualcluster/pvc/releases/tag/v0.9.47)
* [Node Daemon/API/CLI] Adds Ceph pool device class/tier support
* [API] Fixes a bug returning values if a Ceph pool has not yet reported stats
* [API/CLI] Adds PGs count to the pool list output
* [API/CLI] Adds Ceph pool PGs count adjustment support
###### [v0.9.46](https://github.com/parallelvirtualcluster/pvc/releases/tag/v0.9.46)
* [API] Fixes bugs with legacy benchmark display
* [API] Fixes a bug around cloned image sizes
* [API] Removes extraneous message text in provisioner create command
* [API] Corrects bugs around fuzzy matching
* [CLI] Adds auditing for PVC CLI to local syslog
* [CLI] Adds --yes bypass for benchmark command
* [Node Daemon/API/CLI] Adds support for "detect" strings when specifying OSD or OSDDB devices
* [Node Daemon] Fixes a bug when removing OSDs
* [Node Daemon] Fixes a single-node cluster shutdown deadlock
###### [v0.9.45](https://github.com/parallelvirtualcluster/pvc/releases/tag/v0.9.45)
* [Node Daemon] Fixes an ordering issue with pvcnoded.service

View File

@ -19,7 +19,7 @@ As a consequence of its features, PVC makes administrating very high-uptime VMs
PVC also features an optional, fully customizable VM provisioning framework, designed to automate and simplify VM deployments using custom provisioning profiles, scripts, and CloudInit userdata API support.
Installation of PVC is accomplished by two main components: a [Node installer ISO](https://github.com/parallelvirtualcluster/pvc-installer) which creates on-demand installer ISOs, and an [Ansible role framework](https://github.com/parallelvirtualcluster/pvc-ansible) to configure, bootstrap, and administrate the nodes. Once up, the cluster is managed via an HTTP REST API, accessible via a Python Click CLI client or WebUI.
Installation of PVC is accomplished by two main components: a [Node installer ISO](https://github.com/parallelvirtualcluster/pvc-installer) which creates on-demand installer ISOs, and an [Ansible role framework](https://github.com/parallelvirtualcluster/pvc-ansible) to configure, bootstrap, and administrate the nodes. Installation can also be fully automated with a companion [cluster bootstrapping system](https://github.com/parallelvirtualcluster/pvc-bootstrap). Once up, the cluster is managed via an HTTP REST API, accessible via a Python Click CLI client or WebUI.
Just give it physical servers, and it will run your VMs without you having to think about it, all in just an hour or two of setup time.

View File

@ -22,10 +22,12 @@
import os
import yaml
from ssl import SSLContext, TLSVersion
from distutils.util import strtobool as dustrtobool
# Daemon version
version = "0.9.45"
version = "0.9.55"
# API version
API_VERSION = 1.0
@ -123,7 +125,10 @@ def entrypoint():
import pvcapid.flaskapi as pvc_api # noqa: E402
if config["ssl_enabled"]:
context = (config["ssl_cert_file"], config["ssl_key_file"])
context = SSLContext()
context.minimum_version = TLSVersion.TLSv1
context.get_ca_certs()
context.load_cert_chain(config["ssl_cert_file"], keyfile=config["ssl_key_file"])
else:
context = None

View File

@ -1002,7 +1002,7 @@ class API_VM_Root(Resource):
type: string
node_selector:
type: string
description: The selector used to determine candidate nodes during migration
description: The selector used to determine candidate nodes during migration; see 'target_selector' in the node daemon configuration reference
node_autostart:
type: boolean
description: Whether to autostart the VM when its node returns to ready domain state
@ -1252,7 +1252,7 @@ class API_VM_Root(Resource):
{"name": "node"},
{
"name": "selector",
"choices": ("mem", "vcpus", "load", "vms", "none"),
"choices": ("mem", "memfree", "vcpus", "load", "vms", "none"),
"helptext": "A valid selector must be specified",
},
{"name": "autostart"},
@ -1297,13 +1297,15 @@ class API_VM_Root(Resource):
name: selector
type: string
required: false
description: The selector used to determine candidate nodes during migration
default: mem
description: The selector used to determine candidate nodes during migration; see 'target_selector' in the node daemon configuration reference
default: none
enum:
- mem
- memfree
- vcpus
- load
- vms
- none (cluster default)
- in: query
name: autostart
type: boolean
@ -1397,7 +1399,7 @@ class API_VM_Element(Resource):
{"name": "node"},
{
"name": "selector",
"choices": ("mem", "vcpus", "load", "vms", "none"),
"choices": ("mem", "memfree", "vcpus", "load", "vms", "none"),
"helptext": "A valid selector must be specified",
},
{"name": "autostart"},
@ -1444,10 +1446,11 @@ class API_VM_Element(Resource):
name: selector
type: string
required: false
description: The selector used to determine candidate nodes during migration
default: mem
description: The selector used to determine candidate nodes during migration; see 'target_selector' in the node daemon configuration reference
default: none
enum:
- mem
- memfree
- vcpus
- load
- vms
@ -1626,7 +1629,7 @@ class API_VM_Metadata(Resource):
type: string
node_selector:
type: string
description: The selector used to determine candidate nodes during migration
description: The selector used to determine candidate nodes during migration; see 'target_selector' in the node daemon configuration reference
node_autostart:
type: string
description: Whether to autostart the VM when its node returns to ready domain state
@ -1646,7 +1649,7 @@ class API_VM_Metadata(Resource):
{"name": "limit"},
{
"name": "selector",
"choices": ("mem", "vcpus", "load", "vms", "none"),
"choices": ("mem", "memfree", "vcpus", "load", "vms", "none"),
"helptext": "A valid selector must be specified",
},
{"name": "autostart"},
@ -1675,12 +1678,14 @@ class API_VM_Metadata(Resource):
name: selector
type: string
required: false
description: The selector used to determine candidate nodes during migration
description: The selector used to determine candidate nodes during migration; see 'target_selector' in the node daemon configuration reference
enum:
- mem
- memfree
- vcpus
- load
- vms
- none (cluster default)
- in: query
name: autostart
type: boolean
@ -3849,7 +3854,7 @@ class API_Storage_Ceph_OSDDB_Root(Resource):
{
"name": "device",
"required": True,
"helptext": "A valid device must be specified.",
"helptext": "A valid device or detect string must be specified.",
},
]
)
@ -3871,7 +3876,7 @@ class API_Storage_Ceph_OSDDB_Root(Resource):
name: device
type: string
required: true
description: The block device (e.g. "/dev/sdb", "/dev/disk/by-path/...", etc.) to create the OSD DB volume group on
description: The block device (e.g. "/dev/sdb", "/dev/disk/by-path/...", etc.) or detect string ("detect:NAME:SIZE:ID") to create the OSD DB volume group on
responses:
200:
description: OK
@ -4003,7 +4008,7 @@ class API_Storage_Ceph_OSD_Root(Resource):
{
"name": "device",
"required": True,
"helptext": "A valid device must be specified.",
"helptext": "A valid device or detect string must be specified.",
},
{
"name": "weight",
@ -4040,7 +4045,7 @@ class API_Storage_Ceph_OSD_Root(Resource):
name: device
type: string
required: true
description: The block device (e.g. "/dev/sdb", "/dev/disk/by-path/...", etc.) to create the OSD on
description: The block device (e.g. "/dev/sdb", "/dev/disk/by-path/...", etc.) or detect string ("detect:NAME:SIZE:ID") to create the OSD on
- in: query
name: weight
type: number
@ -4099,11 +4104,112 @@ class API_Storage_Ceph_OSD_Element(Resource):
@RequestParser(
[
{
"name": "device",
"required": True,
"helptext": "A valid device or detect string must be specified.",
},
{
"name": "weight",
"required": True,
"helptext": "An OSD weight must be specified.",
},
{
"name": "yes-i-really-mean-it",
"required": True,
"helptext": "Please confirm that 'yes-i-really-mean-it'.",
}
},
]
)
@Authenticator
def post(self, osdid, reqargs):
"""
Replace a Ceph OSD in the cluster
Note: This task may take up to 30s to complete and return
---
tags:
- storage / ceph
parameters:
- in: query
name: device
type: string
required: true
description: The block device (e.g. "/dev/sdb", "/dev/disk/by-path/...", etc.) or detect string ("detect:NAME:SIZE:ID") to replace the OSD onto
- in: query
name: weight
type: number
required: true
description: The Ceph CRUSH weight for the replaced OSD
responses:
200:
description: OK
schema:
type: object
id: Message
400:
description: Bad request
schema:
type: object
id: Message
"""
return api_helper.ceph_osd_replace(
osdid,
reqargs.get("device", None),
reqargs.get("weight", None),
)
@RequestParser(
[
{
"name": "device",
"required": True,
"helptext": "A valid device or detect string must be specified.",
},
]
)
@Authenticator
def put(self, osdid, reqargs):
"""
Refresh (reimport) a Ceph OSD in the cluster
Note: This task may take up to 30s to complete and return
---
tags:
- storage / ceph
parameters:
- in: query
name: device
type: string
required: true
description: The block device (e.g. "/dev/sdb", "/dev/disk/by-path/...", etc.) or detect string ("detect:NAME:SIZE:ID") that the OSD should be using
responses:
200:
description: OK
schema:
type: object
id: Message
400:
description: Bad request
schema:
type: object
id: Message
"""
return api_helper.ceph_osd_refresh(
osdid,
reqargs.get("device", None),
)
@RequestParser(
[
{
"name": "force",
"required": False,
"helptext": "Force removal even if steps fail.",
},
{
"name": "yes-i-really-mean-it",
"required": True,
"helptext": "Please confirm that 'yes-i-really-mean-it'.",
},
]
)
@Authenticator
@ -4116,6 +4222,11 @@ class API_Storage_Ceph_OSD_Element(Resource):
tags:
- storage / ceph
parameters:
- in: query
name: force
type: boolean
required: flase
description: Force removal even if some step(s) fail
- in: query
name: yes-i-really-mean-it
type: string
@ -4138,7 +4249,7 @@ class API_Storage_Ceph_OSD_Element(Resource):
type: object
id: Message
"""
return api_helper.ceph_osd_remove(osdid)
return api_helper.ceph_osd_remove(osdid, reqargs.get("force", False))
api.add_resource(API_Storage_Ceph_OSD_Element, "/storage/ceph/osd/<osdid>")
@ -4226,6 +4337,12 @@ class API_Storage_Ceph_Pool_Root(Resource):
volume_count:
type: integer
description: The number of volumes in the pool
tier:
type: string
description: The device class/tier of the pool
pgs:
type: integer
description: The number of PGs (placement groups) for the pool
stats:
type: object
properties:
@ -4307,6 +4424,12 @@ class API_Storage_Ceph_Pool_Root(Resource):
"required": True,
"helptext": "A valid replication configuration must be specified.",
},
{
"name": "tier",
"required": False,
"choices": ("hdd", "ssd", "nvme", "default"),
"helptext": "A valid tier must be specified",
},
]
)
@Authenticator
@ -4332,6 +4455,10 @@ class API_Storage_Ceph_Pool_Root(Resource):
type: string
required: true
description: The replication configuration (e.g. "copies=3,mincopies=2") for the pool
- in: query
name: tier
required: false
description: The device tier for the pool (hdd, ssd, nvme, or default)
responses:
200:
description: OK
@ -4348,6 +4475,7 @@ class API_Storage_Ceph_Pool_Root(Resource):
reqargs.get("pool", None),
reqargs.get("pgs", None),
reqargs.get("replcfg", None),
reqargs.get("tier", None),
)
@ -4388,6 +4516,12 @@ class API_Storage_Ceph_Pool_Element(Resource):
"required": True,
"helptext": "A valid replication configuration must be specified.",
},
{
"name": "tier",
"required": False,
"choices": ("hdd", "ssd", "nvme", "default"),
"helptext": "A valid tier must be specified",
},
]
)
@Authenticator
@ -4408,6 +4542,10 @@ class API_Storage_Ceph_Pool_Element(Resource):
type: string
required: true
description: The replication configuration (e.g. "copies=3,mincopies=2") for the pool
- in: query
name: tier
required: false
description: The device tier for the pool (hdd, ssd, nvme, or default)
responses:
200:
description: OK
@ -4426,7 +4564,54 @@ class API_Storage_Ceph_Pool_Element(Resource):
id: Message
"""
return api_helper.ceph_pool_add(
pool, reqargs.get("pgs", None), reqargs.get("replcfg", None)
pool,
reqargs.get("pgs", None),
reqargs.get("replcfg", None),
reqargs.get("tier", None),
)
@RequestParser(
[
{
"name": "pgs",
"required": True,
"helptext": "A placement group count must be specified.",
},
]
)
@Authenticator
def put(self, pool, reqargs):
"""
Adjust Ceph pool {pool}'s placement group count
---
tags:
- storage / ceph
parameters:
- in: query
name: pgs
type: integer
required: true
description: The new number of placement groups (PGs) for the pool
responses:
200:
description: OK
schema:
type: object
id: Message
404:
description: Not found
schema:
type: object
id: Message
400:
description: Bad request
schema:
type: object
id: Message
"""
return api_helper.ceph_pool_set_pgs(
pool,
reqargs.get("pgs", 0),
)
@RequestParser(

View File

@ -224,7 +224,7 @@ def node_domain_state(zkhandler, node):
@ZKConnection(config)
def node_secondary(zkhandler, node):
"""
Take NODE out of primary router mode.
Take NODE out of primary coordinator mode.
"""
retflag, retdata = pvc_node.secondary_node(zkhandler, node)
@ -240,7 +240,7 @@ def node_secondary(zkhandler, node):
@ZKConnection(config)
def node_primary(zkhandler, node):
"""
Set NODE to primary router mode.
Set NODE to primary coordinator mode.
"""
retflag, retdata = pvc_node.primary_node(zkhandler, node)
@ -1302,11 +1302,43 @@ def ceph_osd_add(zkhandler, node, device, weight, ext_db_flag=False, ext_db_rati
@ZKConnection(config)
def ceph_osd_remove(zkhandler, osd_id):
def ceph_osd_replace(zkhandler, osd_id, device, weight):
"""
Replace a Ceph OSD in the PVC Ceph storage cluster.
"""
retflag, retdata = pvc_ceph.replace_osd(zkhandler, osd_id, device, weight)
if retflag:
retcode = 200
else:
retcode = 400
output = {"message": retdata.replace('"', "'")}
return output, retcode
@ZKConnection(config)
def ceph_osd_refresh(zkhandler, osd_id, device):
"""
Refresh (reimport) a Ceph OSD in the PVC Ceph storage cluster.
"""
retflag, retdata = pvc_ceph.refresh_osd(zkhandler, osd_id, device)
if retflag:
retcode = 200
else:
retcode = 400
output = {"message": retdata.replace('"', "'")}
return output, retcode
@ZKConnection(config)
def ceph_osd_remove(zkhandler, osd_id, force_flag):
"""
Remove a Ceph OSD from the PVC Ceph storage cluster.
"""
retflag, retdata = pvc_ceph.remove_osd(zkhandler, osd_id)
retflag, retdata = pvc_ceph.remove_osd(zkhandler, osd_id, force_flag)
if retflag:
retcode = 200
@ -1403,11 +1435,11 @@ def ceph_pool_list(zkhandler, limit=None, is_fuzzy=True):
@ZKConnection(config)
def ceph_pool_add(zkhandler, name, pgs, replcfg):
def ceph_pool_add(zkhandler, name, pgs, replcfg, tier=None):
"""
Add a Ceph RBD pool to the PVC Ceph storage cluster.
"""
retflag, retdata = pvc_ceph.add_pool(zkhandler, name, pgs, replcfg)
retflag, retdata = pvc_ceph.add_pool(zkhandler, name, pgs, replcfg, tier)
if retflag:
retcode = 200
@ -1434,6 +1466,22 @@ def ceph_pool_remove(zkhandler, name):
return output, retcode
@ZKConnection(config)
def ceph_pool_set_pgs(zkhandler, name, pgs):
"""
Set the PGs of a ceph RBD pool.
"""
retflag, retdata = pvc_ceph.set_pgs_pool(zkhandler, name, pgs)
if retflag:
retcode = 200
else:
retcode = 400
output = {"message": retdata.replace('"', "'")}
return output, retcode
@pvc_common.Profiler(config)
@ZKConnection(config)
def ceph_volume_list(zkhandler, pool=None, limit=None, is_fuzzy=True):

View File

@ -1442,11 +1442,17 @@ def create_vm(
)
if not volume["pool"] in pools:
pools[volume["pool"]] = int(
volume_data["stats"]["size"].replace("G", "")
pvc_ceph.format_bytes_fromhuman(volume_data["stats"]["size"])
/ 1024
/ 1024
/ 1024
)
else:
pools[volume["pool"]] += int(
volume_data["stats"]["size"].replace("G", "")
pvc_ceph.format_bytes_fromhuman(volume_data["stats"]["size"])
/ 1024
/ 1024
/ 1024
)
else:
if not volume["pool"] in pools:
@ -2080,7 +2086,7 @@ def create_vm(
del zkhandler
return {
"status": 'VM "{}" with profile "{}" has been provisioned and started successfully'.format(
"status": 'VM "{}" with profile "{}" has been provisioned successfully'.format(
vm_name, vm_profile
),
"current": 10,

View File

@ -21,7 +21,7 @@
import math
from json import dumps
from json import dumps, loads
from requests_toolbelt.multipart.encoder import (
MultipartEncoder,
MultipartEncoderMonitor,
@ -255,7 +255,47 @@ def ceph_osd_add(config, node, device, weight, ext_db_flag, ext_db_ratio):
return retstatus, response.json().get("message", "")
def ceph_osd_remove(config, osdid):
def ceph_osd_replace(config, osdid, device, weight):
"""
Replace an existing Ceph OSD with a new device
API endpoint: POST /api/v1/storage/ceph/osd/{osdid}
API arguments: device={device}, weight={weight}
API schema: {"message":"{data}"}
"""
params = {"device": device, "weight": weight, "yes-i-really-mean-it": "yes"}
response = call_api(config, "post", f"/storage/ceph/osd/{osdid}", params=params)
if response.status_code == 200:
retstatus = True
else:
retstatus = False
return retstatus, response.json().get("message", "")
def ceph_osd_refresh(config, osdid, device):
"""
Refresh (reimport) an existing Ceph OSD with device {device}
API endpoint: PUT /api/v1/storage/ceph/osd/{osdid}
API arguments: device={device}
API schema: {"message":"{data}"}
"""
params = {
"device": device,
}
response = call_api(config, "put", f"/storage/ceph/osd/{osdid}", params=params)
if response.status_code == 200:
retstatus = True
else:
retstatus = False
return retstatus, response.json().get("message", "")
def ceph_osd_remove(config, osdid, force_flag):
"""
Remove Ceph OSD
@ -263,7 +303,7 @@ def ceph_osd_remove(config, osdid):
API arguments:
API schema: {"message":"{data}"}
"""
params = {"yes-i-really-mean-it": "yes"}
params = {"force": force_flag, "yes-i-really-mean-it": "yes"}
response = call_api(
config, "delete", "/storage/ceph/osd/{osdid}".format(osdid=osdid), params=params
)
@ -367,10 +407,29 @@ def format_list_osd(osd_list):
for osd_information in osd_list:
try:
# If this happens, the node hasn't checked in fully yet, so just ignore it
# If this happens, the node hasn't checked in fully yet, so use some dummy data
if osd_information["stats"]["node"] == "|":
continue
for key in osd_information["stats"].keys():
if (
osd_information["stats"][key] == "|"
or osd_information["stats"][key] is None
):
osd_information["stats"][key] = "N/A"
for key in osd_information.keys():
if osd_information[key] is None:
osd_information[key] = "N/A"
else:
for key in osd_information["stats"].keys():
if key in ["utilization", "var"] and isinstance(
osd_information["stats"][key], float
):
osd_information["stats"][key] = round(
osd_information["stats"][key], 2
)
except KeyError:
print(
f"Details for OSD {osd_information['id']} missing required keys, skipping."
)
continue
# Deal with the size to human readable
@ -396,7 +455,7 @@ def format_list_osd(osd_list):
osd_id_length = _osd_id_length
# Set the OSD node length
_osd_node_length = len(osd_information["stats"]["node"]) + 1
_osd_node_length = len(osd_information["node"]) + 1
if _osd_node_length > osd_node_length:
osd_node_length = _osd_node_length
@ -439,13 +498,11 @@ def format_list_osd(osd_list):
if _osd_free_length > osd_free_length:
osd_free_length = _osd_free_length
osd_util = round(osd_information["stats"]["utilization"], 2)
_osd_util_length = len(str(osd_util)) + 1
_osd_util_length = len(str(osd_information["stats"]["utilization"])) + 1
if _osd_util_length > osd_util_length:
osd_util_length = _osd_util_length
osd_var = round(osd_information["stats"]["var"], 2)
_osd_var_length = len(str(osd_var)) + 1
_osd_var_length = len(str(osd_information["stats"]["var"])) + 1
if _osd_var_length > osd_var_length:
osd_var_length = _osd_var_length
@ -592,18 +649,9 @@ def format_list_osd(osd_list):
)
for osd_information in sorted(osd_list, key=lambda x: int(x["id"])):
try:
# If this happens, the node hasn't checked in fully yet, so just ignore it
if osd_information["stats"]["node"] == "|":
continue
except KeyError:
continue
osd_up_flag, osd_up_colour, osd_in_flag, osd_in_colour = getOutputColoursOSD(
osd_information
)
osd_util = round(osd_information["stats"]["utilization"], 2)
osd_var = round(osd_information["stats"]["var"], 2)
osd_db_device = osd_information["db_device"]
if not osd_db_device:
@ -653,7 +701,7 @@ def format_list_osd(osd_list):
osd_rdops_length=osd_rdops_length,
osd_rddata_length=osd_rddata_length,
osd_id=osd_information["id"],
osd_node=osd_information["stats"]["node"],
osd_node=osd_information["node"],
osd_device=osd_information["device"],
osd_db_device=osd_db_device,
osd_up_colour=osd_up_colour,
@ -666,8 +714,8 @@ def format_list_osd(osd_list):
osd_reweight=osd_information["stats"]["reweight"],
osd_used=osd_information["stats"]["used"],
osd_free=osd_information["stats"]["avail"],
osd_util=osd_util,
osd_var=osd_var,
osd_util=osd_information["stats"]["utilization"],
osd_var=osd_information["stats"]["var"],
osd_wrops=osd_information["stats"]["wr_ops"],
osd_wrdata=osd_information["stats"]["wr_data"],
osd_rdops=osd_information["stats"]["rd_ops"],
@ -708,7 +756,7 @@ def ceph_pool_info(config, pool):
def ceph_pool_list(config, limit):
"""
Get list information about Ceph OSDs (limited by {limit})
Get list information about Ceph pools (limited by {limit})
API endpoint: GET /api/v1/storage/ceph/pool
API arguments: limit={limit}
@ -726,15 +774,15 @@ def ceph_pool_list(config, limit):
return False, response.json().get("message", "")
def ceph_pool_add(config, pool, pgs, replcfg):
def ceph_pool_add(config, pool, pgs, replcfg, tier):
"""
Add new Ceph OSD
Add new Ceph pool
API endpoint: POST /api/v1/storage/ceph/pool
API arguments: pool={pool}, pgs={pgs}, replcfg={replcfg}
API arguments: pool={pool}, pgs={pgs}, replcfg={replcfg}, tier={tier}
API schema: {"message":"{data}"}
"""
params = {"pool": pool, "pgs": pgs, "replcfg": replcfg}
params = {"pool": pool, "pgs": pgs, "replcfg": replcfg, "tier": tier}
response = call_api(config, "post", "/storage/ceph/pool", params=params)
if response.status_code == 200:
@ -747,7 +795,7 @@ def ceph_pool_add(config, pool, pgs, replcfg):
def ceph_pool_remove(config, pool):
"""
Remove Ceph OSD
Remove Ceph pool
API endpoint: DELETE /api/v1/storage/ceph/pool/{pool}
API arguments:
@ -766,6 +814,27 @@ def ceph_pool_remove(config, pool):
return retstatus, response.json().get("message", "")
def ceph_pool_set_pgs(config, pool, pgs):
"""
Set the PGs of a Ceph pool
API endpoint: PUT /api/v1/storage/ceph/pool/{pool}
API arguments: {"pgs": "{pgs}"}
API schema: {"message":"{data}"}
"""
params = {"pgs": pgs}
response = call_api(
config, "put", "/storage/ceph/pool/{pool}".format(pool=pool), params=params
)
if response.status_code == 200:
retstatus = True
else:
retstatus = False
return retstatus, response.json().get("message", "")
def format_list_pool(pool_list):
# Handle empty list
if not pool_list:
@ -775,6 +844,8 @@ def format_list_pool(pool_list):
pool_name_length = 5
pool_id_length = 3
pool_tier_length = 5
pool_pgs_length = 4
pool_used_length = 5
pool_usedpct_length = 6
pool_free_length = 5
@ -812,6 +883,16 @@ def format_list_pool(pool_list):
if _pool_id_length > pool_id_length:
pool_id_length = _pool_id_length
# Set the tier and length
_pool_tier_length = len(str(pool_information["tier"])) + 1
if _pool_tier_length > pool_tier_length:
pool_tier_length = _pool_tier_length
# Set the pgs and length
_pool_pgs_length = len(str(pool_information["pgs"])) + 1
if _pool_pgs_length > pool_pgs_length:
pool_pgs_length = _pool_pgs_length
# Set the used and length
_pool_used_length = len(str(pool_information["stats"]["used_bytes"])) + 1
if _pool_used_length > pool_used_length:
@ -879,10 +960,12 @@ def format_list_pool(pool_list):
end_bold=ansiprint.end(),
pool_header_length=pool_id_length
+ pool_name_length
+ pool_tier_length
+ pool_pgs_length
+ pool_used_length
+ pool_usedpct_length
+ pool_free_length
+ 4,
+ 6,
objects_header_length=pool_num_objects_length
+ pool_num_clones_length
+ pool_num_copies_length
@ -898,10 +981,12 @@ def format_list_pool(pool_list):
6,
pool_id_length
+ pool_name_length
+ pool_tier_length
+ pool_pgs_length
+ pool_used_length
+ pool_usedpct_length
+ pool_free_length
+ 3,
+ 5,
)
]
),
@ -934,6 +1019,8 @@ def format_list_pool(pool_list):
"{bold}\
{pool_id: <{pool_id_length}} \
{pool_name: <{pool_name_length}} \
{pool_tier: <{pool_tier_length}} \
{pool_pgs: <{pool_pgs_length}} \
{pool_used: <{pool_used_length}} \
{pool_usedpct: <{pool_usedpct_length}} \
{pool_free: <{pool_free_length}} \
@ -950,6 +1037,8 @@ def format_list_pool(pool_list):
end_bold=ansiprint.end(),
pool_id_length=pool_id_length,
pool_name_length=pool_name_length,
pool_tier_length=pool_tier_length,
pool_pgs_length=pool_pgs_length,
pool_used_length=pool_used_length,
pool_usedpct_length=pool_usedpct_length,
pool_free_length=pool_free_length,
@ -963,6 +1052,8 @@ def format_list_pool(pool_list):
pool_read_data_length=pool_read_data_length,
pool_id="ID",
pool_name="Name",
pool_tier="Tier",
pool_pgs="PGs",
pool_used="Used",
pool_usedpct="Used%",
pool_free="Free",
@ -983,6 +1074,8 @@ def format_list_pool(pool_list):
"{bold}\
{pool_id: <{pool_id_length}} \
{pool_name: <{pool_name_length}} \
{pool_tier: <{pool_tier_length}} \
{pool_pgs: <{pool_pgs_length}} \
{pool_used: <{pool_used_length}} \
{pool_usedpct: <{pool_usedpct_length}} \
{pool_free: <{pool_free_length}} \
@ -999,6 +1092,8 @@ def format_list_pool(pool_list):
end_bold="",
pool_id_length=pool_id_length,
pool_name_length=pool_name_length,
pool_tier_length=pool_tier_length,
pool_pgs_length=pool_pgs_length,
pool_used_length=pool_used_length,
pool_usedpct_length=pool_usedpct_length,
pool_free_length=pool_free_length,
@ -1012,6 +1107,8 @@ def format_list_pool(pool_list):
pool_read_data_length=pool_read_data_length,
pool_id=pool_information["stats"]["id"],
pool_name=pool_information["name"],
pool_tier=pool_information["tier"],
pool_pgs=pool_information["pgs"],
pool_used=pool_information["stats"]["used_bytes"],
pool_usedpct=pool_information["stats"]["used_percent"],
pool_free=pool_information["stats"]["free_bytes"],
@ -1648,6 +1745,8 @@ def ceph_benchmark_list(config, job):
def get_benchmark_list_results_legacy(benchmark_data):
if isinstance(benchmark_data, str):
benchmark_data = loads(benchmark_data)
benchmark_bandwidth = dict()
benchmark_iops = dict()
for test in ["seq_read", "seq_write", "rand_read_4K", "rand_write_4K"]:
@ -1732,7 +1831,7 @@ def format_list_benchmark(config, benchmark_information):
for benchmark in benchmark_information:
benchmark_job = benchmark["job"]
benchmark_format = benchmark["test_format"] # noqa: F841
benchmark_format = benchmark.get("test_format", 0) # noqa: F841
_benchmark_job_length = len(benchmark_job)
if _benchmark_job_length > benchmark_job_length:
@ -1837,7 +1936,7 @@ def format_list_benchmark(config, benchmark_information):
for benchmark in benchmark_information:
benchmark_job = benchmark["job"]
benchmark_format = benchmark["test_format"] # noqa: F841
benchmark_format = benchmark.get("test_format", 0) # noqa: F841
if benchmark["benchmark_result"] == "Running":
seq_benchmark_bandwidth = "Running"

View File

@ -23,10 +23,10 @@ from requests_toolbelt.multipart.encoder import (
MultipartEncoder,
MultipartEncoderMonitor,
)
from ast import literal_eval
import pvc.cli_lib.ansiprint as ansiprint
from pvc.cli_lib.common import UploadProgressBar, call_api
from ast import literal_eval
#
@ -793,10 +793,16 @@ def task_status(config, task_id=None, is_watching=False):
task["type"] = task_type
task["worker"] = task_host
task["id"] = task_job.get("id")
task_args = literal_eval(task_job.get("args"))
try:
task_args = literal_eval(task_job.get("args"))
except Exception:
task_args = task_job.get("args")
task["vm_name"] = task_args[0]
task["vm_profile"] = task_args[1]
task_kwargs = literal_eval(task_job.get("kwargs"))
try:
task_kwargs = literal_eval(task_job.get("kwargs"))
except Exception:
task_kwargs = task_job.get("kwargs")
task["vm_define"] = str(bool(task_kwargs["define_vm"]))
task["vm_start"] = str(bool(task_kwargs["start_vm"]))
task_data.append(task)

View File

@ -28,8 +28,11 @@ import time
import colorama
import yaml
import json
import syslog
import lxml.etree as etree
from sys import argv
from distutils.util import strtobool
from functools import wraps
@ -51,6 +54,22 @@ default_store_data = {"cfgfile": "/etc/pvc/pvcapid.yaml"}
config = dict()
#
# Audit function
#
def audit():
args = argv
args[0] = "pvc"
syslog.openlog(facility=syslog.LOG_AUTH)
syslog.syslog(
'client audit: command "{}" by user "{}"'.format(
" ".join(args),
os.environ.get("USER", None),
)
)
syslog.closelog()
#
# Version function
#
@ -467,7 +486,7 @@ def cli_node():
@cluster_req
def node_secondary(node, wait):
"""
Take NODE out of primary router mode.
Take NODE out of primary coordinator mode.
"""
task_retcode, task_retdata = pvc_provisioner.task_status(config, None)
@ -520,7 +539,7 @@ def node_secondary(node, wait):
@cluster_req
def node_primary(node, wait):
"""
Put NODE into primary router mode.
Put NODE into primary coordinator mode.
"""
task_retcode, task_retdata = pvc_provisioner.task_status(config, None)
@ -784,11 +803,11 @@ def cli_vm():
)
@click.option(
"-s",
"--selector",
"--node-selector",
"node_selector",
default="mem",
default="none",
show_default=True,
type=click.Choice(["mem", "load", "vcpus", "vms", "none"]),
type=click.Choice(["mem", "memfree", "load", "vcpus", "vms", "none"]),
help='Method to determine optimal target node during autoselect; "none" will use the default for the cluster.',
)
@click.option(
@ -838,6 +857,18 @@ def vm_define(
):
"""
Define a new virtual machine from Libvirt XML configuration file VMCONFIG.
The target node selector ("--node-selector"/"-s") can be "none" to use the cluster default, or one of the following values:
* "mem": choose the node with the least provisioned VM memory
* "memfree": choose the node with the most (real) free memory
* "vcpus": choose the node with the least allocated VM vCPUs
* "load": choose the node with the lowest current load average
* "vms": choose the node with the least number of provisioned VMs
For most clusters, "mem" should be sufficient, but others may be used based on the cluster workload and available resources. The following caveats should be considered:
* "mem" looks at the provisioned memory, not the allocated memory; thus, stopped or disabled VMs are counted towards a node's memory for this selector, even though their memory is not actively in use.
* "memfree" looks at the free memory of the node in general, ignoring the amount provisioned to VMs; if any VM's internal memory usage changes, this value would be affected. This might be preferable to "mem" on clusters with very high memory utilization versus total capacity or if many VMs are stopped/disabled.
* "load" looks at the system load of the node in general, ignoring load in any particular VMs; if any VM's CPU usage changes, this value would be affected. This might be preferable on clusters with some very CPU intensive VMs.
"""
# Open the XML file
@ -879,11 +910,11 @@ def vm_define(
)
@click.option(
"-s",
"--selector",
"--node-selector",
"node_selector",
default=None,
show_default=False,
type=click.Choice(["mem", "load", "vcpus", "vms", "none"]),
type=click.Choice(["mem", "memfree", "load", "vcpus", "vms", "none"]),
help='Method to determine optimal target node during autoselect; "none" will use the default for the cluster.',
)
@click.option(
@ -923,6 +954,8 @@ def vm_meta(
):
"""
Modify the PVC metadata of existing virtual machine DOMAIN. At least one option to update must be specified. DOMAIN may be a UUID or name.
For details on the "--node-selector"/"-s" values, please see help for the command "pvc vm define".
"""
if (
@ -990,6 +1023,7 @@ def vm_meta(
)
@click.argument("domain")
@click.argument("cfgfile", type=click.File(), default=None, required=False)
@cluster_req
def vm_modify(
domain,
cfgfile,
@ -1319,20 +1353,36 @@ def vm_stop(domain, confirm_flag):
@click.argument("domain")
@click.option(
"--force",
"force",
"force_flag",
is_flag=True,
default=False,
help="Forcibly stop the VM instead of waiting for shutdown.",
)
@click.option(
"-y",
"--yes",
"confirm_flag",
is_flag=True,
default=False,
help="Confirm the disable",
)
@cluster_req
def vm_disable(domain, force):
def vm_disable(domain, force_flag, confirm_flag):
"""
Shut down virtual machine DOMAIN and mark it as disabled. DOMAIN may be a UUID or name.
Disabled VMs will not be counted towards a degraded cluster health status, unlike stopped VMs. Use this option for a VM that will remain off for an extended period.
"""
retcode, retmsg = pvc_vm.vm_state(config, domain, "disable", force=force)
if not confirm_flag and not config["unsafe"]:
try:
click.confirm(
"Disable VM {}".format(domain), prompt_suffix="? ", abort=True
)
except Exception:
exit(0)
retcode, retmsg = pvc_vm.vm_state(config, domain, "disable", force=force_flag)
cleanup(retcode, retmsg)
@ -3159,20 +3209,29 @@ def ceph_benchmark():
# pvc storage benchmark run
###############################################################################
@click.command(name="run", short_help="Run a storage benchmark.")
@click.option(
"-y",
"--yes",
"confirm_flag",
is_flag=True,
default=False,
help="Confirm the run",
)
@click.argument("pool")
@cluster_req
def ceph_benchmark_run(pool):
def ceph_benchmark_run(confirm_flag, pool):
"""
Run a storage benchmark on POOL in the background.
"""
try:
click.confirm(
"NOTE: Storage benchmarks take approximately 10 minutes to run and generate significant load on the cluster; they should be run sparingly. Continue",
prompt_suffix="? ",
abort=True,
)
except Exception:
exit(0)
if not confirm_flag and not config["unsafe"]:
try:
click.confirm(
"NOTE: Storage benchmarks take approximately 10 minutes to run and generate significant load on the cluster; they should be run sparingly. Continue",
prompt_suffix="? ",
abort=True,
)
except Exception:
exit(0)
retcode, retmsg = pvc_ceph.ceph_benchmark_run(config, pool)
cleanup(retcode, retmsg)
@ -3253,7 +3312,9 @@ def ceph_osd():
@cluster_req
def ceph_osd_create_db_vg(node, device, confirm_flag):
"""
Create a new Ceph OSD database volume group on node NODE with block device DEVICE. DEVICE must be a valid raw block device, one of e.g. '/dev/sda', '/dev/nvme0n1', '/dev/disk/by-path/...', '/dev/disk/by-id/...', etc. Using partitions is not supported.
Create a new Ceph OSD database volume group on node NODE with block device DEVICE. DEVICE must be a valid raw block device (e.g. '/dev/nvme0n1', '/dev/disk/by-path/...') or a "detect" string. Using partitions is not supported.
A "detect" string is a string in the form "detect:<NAME>:<HUMAN-SIZE>:<ID>". Detect strings allow for automatic determination of Linux block device paths from known basic information about disks by leveraging "lsscsi" on the target host. The "NAME" should be some descriptive identifier, for instance the manufacturer (e.g. "INTEL"), the "HUMAN-SIZE" should be the labeled human-readable size of the device (e.g. "480GB", "1.92TB"), and "ID" specifies the Nth 0-indexed device which matches the "NAME" and "HUMAN-SIZE" values (e.g. "2" would match the third device with the corresponding "NAME" and "HUMAN-SIZE"). When matching against sizes, there is +/- 3% flexibility to account for base-1000 vs. base-1024 differences and rounding errors. The "NAME" may contain whitespace but if so the entire detect string should be quoted, and is case-insensitive. More information about detect strings can be found in the pvcbootstrapd manual.
This volume group will be used for Ceph OSD database and WAL functionality if the '--ext-db' flag is passed to newly-created OSDs during 'pvc storage osd add'. DEVICE should be an extremely fast SSD device (NVMe, Intel Optane, etc.) which is significantly faster than the normal OSD disks and with very high write endurance. Only one OSD database volume group on a single physical device is supported per node, so it must be fast and large enough to act as an effective OSD database device for all OSDs on the node. Attempting to add additional database volume groups after the first will fail.
"""
@ -3315,7 +3376,9 @@ def ceph_osd_create_db_vg(node, device, confirm_flag):
@cluster_req
def ceph_osd_add(node, device, weight, ext_db_flag, ext_db_ratio, confirm_flag):
"""
Add a new Ceph OSD on node NODE with block device DEVICE. DEVICE must be a valid raw block device, one of e.g. '/dev/sda', '/dev/nvme0n1', '/dev/disk/by-path/...', '/dev/disk/by-id/...', etc. Using partitions is not supported.
Add a new Ceph OSD on node NODE with block device DEVICE. DEVICE must be a valid raw block device (e.g. '/dev/sda', '/dev/nvme0n1', '/dev/disk/by-path/...', '/dev/disk/by-id/...') or a "detect" string. Using partitions is not supported.
A "detect" string is a string in the form "detect:<NAME>:<HUMAN-SIZE>:<ID>". Detect strings allow for automatic determination of Linux block device paths from known basic information about disks by leveraging "lsscsi" on the target host. The "NAME" should be some descriptive identifier, for instance the manufacturer (e.g. "INTEL"), the "HUMAN-SIZE" should be the labeled human-readable size of the device (e.g. "480GB", "1.92TB"), and "ID" specifies the Nth 0-indexed device which matches the "NAME" and "HUMAN-SIZE" values (e.g. "2" would match the third device with the corresponding "NAME" and "HUMAN-SIZE"). When matching against sizes, there is +/- 3% flexibility to account for base-1000 vs. base-1024 differences and rounding errors. The "NAME" may contain whitespace but if so the entire detect string should be quoted, and is case-insensitive. More information about detect strings can be found in the pvcbootstrapd manual.
The weight of an OSD should reflect the ratio of the OSD to other OSDs in the storage cluster. For example, if all OSDs are the same size as recommended for PVC, 1 (the default) is a valid weight so that all are treated identically. If a new OSD is added later which is 4x the size of the existing OSDs, the new OSD's weight should then be 4 to tell the cluster that 4x the data can be stored on the OSD. Weights can also be tweaked for performance reasons, since OSDs with more data will incur more I/O load. For more information about CRUSH weights, please see the Ceph documentation.
@ -3340,10 +3403,19 @@ def ceph_osd_add(node, device, weight, ext_db_flag, ext_db_ratio, confirm_flag):
###############################################################################
# pvc storage osd remove
# pvc storage osd replace
###############################################################################
@click.command(name="remove", short_help="Remove OSD.")
@click.command(name="replace", short_help="Replace OSD block device.")
@click.argument("osdid")
@click.argument("device")
@click.option(
"-w",
"--weight",
"weight",
default=1.0,
show_default=True,
help="New weight of the OSD within the CRUSH map.",
)
@click.option(
"-y",
"--yes",
@ -3353,11 +3425,80 @@ def ceph_osd_add(node, device, weight, ext_db_flag, ext_db_ratio, confirm_flag):
help="Confirm the removal",
)
@cluster_req
def ceph_osd_remove(osdid, confirm_flag):
def ceph_osd_replace(osdid, device, weight, confirm_flag):
"""
Replace the block device of an existing OSD with ID OSDID with DEVICE. Use this command to replace a failed or smaller OSD block device with a new one.
DEVICE must be a valid raw block device (e.g. '/dev/sda', '/dev/nvme0n1', '/dev/disk/by-path/...', '/dev/disk/by-id/...') or a "detect" string. Using partitions is not supported. A "detect" string is a string in the form "detect:<NAME>:<HUMAN-SIZE>:<ID>". For details, see 'pvc storage osd add --help'.
The weight of an OSD should reflect the ratio of the OSD to other OSDs in the storage cluster. For details, see 'pvc storage osd add --help'. Note that the current weight must be explicitly specified if it differs from the default.
Existing IDs, external DB devices, etc. of the OSD will be preserved; data will be lost and rebuilt from the remaining healthy OSDs.
"""
if not confirm_flag and not config["unsafe"]:
try:
click.confirm(
"Replace OSD {} with block device {}".format(osdid, device),
prompt_suffix="? ",
abort=True,
)
except Exception:
exit(0)
retcode, retmsg = pvc_ceph.ceph_osd_replace(config, osdid, device, weight)
cleanup(retcode, retmsg)
###############################################################################
# pvc storage osd refresh
###############################################################################
@click.command(name="refresh", short_help="Refresh (reimport) OSD device.")
@click.argument("osdid")
@click.argument("device")
@cluster_req
def ceph_osd_refresh(osdid, device):
"""
Refresh (reimport) the block DEVICE of an existing OSD with ID OSDID. Use this command to reimport a working OSD into a rebuilt/replaced node.
DEVICE must be a valid raw block device (e.g. '/dev/sda', '/dev/nvme0n1', '/dev/disk/by-path/...', '/dev/disk/by-id/...') or a "detect" string. Using partitions is not supported. A "detect" string is a string in the form "detect:<NAME>:<HUMAN-SIZE>:<ID>". For details, see 'pvc storage osd add --help'.
Existing data, IDs, weights, etc. of the OSD will be preserved.
NOTE: If a device had an external DB device, this is not automatically handled at this time. It is best to remove and re-add the OSD instead.
"""
retcode, retmsg = pvc_ceph.ceph_osd_refresh(config, osdid, device)
cleanup(retcode, retmsg)
###############################################################################
# pvc storage osd remove
###############################################################################
@click.command(name="remove", short_help="Remove OSD.")
@click.argument("osdid")
@click.option(
"-f",
"--force",
"force_flag",
is_flag=True,
default=False,
help="Force removal even if steps fail",
)
@click.option(
"-y",
"--yes",
"confirm_flag",
is_flag=True,
default=False,
help="Confirm the removal",
)
@cluster_req
def ceph_osd_remove(osdid, force_flag, confirm_flag):
"""
Remove a Ceph OSD with ID OSDID.
DANGER: This will completely remove the OSD from the cluster. OSDs will rebalance which will negatively affect performance and available space. It is STRONGLY RECOMMENDED to set an OSD out (using 'pvc storage osd out') and allow the cluster to fully rebalance (verified with 'pvc storage status') before removing an OSD.
NOTE: The "-f"/"--force" option is useful after replacing a failed node, to ensure the OSD is removed even if the OSD in question does not properly exist on the node after a rebuild.
"""
if not confirm_flag and not config["unsafe"]:
try:
@ -3365,7 +3506,7 @@ def ceph_osd_remove(osdid, confirm_flag):
except Exception:
exit(0)
retcode, retmsg = pvc_ceph.ceph_osd_remove(config, osdid)
retcode, retmsg = pvc_ceph.ceph_osd_remove(config, osdid, force_flag)
cleanup(retcode, retmsg)
@ -3475,6 +3616,17 @@ def ceph_pool():
@click.command(name="add", short_help="Add new RBD pool.")
@click.argument("name")
@click.argument("pgs")
@click.option(
"-t",
"--tier",
"tier",
default="default",
show_default=True,
type=click.Choice(["default", "hdd", "ssd", "nvme"]),
help="""
The device tier to limit the pool to. Default is all OSD tiers, and specific tiers can be specified instead. At least one full set of OSDs for a given tier must be present for the tier to be specified, or the pool creation will fail.
""",
)
@click.option(
"--replcfg",
"replcfg",
@ -3482,20 +3634,18 @@ def ceph_pool():
show_default=True,
required=False,
help="""
The replication configuration, specifying both a "copies" and "mincopies" value, separated by a
comma, e.g. "copies=3,mincopies=2". The "copies" value specifies the total number of replicas
and should not exceed the total number of nodes; the "mincopies" value specifies the minimum
number of available copies to allow writes. For additional details please see the Cluster
Architecture documentation.
The replication configuration, specifying both a "copies" and "mincopies" value, separated by a comma, e.g. "copies=3,mincopies=2". The "copies" value specifies the total number of replicas and should not exceed the total number of nodes; the "mincopies" value specifies the minimum number of available copies to allow writes. For additional details please see the Cluster Architecture documentation.
""",
)
@cluster_req
def ceph_pool_add(name, pgs, replcfg):
def ceph_pool_add(name, pgs, tier, replcfg):
"""
Add a new Ceph RBD pool with name NAME and PGS placement groups.
The placement group count must be a non-zero power of 2.
"""
retcode, retmsg = pvc_ceph.ceph_pool_add(config, name, pgs, replcfg)
retcode, retmsg = pvc_ceph.ceph_pool_add(config, name, pgs, replcfg, tier)
cleanup(retcode, retmsg)
@ -3531,6 +3681,26 @@ def ceph_pool_remove(name, confirm_flag):
cleanup(retcode, retmsg)
###############################################################################
# pvc storage pool set-pgs
###############################################################################
@click.command(name="set-pgs", short_help="Set PGs of an RBD pool.")
@click.argument("name")
@click.argument("pgs")
@cluster_req
def ceph_pool_set_pgs(name, pgs):
"""
Set the placement groups (PGs) count for the pool NAME to PGS.
The placement group count must be a non-zero power of 2.
Placement group counts may be increased or decreased as required though frequent alteration is not recommended.
"""
retcode, retmsg = pvc_ceph.ceph_pool_set_pgs(config, name, pgs)
cleanup(retcode, retmsg)
###############################################################################
# pvc storage pool list
###############################################################################
@ -3963,7 +4133,9 @@ def provisioner_template_system_list(limit):
@click.option(
"--node-selector",
"node_selector",
type=click.Choice(["mem", "vcpus", "vms", "load", "none"], case_sensitive=False),
type=click.Choice(
["mem", "memfree", "vcpus", "vms", "load", "none"], case_sensitive=False
),
default="none",
help='Method to determine optimal target node during autoselect; "none" will use the default for the cluster.',
)
@ -3996,6 +4168,8 @@ def provisioner_template_system_add(
):
"""
Add a new system template NAME to the PVC cluster provisioner.
For details on the possible "--node-selector" values, please see help for the command "pvc vm define".
"""
params = dict()
params["name"] = name
@ -4055,7 +4229,9 @@ def provisioner_template_system_add(
@click.option(
"--node-selector",
"node_selector",
type=click.Choice(["mem", "vcpus", "vms", "load", "none"], case_sensitive=False),
type=click.Choice(
["mem", "memfree", "vcpus", "vms", "load", "none"], case_sensitive=False
),
help='Method to determine optimal target node during autoselect; "none" will use the default for the cluster.',
)
@click.option(
@ -4087,6 +4263,8 @@ def provisioner_template_system_modify(
):
"""
Add a new system template NAME to the PVC cluster provisioner.
For details on the possible "--node-selector" values, please see help for the command "pvc vm define".
"""
params = dict()
params["vcpus"] = vcpus
@ -5680,6 +5858,11 @@ def cli(_cluster, _debug, _quiet, _unsafe, _colour):
global config
store_data = get_store(store_path)
config = get_config(store_data, _cluster)
# There is only one cluster and no local cluster, so even if nothing was passed, use it
if len(store_data) == 1 and _cluster is None and config.get("badcfg", None):
config = get_config(store_data, list(store_data.keys())[0])
if not config.get("badcfg", None):
config["debug"] = _debug
config["unsafe"] = _unsafe
@ -5702,6 +5885,8 @@ def cli(_cluster, _debug, _quiet, _unsafe, _colour):
)
echo("", err=True)
audit()
#
# Click command tree
@ -5794,6 +5979,8 @@ ceph_benchmark.add_command(ceph_benchmark_list)
ceph_osd.add_command(ceph_osd_create_db_vg)
ceph_osd.add_command(ceph_osd_add)
ceph_osd.add_command(ceph_osd_replace)
ceph_osd.add_command(ceph_osd_refresh)
ceph_osd.add_command(ceph_osd_remove)
ceph_osd.add_command(ceph_osd_in)
ceph_osd.add_command(ceph_osd_out)
@ -5803,6 +5990,7 @@ ceph_osd.add_command(ceph_osd_list)
ceph_pool.add_command(ceph_pool_add)
ceph_pool.add_command(ceph_pool_remove)
ceph_pool.add_command(ceph_pool_set_pgs)
ceph_pool.add_command(ceph_pool_list)
ceph_volume.add_command(ceph_volume_add)

View File

@ -2,7 +2,7 @@ from setuptools import setup
setup(
name="pvc",
version="0.9.45",
version="0.9.55",
packages=["pvc", "pvc.cli_lib"],
install_requires=[
"Click",

View File

@ -181,6 +181,7 @@ def getClusterOSDList(zkhandler):
def getOSDInformation(zkhandler, osd_id):
# Get the devices
osd_node = zkhandler.read(("osd.node", osd_id))
osd_device = zkhandler.read(("osd.device", osd_id))
osd_db_device = zkhandler.read(("osd.db_device", osd_id))
# Parse the stats data
@ -189,6 +190,7 @@ def getOSDInformation(zkhandler, osd_id):
osd_information = {
"id": osd_id,
"node": osd_node,
"device": osd_device,
"db_device": osd_db_device,
"stats": osd_stats,
@ -234,7 +236,7 @@ def add_osd_db_vg(zkhandler, node, device):
return success, message
# OSD addition and removal uses the /cmd/ceph pipe
# OSD actions use the /cmd/ceph pipe
# These actions must occur on the specific node they reference
def add_osd(zkhandler, node, device, weight, ext_db_flag=False, ext_db_ratio=0.05):
# Verify the target node exists
@ -286,14 +288,111 @@ def add_osd(zkhandler, node, device, weight, ext_db_flag=False, ext_db_ratio=0.0
return success, message
def remove_osd(zkhandler, osd_id):
def replace_osd(zkhandler, osd_id, new_device, weight):
# Get current OSD information
osd_information = getOSDInformation(zkhandler, osd_id)
node = osd_information["node"]
old_device = osd_information["device"]
ext_db_flag = True if osd_information["db_device"] else False
# Verify target block device isn't in use
block_osd = verifyOSDBlock(zkhandler, node, new_device)
if block_osd and block_osd != osd_id:
return (
False,
'ERROR: Block device "{}" on node "{}" is used by OSD "{}"'.format(
new_device, node, block_osd
),
)
# Tell the cluster to create a new OSD for the host
replace_osd_string = "osd_replace {},{},{},{},{},{}".format(
node, osd_id, old_device, new_device, weight, ext_db_flag
)
zkhandler.write([("base.cmd.ceph", replace_osd_string)])
# Wait 1/2 second for the cluster to get the message and start working
time.sleep(0.5)
# Acquire a read lock, so we get the return exclusively
with zkhandler.readlock("base.cmd.ceph"):
try:
result = zkhandler.read("base.cmd.ceph").split()[0]
if result == "success-osd_replace":
message = 'Replaced OSD {} with block device "{}" on node "{}".'.format(
osd_id, new_device, node
)
success = True
else:
message = "ERROR: Failed to replace OSD; check node logs for details."
success = False
except Exception:
message = "ERROR: Command ignored by node."
success = False
# Acquire a write lock to ensure things go smoothly
with zkhandler.writelock("base.cmd.ceph"):
time.sleep(0.5)
zkhandler.write([("base.cmd.ceph", "")])
return success, message
def refresh_osd(zkhandler, osd_id, device):
# Get current OSD information
osd_information = getOSDInformation(zkhandler, osd_id)
node = osd_information["node"]
ext_db_flag = True if osd_information["db_device"] else False
# Verify target block device isn't in use
block_osd = verifyOSDBlock(zkhandler, node, device)
if not block_osd or block_osd != osd_id:
return (
False,
'ERROR: Block device "{}" on node "{}" is not used by OSD "{}"; use replace instead'.format(
device, node, osd_id
),
)
# Tell the cluster to create a new OSD for the host
refresh_osd_string = "osd_refresh {},{},{},{}".format(
node, osd_id, device, ext_db_flag
)
zkhandler.write([("base.cmd.ceph", refresh_osd_string)])
# Wait 1/2 second for the cluster to get the message and start working
time.sleep(0.5)
# Acquire a read lock, so we get the return exclusively
with zkhandler.readlock("base.cmd.ceph"):
try:
result = zkhandler.read("base.cmd.ceph").split()[0]
if result == "success-osd_refresh":
message = (
'Refreshed OSD {} with block device "{}" on node "{}".'.format(
osd_id, device, node
)
)
success = True
else:
message = "ERROR: Failed to refresh OSD; check node logs for details."
success = False
except Exception:
message = "ERROR: Command ignored by node."
success = False
# Acquire a write lock to ensure things go smoothly
with zkhandler.writelock("base.cmd.ceph"):
time.sleep(0.5)
zkhandler.write([("base.cmd.ceph", "")])
return success, message
def remove_osd(zkhandler, osd_id, force_flag):
if not verifyOSD(zkhandler, osd_id):
return False, 'ERROR: No OSD with ID "{}" is present in the cluster.'.format(
osd_id
)
# Tell the cluster to remove an OSD
remove_osd_string = "osd_remove {}".format(osd_id)
remove_osd_string = "osd_remove {},{}".format(osd_id, str(force_flag))
zkhandler.write([("base.cmd.ceph", remove_osd_string)])
# Wait 1/2 second for the cluster to get the message and start working
time.sleep(0.5)
@ -375,7 +474,7 @@ def get_list_osd(zkhandler, limit, is_fuzzy=True):
for osd in full_osd_list:
if limit:
try:
if re.match(limit, osd):
if re.fullmatch(limit, osd):
osd_list.append(getOSDInformation(zkhandler, osd))
except Exception as e:
return False, "Regex Error: {}".format(e)
@ -393,12 +492,22 @@ def getPoolInformation(zkhandler, pool):
pool_stats_raw = zkhandler.read(("pool.stats", pool))
pool_stats = dict(json.loads(pool_stats_raw))
volume_count = len(getCephVolumes(zkhandler, pool))
tier = zkhandler.read(("pool.tier", pool))
if tier is None:
tier = "default"
pgs = zkhandler.read(("pool.pgs", pool))
pool_information = {"name": pool, "volume_count": volume_count, "stats": pool_stats}
pool_information = {
"name": pool,
"volume_count": volume_count,
"tier": tier,
"pgs": pgs,
"stats": pool_stats,
}
return pool_information
def add_pool(zkhandler, name, pgs, replcfg):
def add_pool(zkhandler, name, pgs, replcfg, tier=None):
# Prepare the copies/mincopies variables
try:
copies, mincopies = replcfg.split(",")
@ -408,60 +517,70 @@ def add_pool(zkhandler, name, pgs, replcfg):
copies = None
mincopies = None
if not copies or not mincopies:
return False, 'ERROR: Replication configuration "{}" is not valid.'.format(
replcfg
)
return False, f'ERROR: Replication configuration "{replcfg}" is not valid.'
# 1. Create the pool
# Prepare the tiers if applicable
if tier is not None and tier in ["hdd", "ssd", "nvme"]:
crush_rule = f"{tier}_tier"
# Create a CRUSH rule for the relevant tier
retcode, stdout, stderr = common.run_os_command(
f"ceph osd crush rule create-replicated {crush_rule} default host {tier}"
)
if retcode:
return (
False,
f"ERROR: Failed to create CRUSH rule {tier} for pool {name}: {stderr}",
)
else:
tier = "default"
crush_rule = "replicated"
# Create the pool
retcode, stdout, stderr = common.run_os_command(
"ceph osd pool create {} {} replicated".format(name, pgs)
f"ceph osd pool create {name} {pgs} {pgs} {crush_rule}"
)
if retcode:
return False, 'ERROR: Failed to create pool "{}" with {} PGs: {}'.format(
name, pgs, stderr
)
return False, f'ERROR: Failed to create pool "{name}" with {pgs} PGs: {stderr}'
# 2. Set the size and minsize
# Set the size and minsize
retcode, stdout, stderr = common.run_os_command(
"ceph osd pool set {} size {}".format(name, copies)
f"ceph osd pool set {name} size {copies}"
)
if retcode:
return False, 'ERROR: Failed to set pool "{}" size of {}: {}'.format(
name, copies, stderr
)
return False, f'ERROR: Failed to set pool "{name}" size of {copies}: {stderr}'
retcode, stdout, stderr = common.run_os_command(
"ceph osd pool set {} min_size {}".format(name, mincopies)
)
if retcode:
return False, 'ERROR: Failed to set pool "{}" minimum size of {}: {}'.format(
name, mincopies, stderr
)
# 3. Enable RBD application
retcode, stdout, stderr = common.run_os_command(
"ceph osd pool application enable {} rbd".format(name)
f"ceph osd pool set {name} min_size {mincopies}"
)
if retcode:
return (
False,
'ERROR: Failed to enable RBD application on pool "{}" : {}'.format(
name, stderr
),
f'ERROR: Failed to set pool "{name}" minimum size of {mincopies}: {stderr}',
)
# 4. Add the new pool to Zookeeper
# Enable RBD application
retcode, stdout, stderr = common.run_os_command(
f"ceph osd pool application enable {name} rbd"
)
if retcode:
return (
False,
f'ERROR: Failed to enable RBD application on pool "{name}" : {stderr}',
)
# Add the new pool to Zookeeper
zkhandler.write(
[
(("pool", name), ""),
(("pool.pgs", name), pgs),
(("pool.tier", name), tier),
(("pool.stats", name), "{}"),
(("volume", name), ""),
(("snapshot", name), ""),
]
)
return True, 'Created RBD pool "{}" with {} PGs'.format(name, pgs)
return True, f'Created RBD pool "{name}" with {pgs} PGs'
def remove_pool(zkhandler, name):
@ -493,19 +612,63 @@ def remove_pool(zkhandler, name):
return True, 'Removed RBD pool "{}" and all volumes.'.format(name)
def set_pgs_pool(zkhandler, name, pgs):
if not verifyPool(zkhandler, name):
return False, f'ERROR: No pool with name "{name}" is present in the cluster.'
# Validate new PGs count
pgs = int(pgs)
if (pgs == 0) or (pgs & (pgs - 1) != 0):
return (
False,
f'ERROR: Invalid PGs number "{pgs}": must be a non-zero power of 2.',
)
# Set the new pgs number
retcode, stdout, stderr = common.run_os_command(
f"ceph osd pool set {name} pg_num {pgs}"
)
if retcode:
return False, f"ERROR: Failed to set pg_num on pool {name} to {pgs}: {stderr}"
# Set the new pgps number if increasing
current_pgs = int(zkhandler.read(("pool.pgs", name)))
if current_pgs >= pgs:
retcode, stdout, stderr = common.run_os_command(
f"ceph osd pool set {name} pgp_num {pgs}"
)
if retcode:
return (
False,
f"ERROR: Failed to set pg_num on pool {name} to {pgs}: {stderr}",
)
# Update Zookeeper count
zkhandler.write(
[
(("pool.pgs", name), pgs),
]
)
return True, f'Set PGs count to {pgs} for RBD pool "{name}".'
def get_list_pool(zkhandler, limit, is_fuzzy=True):
full_pool_list = zkhandler.children("base.pool")
if limit:
if not is_fuzzy:
limit = "^" + limit + "$"
if is_fuzzy and limit:
# Implicitly assume fuzzy limits
if not re.match(r"\^.*", limit):
limit = ".*" + limit
if not re.match(r".*\$", limit):
limit = limit + ".*"
get_pool_info = dict()
for pool in full_pool_list:
is_limit_match = False
if limit:
try:
if re.match(limit, pool):
if re.fullmatch(limit, pool):
is_limit_match = True
except Exception as e:
return False, "Regex Error: {}".format(e)
@ -523,7 +686,7 @@ def get_list_pool(zkhandler, limit, is_fuzzy=True):
for future in futures:
pool_data_list.append(future.result())
return True, sorted(pool_data_list, key=lambda x: int(x["stats"]["id"]))
return True, sorted(pool_data_list, key=lambda x: int(x["stats"].get("id", 0)))
#
@ -848,15 +1011,12 @@ def get_list_volume(zkhandler, pool, limit, is_fuzzy=True):
full_volume_list = getCephVolumes(zkhandler, pool)
if limit:
if not is_fuzzy:
limit = "^" + limit + "$"
else:
# Implicitly assume fuzzy limits
if not re.match(r"\^.*", limit):
limit = ".*" + limit
if not re.match(r".*\$", limit):
limit = limit + ".*"
if is_fuzzy and limit:
# Implicitly assume fuzzy limits
if not re.match(r"\^.*", limit):
limit = ".*" + limit
if not re.match(r".*\$", limit):
limit = limit + ".*"
get_volume_info = dict()
for volume in full_volume_list:
@ -867,7 +1027,7 @@ def get_list_volume(zkhandler, pool, limit, is_fuzzy=True):
if limit:
# Try to match the limit against the volume name
try:
if re.match(limit, volume_name):
if re.fullmatch(limit, volume_name):
is_limit_match = True
except Exception as e:
return False, "Regex Error: {}".format(e)
@ -1073,7 +1233,7 @@ def get_list_snapshot(zkhandler, pool, volume, limit, is_fuzzy=True):
pool_name, volume_name = volume.split("/")
if limit:
try:
if re.match(limit, snapshot_name):
if re.fullmatch(limit, snapshot_name):
snapshot_list.append(
{
"pool": pool_name,

View File

@ -639,6 +639,8 @@ def findTargetNode(zkhandler, dom_uuid):
# Execute the search
if search_field == "mem":
return findTargetNodeMem(zkhandler, node_limit, dom_uuid)
if search_field == "memfree":
return findTargetNodeMemFree(zkhandler, node_limit, dom_uuid)
if search_field == "load":
return findTargetNodeLoad(zkhandler, node_limit, dom_uuid)
if search_field == "vcpus":
@ -677,7 +679,7 @@ def getNodes(zkhandler, node_limit, dom_uuid):
#
# via free memory (relative to allocated memory)
# via provisioned memory
#
def findTargetNodeMem(zkhandler, node_limit, dom_uuid):
most_provfree = 0
@ -698,6 +700,24 @@ def findTargetNodeMem(zkhandler, node_limit, dom_uuid):
return target_node
#
# via free memory
#
def findTargetNodeMemFree(zkhandler, node_limit, dom_uuid):
most_memfree = 0
target_node = None
node_list = getNodes(zkhandler, node_limit, dom_uuid)
for node in node_list:
memfree = int(zkhandler.read(("node.memory.free", node)))
if memfree > most_memfree:
most_memfree = memfree
target_node = node
return target_node
#
# via load average
#

View File

@ -0,0 +1 @@
{"version": "7", "root": "", "base": {"root": "", "schema": "/schema", "schema.version": "/schema/version", "config": "/config", "config.maintenance": "/config/maintenance", "config.primary_node": "/config/primary_node", "config.primary_node.sync_lock": "/config/primary_node/sync_lock", "config.upstream_ip": "/config/upstream_ip", "config.migration_target_selector": "/config/migration_target_selector", "cmd": "/cmd", "cmd.node": "/cmd/nodes", "cmd.domain": "/cmd/domains", "cmd.ceph": "/cmd/ceph", "logs": "/logs", "node": "/nodes", "domain": "/domains", "network": "/networks", "storage": "/ceph", "storage.util": "/ceph/util", "osd": "/ceph/osds", "pool": "/ceph/pools", "volume": "/ceph/volumes", "snapshot": "/ceph/snapshots"}, "logs": {"node": "", "messages": "/messages"}, "node": {"name": "", "keepalive": "/keepalive", "mode": "/daemonmode", "data.active_schema": "/activeschema", "data.latest_schema": "/latestschema", "data.static": "/staticdata", "data.pvc_version": "/pvcversion", "running_domains": "/runningdomains", "count.provisioned_domains": "/domainscount", "count.networks": "/networkscount", "state.daemon": "/daemonstate", "state.router": "/routerstate", "state.domain": "/domainstate", "cpu.load": "/cpuload", "vcpu.allocated": "/vcpualloc", "memory.total": "/memtotal", "memory.used": "/memused", "memory.free": "/memfree", "memory.allocated": "/memalloc", "memory.provisioned": "/memprov", "ipmi.hostname": "/ipmihostname", "ipmi.username": "/ipmiusername", "ipmi.password": "/ipmipassword", "sriov": "/sriov", "sriov.pf": "/sriov/pf", "sriov.vf": "/sriov/vf"}, "sriov_pf": {"phy": "", "mtu": "/mtu", "vfcount": "/vfcount"}, "sriov_vf": {"phy": "", "pf": "/pf", "mtu": "/mtu", "mac": "/mac", "phy_mac": "/phy_mac", "config": "/config", "config.vlan_id": "/config/vlan_id", "config.vlan_qos": "/config/vlan_qos", "config.tx_rate_min": "/config/tx_rate_min", "config.tx_rate_max": "/config/tx_rate_max", "config.spoof_check": "/config/spoof_check", "config.link_state": "/config/link_state", "config.trust": "/config/trust", "config.query_rss": "/config/query_rss", "pci": "/pci", "pci.domain": "/pci/domain", "pci.bus": "/pci/bus", "pci.slot": "/pci/slot", "pci.function": "/pci/function", "used": "/used", "used_by": "/used_by"}, "domain": {"name": "", "xml": "/xml", "state": "/state", "profile": "/profile", "stats": "/stats", "node": "/node", "last_node": "/lastnode", "failed_reason": "/failedreason", "storage.volumes": "/rbdlist", "console.log": "/consolelog", "console.vnc": "/vnc", "meta.autostart": "/node_autostart", "meta.migrate_method": "/migration_method", "meta.node_selector": "/node_selector", "meta.node_limit": "/node_limit", "meta.tags": "/tags", "migrate.sync_lock": "/migrate_sync_lock"}, "tag": {"name": "", "type": "/type", "protected": "/protected"}, "network": {"vni": "", "type": "/nettype", "mtu": "/mtu", "rule": "/firewall_rules", "rule.in": "/firewall_rules/in", "rule.out": "/firewall_rules/out", "nameservers": "/name_servers", "domain": "/domain", "reservation": "/dhcp4_reservations", "lease": "/dhcp4_leases", "ip4.gateway": "/ip4_gateway", "ip4.network": "/ip4_network", "ip4.dhcp": "/dhcp4_flag", "ip4.dhcp_start": "/dhcp4_start", "ip4.dhcp_end": "/dhcp4_end", "ip6.gateway": "/ip6_gateway", "ip6.network": "/ip6_network", "ip6.dhcp": "/dhcp6_flag"}, "reservation": {"mac": "", "ip": "/ipaddr", "hostname": "/hostname"}, "lease": {"mac": "", "ip": "/ipaddr", "hostname": "/hostname", "expiry": "/expiry", "client_id": "/clientid"}, "rule": {"description": "", "rule": "/rule", "order": "/order"}, "osd": {"id": "", "node": "/node", "device": "/device", "db_device": "/db_device", "stats": "/stats"}, "pool": {"name": "", "pgs": "/pgs", "tier": "/tier", "stats": "/stats"}, "volume": {"name": "", "stats": "/stats"}, "snapshot": {"name": "", "stats": "/stats"}}

View File

@ -0,0 +1 @@
{"version": "8", "root": "", "base": {"root": "", "schema": "/schema", "schema.version": "/schema/version", "config": "/config", "config.maintenance": "/config/maintenance", "config.primary_node": "/config/primary_node", "config.primary_node.sync_lock": "/config/primary_node/sync_lock", "config.upstream_ip": "/config/upstream_ip", "config.migration_target_selector": "/config/migration_target_selector", "cmd": "/cmd", "cmd.node": "/cmd/nodes", "cmd.domain": "/cmd/domains", "cmd.ceph": "/cmd/ceph", "logs": "/logs", "node": "/nodes", "domain": "/domains", "network": "/networks", "storage": "/ceph", "storage.util": "/ceph/util", "osd": "/ceph/osds", "pool": "/ceph/pools", "volume": "/ceph/volumes", "snapshot": "/ceph/snapshots"}, "logs": {"node": "", "messages": "/messages"}, "node": {"name": "", "keepalive": "/keepalive", "mode": "/daemonmode", "data.active_schema": "/activeschema", "data.latest_schema": "/latestschema", "data.static": "/staticdata", "data.pvc_version": "/pvcversion", "running_domains": "/runningdomains", "count.provisioned_domains": "/domainscount", "count.networks": "/networkscount", "state.daemon": "/daemonstate", "state.router": "/routerstate", "state.domain": "/domainstate", "cpu.load": "/cpuload", "vcpu.allocated": "/vcpualloc", "memory.total": "/memtotal", "memory.used": "/memused", "memory.free": "/memfree", "memory.allocated": "/memalloc", "memory.provisioned": "/memprov", "ipmi.hostname": "/ipmihostname", "ipmi.username": "/ipmiusername", "ipmi.password": "/ipmipassword", "sriov": "/sriov", "sriov.pf": "/sriov/pf", "sriov.vf": "/sriov/vf"}, "sriov_pf": {"phy": "", "mtu": "/mtu", "vfcount": "/vfcount"}, "sriov_vf": {"phy": "", "pf": "/pf", "mtu": "/mtu", "mac": "/mac", "phy_mac": "/phy_mac", "config": "/config", "config.vlan_id": "/config/vlan_id", "config.vlan_qos": "/config/vlan_qos", "config.tx_rate_min": "/config/tx_rate_min", "config.tx_rate_max": "/config/tx_rate_max", "config.spoof_check": "/config/spoof_check", "config.link_state": "/config/link_state", "config.trust": "/config/trust", "config.query_rss": "/config/query_rss", "pci": "/pci", "pci.domain": "/pci/domain", "pci.bus": "/pci/bus", "pci.slot": "/pci/slot", "pci.function": "/pci/function", "used": "/used", "used_by": "/used_by"}, "domain": {"name": "", "xml": "/xml", "state": "/state", "profile": "/profile", "stats": "/stats", "node": "/node", "last_node": "/lastnode", "failed_reason": "/failedreason", "storage.volumes": "/rbdlist", "console.log": "/consolelog", "console.vnc": "/vnc", "meta.autostart": "/node_autostart", "meta.migrate_method": "/migration_method", "meta.node_selector": "/node_selector", "meta.node_limit": "/node_limit", "meta.tags": "/tags", "migrate.sync_lock": "/migrate_sync_lock"}, "tag": {"name": "", "type": "/type", "protected": "/protected"}, "network": {"vni": "", "type": "/nettype", "mtu": "/mtu", "rule": "/firewall_rules", "rule.in": "/firewall_rules/in", "rule.out": "/firewall_rules/out", "nameservers": "/name_servers", "domain": "/domain", "reservation": "/dhcp4_reservations", "lease": "/dhcp4_leases", "ip4.gateway": "/ip4_gateway", "ip4.network": "/ip4_network", "ip4.dhcp": "/dhcp4_flag", "ip4.dhcp_start": "/dhcp4_start", "ip4.dhcp_end": "/dhcp4_end", "ip6.gateway": "/ip6_gateway", "ip6.network": "/ip6_network", "ip6.dhcp": "/dhcp6_flag"}, "reservation": {"mac": "", "ip": "/ipaddr", "hostname": "/hostname"}, "lease": {"mac": "", "ip": "/ipaddr", "hostname": "/hostname", "expiry": "/expiry", "client_id": "/clientid"}, "rule": {"description": "", "rule": "/rule", "order": "/order"}, "osd": {"id": "", "node": "/node", "device": "/device", "db_device": "/db_device", "fsid": "/fsid", "ofsid": "/fsid/osd", "cfsid": "/fsid/cluster", "lvm": "/lvm", "vg": "/lvm/vg", "lv": "/lvm/lv", "stats": "/stats"}, "pool": {"name": "", "pgs": "/pgs", "tier": "/tier", "stats": "/stats"}, "volume": {"name": "", "stats": "/stats"}, "snapshot": {"name": "", "stats": "/stats"}}

View File

@ -665,16 +665,20 @@ def get_list(zkhandler, limit, is_fuzzy=True):
net_list = []
full_net_list = zkhandler.children("base.network")
if is_fuzzy and limit:
# Implicitly assume fuzzy limits
if not re.match(r"\^.*", limit):
limit = ".*" + limit
if not re.match(r".*\$", limit):
limit = limit + ".*"
for net in full_net_list:
description = zkhandler.read(("network", net))
if limit:
try:
if not is_fuzzy:
limit = "^" + limit + "$"
if re.match(limit, net):
if re.fullmatch(limit, net):
net_list.append(getNetworkInformation(zkhandler, net))
if re.match(limit, description):
if re.fullmatch(limit, description):
net_list.append(getNetworkInformation(zkhandler, net))
except Exception as e:
return False, "Regex Error: {}".format(e)
@ -700,25 +704,19 @@ def get_list_dhcp(zkhandler, network, limit, only_static=False, is_fuzzy=True):
full_dhcp_list = getNetworkDHCPReservations(zkhandler, net_vni)
full_dhcp_list += getNetworkDHCPLeases(zkhandler, net_vni)
if limit:
try:
if not is_fuzzy:
limit = "^" + limit + "$"
# Implcitly assume fuzzy limits
if not re.match(r"\^.*", limit):
limit = ".*" + limit
if not re.match(r".*\$", limit):
limit = limit + ".*"
except Exception as e:
return False, "Regex Error: {}".format(e)
if is_fuzzy and limit:
# Implicitly assume fuzzy limits
if not re.match(r"\^.*", limit):
limit = ".*" + limit
if not re.match(r".*\$", limit):
limit = limit + ".*"
for lease in full_dhcp_list:
valid_lease = False
if limit:
if re.match(limit, lease):
if re.fullmatch(limit, lease):
valid_lease = True
if re.match(limit, lease):
if re.fullmatch(limit, lease):
valid_lease = True
else:
valid_lease = True
@ -748,23 +746,17 @@ def get_list_acl(zkhandler, network, limit, direction, is_fuzzy=True):
acl_list = []
full_acl_list = getNetworkACLs(zkhandler, net_vni, direction)
if limit:
try:
if not is_fuzzy:
limit = "^" + limit + "$"
# Implcitly assume fuzzy limits
if not re.match(r"\^.*", limit):
limit = ".*" + limit
if not re.match(r".*\$", limit):
limit = limit + ".*"
except Exception as e:
return False, "Regex Error: {}".format(e)
if is_fuzzy and limit:
# Implicitly assume fuzzy limits
if not re.match(r"\^.*", limit):
limit = ".*" + limit
if not re.match(r".*\$", limit):
limit = limit + ".*"
for acl in full_acl_list:
valid_acl = False
if limit:
if re.match(limit, acl["description"]):
if re.fullmatch(limit, acl["description"]):
valid_acl = True
else:
valid_acl = True

View File

@ -91,7 +91,7 @@ def secondary_node(zkhandler, node):
if daemon_mode == "hypervisor":
return (
False,
'ERROR: Cannot change router mode on non-coordinator node "{}"'.format(
'ERROR: Cannot change coordinator mode on non-coordinator node "{}"'.format(
node
),
)
@ -104,9 +104,9 @@ def secondary_node(zkhandler, node):
# Get current state
current_state = zkhandler.read(("node.state.router", node))
if current_state == "secondary":
return True, 'Node "{}" is already in secondary router mode.'.format(node)
return True, 'Node "{}" is already in secondary coordinator mode.'.format(node)
retmsg = "Setting node {} in secondary router mode.".format(node)
retmsg = "Setting node {} in secondary coordinator mode.".format(node)
zkhandler.write([("base.config.primary_node", "none")])
return True, retmsg
@ -124,7 +124,7 @@ def primary_node(zkhandler, node):
if daemon_mode == "hypervisor":
return (
False,
'ERROR: Cannot change router mode on non-coordinator node "{}"'.format(
'ERROR: Cannot change coordinator mode on non-coordinator node "{}"'.format(
node
),
)
@ -137,9 +137,9 @@ def primary_node(zkhandler, node):
# Get current state
current_state = zkhandler.read(("node.state.router", node))
if current_state == "primary":
return True, 'Node "{}" is already in primary router mode.'.format(node)
return True, 'Node "{}" is already in primary coordinator mode.'.format(node)
retmsg = "Setting node {} in primary router mode.".format(node)
retmsg = "Setting node {} in primary coordinator mode.".format(node)
zkhandler.write([("base.config.primary_node", node)])
return True, retmsg
@ -236,14 +236,19 @@ def get_list(
):
node_list = []
full_node_list = zkhandler.children("base.node")
full_node_list.sort()
if is_fuzzy and limit:
# Implicitly assume fuzzy limits
if not re.match(r"\^.*", limit):
limit = ".*" + limit
if not re.match(r".*\$", limit):
limit = limit + ".*"
for node in full_node_list:
if limit:
try:
if not is_fuzzy:
limit = "^" + limit + "$"
if re.match(limit, node):
if re.fullmatch(limit, node):
node_list.append(getNodeInformation(zkhandler, node))
except Exception as e:
return False, "Regex Error: {}".format(e)

View File

@ -1193,6 +1193,7 @@ def get_list(zkhandler, node, state, tag, limit, is_fuzzy=True, negate=False):
return False, 'VM state "{}" is not valid.'.format(state)
full_vm_list = zkhandler.children("base.domain")
full_vm_list.sort()
# Set our limit to a sensible regex
if limit:
@ -1227,9 +1228,9 @@ def get_list(zkhandler, node, state, tag, limit, is_fuzzy=True, negate=False):
if limit:
# Try to match the limit against the UUID (if applicable) and name
try:
if is_limit_uuid and re.match(limit, vm):
if is_limit_uuid and re.fullmatch(limit, vm):
is_limit_match = True
if re.match(limit, name):
if re.fullmatch(limit, name):
is_limit_match = True
except Exception as e:
return False, "Regex Error: {}".format(e)
@ -1291,4 +1292,4 @@ def get_list(zkhandler, node, state, tag, limit, is_fuzzy=True, negate=False):
except Exception:
pass
return True, vm_data_list
return True, sorted(vm_data_list, key=lambda d: d["name"])

View File

@ -540,7 +540,7 @@ class ZKHandler(object):
#
class ZKSchema(object):
# Current version
_version = 6
_version = 8
# Root for doing nested keys
_schema_root = ""
@ -700,10 +700,21 @@ class ZKSchema(object):
"node": "/node",
"device": "/device",
"db_device": "/db_device",
"fsid": "/fsid",
"ofsid": "/fsid/osd",
"cfsid": "/fsid/cluster",
"lvm": "/lvm",
"vg": "/lvm/vg",
"lv": "/lvm/lv",
"stats": "/stats",
},
# The schema of an individual pool entry (/ceph/pools/{pool_name})
"pool": {"name": "", "pgs": "/pgs", "stats": "/stats"}, # The root key
"pool": {
"name": "",
"pgs": "/pgs",
"tier": "/tier",
"stats": "/stats",
}, # The root key
# The schema of an individual volume entry (/ceph/volumes/{pool_name}/{volume_name})
"volume": {"name": "", "stats": "/stats"}, # The root key
# The schema of an individual snapshot entry (/ceph/volumes/{pool_name}/{volume_name}/{snapshot_name})
@ -938,8 +949,13 @@ class ZKSchema(object):
kpath = f"{elem}.{ikey}"
# Validate that the key exists for that child
if not zkhandler.zk_conn.exists(self.path(kpath, child)):
if elem == "pool" and ikey == "tier":
default_data = "default"
else:
default_data = ""
zkhandler.zk_conn.create(
self.path(kpath, child), "".encode(zkhandler.encoding)
self.path(kpath, child),
default_data.encode(zkhandler.encoding),
)
# Continue for child keys under network (reservation, acl)

85
debian/changelog vendored
View File

@ -1,3 +1,88 @@
pvc (0.9.55-0) unstable; urgency=high
* Fixes a problem with the literal eval handler in the provisioner (again)
* Fixes a potential log deadlock in Zookeeper-lost situations when doing keepalives
-- Joshua M. Boniface <joshua@boniface.me> Tue, 04 Oct 2022 13:21:40 -0400
pvc (0.9.54-0) unstable; urgency=high
[CLI Client] Fixes a bad variable reference from the previous change
[API Daemon] Enables TLSv1 with an SSLContext object for maximum compatibility
-- Joshua M. Boniface <joshua@boniface.me> Tue, 23 Aug 2022 11:01:05 -0400
pvc (0.9.53-0) unstable; urgency=high
* [API] Fixes sort order of VM list (for real this time)
-- Joshua M. Boniface <joshua@boniface.me> Fri, 12 Aug 2022 17:47:11 -0400
pvc (0.9.52-0) unstable; urgency=high
* [CLI] Fixes a bug with vm modify not requiring a cluster
* [Docs] Adds a reference to the bootstrap daemon
* [API] Adds sorting to node and VM lists for consistency
* [Node Daemon/API] Adds kb_ stats values for OSD stats
-- Joshua M. Boniface <joshua@boniface.me> Fri, 12 Aug 2022 11:09:25 -0400
pvc (0.9.51-0) unstable; urgency=high
* [CLI Client] Fixes a faulty literal_eval when viewing task status
* [CLI Client] Adds a confirmation flag to the vm disable command
* [Node Daemon] Removes the pvc-flush service
-- Joshua M. Boniface <joshua@boniface.me> Mon, 25 Jul 2022 23:25:41 -0400
pvc (0.9.50-0) unstable; urgency=high
* [Node Daemon/API/CLI] Adds free memory node selector
* [Node Daemon] Fixes bug sending space-containing detect disk strings
-- Joshua M. Boniface <joshua@boniface.me> Wed, 06 Jul 2022 16:01:14 -0400
pvc (0.9.49-0) unstable; urgency=high
* [Node Daemon] Fixes bugs with OSD stat population on creation
* [Node Daemon/API] Adds additional information to Zookeeper about OSDs
* [Node Daemon] Refactors OSD removal for improved safety
* [Node Daemon/API/CLI] Adds explicit support for replacing and refreshing (reimporting) OSDs
* [API/CLI] Fixes a language inconsistency about "router mode"
-- Joshua M. Boniface <joshua@boniface.me> Fri, 06 May 2022 15:49:39 -0400
pvc (0.9.48-0) unstable; urgency=high
* [CLI] Fixes situation where only a single cluster is available
* [CLI/API/Daemon] Allows forcing of OSD removal ignoring errors
* [CLI] Fixes bug where down OSDs are not displayed
-- Joshua M. Boniface <joshua@boniface.me> Fri, 29 Apr 2022 12:13:58 -0400
pvc (0.9.47-0) unstable; urgency=high
* [Node Daemon/API/CLI] Adds Ceph pool device class/tier support
* [API] Fixes a bug returning values if a Ceph pool has not yet reported stats
* [API/CLI] Adds PGs count to the pool list output
* [API/CLI] Adds Ceph pool PGs count adjustment support
-- Joshua M. Boniface <joshua@boniface.me> Tue, 28 Dec 2021 22:01:22 -0500
pvc (0.9.46-0) unstable; urgency=high
* [API] Fixes bugs with legacy benchmark display
* [API] Fixes a bug around cloned image sizes
* [API] Removes extraneous message text in provisioner create command
* [API] Corrects bugs around fuzzy matching
* [CLI] Adds auditing for PVC CLI to local syslog
* [CLI] Adds --yes bypass for benchmark command
* [Node Daemon/API/CLI] Adds support for "detect" strings when specifying OSD or OSDDB devices
* [Node Daemon] Fixes a bug when removing OSDs
* [Node Daemon] Fixes a single-node cluster shutdown deadlock
-- Joshua M. Boniface <joshua@boniface.me> Tue, 28 Dec 2021 15:02:14 -0500
pvc (0.9.45-0) unstable; urgency=high
* [Node Daemon] Fixes an ordering issue with pvcnoded.service

View File

@ -3,5 +3,4 @@ node-daemon/pvcnoded.sample.yaml etc/pvc
node-daemon/pvcnoded usr/share/pvc
node-daemon/pvcnoded.service lib/systemd/system
node-daemon/pvc.target lib/systemd/system
node-daemon/pvc-flush.service lib/systemd/system
node-daemon/monitoring usr/share/pvc

View File

@ -7,11 +7,6 @@ systemctl daemon-reload
systemctl enable /lib/systemd/system/pvcnoded.service
systemctl enable /lib/systemd/system/pvc.target
# Inform administrator of the autoflush daemon if it is not enabled
if ! systemctl is-active --quiet pvc-flush.service; then
echo "NOTE: The PVC autoflush daemon (pvc-flush.service) is not enabled by default; enable it to perform automatic flush/unflush actions on host shutdown/startup."
fi
# Inform administrator of the service restart/startup not occurring automatically
if systemctl is-active --quiet pvcnoded.service; then
echo "NOTE: The PVC node daemon (pvcnoded.service) has not been restarted; this is up to the administrator."

View File

@ -18,7 +18,7 @@ As a consequence of its features, PVC makes administrating very high-uptime VMs
PVC also features an optional, fully customizable VM provisioning framework, designed to automate and simplify VM deployments using custom provisioning profiles, scripts, and CloudInit userdata API support.
Installation of PVC is accomplished by two main components: a [Node installer ISO](https://github.com/parallelvirtualcluster/pvc-installer) which creates on-demand installer ISOs, and an [Ansible role framework](https://github.com/parallelvirtualcluster/pvc-ansible) to configure, bootstrap, and administrate the nodes. Once up, the cluster is managed via an HTTP REST API, accessible via a Python Click CLI client or WebUI.
Installation of PVC is accomplished by two main components: a [Node installer ISO](https://github.com/parallelvirtualcluster/pvc-installer) which creates on-demand installer ISOs, and an [Ansible role framework](https://github.com/parallelvirtualcluster/pvc-ansible) to configure, bootstrap, and administrate the nodes. Installation can also be fully automated with a companion [cluster bootstrapping system](https://github.com/parallelvirtualcluster/pvc-bootstrap). Once up, the cluster is managed via an HTTP REST API, accessible via a Python Click CLI client or WebUI.
Just give it physical servers, and it will run your VMs without you having to think about it, all in just an hour or two of setup time.

View File

@ -353,7 +353,19 @@ The password for the PVC node daemon to log in to the IPMI interface.
* *required*
The selector algorithm to use when migrating hosts away from the node. Valid `selector` values are: `mem`: the node with the least allocated VM memory; `vcpus`: the node with the least allocated VM vCPUs; `load`: the node with the least current load average; `vms`: the node with the least number of provisioned VMs.
The default selector algorithm to use when migrating VMs away from a node; individual VMs can override this default.
Valid `target_selector` values are:
* `mem`: choose the node with the least provisioned VM memory
* `memfree`: choose the node with the most (real) free memory
* `vcpus`: choose the node with the least allocated VM vCPUs
* `load`: choose the node with the lowest current load average
* `vms`: choose the node with the least number of provisioned VMs
For most clusters, `mem` should be sufficient, but others may be used based on the cluster workload and available resources. The following caveats should be considered:
* `mem` looks at the provisioned memory, not the allocated memory; thus, stopped or disabled VMs are counted towards a node's memory for this selector, even though their memory is not actively in use.
* `memfree` looks at the free memory of the node in general, ignoring the amount provisioned to VMs; if any VM's internal memory usage changes, this value would be affected. This might be preferable to `mem` on clusters with very high memory utilization versus total capacity or if many VMs are stopped/disabled.
* `load` looks at the system load of the node in general, ignoring load in any particular VMs; if any VM's CPU usage changes, this value would be affected. This might be preferable on clusters with some very CPU intensive VMs.
#### `system` → `configuration` → `directories` → `dynamic_directory`

View File

@ -192,7 +192,7 @@
"type": "array"
},
"node_selector": {
"description": "The selector used to determine candidate nodes during migration",
"description": "The selector used to determine candidate nodes during migration; see 'target_selector' in the node daemon configuration reference",
"type": "string"
}
},
@ -664,6 +664,10 @@
"description": "The name of the pool",
"type": "string"
},
"pgs": {
"description": "The number of PGs (placement groups) for the pool",
"type": "integer"
},
"stats": {
"properties": {
"free_bytes": {
@ -729,6 +733,10 @@
},
"type": "object"
},
"tier": {
"description": "The device class/tier of the pool",
"type": "string"
},
"volume_count": {
"description": "The number of volumes in the pool",
"type": "integer"
@ -1406,7 +1414,7 @@
"type": "array"
},
"node_selector": {
"description": "The selector used to determine candidate nodes during migration",
"description": "The selector used to determine candidate nodes during migration; see 'target_selector' in the node daemon configuration reference",
"type": "string"
},
"profile": {
@ -5034,7 +5042,7 @@
"type": "string"
},
{
"description": "The block device (e.g. \"/dev/sdb\", \"/dev/disk/by-path/...\", etc.) to create the OSD on",
"description": "The block device (e.g. \"/dev/sdb\", \"/dev/disk/by-path/...\", etc.) or detect string (\"detect:NAME:SIZE:ID\") to create the OSD on",
"in": "query",
"name": "device",
"required": true,
@ -5086,6 +5094,13 @@
"delete": {
"description": "Note: This task may take up to 30s to complete and return<br/>Warning: This operation may have unintended consequences for the storage cluster; ensure the cluster can support removing the OSD before proceeding",
"parameters": [
{
"description": "Force removal even if some step(s) fail",
"in": "query",
"name": "force",
"required": "flase",
"type": "boolean"
},
{
"description": "A confirmation string to ensure that the API consumer really means it",
"in": "query",
@ -5133,6 +5148,73 @@
"tags": [
"storage / ceph"
]
},
"post": {
"description": "Note: This task may take up to 30s to complete and return",
"parameters": [
{
"description": "The block device (e.g. \"/dev/sdb\", \"/dev/disk/by-path/...\", etc.) or detect string (\"detect:NAME:SIZE:ID\") to replace the OSD onto",
"in": "query",
"name": "device",
"required": true,
"type": "string"
},
{
"description": "The Ceph CRUSH weight for the replaced OSD",
"in": "query",
"name": "weight",
"required": true,
"type": "number"
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/Message"
}
},
"400": {
"description": "Bad request",
"schema": {
"$ref": "#/definitions/Message"
}
}
},
"summary": "Replace a Ceph OSD in the cluster",
"tags": [
"storage / ceph"
]
},
"put": {
"description": "Note: This task may take up to 30s to complete and return",
"parameters": [
{
"description": "The block device (e.g. \"/dev/sdb\", \"/dev/disk/by-path/...\", etc.) or detect string (\"detect:NAME:SIZE:ID\") that the OSD should be using",
"in": "query",
"name": "device",
"required": true,
"type": "string"
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/Message"
}
},
"400": {
"description": "Bad request",
"schema": {
"$ref": "#/definitions/Message"
}
}
},
"summary": "Refresh (reimport) a Ceph OSD in the cluster",
"tags": [
"storage / ceph"
]
}
},
"/api/v1/storage/ceph/osd/{osdid}/state": {
@ -5194,7 +5276,7 @@
"type": "string"
},
{
"description": "The block device (e.g. \"/dev/sdb\", \"/dev/disk/by-path/...\", etc.) to create the OSD DB volume group on",
"description": "The block device (e.g. \"/dev/sdb\", \"/dev/disk/by-path/...\", etc.) or detect string (\"detect:NAME:SIZE:ID\") to create the OSD DB volume group on",
"in": "query",
"name": "device",
"required": true,
@ -5272,6 +5354,12 @@
"name": "replcfg",
"required": true,
"type": "string"
},
{
"description": "The device tier for the pool (hdd, ssd, nvme, or default)",
"in": "query",
"name": "tier",
"required": false
}
],
"responses": {
@ -5368,6 +5456,12 @@
"name": "replcfg",
"required": true,
"type": "string"
},
{
"description": "The device tier for the pool (hdd, ssd, nvme, or default)",
"in": "query",
"name": "tier",
"required": false
}
],
"responses": {
@ -5394,6 +5488,42 @@
"tags": [
"storage / ceph"
]
},
"put": {
"description": "",
"parameters": [
{
"description": "The new number of placement groups (PGs) for the pool",
"in": "query",
"name": "pgs",
"required": true,
"type": "integer"
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/Message"
}
},
"400": {
"description": "Bad request",
"schema": {
"$ref": "#/definitions/Message"
}
},
"404": {
"description": "Not found",
"schema": {
"$ref": "#/definitions/Message"
}
}
},
"summary": "Adjust Ceph pool {pool}'s placement group count",
"tags": [
"storage / ceph"
]
}
},
"/api/v1/storage/ceph/snapshot": {
@ -6043,13 +6173,15 @@
"type": "string"
},
{
"default": "mem",
"description": "The selector used to determine candidate nodes during migration",
"default": "none",
"description": "The selector used to determine candidate nodes during migration; see 'target_selector' in the node daemon configuration reference",
"enum": [
"mem",
"memfree",
"vcpus",
"load",
"vms"
"vms",
"none (cluster default)"
],
"in": "query",
"name": "selector",
@ -6200,10 +6332,11 @@
"type": "string"
},
{
"default": "mem",
"description": "The selector used to determine candidate nodes during migration",
"default": "none",
"description": "The selector used to determine candidate nodes during migration; see 'target_selector' in the node daemon configuration reference",
"enum": [
"mem",
"memfree",
"vcpus",
"load",
"vms",
@ -6461,12 +6594,14 @@
"type": "string"
},
{
"description": "The selector used to determine candidate nodes during migration",
"description": "The selector used to determine candidate nodes during migration; see 'target_selector' in the node daemon configuration reference",
"enum": [
"mem",
"memfree",
"vcpus",
"load",
"vms"
"vms",
"none (cluster default)"
],
"in": "query",
"name": "selector",

View File

@ -1,20 +0,0 @@
# Parallel Virtual Cluster autoflush daemon
[Unit]
Description = Parallel Virtual Cluster autoflush daemon
After = pvcnoded.service pvcapid.service zookeeper.service libvirtd.service ssh.service ceph.target network-online.target
Wants = pvcnoded.service
PartOf = pvc.target
[Service]
Type = oneshot
RemainAfterExit = true
WorkingDirectory = /usr/share/pvc
TimeoutSec = 30min
ExecStartPre = /bin/sleep 30
ExecStart = /usr/bin/pvc -c local node unflush --wait
ExecStop = /usr/bin/pvc -c local node flush --wait
ExecStopPost = /bin/sleep 5
[Install]
WantedBy = pvc.target

View File

@ -122,7 +122,7 @@ pvc:
pass: Passw0rd
# migration: Migration option configuration
migration:
# target_selector: Criteria to select the ideal migration target, options: mem, load, vcpus, vms
# target_selector: Criteria to select the ideal migration target, options: mem, memfree, load, vcpus, vms
target_selector: mem
# configuration: Local system configurations
configuration:

View File

@ -48,7 +48,7 @@ import re
import json
# Daemon version
version = "0.9.45"
version = "0.9.55"
##########################################################
@ -233,11 +233,14 @@ def entrypoint():
# Force into secondary coordinator state if needed
try:
if this_node.router_state == "primary":
if this_node.router_state == "primary" and len(d_node) > 1:
zkhandler.write([("base.config.primary_node", "none")])
logger.out("Waiting for primary migration", state="s")
while this_node.router_state != "secondary":
timeout = 240
count = 0
while this_node.router_state != "secondary" and count < timeout:
sleep(0.5)
count += 1
except Exception:
pass
@ -940,7 +943,9 @@ def entrypoint():
# Add any missing OSDs to the list
for osd in [osd for osd in new_osd_list if osd not in osd_list]:
d_osd[osd] = CephInstance.CephOSDInstance(zkhandler, this_node, osd)
d_osd[osd] = CephInstance.CephOSDInstance(
zkhandler, logger, this_node, osd
)
# Remove any deleted OSDs from the list
for osd in [osd for osd in osd_list if osd not in new_osd_list]:
@ -960,7 +965,9 @@ def entrypoint():
# Add any missing pools to the list
for pool in [pool for pool in new_pool_list if pool not in pool_list]:
d_pool[pool] = CephInstance.CephPoolInstance(zkhandler, this_node, pool)
d_pool[pool] = CephInstance.CephPoolInstance(
zkhandler, logger, this_node, pool
)
# Prepare the volume components for this pool
volume_list[pool] = list()
d_volume[pool] = dict()
@ -990,7 +997,7 @@ def entrypoint():
if volume not in volume_list[pool]
]:
d_volume[pool][volume] = CephInstance.CephVolumeInstance(
zkhandler, this_node, pool, volume
zkhandler, logger, this_node, pool, volume
)
# Remove any deleted volumes from the list

View File

@ -21,21 +21,92 @@
import time
import json
import psutil
import daemon_lib.common as common
from distutils.util import strtobool
from re import search
from re import search, match, sub
def get_detect_device(detect_string):
"""
Parses a "detect:" string into a normalized block device path using lsscsi.
A detect string is formatted "detect:<NAME>:<SIZE>:<ID>", where
NAME is some unique identifier in lsscsi, SIZE is a human-readable
size value to within +/- 3% of the real size of the device, and
ID is the Nth (0-indexed) matching entry of that NAME and SIZE.
"""
_, name, size, idd = detect_string.split(":")
if _ != "detect":
return None
retcode, stdout, stderr = common.run_os_command("lsscsi -s")
if retcode:
print(f"Failed to run lsscsi: {stderr}")
return None
# Get valid lines
lsscsi_lines_raw = stdout.split("\n")
lsscsi_lines = list()
for line in lsscsi_lines_raw:
if not line:
continue
split_line = line.split()
if split_line[1] != "disk":
continue
lsscsi_lines.append(line)
# Handle size determination (+/- 3%)
lsscsi_sizes = set()
for line in lsscsi_lines:
lsscsi_sizes.add(split_line[-1])
for l_size in lsscsi_sizes:
b_size = float(sub(r"\D.", "", size))
t_size = float(sub(r"\D.", "", l_size))
plusthreepct = t_size * 1.03
minusthreepct = t_size * 0.97
if b_size > minusthreepct and b_size < plusthreepct:
size = l_size
break
blockdev = None
matches = list()
for idx, line in enumerate(lsscsi_lines):
# Skip non-disk entries
if line.split()[1] != "disk":
continue
# Skip if name is not contained in the line (case-insensitive)
if name.lower() not in line.lower():
continue
# Skip if the size does not match
if size != line.split()[-1]:
continue
# Get our blockdev and append to the list
matches.append(line.split()[-2])
blockdev = None
# Find the blockdev at index {idd}
for idx, _blockdev in enumerate(matches):
if int(idx) == int(idd):
blockdev = _blockdev
break
return blockdev
class CephOSDInstance(object):
def __init__(self, zkhandler, this_node, osd_id):
def __init__(self, zkhandler, logger, this_node, osd_id):
self.zkhandler = zkhandler
self.logger = logger
self.this_node = this_node
self.osd_id = osd_id
self.node = None
self.size = None
self.device = None
self.vg = None
self.lv = None
self.stats = dict()
@self.zkhandler.zk_conn.DataWatch(
@ -72,10 +143,141 @@ class CephOSDInstance(object):
if data and data != self.stats:
self.stats = json.loads(data)
@self.zkhandler.zk_conn.DataWatch(
self.zkhandler.schema.path("osd.device", self.osd_id)
)
def watch_osd_device(data, stat, event=""):
if event and event.type == "DELETED":
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode("ascii")
except AttributeError:
data = ""
if data and data != self.device:
self.device = data
# Exception conditional for migration from schema v7 to schema v8
try:
@self.zkhandler.zk_conn.DataWatch(
self.zkhandler.schema.path("osd.vg", self.osd_id)
)
def watch_osd_vg(data, stat, event=""):
if event and event.type == "DELETED":
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode("ascii")
except AttributeError:
data = ""
if data and data != self.vg:
self.vg = data
@self.zkhandler.zk_conn.DataWatch(
self.zkhandler.schema.path("osd.lv", self.osd_id)
)
def watch_osd_lv(data, stat, event=""):
if event and event.type == "DELETED":
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode("ascii")
except AttributeError:
data = ""
if data and data != self.lv:
self.lv = data
if self.node == self.this_node.name:
self.update_information()
except TypeError:
return
def update_information(self):
if self.vg is not None and self.lv is not None:
find_device = f"/dev/{self.vg}/{self.lv}"
else:
find_device = self.device
self.logger.out(
f"Updating stored disk information for OSD {self.osd_id}",
state="i",
)
retcode, stdout, stderr = common.run_os_command(
f"ceph-volume lvm list {find_device}"
)
osd_blockdev = None
osd_fsid = None
osd_clusterfsid = None
osd_device = None
for line in stdout.split("\n"):
if "block device" in line:
osd_blockdev = line.split()[-1]
if "osd fsid" in line:
osd_fsid = line.split()[-1]
if "cluster fsid" in line:
osd_clusterfsid = line.split()[-1]
if "devices" in line:
osd_device = line.split()[-1]
if not osd_blockdev or not osd_fsid or not osd_clusterfsid or not osd_device:
self.logger.out(
f"Failed to find updated OSD information via ceph-volume for {find_device}",
state="e",
)
return
# Split OSD blockdev into VG and LV components
# osd_blockdev = /dev/ceph-<uuid>/osd-block-<uuid>
_, _, osd_vg, osd_lv = osd_blockdev.split("/")
# Except for potentially the "osd.device", this should never change, but this ensures
# that the data is added at lease once on initialization for existing OSDs.
self.zkhandler.write(
[
(("osd.device", self.osd_id), osd_device),
(("osd.fsid", self.osd_id), ""),
(("osd.ofsid", self.osd_id), osd_fsid),
(("osd.cfsid", self.osd_id), osd_clusterfsid),
(("osd.lvm", self.osd_id), ""),
(("osd.vg", self.osd_id), osd_vg),
(("osd.lv", self.osd_id), osd_lv),
]
)
self.device = osd_device
self.vg = osd_vg
self.lv = osd_lv
@staticmethod
def add_osd(
zkhandler, logger, node, device, weight, ext_db_flag=False, ext_db_ratio=0.05
):
# Handle a detect device if that is passed
if match(r"detect:", device):
ddevice = get_detect_device(device)
if ddevice is None:
logger.out(
f"Failed to determine block device from detect string {device}",
state="e",
)
return False
else:
logger.out(
f"Determined block device {ddevice} from detect string {device}",
state="i",
)
device = ddevice
# We are ready to create a new OSD on this node
logger.out("Creating new OSD disk on block device {}".format(device), state="i")
try:
@ -145,24 +347,39 @@ class CephOSDInstance(object):
print(stderr)
raise Exception
# 4a. Get OSD FSID
# 4a. Get OSD information
logger.out(
"Getting OSD FSID for ID {} on {}".format(osd_id, device), state="i"
"Getting OSD information for ID {} on {}".format(osd_id, device),
state="i",
)
retcode, stdout, stderr = common.run_os_command(
"ceph-volume lvm list {device}".format(device=device)
)
for line in stdout.split("\n"):
if "block device" in line:
osd_blockdev = line.split()[-1]
if "osd fsid" in line:
osd_fsid = line.split()[-1]
if "cluster fsid" in line:
osd_clusterfsid = line.split()[-1]
if "devices" in line:
osd_device = line.split()[-1]
if not osd_fsid:
print("ceph-volume lvm list")
print("Could not find OSD fsid in data:")
print("Could not find OSD information in data:")
print(stdout)
print(stderr)
raise Exception
# Split OSD blockdev into VG and LV components
# osd_blockdev = /dev/ceph-<uuid>/osd-block-<uuid>
_, _, osd_vg, osd_lv = osd_blockdev.split("/")
# Reset whatever we were given to Ceph's /dev/xdX naming
if device != osd_device:
device = osd_device
# 4b. Activate the OSD
logger.out("Activating new OSD disk with ID {}".format(osd_id), state="i")
retcode, stdout, stderr = common.run_os_command(
@ -190,6 +407,7 @@ class CephOSDInstance(object):
print(stdout)
print(stderr)
raise Exception
time.sleep(0.5)
# 6. Verify it started
@ -212,7 +430,16 @@ class CephOSDInstance(object):
(("osd.node", osd_id), node),
(("osd.device", osd_id), device),
(("osd.db_device", osd_id), db_device),
(("osd.stats", osd_id), "{}"),
(("osd.fsid", osd_id), ""),
(("osd.ofsid", osd_id), osd_fsid),
(("osd.cfsid", osd_id), osd_clusterfsid),
(("osd.lvm", osd_id), ""),
(("osd.vg", osd_id), osd_vg),
(("osd.lv", osd_id), osd_lv),
(
("osd.stats", osd_id),
'{"uuid": "|", "up": 0, "in": 0, "primary_affinity": "|", "utilization": "|", "var": "|", "pgs": "|", "kb": "|", "weight": "|", "reweight": "|", "node": "|", "used": "|", "avail": "|", "wr_ops": "|", "wr_data": "|", "rd_ops": "|", "rd_data": "|", "state": "|"}',
),
]
)
@ -225,10 +452,39 @@ class CephOSDInstance(object):
return False
@staticmethod
def remove_osd(zkhandler, logger, osd_id, osd_obj):
logger.out("Removing OSD disk {}".format(osd_id), state="i")
def replace_osd(
zkhandler,
logger,
node,
osd_id,
old_device,
new_device,
weight,
ext_db_flag=False,
):
# Handle a detect device if that is passed
if match(r"detect:", new_device):
ddevice = get_detect_device(new_device)
if ddevice is None:
logger.out(
f"Failed to determine block device from detect string {new_device}",
state="e",
)
return False
else:
logger.out(
f"Determined block device {ddevice} from detect string {new_device}",
state="i",
)
new_device = ddevice
# We are ready to create a new OSD on this node
logger.out(
"Replacing OSD {} disk with block device {}".format(osd_id, new_device),
state="i",
)
try:
# 1. Verify the OSD is present
# Verify the OSD is present
retcode, stdout, stderr = common.run_os_command("ceph osd ls")
osd_list = stdout.split("\n")
if osd_id not in osd_list:
@ -237,7 +493,17 @@ class CephOSDInstance(object):
)
return True
# 1. Set the OSD out so it will flush
# 1. Set the OSD down and out so it will flush
logger.out("Setting down OSD disk with ID {}".format(osd_id), state="i")
retcode, stdout, stderr = common.run_os_command(
"ceph osd down {}".format(osd_id)
)
if retcode:
print("ceph osd down")
print(stdout)
print(stderr)
raise Exception
logger.out("Setting out OSD disk with ID {}".format(osd_id), state="i")
retcode, stdout, stderr = common.run_os_command(
"ceph osd out {}".format(osd_id)
@ -248,25 +514,371 @@ class CephOSDInstance(object):
print(stderr)
raise Exception
# 2. Wait for the OSD to flush
logger.out("Flushing OSD disk with ID {}".format(osd_id), state="i")
osd_string = str()
# 2. Wait for the OSD to be safe to remove (but don't wait for rebalancing to complete)
logger.out("Waiting for OSD {osd_id} to be safe to remove", state="i")
while True:
try:
retcode, stdout, stderr = common.run_os_command(
"ceph pg dump osds --format json"
)
dump_string = json.loads(stdout)
for osd in dump_string:
if str(osd["osd"]) == osd_id:
osd_string = osd
num_pgs = osd_string["num_pgs"]
if num_pgs > 0:
time.sleep(5)
else:
raise Exception
except Exception:
retcode, stdout, stderr = common.run_os_command(
f"ceph osd safe-to-destroy osd.{osd_id}"
)
if retcode in [0, 11]:
# Code 0 = success
# Code 11 = "Error EAGAIN: OSD(s) 5 have no reported stats, and not all PGs are active+clean; we cannot draw any conclusions." which means all PGs have been remappped but backfill is still occurring
break
else:
time.sleep(5)
# 3. Stop the OSD process
logger.out("Stopping OSD disk with ID {}".format(osd_id), state="i")
retcode, stdout, stderr = common.run_os_command(
"systemctl stop ceph-osd@{}".format(osd_id)
)
if retcode:
print("systemctl stop")
print(stdout)
print(stderr)
raise Exception
time.sleep(2)
# 4. Destroy the OSD
logger.out("Destroying OSD with ID {osd_id}", state="i")
retcode, stdout, stderr = common.run_os_command(
f"ceph osd destroy {osd_id} --yes-i-really-mean-it"
)
if retcode:
print("ceph osd destroy")
print(stdout)
print(stderr)
raise Exception
# 5. Adjust the weight
logger.out(
"Adjusting weight of OSD disk with ID {} in CRUSH map".format(osd_id),
state="i",
)
retcode, stdout, stderr = common.run_os_command(
"ceph osd crush reweight osd.{osdid} {weight}".format(
osdid=osd_id, weight=weight
)
)
if retcode:
print("ceph osd crush reweight")
print(stdout)
print(stderr)
raise Exception
# 6a. Zap the new disk to ensure it is ready to go
logger.out("Zapping disk {}".format(new_device), state="i")
retcode, stdout, stderr = common.run_os_command(
"ceph-volume lvm zap --destroy {}".format(new_device)
)
if retcode:
print("ceph-volume lvm zap")
print(stdout)
print(stderr)
raise Exception
dev_flags = "--data {}".format(new_device)
# 6b. Prepare the logical volume if ext_db_flag
if ext_db_flag:
db_device = "osd-db/osd-{}".format(osd_id)
dev_flags += " --block.db {}".format(db_device)
else:
db_device = ""
# 6c. Replace the OSD
logger.out(
"Preparing LVM for replaced OSD {} disk on {}".format(
osd_id, new_device
),
state="i",
)
retcode, stdout, stderr = common.run_os_command(
"ceph-volume lvm prepare --osd-id {osdid} --bluestore {devices}".format(
osdid=osd_id, devices=dev_flags
)
)
if retcode:
print("ceph-volume lvm prepare")
print(stdout)
print(stderr)
raise Exception
# 7a. Get OSD information
logger.out(
"Getting OSD information for ID {} on {}".format(osd_id, new_device),
state="i",
)
retcode, stdout, stderr = common.run_os_command(
"ceph-volume lvm list {device}".format(device=new_device)
)
for line in stdout.split("\n"):
if "block device" in line:
osd_blockdev = line.split()[-1]
if "osd fsid" in line:
osd_fsid = line.split()[-1]
if "cluster fsid" in line:
osd_clusterfsid = line.split()[-1]
if "devices" in line:
osd_device = line.split()[-1]
if not osd_fsid:
print("ceph-volume lvm list")
print("Could not find OSD information in data:")
print(stdout)
print(stderr)
raise Exception
# Split OSD blockdev into VG and LV components
# osd_blockdev = /dev/ceph-<uuid>/osd-block-<uuid>
_, _, osd_vg, osd_lv = osd_blockdev.split("/")
# Reset whatever we were given to Ceph's /dev/xdX naming
if new_device != osd_device:
new_device = osd_device
# 7b. Activate the OSD
logger.out("Activating new OSD disk with ID {}".format(osd_id), state="i")
retcode, stdout, stderr = common.run_os_command(
"ceph-volume lvm activate --bluestore {osdid} {osdfsid}".format(
osdid=osd_id, osdfsid=osd_fsid
)
)
if retcode:
print("ceph-volume lvm activate")
print(stdout)
print(stderr)
raise Exception
time.sleep(0.5)
# 8. Verify it started
retcode, stdout, stderr = common.run_os_command(
"systemctl status ceph-osd@{osdid}".format(osdid=osd_id)
)
if retcode:
print("systemctl status")
print(stdout)
print(stderr)
raise Exception
# 9. Update Zookeeper information
logger.out(
"Adding new OSD disk with ID {} to Zookeeper".format(osd_id), state="i"
)
zkhandler.write(
[
(("osd", osd_id), ""),
(("osd.node", osd_id), node),
(("osd.device", osd_id), new_device),
(("osd.db_device", osd_id), db_device),
(("osd.fsid", osd_id), ""),
(("osd.ofsid", osd_id), osd_fsid),
(("osd.cfsid", osd_id), osd_clusterfsid),
(("osd.lvm", osd_id), ""),
(("osd.vg", osd_id), osd_vg),
(("osd.lv", osd_id), osd_lv),
(
("osd.stats", osd_id),
'{"uuid": "|", "up": 0, "in": 0, "primary_affinity": "|", "utilization": "|", "var": "|", "pgs": "|", "kb": "|", "weight": "|", "reweight": "|", "node": "|", "used": "|", "avail": "|", "wr_ops": "|", "wr_data": "|", "rd_ops": "|", "rd_data": "|", "state": "|"}',
),
]
)
# Log it
logger.out(
"Replaced OSD {} disk with device {}".format(osd_id, new_device),
state="o",
)
return True
except Exception as e:
# Log it
logger.out("Failed to replace OSD {} disk: {}".format(osd_id, e), state="e")
return False
@staticmethod
def refresh_osd(zkhandler, logger, node, osd_id, device, ext_db_flag):
# Handle a detect device if that is passed
if match(r"detect:", device):
ddevice = get_detect_device(device)
if ddevice is None:
logger.out(
f"Failed to determine block device from detect string {device}",
state="e",
)
return False
else:
logger.out(
f"Determined block device {ddevice} from detect string {device}",
state="i",
)
device = ddevice
# We are ready to create a new OSD on this node
logger.out(
"Refreshing OSD {} disk on block device {}".format(osd_id, device),
state="i",
)
try:
# 1. Verify the OSD is present
retcode, stdout, stderr = common.run_os_command("ceph osd ls")
osd_list = stdout.split("\n")
if osd_id not in osd_list:
logger.out(
"Could not find OSD {} in the cluster".format(osd_id), state="e"
)
return True
dev_flags = "--data {}".format(device)
if ext_db_flag:
db_device = "osd-db/osd-{}".format(osd_id)
dev_flags += " --block.db {}".format(db_device)
else:
db_device = ""
# 2. Get OSD information
logger.out(
"Getting OSD information for ID {} on {}".format(osd_id, device),
state="i",
)
retcode, stdout, stderr = common.run_os_command(
"ceph-volume lvm list {device}".format(device=device)
)
for line in stdout.split("\n"):
if "block device" in line:
osd_blockdev = line.split()[-1]
if "osd fsid" in line:
osd_fsid = line.split()[-1]
if "cluster fsid" in line:
osd_clusterfsid = line.split()[-1]
if "devices" in line:
osd_device = line.split()[-1]
if not osd_fsid:
print("ceph-volume lvm list")
print("Could not find OSD information in data:")
print(stdout)
print(stderr)
raise Exception
# Split OSD blockdev into VG and LV components
# osd_blockdev = /dev/ceph-<uuid>/osd-block-<uuid>
_, _, osd_vg, osd_lv = osd_blockdev.split("/")
# Reset whatever we were given to Ceph's /dev/xdX naming
if device != osd_device:
device = osd_device
# 3. Activate the OSD
logger.out("Activating new OSD disk with ID {}".format(osd_id), state="i")
retcode, stdout, stderr = common.run_os_command(
"ceph-volume lvm activate --bluestore {osdid} {osdfsid}".format(
osdid=osd_id, osdfsid=osd_fsid
)
)
if retcode:
print("ceph-volume lvm activate")
print(stdout)
print(stderr)
raise Exception
time.sleep(0.5)
# 4. Verify it started
retcode, stdout, stderr = common.run_os_command(
"systemctl status ceph-osd@{osdid}".format(osdid=osd_id)
)
if retcode:
print("systemctl status")
print(stdout)
print(stderr)
raise Exception
# 5. Update Zookeeper information
logger.out(
"Adding new OSD disk with ID {} to Zookeeper".format(osd_id), state="i"
)
zkhandler.write(
[
(("osd", osd_id), ""),
(("osd.node", osd_id), node),
(("osd.device", osd_id), device),
(("osd.db_device", osd_id), db_device),
(("osd.fsid", osd_id), ""),
(("osd.ofsid", osd_id), osd_fsid),
(("osd.cfsid", osd_id), osd_clusterfsid),
(("osd.lvm", osd_id), ""),
(("osd.vg", osd_id), osd_vg),
(("osd.lv", osd_id), osd_lv),
(
("osd.stats", osd_id),
'{"uuid": "|", "up": 0, "in": 0, "primary_affinity": "|", "utilization": "|", "var": "|", "pgs": "|", "kb": "|", "weight": "|", "reweight": "|", "node": "|", "used": "|", "avail": "|", "wr_ops": "|", "wr_data": "|", "rd_ops": "|", "rd_data": "|", "state": "|"}',
),
]
)
# Log it
logger.out("Refreshed OSD {} disk on {}".format(osd_id, device), state="o")
return True
except Exception as e:
# Log it
logger.out("Failed to refresh OSD {} disk: {}".format(osd_id, e), state="e")
return False
@staticmethod
def remove_osd(zkhandler, logger, osd_id, osd_obj, force_flag):
logger.out("Removing OSD disk {}".format(osd_id), state="i")
try:
# Verify the OSD is present
retcode, stdout, stderr = common.run_os_command("ceph osd ls")
osd_list = stdout.split("\n")
if osd_id not in osd_list:
logger.out(
"Could not find OSD {} in the cluster".format(osd_id), state="e"
)
if force_flag:
logger.out("Ignoring error due to force flag", state="i")
else:
return True
# 1. Set the OSD down and out so it will flush
logger.out("Setting down OSD disk with ID {}".format(osd_id), state="i")
retcode, stdout, stderr = common.run_os_command(
"ceph osd down {}".format(osd_id)
)
if retcode:
print("ceph osd down")
print(stdout)
print(stderr)
if force_flag:
logger.out("Ignoring error due to force flag", state="i")
else:
raise Exception
logger.out("Setting out OSD disk with ID {}".format(osd_id), state="i")
retcode, stdout, stderr = common.run_os_command(
"ceph osd out {}".format(osd_id)
)
if retcode:
print("ceph osd out")
print(stdout)
print(stderr)
if force_flag:
logger.out("Ignoring error due to force flag", state="i")
else:
raise Exception
# 2. Wait for the OSD to be safe to remove (but don't wait for rebalancing to complete)
logger.out("Waiting for OSD {osd_id} to be safe to remove", state="i")
while True:
retcode, stdout, stderr = common.run_os_command(
f"ceph osd safe-to-destroy osd.{osd_id}"
)
if int(retcode) in [0, 11]:
break
else:
time.sleep(5)
# 3. Stop the OSD process and wait for it to be terminated
logger.out("Stopping OSD disk with ID {}".format(osd_id), state="i")
@ -277,43 +889,55 @@ class CephOSDInstance(object):
print("systemctl stop")
print(stdout)
print(stderr)
raise Exception
# FIXME: There has to be a better way to do this /shrug
while True:
is_osd_up = False
# Find if there is a process named ceph-osd with arg '--id {id}'
for p in psutil.process_iter(attrs=["name", "cmdline"]):
if "ceph-osd" == p.info["name"] and "--id {}".format(
osd_id
) in " ".join(p.info["cmdline"]):
is_osd_up = True
# If there isn't, continue
if not is_osd_up:
break
if force_flag:
logger.out("Ignoring error due to force flag", state="i")
else:
raise Exception
time.sleep(2)
# 4. Determine the block devices
retcode, stdout, stderr = common.run_os_command(
"readlink /var/lib/ceph/osd/ceph-{}/block".format(osd_id)
osd_vg = zkhandler.read(("osd.vg", osd_id))
osd_lv = zkhandler.read(("osd.lv", osd_id))
osd_lvm = f"/dev/{osd_vg}/{osd_lv}"
osd_device = None
logger.out(
f"Getting disk info for OSD {osd_id} LV {osd_lvm}",
state="i",
)
vg_name = stdout.split("/")[-2] # e.g. /dev/ceph-<uuid>/osd-block-<uuid>
retcode, stdout, stderr = common.run_os_command(
"vgs --separator , --noheadings -o pv_name {}".format(vg_name)
f"ceph-volume lvm list {osd_lvm}"
)
pv_block = stdout.strip()
for line in stdout.split("\n"):
if "devices" in line:
osd_device = line.split()[-1]
if not osd_device:
print("ceph-volume lvm list")
print("Could not find OSD information in data:")
print(stdout)
print(stderr)
if force_flag:
logger.out("Ignoring error due to force flag", state="i")
else:
raise Exception
# 5. Zap the volumes
logger.out(
"Zapping OSD disk with ID {} on {}".format(osd_id, pv_block), state="i"
"Zapping OSD {} disk on {}".format(osd_id, osd_device),
state="i",
)
retcode, stdout, stderr = common.run_os_command(
"ceph-volume lvm zap --destroy {}".format(pv_block)
"ceph-volume lvm zap --destroy {}".format(osd_device)
)
if retcode:
print("ceph-volume lvm zap")
print(stdout)
print(stderr)
raise Exception
if force_flag:
logger.out("Ignoring error due to force flag", state="i")
else:
raise Exception
# 6. Purge the OSD from Ceph
logger.out("Purging OSD disk with ID {}".format(osd_id), state="i")
@ -324,7 +948,10 @@ class CephOSDInstance(object):
print("ceph osd purge")
print(stdout)
print(stderr)
raise Exception
if force_flag:
logger.out("Ignoring error due to force flag", state="i")
else:
raise Exception
# 7. Remove the DB device
if zkhandler.exists(("osd.db_device", osd_id)):
@ -354,17 +981,33 @@ class CephOSDInstance(object):
@staticmethod
def add_db_vg(zkhandler, logger, device):
# Check if an existsing volume group exists
retcode, stdout, stderr = common.run_os_command("vgdisplay osd-db")
if retcode != 5:
logger.out('Ceph OSD database VG "osd-db" already exists', state="e")
return False
# Handle a detect device if that is passed
if match(r"detect:", device):
ddevice = get_detect_device(device)
if ddevice is None:
logger.out(
f"Failed to determine block device from detect string {device}",
state="e",
)
return False
else:
logger.out(
f"Determined block device {ddevice} from detect string {device}",
state="i",
)
device = ddevice
logger.out(
"Creating new OSD database volume group on block device {}".format(device),
state="i",
)
try:
# 0. Check if an existsing volume group exists
retcode, stdout, stderr = common.run_os_command("vgdisplay osd-db")
if retcode != 5:
logger.out('Ceph OSD database VG "osd-db" already exists', state="e")
return False
# 1. Create an empty partition table
logger.out(
"Creating partitions on block device {}".format(device), state="i"
@ -494,8 +1137,9 @@ class CephOSDInstance(object):
class CephPoolInstance(object):
def __init__(self, zkhandler, this_node, name):
def __init__(self, zkhandler, logger, this_node, name):
self.zkhandler = zkhandler
self.logger = logger
self.this_node = this_node
self.name = name
self.pgs = ""
@ -537,8 +1181,9 @@ class CephPoolInstance(object):
class CephVolumeInstance(object):
def __init__(self, zkhandler, this_node, pool, name):
def __init__(self, zkhandler, logger, this_node, pool, name):
self.zkhandler = zkhandler
self.logger = logger
self.this_node = this_node
self.pool = pool
self.name = name
@ -594,8 +1239,9 @@ class CephSnapshotInstance(object):
# Primary command function
# This command pipe is only used for OSD adds and removes
def ceph_command(zkhandler, logger, this_node, data, d_osd):
# Get the command and args
command, args = data.split()
# Get the command and args; the * + join ensures arguments with spaces (e.g. detect strings) are recombined right
command, *args = data.split()
args = " ".join(args)
# Adding a new OSD
if command == "osd_add":
@ -621,9 +1267,63 @@ def ceph_command(zkhandler, logger, this_node, data, d_osd):
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1)
# Replacing an OSD
if command == "osd_replace":
node, osd_id, old_device, new_device, weight, ext_db_flag = args.split(",")
ext_db_flag = bool(strtobool(ext_db_flag))
if node == this_node.name:
# Lock the command queue
zk_lock = zkhandler.writelock("base.cmd.ceph")
with zk_lock:
# Add the OSD
result = CephOSDInstance.replace_osd(
zkhandler,
logger,
node,
osd_id,
old_device,
new_device,
weight,
ext_db_flag,
)
# Command succeeded
if result:
# Update the command queue
zkhandler.write([("base.cmd.ceph", "success-{}".format(data))])
# Command failed
else:
# Update the command queue
zkhandler.write([("base.cmd.ceph", "failure-{}".format(data))])
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1)
# Refreshing an OSD
if command == "osd_refresh":
node, osd_id, device, ext_db_flag = args.split(",")
ext_db_flag = bool(strtobool(ext_db_flag))
if node == this_node.name:
# Lock the command queue
zk_lock = zkhandler.writelock("base.cmd.ceph")
with zk_lock:
# Add the OSD
result = CephOSDInstance.refresh_osd(
zkhandler, logger, node, osd_id, device, ext_db_flag
)
# Command succeeded
if result:
# Update the command queue
zkhandler.write([("base.cmd.ceph", "success-{}".format(data))])
# Command failed
else:
# Update the command queue
zkhandler.write([("base.cmd.ceph", "failure-{}".format(data))])
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1)
# Removing an OSD
elif command == "osd_remove":
osd_id = args
osd_id, force = args.split(",")
force_flag = bool(strtobool(force))
# Verify osd_id is in the list
if d_osd[osd_id] and d_osd[osd_id].node == this_node.name:
@ -632,7 +1332,7 @@ def ceph_command(zkhandler, logger, this_node, data, d_osd):
with zk_lock:
# Remove the OSD
result = CephOSDInstance.remove_osd(
zkhandler, logger, osd_id, d_osd[osd_id]
zkhandler, logger, osd_id, d_osd[osd_id], force_flag
)
# Command succeeded
if result:

View File

@ -307,8 +307,14 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
"var": osd["var"],
"pgs": osd["pgs"],
"kb": osd["kb"],
"kb_used": osd["kb_used"],
"kb_used_data": osd["kb_used_data"],
"kb_used_omap": osd["kb_used_omap"],
"kb_used_meta": osd["kb_used_meta"],
"kb_avail": osd["kb_avail"],
"weight": osd["crush_weight"],
"reweight": osd["reweight"],
"class": osd["device_class"],
}
}
)
@ -655,15 +661,19 @@ def node_keepalive(logger, config, zkhandler, this_node):
zkhandler.read("base.config.migration_target_selector")
!= config["migration_target_selector"]
):
raise
zkhandler.write(
[
(
"base.config.migration_target_selector",
config["migration_target_selector"],
)
]
)
except Exception:
zkhandler.write(
[
(
"base.config.migration_target_selector",
config["migration_target_selector"],
)
]
logger.out(
"Failed to set migration target selector in Zookeeper",
state="e",
prefix="main-thread",
)
# Set the upstream IP in Zookeeper for clients to read
@ -674,10 +684,14 @@ def node_keepalive(logger, config, zkhandler, this_node):
zkhandler.read("base.config.upstream_ip")
!= config["upstream_floating_ip"]
):
raise
zkhandler.write(
[("base.config.upstream_ip", config["upstream_floating_ip"])]
)
except Exception:
zkhandler.write(
[("base.config.upstream_ip", config["upstream_floating_ip"])]
logger.out(
"Failed to set upstream floating IP in Zookeeper",
state="e",
prefix="main-thread",
)
# Get past state and update if needed