Skip to content

Commit

Permalink
CP-50475: parallelize device ops during VM lifecycle ops (#6057)
Browse files Browse the repository at this point in the history
Operations on different devices should be independent and therefore can
be
parallelized. This both means parallelizing operations on different
device
types and on devices for the same type.

An atom to serialize action has been introduced because the operation
regarding
a single device must be kept serialized.

Also removes some unneeded parallel atoms, which cause some overhead as
well as polluting the traces

This makes VM_starts to run about ~1 second faster on some tests where
VMs have 5 VIFs. I'm testing it further, but it doesn't look like it's
breaking any tests. I want to push it through some bootstorms and
measure improvements

Please do suggest some further parallelization if you can find it!

BVT+BST are all green
  • Loading branch information
robhoes authored Oct 29, 2024
2 parents df30b80 + 0425b0b commit 2ed1166
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 116 deletions.
236 changes: 124 additions & 112 deletions ocaml/xenopsd/lib/xenops_server.ml
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ type atomic =
| VM_rename of (Vm.id * Vm.id * rename_when)
| VM_import_metadata of (Vm.id * Metadata.t)
| Parallel of Vm.id * string * atomic list
| Serial of Vm.id * string * atomic list
| Best_effort of atomic
[@@deriving rpcty]

Expand Down Expand Up @@ -271,6 +272,9 @@ let rec name_of_atomic = function
| Parallel (_, _, atomics) ->
Printf.sprintf "Parallel (%s)"
(String.concat " | " (List.map name_of_atomic atomics))
| Serial (_, _, atomics) ->
Printf.sprintf "Serial (%s)"
(String.concat " & " (List.map name_of_atomic atomics))
| Best_effort atomic ->
Printf.sprintf "Best_effort (%s)" (name_of_atomic atomic)

Expand Down Expand Up @@ -1550,6 +1554,23 @@ let dequarantine_ops vgpus =
fun vgpu -> PCI_dequarantine vgpu.physical_pci_address
)

(* Avoid generating list-based atoms with 1 or no actions in them *)
let collect_into apply = function [] -> [] | [op] -> [op] | lst -> apply lst

let parallel name ~id =
collect_into (fun ls -> [Parallel (id, Printf.sprintf "%s VM=%s" name id, ls)])

let serial name ~id =
collect_into (fun ls -> [Serial (id, Printf.sprintf "%s VM=%s" name id, ls)])

let parallel_concat name ~id lst = parallel name ~id (List.concat lst)

let serial_concat name ~id lst = serial name ~id (List.concat lst)

let parallel_map name ~id lst f = parallel name ~id (List.concat_map f lst)

let map_or_empty f x = Option.value ~default:[] (Option.map f x)

