44 "fmt"
55 "net"
66 "net/http"
7+ "sync"
78 "time"
89
910 "k8s.io/api/core/v1"
@@ -127,8 +128,9 @@ func (sdn *OpenShiftSDN) initProxy() error {
127128 return err
128129}
129130
130- // runProxy starts the configured proxy process.
131- func (sdn * OpenShiftSDN ) runProxy () {
131+ // runProxy starts the configured proxy process and closes the provided channel
132+ // when the proxy has initialized
133+ func (sdn * OpenShiftSDN ) runProxy (waitChan chan <- bool ) {
132134 protocol := utiliptables .ProtocolIpv4
133135 bindAddr := net .ParseIP (sdn .ProxyConfig .BindAddress )
134136 if bindAddr .To4 () == nil {
@@ -270,10 +272,6 @@ func (sdn *OpenShiftSDN) runProxy() {
270272 proxier = hybridProxier
271273 }
272274
273- iptInterface .AddReloadFunc (proxier .Sync )
274- serviceConfig .RegisterEventHandler (servicesHandler )
275- go serviceConfig .Run (utilwait .NeverStop )
276-
277275 endpointsConfig := pconfig .NewEndpointsConfig (
278276 sdn .informers .KubeInformers .Core ().V1 ().Endpoints (),
279277 sdn .ProxyConfig .ConfigSyncPeriod .Duration ,
@@ -283,7 +281,15 @@ func (sdn *OpenShiftSDN) runProxy() {
283281 klog .Fatalf ("error: node proxy plugin startup failed: %v" , err )
284282 }
285283 endpointsHandler = sdn .OsdnProxy
286- endpointsConfig .RegisterEventHandler (endpointsHandler )
284+
285+ // Wrap the proxy to know when it finally initializes
286+ waitingProxy := newWaitingProxyHandler (servicesHandler , endpointsHandler , waitChan )
287+
288+ iptInterface .AddReloadFunc (proxier .Sync )
289+ serviceConfig .RegisterEventHandler (waitingProxy )
290+ go serviceConfig .Run (utilwait .NeverStop )
291+
292+ endpointsConfig .RegisterEventHandler (waitingProxy )
287293 go endpointsConfig .Run (utilwait .NeverStop )
288294
289295 // Start up healthz server
@@ -346,3 +352,73 @@ func getNodeIP(client kv1core.CoreV1Interface, hostname string) (net.IP, error)
346352
347353 return nodeIP , nil
348354}
355+
356+ type waitingProxyHandler struct {
357+ sync.Mutex
358+
359+ // waitChan will be closed when both services and endpoints have
360+ // been synced in the proxy
361+ waitChan chan <- bool
362+ initialized bool
363+
364+ serviceChild pconfig.ServiceHandler
365+ serviceSynced bool
366+ endpointsChild pconfig.EndpointsHandler
367+ endpointsSynced bool
368+ }
369+
370+ func newWaitingProxyHandler (serviceChild pconfig.ServiceHandler , endpointsChild pconfig.EndpointsHandler , waitChan chan <- bool ) * waitingProxyHandler {
371+ return & waitingProxyHandler {
372+ serviceChild : serviceChild ,
373+ endpointsChild : endpointsChild ,
374+ waitChan : waitChan ,
375+ }
376+ }
377+
378+ func (wph * waitingProxyHandler ) checkInitialized () {
379+ if ! wph .initialized && wph .serviceSynced && wph .endpointsSynced {
380+ klog .V (2 ).Info ("openshift-sdn proxy services and endpoints initialized" )
381+ wph .initialized = true
382+ close (wph .waitChan )
383+ }
384+ }
385+
386+ func (wph * waitingProxyHandler ) OnServiceAdd (service * v1.Service ) {
387+ wph .serviceChild .OnServiceAdd (service )
388+ }
389+
390+ func (wph * waitingProxyHandler ) OnServiceUpdate (oldService , service * v1.Service ) {
391+ wph .serviceChild .OnServiceUpdate (oldService , service )
392+ }
393+
394+ func (wph * waitingProxyHandler ) OnServiceDelete (service * v1.Service ) {
395+ wph .serviceChild .OnServiceDelete (service )
396+ }
397+
398+ func (wph * waitingProxyHandler ) OnServiceSynced () {
399+ wph .serviceChild .OnServiceSynced ()
400+ wph .Lock ()
401+ defer wph .Unlock ()
402+ wph .serviceSynced = true
403+ wph .checkInitialized ()
404+ }
405+
406+ func (wph * waitingProxyHandler ) OnEndpointsAdd (endpoints * v1.Endpoints ) {
407+ wph .endpointsChild .OnEndpointsAdd (endpoints )
408+ }
409+
410+ func (wph * waitingProxyHandler ) OnEndpointsUpdate (oldEndpoints , endpoints * v1.Endpoints ) {
411+ wph .endpointsChild .OnEndpointsUpdate (oldEndpoints , endpoints )
412+ }
413+
414+ func (wph * waitingProxyHandler ) OnEndpointsDelete (endpoints * v1.Endpoints ) {
415+ wph .endpointsChild .OnEndpointsDelete (endpoints )
416+ }
417+
418+ func (wph * waitingProxyHandler ) OnEndpointsSynced () {
419+ wph .endpointsChild .OnEndpointsSynced ()
420+ wph .Lock ()
421+ defer wph .Unlock ()
422+ wph .endpointsSynced = true
423+ wph .checkInitialized ()
424+ }
0 commit comments