Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use informer index instead of listing service EndpointSlices directly #4409

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 34 additions & 35 deletions control-plane/catalog/to-consul/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ const (
consulKubernetesCheckName = "Kubernetes Readiness Check"
kubernetesSuccessReasonMsg = "Kubernetes health checks passing"
kubernetesFailureReasonMsg = "Kubernetes health checks failing"

endpointServiceIndexName = "metadata.labels[" + discoveryv1.LabelServiceName + "]"
)

type NodePortSyncType string
Expand Down Expand Up @@ -145,6 +147,10 @@ type ServiceResource struct {
// The Consul node name to register service with.
ConsulNodeName string

// endpointsController holds a reference to the serviceEndpointsResource
// controller so that we can query its cache on new discovered services.
endpointsController *controller.Controller

// serviceLock must be held for any read/write to these maps.
serviceLock sync.RWMutex

Expand Down Expand Up @@ -239,44 +245,25 @@ func (t *ServiceResource) Upsert(key string, raw interface{}) error {
// If we care about endpoints, we should load the associated endpoint slices.
if t.shouldTrackEndpoints(key) {
allEndpointSlices := make(map[string]*discoveryv1.EndpointSlice)
labelSelector := fmt.Sprintf("%s=%s", discoveryv1.LabelServiceName, service.Name)
continueToken := ""
limit := int64(100)

for {
opts := metav1.ListOptions{
LabelSelector: labelSelector,
Limit: limit,
Continue: continueToken,
}
endpointSliceList, err := t.Client.DiscoveryV1().
EndpointSlices(service.Namespace).
List(t.Ctx, opts)

if err != nil {
t.Log.Warn("error loading endpoint slices list",
"key", key,
"err", err)
break
}

for _, endpointSlice := range endpointSliceList.Items {
endpointSliceList, err := t.endpointsController.GetByIndex(endpointServiceIndexName, key)
if err != nil {
t.Log.Warn("error loading endpoint slices list",
"key", key,
"err", err)
} else {
for _, item := range endpointSliceList {
endpointSlice := item.(*discoveryv1.EndpointSlice)
endptKey := service.Namespace + "/" + endpointSlice.Name
allEndpointSlices[endptKey] = &endpointSlice
allEndpointSlices[endptKey] = endpointSlice
}

if endpointSliceList.Continue != "" {
continueToken = endpointSliceList.Continue
} else {
break
if t.endpointSlicesMap == nil {
t.endpointSlicesMap = make(map[string]map[string]*discoveryv1.EndpointSlice)
}
t.endpointSlicesMap[key] = allEndpointSlices
t.Log.Debug("[ServiceResource.Upsert] adding service's endpoint slices to endpointSlicesMap", "key", key, "service", service, "endpointSlices", allEndpointSlices)
}

if t.endpointSlicesMap == nil {
t.endpointSlicesMap = make(map[string]map[string]*discoveryv1.EndpointSlice)
}
t.endpointSlicesMap[key] = allEndpointSlices
t.Log.Debug("[ServiceResource.Upsert] adding service's endpoint slices to endpointSlicesMap", "key", key, "service", service, "endpointSlices", allEndpointSlices)
}

// Update the registration and trigger a sync
Expand Down Expand Up @@ -316,7 +303,7 @@ func (t *ServiceResource) Run(ch <-chan struct{}) {
t.Log.Info("starting runner for endpoints")
// Register a controller for Endpoints which subsequently registers a
// controller for the Ingress resource.
(&controller.Controller{
t.endpointsController = &controller.Controller{
Resource: &serviceEndpointsResource{
Service: t,
Ctx: t.Ctx,
Expand All @@ -329,7 +316,9 @@ func (t *ServiceResource) Run(ch <-chan struct{}) {
},
},
Log: t.Log.Named("controller/service"),
}).Run(ch)
}

t.endpointsController.Run(ch)
}

// shouldSync returns true if resyncing should be enabled for the given service.
Expand Down Expand Up @@ -874,7 +863,17 @@ func (t *serviceEndpointsResource) Informer() cache.SharedIndexInformer {
},
&discoveryv1.EndpointSlice{},
0,
cache.Indexers{},
cache.Indexers{
endpointServiceIndexName: func(obj interface{}) ([]string, error) {
endpointSlice := obj.(*discoveryv1.EndpointSlice)

if serviceName, ok := endpointSlice.Labels[discoveryv1.LabelServiceName]; ok {
return []string{endpointSlice.Namespace + "/" + serviceName}, nil
}

return nil, nil
},
},
)
}

Expand Down
9 changes: 9 additions & 0 deletions control-plane/helper/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,15 @@ func (c *Controller) processSingle(
return true
}

// GetByIndex allows querying the informer's indexer to avoid extra calls to k8s
func (c *Controller) GetByIndex(indexName, indexedValue string) ([]interface{}, error) {
if c.informer == nil {
return nil, nil
}

return c.informer.GetIndexer().ByIndex(indexName, indexedValue)
}

// informerDeleteHandler returns a function that implements
// `DeleteFunc` from the `ResourceEventHandlerFuncs` interface.
// It is split out as its own method to aid in testing.
Expand Down
Loading