let rec atomics_of_operation = function
| VM_start (id, force) ->
let vbds_rw, vbds_ro = VBD_DB.vbds id |> vbd_plug_sets in
Expand All @@ -1561,6 +1582,23 @@ let rec atomics_of_operation = function
List.partition (is_nvidia_sriov vgpus) pcis
in
let no_sharept = List.exists is_no_sharept vgpus in
let plug_vbds typ vbds =
let pf = Printf.sprintf in
let name_multi = pf "VBDs.activate_epoch_and_plug %s" typ in
let name_one = pf "VBD.activate_epoch_and_plug %s" typ in
parallel_map name_multi ~id vbds (fun vbd ->
serial_concat name_one ~id
[
[VBD_set_active (vbd.Vbd.id, true)]
; map_or_empty
(fun x ->
[VBD_epoch_begin (vbd.Vbd.id, x, vbd.Vbd.persistent)]
)
vbd.Vbd.backend
; [VBD_plug vbd.Vbd.id]
]
)
in
[
dequarantine_ops vgpus
; [
Expand All @@ -1569,50 +1607,35 @@ let rec atomics_of_operation = function
; VM_create (id, None, None, no_sharept)
; VM_build (id, force)
]
; List.map
(fun vbd -> VBD_set_active (vbd.Vbd.id, true))
(vbds_rw @ vbds_ro)
(* keeping behaviour of vbd_plug_order: rw vbds must be plugged before
ro vbds, see vbd_plug_sets *)
; List.map
(fun (ty, vbds) ->
Parallel
( id
, Printf.sprintf "VBD.epoch_begin %s vm=%s" ty id
, List.filter_map
(fun vbd ->
Option.map
(fun x ->
VBD_epoch_begin (vbd.Vbd.id, x, vbd.Vbd.persistent)
)
vbd.Vbd.backend
)
vbds
; parallel_concat "Devices.plug (no qemu)" ~id
[
(* rw vbds must be plugged before ro vbds, see vbd_plug_sets *)
serial_concat "VBDs.acticate_epoch_and_plug RW+RO" ~id
[plug_vbds "RW" vbds_rw; plug_vbds "RO" vbds_ro]
; List.concat_map
(fun vif ->
serial "VIF.activate_and_plug" ~id
[VIF_set_active (vif.Vif.id, true); VIF_plug vif.Vif.id]
)
)
[("RW", vbds_rw); ("RO", vbds_ro)]
; [
(* rw vbds must be plugged before ro vbds, see vbd_plug_sets *)
Parallel
( id
, Printf.sprintf "VBD.plug RW vm=%s" id
, List.map (fun vbd -> VBD_plug vbd.Vbd.id) vbds_rw
)
; Parallel
( id
, Printf.sprintf "VBD.plug RO vm=%s" id
, List.map (fun vbd -> VBD_plug vbd.Vbd.id) vbds_ro
)
]
; List.map (fun vif -> VIF_set_active (vif.Vif.id, true)) vifs
; List.map (fun vif -> VIF_plug vif.Vif.id) vifs
; List.map (fun vgpu -> VGPU_set_active (vgpu.Vgpu.id, true)) vgpus
; List.map (fun pci -> PCI_plug (pci.Pci.id, false)) pcis_sriov
vifs
; serial_concat "VGPUs.activate & PCI.plug (SRIOV)" ~id
[
parallel_map "VGPUs.activate" ~id vgpus (fun vgpu ->
[VGPU_set_active (vgpu.Vgpu.id, true)]
)
; parallel_map "PCIs.plug (SRIOV)" ~id pcis_sriov (fun pci ->
[PCI_plug (pci.Pci.id, false)]
)
]
]
; [VM_create_device_model (id, false)]
(* PCI and USB devices are hot-plugged into HVM guests via QEMU, so the
following operations occur after creating the device models *)
; List.map (fun pci -> PCI_plug (pci.Pci.id, true)) pcis_other
; List.map (fun vusb -> VUSB_plug vusb.Vusb.id) vusbs
; parallel_concat "Devices.plug (qemu)" ~id
[
List.map (fun pci -> PCI_plug (pci.Pci.id, true)) pcis_other
; List.map (fun vusb -> VUSB_plug vusb.Vusb.id) vusbs
]
(* At this point the domain is considered survivable. *)
; [VM_set_domain_action_request (id, None)]
]
Expand All @@ -1623,65 +1646,62 @@ let rec atomics_of_operation = function
let pcis = PCI_DB.pcis id in
let vusbs = VUSB_DB.vusbs id in
[
Option.value ~default:[]
(Option.map (fun x -> [VM_shutdown_domain (id, PowerOff, x)]) timeout)
map_or_empty (fun x -> [VM_shutdown_domain (id, PowerOff, x)]) timeout
(* Before shutting down a VM, we need to unplug its VUSBs. *)
; List.map (fun vusb -> VUSB_unplug vusb.Vusb.id) vusbs
; parallel_map "VUSBs.unplug" ~id vusbs (fun vusb ->
[VUSB_unplug vusb.Vusb.id]
)
; [
(* CA-315450: in a hard shutdown or snapshot revert, timeout=None and
VM_shutdown_domain is not called. To avoid any interference, we
pause the domain before destroying the device model. *)
Best_effort (VM_pause id)
; VM_destroy_device_model id
; Parallel
( id
, Printf.sprintf "VBD.unplug vm=%s" id
, List.map (fun vbd -> VBD_unplug (vbd.Vbd.id, true)) vbds
)
]
; List.map (fun vif -> VIF_unplug (vif.Vif.id, true)) vifs
; List.map (fun pci -> PCI_unplug pci.Pci.id) pcis
; parallel_concat "Devices.unplug" ~id
[
List.map (fun vbd -> VBD_unplug (vbd.Vbd.id, true)) vbds
; List.map (fun vif -> VIF_unplug (vif.Vif.id, true)) vifs
; List.map (fun pci -> PCI_unplug pci.Pci.id) pcis
]
; [VM_destroy id]
]
|> List.concat
| VM_restore_vifs id ->
let vifs = VIF_DB.vifs id in
[
List.map (fun vif -> VIF_set_active (vif.Vif.id, true)) vifs
; List.map (fun vif -> VIF_plug vif.Vif.id) vifs
]
|> List.concat
parallel_map "VIFs.activate_and_plug" ~id vifs (fun vif ->
serial "VIF.activate_and_plug" ~id
[VIF_set_active (vif.Vif.id, true); VIF_plug vif.Vif.id]
)
| VM_restore_devices (id, restore_vifs) ->
let vbds_rw, vbds_ro = VBD_DB.vbds id |> vbd_plug_sets in
let vgpus = VGPU_DB.vgpus id in
let pcis = PCI_DB.pcis id |> pci_plug_order in
let pcis_other = List.filter (is_not_nvidia_sriov vgpus) pcis in
let plug_vbds typ vbds =
let pf = Printf.sprintf in
let name_multi = pf "VBDs.activate_and_plug %s" typ in
let name_one = pf "VBD.activate_and_plug %s" typ in
parallel_map name_multi ~id vbds (fun vbd ->
serial name_one ~id
[VBD_set_active (vbd.Vbd.id, true); VBD_plug vbd.Vbd.id]
)
in
[
List.map
(fun vbd -> VBD_set_active (vbd.Vbd.id, true))
(vbds_rw @ vbds_ro)
; [
(* rw vbds must be plugged before ro vbds, see vbd_plug_sets *)
Parallel
( id
, Printf.sprintf "VBD.plug RW vm=%s" id
, List.map (fun vbd -> VBD_plug vbd.Vbd.id) vbds_rw
)
; Parallel
( id
, Printf.sprintf "VBD.plug RO vm=%s" id
, List.map (fun vbd -> VBD_plug vbd.Vbd.id) vbds_ro
)
]
(* rw vbds must be plugged before ro vbds, see vbd_plug_sets *)
plug_vbds "RW" vbds_rw
; plug_vbds "RO" vbds_ro
; (if restore_vifs then atomics_of_operation (VM_restore_vifs id) else [])
; List.map (fun vgpu -> VGPU_set_active (vgpu.Vgpu.id, true)) vgpus
(* Nvidia SRIOV PCI devices have been already been plugged *)
; [
VM_create_device_model (id, true)
(* PCI and USB devices are hot-plugged into HVM guests via QEMU, so
the following operations occur after creating the device models *)
]
; List.map (fun pci -> PCI_plug (pci.Pci.id, true)) pcis_other
; (* Nvidia SRIOV PCI devices have been already been plugged *)
parallel_map "VGPUs.activate" ~id vgpus (fun vgpu ->
[VGPU_set_active (vgpu.Vgpu.id, true)]
)
; [VM_create_device_model (id, true)]
(* PCI and USB devices are hot-plugged into HVM guests via QEMU, so
the following operations occur after creating the device models *)
; parallel_map "PCIs.plug" ~id pcis_other (fun pci ->
[PCI_plug (pci.Pci.id, true)]
)
]
|> List.concat
| VM_poweroff (id, timeout) ->
Expand All @@ -1694,25 +1714,24 @@ let rec atomics_of_operation = function
else
Xenops_hooks.reason__clean_shutdown
in
let unplug_vbd vbd =
serial_concat "VBD.epoch_and_deactivate" ~id
[
map_or_empty
(fun x -> [VBD_epoch_end (vbd.Vbd.id, x)])
vbd.Vbd.backend
; [VBD_set_active (vbd.Vbd.id, false)]
]
in
[
[VM_hook_script (id, Xenops_hooks.VM_pre_destroy, reason)]
; atomics_of_operation (VM_shutdown (id, timeout))
; [
Parallel
( id
, Printf.sprintf "VBD.epoch_end vm=%s" id
, List.filter_map
(fun vbd ->
Option.map
(fun x -> VBD_epoch_end (vbd.Vbd.id, x))
vbd.Vbd.backend
)
vbds
)
]
; List.map (fun vbd -> VBD_set_active (vbd.Vbd.id, false)) vbds
; List.map (fun vif -> VIF_set_active (vif.Vif.id, false)) vifs
; List.map (fun vgpu -> VGPU_set_active (vgpu.Vgpu.id, false)) vgpus
; parallel_concat "Devices.deactivate" ~id
[
List.concat_map unplug_vbd vbds
; List.map (fun vif -> VIF_set_active (vif.Vif.id, false)) vifs
; List.map (fun vgpu -> VGPU_set_active (vgpu.Vgpu.id, false)) vgpus
]
; [VM_hook_script (id, Xenops_hooks.VM_post_destroy, reason)]
]
|> List.concat
Expand All @@ -1725,23 +1744,14 @@ let rec atomics_of_operation = function
Xenops_hooks.reason__clean_reboot
in
[
Option.value ~default:[]
(Option.map (fun x -> [VM_shutdown_domain (id, Reboot, x)]) timeout)
map_or_empty (fun x -> [VM_shutdown_domain (id, Reboot, x)]) timeout
; [VM_hook_script (id, Xenops_hooks.VM_pre_destroy, reason)]
; atomics_of_operation (VM_shutdown (id, None))
; [
Parallel
( id
, Printf.sprintf "VBD.epoch_end vm=%s" id
, List.filter_map
(fun vbd ->
Option.map
(fun x -> VBD_epoch_end (vbd.Vbd.id, x))
vbd.Vbd.backend
)
vbds
)
]
; parallel_map "VBD.epoch_end" ~id vbds (fun vbd ->
map_or_empty
(fun x -> [VBD_epoch_end (vbd.Vbd.id, x)])
vbd.Vbd.backend
)
; [
VM_hook_script (id, Xenops_hooks.VM_post_destroy, reason)
; VM_hook_script
Expand Down Expand Up @@ -1858,7 +1868,7 @@ let rec perform_atomic ~progress_callback ?subtask:_ ?result (op : atomic)
(Xenops_task.id_of_handle t)
(List.length atoms) description
in
let with_tracing = parallel_id_with_tracing parallel_id t in
let with_tracing = id_with_tracing parallel_id t in
debug "begin_%s" parallel_id ;
let task_list =
queue_atomics_and_wait ~progress_callback ~max_parallel_atoms:10
Expand Down Expand Up @@ -1902,6 +1912,8 @@ let rec perform_atomic ~progress_callback ?subtask:_ ?result (op : atomic)
List.iter
(fun err -> match err with None -> () | Some e -> raise e)
errors
| Serial (_, _, atoms) ->
List.iter (Fun.flip (perform_atomic ~progress_callback) t) atoms
| VIF_plug id ->
debug "VIF.plug %s" (VIF_DB.string_of_id id) ;
B.VIF.plug t (VIF_DB.vm_of id) (VIF_DB.read_exn id) ;
Expand Down Expand Up @@ -2501,7 +2513,7 @@ and trigger_cleanup_after_failure_atom op t =
immediate_operation dbg id (VM_check_state id)
| Best_effort op ->
trigger_cleanup_after_failure_atom op t
| Parallel (_id, _description, ops) ->
| Parallel (_id, _description, ops) | Serial (_id, _description, ops) ->
List.iter (fun op -> trigger_cleanup_after_failure_atom op t) ops
| VM_rename (id1, id2, _) ->
immediate_operation dbg id1 (VM_check_state id1) ;
Expand Down
4 changes: 2 additions & 2 deletions ocaml/xenopsd/lib/xenops_task.ml
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ let is_task task = function
| _ ->
None

let parallel_id_with_tracing parallel_id t =
Debug_info.make ~log:parallel_id ~tracing:(Xenops_task.tracing t)
let id_with_tracing id t =
Debug_info.make ~log:id ~tracing:(Xenops_task.tracing t)
|> Debug_info.to_string

let dbg_with_traceparent_of_task t =
Expand Down
3 changes: 1 addition & 2 deletions ocaml/xenopsd/lib/xenops_utils.ml
Original file line number Diff line number Diff line change
Expand Up @@ -620,8 +620,7 @@ let chunks size lst =
[op] :: xs :: xss
)
[] lst
|> List.map (fun xs -> List.rev xs)
|> List.rev
|> List.rev_map (fun xs -> List.rev xs)

let really_kill pid =
try Unixext.kill_and_wait pid
Expand Down

0 comments on commit 2ed1166

Please sign in to comment.