-
Notifications
You must be signed in to change notification settings - Fork 22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
plugin,etc: Rewrite to get state from Pod annotations #1163
base: main
Are you sure you want to change the base?
Conversation
No changes to the coverage.
HTML Report |
Probably an inadvertent merge conflict between #1090 and #989 meaning we accidentally weren't using go-chef for neonvm-daemon. Noticed this while working on #1163 locally and saw that it was re-downloading all of the dependencies for neonvm-daemon every time, even though I was making changes in the scheduler and the dependencies hadn't changed.
It wasn't correct; the separator is '/', not ':'. (I think once upon a time, we used to format it with ':', but that's no longer the case). Noticed this as part of #1163.
cf468d8
to
34f1106
Compare
0b64f08
to
6c76e9d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Partial review, got to the middle of handle_pod.
return extractFromAnnotation[VirtualMachineResources](pod, VirtualMachineResourcesAnnotation) | ||
} | ||
|
||
func extractFromAnnotation[T any](pod *corev1.Pod, annotation string) (*T, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this one the same as extractAnnotationJSON, let's maybe deduplicate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, if we put it in pkg/util
, we get an import cycle, but I don't think it belongs in either pkg/api
or neonvm/apis/neonvm/v1
...
I'm inclined to leave it as-is for now, but LMK what you think
// EventKindEphemeral represents the combination when an addition was not handled before the | ||
// object was deleted, yet we still may want to process either of these. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why we might need to handle EventKindEphemeral?
requeuePod func(uid types.UID) error | ||
requeueNode func(nodeName string) error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we have different types of ids for pods and nodes? And how those are related to NamespacedName
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To answer what these IDs are, in short:
Each object in your cluster has a Name that is unique for that type of resource. Every Kubernetes object also has a UID that is unique across your whole cluster.
Some objects are not namespaced (in which case their name is just a string
); some objects are (in which case their name is a NamespacedName
).
As for why these two requeueFoo
functions take two different IDs:
- Pods can be trivially recreated with the same name. So in this PR, we always include the UID whenever dealing with a Pod object, and only ever store maps of them keyed by UID.
- while Nodes can have duplicate names at different times, we also have to deal with the fact that Pods only refer to nodes by name, not UID — see the
.spec.nodeName
field. We already need to have some way of doing node name → node state lookups to handle pod events, so for consistency this PR only ever refers to nodes by name rather than by UID.
pkg/plugin/handle_pod.go
Outdated
var ok bool | ||
ns, ok = s.nodes[nodeName] | ||
if !ok { | ||
return fmt.Errorf("pod's node %q is not present in local state", nodeName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed, maybe there is no reason to fail here, we can still accept the pod even we not yet have seen the node.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TBH that seems like it'd add a lot of complexity that I'm not sure is worth it — adding a pod potentially has to add a placeholder node but do nothing else, and deleting a pod potentially has to also delete the node it's on, if the node was already deleted and the pod is the last one.
I'm OK with complexity encapsulated into the state
package because that's easy to unit-test, but I don't currently see a clean interface boundary we can draw here without unit-testing the entire global state (which is maybe fine, but a lot of effort! that's what the state
package is meant to help with)
Maybe I'm missing something? WDYT?
The first 7 commits look quite isolated from the big "stateless scheduler" commit. They also LGTM, so you can create a PR out of these commits and merge them to main, if you want. |
Call(*zap.Logger, MiddlewareParams, MiddlewareHandlerFunc) (Result, error) | ||
} | ||
|
||
// MiddlewareHandlerFunc is an enriched version of HandlerFunc that accepts more parameters, so that |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need this distinction between HandlerFunc
and MiddlewareHandlerFunc
?
Why not have MiddlewareParams
be named smth like ObjectParams
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly, I figured it was cleaner to have a simpler HandlerFunc
— MiddlewareParams
exists because it's easier for all the middleware to have access to common object attributes (e.g., GVK) than trying fallible extraction in each one, whereas the end handler probably doesn't need those, so it'd just be extra unused parameters.
|
||
// Middleware wraps a reconcile operation to insert its own logic | ||
type Middleware interface { | ||
Call(*zap.Logger, MiddlewareParams, MiddlewareHandlerFunc) (Result, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why handler func is passed here? It could instead be passed when a particular middleware is constructed, thus applyMiddleware
is not needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason I didn't go with that approach is because storing the handler in the middleware requires either:
- Putting together all the middleware with an explicit ordering of how we construct them, which risks making the code harder to refactor (because you can't "just" move things around, due to the web of dependencies); OR
- Collecting the set of middleware as a set of
func(handler) Middleware
to be assembled at the end
I figured that providing the handler func in the call to middleware allows it to be more isolated, which -- for me, at least -- reduces the number of things I need to hold in my head when thinking about how it works.
Does that make sense? If so, WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also worth noting that the approach you mentioned requires a single handler, rather than the distinct handler for each type that the queue currently receives as input -- discussed that here: #1163 (comment)
|
||
// NewQueue builds and returns a new Queue with the provided middleware and handlers for various | ||
// types. | ||
func NewQueue(handlers map[Object]HandlerFunc, opts ...QueueOption) (*Queue, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why Queue is initialized with a map of handlers per object type, instead of just one handler, which would do the dispatch by itself?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did it that way because it allows greater validation, in a few places:
- We always know that we're seeing all possible types during construction, so if e.g. there's an issue extracting the GVK from the object (potentially due to unregistered/misregistered scheme), that'll fail on startup, rather than later on when we try to handle it
- If we don't have handling for a particular object type, that'll fail at the point where it's added to the queue, rather than in the handler (which may just repeat endlessly)
- Having the set of types known at construction allows builtin middleware to be aware of that — in particular, the
ErrorBackoffMiddleware
uses this information to pre-create separate maps for each type, instead of needing to lock a global object each time.
|
||
// Check that this isn't a duplicate | ||
if _, ok := handlersByType[gvk]; ok { | ||
return nil, fmt.Errorf("Duplicate handler for object type %T with GVK %q", obj, fmtGVK(gvk)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Capitalized error string. Why linter doesn't complain?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Presumably staticcheck style lints aren't enabled? The one you're referring to seems to be ST1005.
Across the rest of the codebase, of the calls to fmt.Errorf
that start with a letter, about 1/3 are capitalized and 2/3 are lowercase. I'm happy to make that change separately, but it's broader than just this PR.
apply func(*queueSettings) | ||
} | ||
|
||
// queueSettings is the internal, temporary structure that we use to hold the results of applying |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of supplying middleware-related callbacks through options, did you consider instead instantiating middlewares before initializing queue, providing those callbacks to the middleware directly. Something like:
logMiddleware := reconcile.NewLogMiddleware(handler, func(..) {
// This is the callback
})
panicMiddleware := reconcile.NewPanicMiddleware(logMiddleware, func(...) {})
finalHandler := panicMiddleware
queue := reconcile.NewQueue(finalHandler, ...)
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I considered doing the construction of the default middleware outside reconcile.NewQueue
, but ultimately decided not to, because:
- These particular pieces of middleware (logging, error backoff, panic handling) should always be included — it would be an error to not include them, because the behavior of the queue would be open to easily preventable risks (busy-looping on an error, panicking taking down the process, etc.). So, it's probably better from an API perspective to make sure that they must always be included!
- The functions that they can be configured with are optional — e.g., they may only be used for incrementing metrics, so IMO it's more natural to have the API express that these are optional, even if in practice we always use them
Does that make sense? WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Finished my first reading, overall the code looks very good. Will come back for a second iteration next year :)
Made some small refactorings along the way, mostly it is about moving metrics-related code to a new package: https://github.com/neondatabase/autoscaling/pull/1182/commits
The changes are described in commit messages, feel free to accept/reject.
maxErrorWait = time.Minute | ||
) | ||
|
||
func NewErrorBackoffMiddleware(typs []schema.GroupVersionKind, callback ErrorStatsCallback) *ErrorBackoffMiddleware { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: This middleware is quite complicated. Can you please add a comment explaining what it does and why?
Extracted from #1163, where `gci` incorrectly groups the "iter" import away from other stdlib pacakges, because it was only added in Go 1.23.
Extracted from #1163, where `gci` incorrectly groups the "iter" import away from other stdlib pacakges, because it was only added in Go 1.23.
Extracted from #1163, where `gci` incorrectly groups the "iter" import away from other stdlib pacakges, because it was only added in Go 1.23.
71fc62f
to
11b50ac
Compare
From now on, during the migration we will have: - Symmetrical owner refs: both VirtualMachine and VirtualMachineMigration will own both source and target pods - Asymmetrical controller refs: VirtualMachine controls source pod, VirtualMachineMigration controls target pod. This allows us to figure out the pod's role in the migration by looking at it's references. Co-authored-by: Oleg Vasilev <[email protected]>
We had some existing, similar helpers in 'pkg/util', but I figured it makes more sense for those to be defined in 'neonvm/apis/...'. This change originated while working on the stateless scheduler prototype, where I found myself wanting a reliable way to determine what role a pod has in a live migration from only the pod's metadata.
We weren't doing this previously, and it means that propagation of labels/annotations can be arbitrarily delayed while the VM starts.
Also, change handleEvent[T any, P ~*T]() from a function to a method on *Store[T], now that we have the handlers translated from P -> *T. This opens up a lot of ways to make the code cleaner, and the handlers part in particular is required to implement (*Store[T]).NopUpdate() in a later commit.
There's some info about this in the added comment. tl;dr: We need it for the stateless scheduler work to be able to re-inject items into the reconcile queue while maintaining the watch as the source of truth for what each object is.
The K8s API server and client-go together have the behavior that objects returned from List() calls do not have TypeMeta set. For one-off List() requests this is fine becuase you already know the type! But this interacts poorly with the generated implementations of objects' .GetObjectKind().GroupVersionKind(), as those just directly read from the TypeMeta fields (which again: are not set). So this commit works around this behavior by getting the GVK at the start of the Watch() call and explicitly setting it on all incoming objects.
The Listen() method returns a util.BroadcastReceiver that will be updated whenever the object is modified or deleted. This is required *for now* for the stateless scheduler work, so that we can be separately notified when there's changes to an object without hooking deeper into the internal state. We can probably remove this once the scheduler plugin's agnent request handler server is removed, at the end of the stateless scheduler work.
a.k.a. "Stateless Scheduler". This is effectively a full rewrite of the scheduler plugin. At a high level, the existing external interfaces are preserved: - The scheduler plugin still exposes an HTTP server for the autoscaler-agent (for now); and - The scheduler plugin is still a plugin. However, instead of storing the state for approved resources in-memory, in the scheduler plugin, we now treat annotations on the Pod as the source of truth for requested/approved resources. A brief overview of the internal changes to make this happen: 1. The state of resource reservations can be constructed entirely from Node and Pod objects. We *do* store that, and update as objects change, but it's only for convenience and not a strict requirement. One tricky piece is with scheduling. For that, we store a set of pods that have gone through the plugin methods but haven't actually had the spec.nodeName field set. For more info, the 'pkg/plugin/state' package contains all the pure logic for manipulating resources. 2. Each watch event on Node/Pod objects is now placed into a "reconcile" queue similar to the controller framework. Reconcile operations are a tuple of (object, event type, desired reconcile time) and are retried with backoff on error/panic. For a detailed look, the 'pkg/plugin/reconcile' package defines the reconcile queue and all its related machinery. 3. The handler for autoscaler-agent requests no longer accesses the internal state and instead directly patches the VirtualMachine object to set the annotation for requested resources, and then waits for that object to be updated. Once the autoscaler-agent is converted to read and write those annotations directly, we will remove the HTTP server. 4. 'pkg/util/watch' was changed to allow asking to be notified when there's changes to an object, via the new (*Store[T]).Listen() API. This was required to implement (3), and can be removed once (3) is no longer needed, if it doesn't become used in the autoscaler-agent. 5. 'pkg/util/watch' was changed to allow triggering no-op update events, which - for our usage - will trigger requeuing the object. This solves two problems: a. During initial startup, we need to defer resource approval until all Pods on the Node have been processed -- otherwise, we may end up unintentionally overcommitting resources based on partial information. So during startup, we track the set of Pods with deferred approvals, and then requeue them all once startup is over by triggering no-op update events in the watch store. b. Whenever we handle changes for some Pod, it's worthwhile to handle certain operations on the Node -- e.g., triggering live migration if the reserved resources are too high. While we *could* do this as part of the Pod reconciling, we get more fair behavior (and, better balancing under load) by instead triggering re-reconciling the Pod's Node. Why can't this be done elsewhere? In short, consistency. Fundamentally we need to use a consistent view of the object that we're reconciling (else, it might not be no-op), and the source of truth for the current value of an object *within the scheduler plugin* is the watch store. Co-authored-by: Arthur Petukhovsky <[email protected]>
11b50ac
to
d04ce1b
Compare
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Found a couple of small issues. I can approve everything except framework_methods.go
, will take a look at it again tomorrow.
UPD: accidentally pressed approve instead of comment, but no big difference :)
@@ -25,6 +27,7 @@ import ( | |||
func main() { | |||
logConfig := zap.NewProductionConfig() | |||
logConfig.Sampling = nil // Disable sampling, which the production config enables by default. | |||
logConfig.DisableStacktrace = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question (non-blocking): Why have you disabled stacktraces?
"minUsageScore": 0.5, | ||
"maxUsageScore": 0, | ||
"scorePeak": 0.8 | ||
"scorePeak": 0.8, | ||
"randomize": false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question (non-blocking): Why randomize
is false now? I think it was true before?
@@ -0,0 +1,70 @@ | |||
package plugin | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (non-blocking): Let's rename this file to reconcile.go
newState.CPU.Watermark != n.CPU.Watermark || newState.Mem.Watermark != n.Mem.Watermark | ||
|
||
// Propagate changes to labels: | ||
for label, value := range n.Labels.Entries() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (blocking): Looks like a typo, we should iterate over newState.Labels
here:
suggestion:
for label, value := range n.Labels.Entries() { | |
for label, value := range newState.Labels.Entries() { |
if !ok { | ||
msg := "Node not found in local state" | ||
logger.Error(msg) | ||
return framework.NewStatus(framework.Error, fmt.Sprintf("%s: %s", msg, err.Error())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (blocking): err
is incorrectly used and is always nil
here
suggestion:
return framework.NewStatus(framework.Error, fmt.Sprintf("%s: %s", msg, err.Error())) | |
return framework.NewStatus(framework.Error, msg) |
) (_ int64, status *framework.Status) { | ||
ignored := e.state.config.ignoredNamespace(pod.Namespace) | ||
|
||
e.metrics.IncMethodCall("NormalizeScore", pod, ignored) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (blocking): NormalizeScore
here looks like a copy-paste typo, I guess it should be Score
?
suggestion: replace NormalizeScore
with Score
on lines 287, 289, 293
if !ok { | ||
msg := "Node not found in local state" | ||
logger.Error(msg) | ||
return framework.NewStatus(framework.Error, fmt.Sprintf("%s: %s", msg, err.Error())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (blocking): err
is incorrectly used and is always nil
here
suggestion:
return framework.NewStatus(framework.Error, fmt.Sprintf("%s: %s", msg, err.Error())) | |
return framework.NewStatus(framework.Error, msg) |
thought: This is a second case of this issue in the file, I think we should enable linter to catch similar issues. ChatGPT said that staticcheck
should catch issues like this, not sure why it didn't see any issue here (we already have it enabled).
if !ok { | ||
msg := "Node not found in local state" | ||
logger.Error(msg) | ||
status := framework.NewStatus(framework.Error, fmt.Sprintf("%s: %s", msg, err.Error())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (blocking): err
is incorrectly used and is always nil
here
suggestion:
status := framework.NewStatus(framework.Error, fmt.Sprintf("%s: %s", msg, err.Error())) | |
status := framework.NewStatus(framework.Error, msg) |
|
||
ns, ok := e.state.nodes[nodeName] | ||
if !ok { | ||
logger.Error("Node not found in local state", logFieldForNodeName(nodeName)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logger.Error("Node not found in local state", logFieldForNodeName(nodeName)) | |
logger.Error("Node not found in local state", logFieldForNodeName(nodeName)) | |
return |
Note
Hey! This is a big PR — sorry! There's a lot of smaller commits (see below), plus one GIANT commit that does the bulk of the rewrite, building on those smaller commits.
There's an overview of the changes in the commit message for the full rewrite. That might be a useful place to start to get your bearings on the changes.
Commits broken down by theme:
Background work: neonvm
neonvm-controller: Use non-controller owner ref for migration source (d6fa7d0)
neonvm: Add helpers to get pod ownership (e77ac86)
neonvm-controller: Update runner pod metadata while pending (a1b55f7)
Related changes in
util/watch
util/watch: Store
HandlerFuncs[*T]
inStore[T]
(ad2340f)util/watch: Add
(*Store[T]).NopUpdate()
method (1f43c44)util/watch: Override GVK on incoming objects (2580422)
util/watch: Add
(*Store[T]).Listen()
method (341c0b5)The one big commit doing the rewrite
plugin: Rewrite to get state from Pod annotations (d04ce1b)
Remaining TODOs:
Open questions: