Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify IPSet Setfilter to make it suitable for filtering ipsets dynamically #9669

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 42 additions & 3 deletions felix/bpf/ipsets/ipsets.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ type bpfIPSets struct {
opRecorder logutils.OpRecorder

lg *log.Entry

// filterIPSet is set to filter the needed ipsets based on the
// ipset name.
// when not set (nil), all the ipsets will be programmed.
// when the filter returns true, the ipset will be programmed.
// when the filter returns false, the ipset will be skipped.
filterIPSet func(string) bool
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth a comment saying what this does. In particular:

  • What does nil mean?
  • Is true "filter in" or "filter out"

}

func NewBPFIPSets(
Expand Down Expand Up @@ -133,6 +140,14 @@ func (m *bpfIPSets) deleteIPSetAndReleaseID(ipSet *bpfIPSet) {
// to ApplyUpdates(), the IP sets will be replaced with the new contents and the set's metadata
// will be updated as appropriate.
func (m *bpfIPSets) AddOrReplaceIPSet(setMetadata ipsets.IPSetMetadata, members []string) {
if m.filterIPSet != nil && !m.filterIPSet(setMetadata.SetID) {
ipSet := m.getExistingIPSetString(setMetadata.SetID)
if ipSet != nil {
ipSet.Deleted = true
m.markIPSetDirty(ipSet)
}
return
}
ipSet := m.getOrCreateIPSet(setMetadata.SetID)
ipSet.Type = setMetadata.Type
m.lg.WithFields(log.Fields{"stringID": setMetadata.SetID, "uint64ID": ipSet.ID, "members": members}).Info("IP set added")
Expand Down Expand Up @@ -370,9 +385,33 @@ func (m *bpfIPSets) markIPSetDirty(data *bpfIPSet) {
m.dirtyIPSetIDs.Add(data.ID)
}

func (m *bpfIPSets) SetFilter(ipSetNames set.Set[string]) {
// Not needed for this IP set dataplane. All known IP sets
// are written into the corresponding BPF map.
// SetFilter updates the ipset filter function but does
// not scan the existing ipsets and apply the filter.
func (m *bpfIPSets) SetFilter(fn func(ipSetName string) bool) {
m.filterIPSet = fn
}

func (m *bpfIPSets) isIPSetNeeded(name string) bool {
if m.filterIPSet == nil {
// We're not filtering down to a "needed" set, so all IP sets are needed.
return true
}

// We are filtering down, so compare against the needed set.
return m.filterIPSet(name)
}

// ApplyFilter applies the ipset filter to the existing
// ipsets. The caller should call ApplyFilter after updating
// the filter program to make sure the filter is applied to
// the existing ipsets.
func (m *bpfIPSets) ApplyFilter() {
for _, ipset := range m.ipSets {
if !m.isIPSetNeeded(ipset.OriginalID) {
ipset.Deleted = true
m.markIPSetDirty(ipset)
}
}
}

type bpfIPSet struct {
Expand Down
3 changes: 2 additions & 1 deletion felix/dataplane/ipsets/ipsets_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ type IPSetsDataplane interface {
QueueResync()
ApplyUpdates()
ApplyDeletions() (reschedule bool)
SetFilter(neededIPSets set.Set[string])
SetFilter(fn func(string) bool)
ApplyFilter()
}

// Except for domain IP sets, IPSetsManager simply passes through IP set updates from the datastore
Expand Down
6 changes: 5 additions & 1 deletion felix/dataplane/ipsets/mock_ipsets.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ func (s *MockIPSets) ApplyDeletions() bool {
return false
}

func (s *MockIPSets) SetFilter(ipSetNames set.Set[string]) {
func (s *MockIPSets) SetFilter(fn func(string) bool) {
// Not implemented for UT.
}

func (s *MockIPSets) ApplyFilter() {
// Not implemented for UT.
}
14 changes: 12 additions & 2 deletions felix/dataplane/linux/int_dataplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -770,7 +770,12 @@ func NewIntDataplaneDriver(config Config) *InternalDataplane {
bpfutils.RemoveBPFSpecialDevices()
} else {
// In BPF mode we still use iptables for raw egress policy.
dp.RegisterManager(newRawEgressPolicyManager(rawTableV4, ruleRenderer, 4, ipSetsV4.SetFilter, config.RulesConfig.NFTables))
mgr := newRawEgressPolicyManager(rawTableV4, ruleRenderer, 4, ipSetsV4.ApplyFilter, config.RulesConfig.NFTables)
// When in BPF mode, we still program egress do-not-track rules into
// (ip|nf)tables; set a filter on the IP sets we program so that only the
// IP sets needed for do-not-track are programmed.
ipSetsV4.SetFilter(mgr.IPSetNeeded)
dp.RegisterManager(mgr)
}

interfaceRegexes := make([]string, len(config.RulesConfig.WorkloadIfacePrefixes))
Expand Down Expand Up @@ -1035,7 +1040,12 @@ func NewIntDataplaneDriver(config Config) *InternalDataplane {
config.MaxIPSetSize))
dp.RegisterManager(newPolicyManager(rawTableV6, mangleTableV6, filterTableV6, ruleRenderer, 6, config.RulesConfig.NFTables))
} else {
dp.RegisterManager(newRawEgressPolicyManager(rawTableV6, ruleRenderer, 6, ipSetsV6.SetFilter, config.RulesConfig.NFTables))
mgr := newRawEgressPolicyManager(rawTableV6, ruleRenderer, 6, ipSetsV6.ApplyFilter, config.RulesConfig.NFTables)
// When in BPF mode, we still program egress do-not-track rules into
// (ip|nf)tables; set a filter on the IP sets we program so that only the
// IP sets needed for do-not-track are programmed.
ipSetsV6.SetFilter(mgr.IPSetNeeded)
dp.RegisterManager(mgr)
}

dp.RegisterManager(newEndpointManager(
Expand Down
21 changes: 15 additions & 6 deletions felix/dataplane/linux/policy_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ type policyManager struct {
rawEgressOnly bool
ipSetFilterDirty bool // Only used in "raw only" mode.
neededIPSets map[types.PolicyID]set.Set[string]
ipSetsCallback func(neededIPSets set.Set[string])
ipSetsCache set.Set[string]
ipSetsCallback func()
nftablesEnabled bool
}

Expand All @@ -58,7 +59,7 @@ func newPolicyManager(rawTable, mangleTable, filterTable Table, ruleRenderer pol
}

func newRawEgressPolicyManager(rawTable Table, ruleRenderer policyRenderer, ipVersion uint8,
ipSetsCallback func(neededIPSets set.Set[string]),
ipSetsCallback func(),
nft bool,
) *policyManager {
return &policyManager{
Expand All @@ -71,6 +72,7 @@ func newRawEgressPolicyManager(rawTable Table, ruleRenderer policyRenderer, ipVe
// Make sure we set the filter at start-of-day, even if there are no policies.
ipSetFilterDirty: true,
neededIPSets: make(map[types.PolicyID]set.Set[string]),
ipSetsCache: set.New[string](),
ipSetsCallback: ipSetsCallback,
nftablesEnabled: nft,
}
Expand Down Expand Up @@ -151,6 +153,7 @@ func (m *policyManager) updateNeededIPSets(id *types.PolicyID, neededIPSets set.
} else {
delete(m.neededIPSets, *id)
}
m.syncIPSetCache()
m.ipSetFilterDirty = true
}

Expand All @@ -162,14 +165,20 @@ func (m *policyManager) CompleteDeferredWork() error {
return nil
}
m.ipSetFilterDirty = false
m.ipSetsCallback()
return nil
}

merged := set.New[string]()
func (m *policyManager) syncIPSetCache() {
m.ipSetsCache.Clear()
for _, ipSets := range m.neededIPSets {
ipSets.Iter(func(item string) error {
merged.Add(item)
m.ipSetsCache.Add(item)
return nil
})
}
m.ipSetsCallback(merged)
return nil
}

func (m *policyManager) IPSetNeeded(name string) bool {
return m.ipSetsCache.Contains(name)
}
26 changes: 12 additions & 14 deletions felix/dataplane/linux/policy_mgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,12 +253,10 @@ var _ = Describe("Raw egress policy manager", func() {
var (
policyMgr *policyManager
rawTable *mockTable
neededIPSets set.Set[string]
numCallbackCalls int
)

BeforeEach(func() {
neededIPSets = nil
numCallbackCalls = 0
rawTable = newMockTable("raw")
ruleRenderer := rules.NewRenderer(rules.Config{
Expand All @@ -275,17 +273,14 @@ var _ = Describe("Raw egress policy manager", func() {
rawTable,
ruleRenderer,
4,
func(ipSets set.Set[string]) {
neededIPSets = ipSets
func() {
numCallbackCalls++
}, false)
})

It("correctly reports no IP sets at start of day", func() {
err := policyMgr.CompleteDeferredWork()
Expect(err).NotTo(HaveOccurred())
Expect(neededIPSets).ToNot(BeNil())
Expect(neededIPSets.Len()).To(BeZero())
Expect(numCallbackCalls).To(Equal(1))

By("Not repeating the callback.")
Expand All @@ -294,7 +289,7 @@ var _ = Describe("Raw egress policy manager", func() {
Expect(numCallbackCalls).To(Equal(1))
})

It("correctly reports needed IP sets", func() {
It("correctly caches the needed IP sets", func() {
By("defining one untracked policy with an IP set")
policyMgr.OnUpdate(&proto.ActivePolicyUpdate{
Id: &proto.PolicyID{Tier: "default", Name: "pol1"},
Expand All @@ -311,7 +306,7 @@ var _ = Describe("Raw egress policy manager", func() {
err := policyMgr.CompleteDeferredWork()
Expect(err).NotTo(HaveOccurred())

Expect(neededIPSets).To(MatchIPSets("ipsetA"))
Expect(policyMgr.IPSetNeeded("cali40ipsetA")).To(BeTrue())

By("defining another untracked policy with a different IP set")
policyMgr.OnUpdate(&proto.ActivePolicyUpdate{
Expand All @@ -329,7 +324,8 @@ var _ = Describe("Raw egress policy manager", func() {
err = policyMgr.CompleteDeferredWork()
Expect(err).NotTo(HaveOccurred())

Expect(neededIPSets).To(MatchIPSets("ipsetA", "ipsetB"))
Expect(policyMgr.IPSetNeeded("cali40ipsetB")).To(BeTrue())
Expect(policyMgr.IPSetNeeded("cali40ipsetA")).To(BeTrue())

By("defining a non-untracked policy with a third IP set")
policyMgr.OnUpdate(&proto.ActivePolicyUpdate{
Expand All @@ -346,8 +342,8 @@ var _ = Describe("Raw egress policy manager", func() {
err = policyMgr.CompleteDeferredWork()
Expect(err).NotTo(HaveOccurred())

// The non-untracked policy IP set is not needed.
Expect(neededIPSets).To(MatchIPSets("ipsetA", "ipsetB"))
// The non-untracked policy IP set is not cached.
Expect(policyMgr.IPSetNeeded("cali40ipsetC")).To(BeFalse())

By("removing the first untracked policy")
policyMgr.OnUpdate(&proto.ActivePolicyRemove{
Expand All @@ -356,16 +352,18 @@ var _ = Describe("Raw egress policy manager", func() {
err = policyMgr.CompleteDeferredWork()
Expect(err).NotTo(HaveOccurred())

Expect(neededIPSets).To(MatchIPSets("ipsetB"))
Expect(policyMgr.IPSetNeeded("cali40ipsetB")).To(BeTrue())
Expect(policyMgr.IPSetNeeded("cali40ipsetA")).To(BeFalse())

By("removing the second untracked policy")
policyMgr.OnUpdate(&proto.ActivePolicyRemove{
Id: &proto.PolicyID{Tier: "default", Name: "pol2"},
})
err = policyMgr.CompleteDeferredWork()
Expect(err).NotTo(HaveOccurred())

Expect(neededIPSets).To(MatchIPSets())
Expect(policyMgr.IPSetNeeded("cali40ipsetB")).To(BeFalse())
Expect(policyMgr.IPSetNeeded("cali40ipsetA")).To(BeFalse())
Expect(policyMgr.IPSetNeeded("cali40ipsetC")).To(BeFalse())
})
})

Expand Down
6 changes: 5 additions & 1 deletion felix/dataplane/windows/ipsets/ipsets.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,10 @@ func (m *IPSets) ApplyDeletions() bool {
return false
}

func (s *IPSets) SetFilter(ipSetNames set.Set[string]) {
func (m *IPSets) SetFilter(fn func(ipSetName string) bool) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth commenting that these are deliberately not implemented

// Not needed for Windows.
}

func (m *IPSets) ApplyFilter() {
// Not needed for Windows.
}
45 changes: 26 additions & 19 deletions felix/ipsets/ipsets.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,12 @@ type IPSets struct {

opReporter logutils.OpRecorder

// Optional filter. When non-nil, only these IP set IDs will be rendered into the dataplane
// as Linux IP sets.
neededIPSetNames set.Set[string]
// filterIPSet is set to filter the needed ipsets based on the
// ipset name.
// when not set (nil), all the ipsets will be programmed.
// when the filter returns true, the ipset will be programmed.
// when the filter returns false, the ipset will be skipped.
filterIPSet func(string) bool
}

func NewIPSets(ipVersionConfig *IPVersionConfig, recorder logutils.OpRecorder) *IPSets {
Expand Down Expand Up @@ -1092,13 +1095,27 @@ func (s *IPSets) updateDirtiness(name string) {
}
}

func (s *IPSets) SetFilter(ipSetNames set.Set[string]) {
oldSetNames := s.neededIPSetNames
if oldSetNames == nil && ipSetNames == nil {
return
// SetFilter updates the ipset filter function but does
// not scan the existing ipsets and apply the filter.
func (s *IPSets) SetFilter(fn func(string) bool) {
s.filterIPSet = fn
}

func (s *IPSets) ipSetNeeded(name string) bool {
if s.filterIPSet == nil {
// We're not filtering down to a "needed" set, so all IP sets are needed.
return true
}
s.logCxt.Debugf("Filtering to needed IP set names: %v", ipSetNames)
s.neededIPSetNames = ipSetNames

// We are filtering down, so compare against the needed set.
return s.filterIPSet(name)
}

// ApplyFilter applies the ipset filter to the existing
// ipsets. The caller should call ApplyFilter after updating
// the filter program to make sure the filter is applied to
// the existing ipsets.
func (s *IPSets) ApplyFilter() {
for name, meta := range s.setNameToAllMetadata {
if s.ipSetNeeded(name) {
s.setNameToProgrammedMetadata.Desired().Set(name, meta)
Expand All @@ -1109,16 +1126,6 @@ func (s *IPSets) SetFilter(ipSetNames set.Set[string]) {
}
}

func (s *IPSets) ipSetNeeded(name string) bool {
if s.neededIPSetNames == nil {
// We're not filtering down to a "needed" set, so all IP sets are needed.
return true
}

// We are filtering down, so compare against the needed set.
return s.neededIPSetNames.Contains(name)
}

// CanonicaliseMember converts the string representation of an IP set member to a canonical
// object of some kind. The object is required to by hashable.
func CanonicaliseMember(t IPSetType, member string) IPSetMember {
Expand Down
17 changes: 14 additions & 3 deletions felix/ipsets/ipsets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,10 @@ var _ = Describe("IP sets dataplane", func() {

Context("with filtering to two IP sets", func() {
BeforeEach(func() {
ipsets.SetFilter(set.From(v4MainIPSetName2, v4MainIPSetName))
ipsets.SetFilter(func(ipSetName string) bool {
neededIPSets := set.From(v4MainIPSetName2, v4MainIPSetName)
return neededIPSets.Contains(ipSetName)
})
ipsets.QueueResync()
apply()
})
Expand Down Expand Up @@ -699,7 +702,11 @@ var _ = Describe("IP sets dataplane", func() {

Context("with filtering to single IP set", func() {
BeforeEach(func() {
ipsets.SetFilter(set.From(v4MainIPSetName2))
ipsets.SetFilter(func(ipSetName string) bool {
neededIPSets := set.From(v4MainIPSetName2)
return neededIPSets.Contains(ipSetName)
})
ipsets.ApplyFilter()
apply()
})

Expand All @@ -714,7 +721,11 @@ var _ = Describe("IP sets dataplane", func() {

Context("with filtering to both known IP sets", func() {
BeforeEach(func() {
ipsets.SetFilter(set.From(v4MainIPSetName2, v4MainIPSetName))
ipsets.SetFilter(func(ipSetName string) bool {
neededIPSets := set.From(v4MainIPSetName2, v4MainIPSetName)
return neededIPSets.Contains(ipSetName)
})
ipsets.ApplyFilter()
apply()
})

Expand Down
Loading
Loading