Skip to content

Commit bd17f33

Browse files
committed
sdn: wait for proxy initialization before declaring node network ready
Wait for proxy services and endpoints to sync before writing the CNI config file and thus declaring node network readiness.
1 parent 9070175 commit bd17f33

2 files changed

Lines changed: 87 additions & 8 deletions

File tree

pkg/cmd/openshift-sdn/cmd.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,9 +217,12 @@ func (sdn *OpenShiftSDN) Start(stopCh <-chan struct{}) error {
217217
if err != nil {
218218
return err
219219
}
220-
sdn.runProxy()
220+
proxyInitChan := make(chan bool)
221+
sdn.runProxy(proxyInitChan)
221222
sdn.informers.start(stopCh)
222223

224+
klog.V(2).Infof("openshift-sdn network plugin waiting for proxy startup to comlete")
225+
<-proxyInitChan
223226
klog.V(2).Infof("openshift-sdn network plugin registering startup")
224227
if err := sdn.writeConfigFile(); err != nil {
225228
klog.Fatal(err)

pkg/cmd/openshift-sdn/proxy.go

Lines changed: 83 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"fmt"
55
"net"
66
"net/http"
7+
"sync"
78
"time"
89

910
"k8s.io/api/core/v1"
@@ -127,8 +128,9 @@ func (sdn *OpenShiftSDN) initProxy() error {
127128
return err
128129
}
129130

130-
// runProxy starts the configured proxy process.
131-
func (sdn *OpenShiftSDN) runProxy() {
131+
// runProxy starts the configured proxy process and closes the provided channel
132+
// when the proxy has initialized
133+
func (sdn *OpenShiftSDN) runProxy(waitChan chan<- bool) {
132134
protocol := utiliptables.ProtocolIpv4
133135
bindAddr := net.ParseIP(sdn.ProxyConfig.BindAddress)
134136
if bindAddr.To4() == nil {
@@ -270,10 +272,6 @@ func (sdn *OpenShiftSDN) runProxy() {
270272
proxier = hybridProxier
271273
}
272274

273-
iptInterface.AddReloadFunc(proxier.Sync)
274-
serviceConfig.RegisterEventHandler(servicesHandler)
275-
go serviceConfig.Run(utilwait.NeverStop)
276-
277275
endpointsConfig := pconfig.NewEndpointsConfig(
278276
sdn.informers.KubeInformers.Core().V1().Endpoints(),
279277
sdn.ProxyConfig.ConfigSyncPeriod.Duration,
@@ -283,7 +281,15 @@ func (sdn *OpenShiftSDN) runProxy() {
283281
klog.Fatalf("error: node proxy plugin startup failed: %v", err)
284282
}
285283
endpointsHandler = sdn.OsdnProxy
286-
endpointsConfig.RegisterEventHandler(endpointsHandler)
284+
285+
// Wrap the proxy to know when it finally initializes
286+
waitingProxy := newWaitingProxyHandler(servicesHandler, endpointsHandler, waitChan)
287+
288+
iptInterface.AddReloadFunc(proxier.Sync)
289+
serviceConfig.RegisterEventHandler(waitingProxy)
290+
go serviceConfig.Run(utilwait.NeverStop)
291+
292+
endpointsConfig.RegisterEventHandler(waitingProxy)
287293
go endpointsConfig.Run(utilwait.NeverStop)
288294

289295
// Start up healthz server
@@ -346,3 +352,73 @@ func getNodeIP(client kv1core.CoreV1Interface, hostname string) (net.IP, error)
346352

347353
return nodeIP, nil
348354
}
355+
356+
type waitingProxyHandler struct {
357+
sync.Mutex
358+
359+
// waitChan will be closed when both services and endpoints have
360+
// been synced in the proxy
361+
waitChan chan<- bool
362+
initialized bool
363+
364+
serviceChild pconfig.ServiceHandler
365+
serviceSynced bool
366+
endpointsChild pconfig.EndpointsHandler
367+
endpointsSynced bool
368+
}
369+
370+
func newWaitingProxyHandler(serviceChild pconfig.ServiceHandler, endpointsChild pconfig.EndpointsHandler, waitChan chan<- bool) *waitingProxyHandler {
371+
return &waitingProxyHandler{
372+
serviceChild: serviceChild,
373+
endpointsChild: endpointsChild,
374+
waitChan: waitChan,
375+
}
376+
}
377+
378+
func (wph *waitingProxyHandler) checkInitialized() {
379+
if !wph.initialized && wph.serviceSynced && wph.endpointsSynced {
380+
klog.V(2).Info("openshift-sdn proxy services and endpoints initialized")
381+
wph.initialized = true
382+
close(wph.waitChan)
383+
}
384+
}
385+
386+
func (wph *waitingProxyHandler) OnServiceAdd(service *v1.Service) {
387+
wph.serviceChild.OnServiceAdd(service)
388+
}
389+
390+
func (wph *waitingProxyHandler) OnServiceUpdate(oldService, service *v1.Service) {
391+
wph.serviceChild.OnServiceUpdate(oldService, service)
392+
}
393+
394+
func (wph *waitingProxyHandler) OnServiceDelete(service *v1.Service) {
395+
wph.serviceChild.OnServiceDelete(service)
396+
}
397+
398+
func (wph *waitingProxyHandler) OnServiceSynced() {
399+
wph.serviceChild.OnServiceSynced()
400+
wph.Lock()
401+
defer wph.Unlock()
402+
wph.serviceSynced = true
403+
wph.checkInitialized()
404+
}
405+
406+
func (wph *waitingProxyHandler) OnEndpointsAdd(endpoints *v1.Endpoints) {
407+
wph.endpointsChild.OnEndpointsAdd(endpoints)
408+
}
409+
410+
func (wph *waitingProxyHandler) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) {
411+
wph.endpointsChild.OnEndpointsUpdate(oldEndpoints, endpoints)
412+
}
413+
414+
func (wph *waitingProxyHandler) OnEndpointsDelete(endpoints *v1.Endpoints) {
415+
wph.endpointsChild.OnEndpointsDelete(endpoints)
416+
}
417+
418+
func (wph *waitingProxyHandler) OnEndpointsSynced() {
419+
wph.endpointsChild.OnEndpointsSynced()
420+
wph.Lock()
421+
defer wph.Unlock()
422+
wph.endpointsSynced = true
423+
wph.checkInitialized()
424+
}

0 commit comments

Comments
 (0)