Cloud Native 15 min read

Unveiling kube-proxy IPVS Mode: Deep Dive into Source Code and Execution Flow

This article examines kube-proxy’s three load‑balancing modes, focusing on the IPVS implementation, and walks through its source code, command‑line setup with cobra, core structures like ProxyServer and Proxier, the async BoundedFrequencyRunner, and the detailed service and endpoint synchronization processes.

360 Zhihui Cloud Developer
360 Zhihui Cloud Developer
360 Zhihui Cloud Developer
Unveiling kube-proxy IPVS Mode: Deep Dive into Source Code and Execution Flow

Why IPVS?

kube-proxy currently supports three load‑balancing implementations—userspace, iptables, and IPVS. The first two encounter performance bottlenecks as the number of Services grows, making IPVS the preferred mode for production environments. This article analyses the IPVS mode in depth.

Overall Logic Structure

The diagram below shows the high‑level flow of kube-proxy, which, like other kube‑* components, uses the pflag and cobra libraries to build its command‑line interface.

Command‑Line Setup with Cobra

<code>func main() {
    command := &amp;cobra.Command{
        Use:   "echo [string to echo]",
        Short: "Echo anything to the screen",
        Long: `echo is for echoing anything back.Echo works a lot like print, except it has a child command.`,
        Args: cobra.MinimumNArgs(1),
        Run: func(cmd *cobra.Command, args []string) {
            fmt.Println("Print: " + strings.Join(args, " "))
        },
    }
    command.Execute()
}</code>

This minimal example demonstrates how kube‑proxy initializes a cobra.Command and starts execution via Execute() . The real kube‑proxy entry point follows the same pattern, with the core logic residing in Command.Run .

ProxyServer Run Logic

<code>func (o *Options) Run() error {
    defer close(o.errCh)
    // ...
    proxyServer, err := NewProxyServer(o)
    if err != nil {
        return err
    }
    if o.CleanupAndExit {
        return proxyServer.CleanupAndExit()
    }
    o.proxyServer = proxyServer
    return o.runLoop()
}</code>

During Run , kube‑proxy creates a ProxyServer instance, optionally cleans up existing rules, and then enters the main run loop.

Core Structures

<code>type ProxyServer struct {
    Client                 clientset.Interface
    EventClient            v1core.EventsGetter
    IptInterface           utiliptables.Interface
    IpvsInterface          utilipvs.Interface
    IpsetInterface         utilipset.Interface
    execer                 exec.Interface
    Proxier                proxy.ProxyProvider
    Broadcaster            record.EventBroadcaster
    Recorder               record.EventRecorder
    ConntrackConfiguration kubeproxyconfig.KubeProxyConntrackConfiguration
    Conntracker            Conntracker // if nil, ignored
    ProxyMode              string
    NodeRef                *v1.ObjectReference
    CleanupIPVS            bool
    MetricsBindAddress     string
    EnableProfiling        bool
    OOMScoreAdj            *int32
    ConfigSyncPeriod       time.Duration
    HealthzServer          *healthcheck.HealthzServer
}

type Proxier struct {
    endpointsChanges *proxy.EndpointChangeTracker
    serviceChanges   *proxy.ServiceChangeTracker
    // ...
    serviceMap   proxy.ServiceMap
    endpointsMap proxy.EndpointsMap
    portsMap     map[utilproxy.LocalPort]utilproxy.Closeable
    // ...
    iptables     utiliptables.Interface
    ipvs         utilipvs.Interface
    ipset        utilipset.Interface
    exec         utilexec.Interface
    // ...
    ipvsScheduler string
}

type BoundedFrequencyRunner struct {
    name        string // the name of this instance
    minInterval time.Duration // the min time between runs, modulo bursts
    maxInterval time.Duration // the max time between runs
    run         chan struct{} // try an async run
    mu          sync.Mutex // guards runs of fn and all mutations
    fn          func() // function to run
    lastRun     time.Time // time of last run
    timer       timer // timer for deferred runs
    limiter     rateLimiter // rate limiter for on-demand runs
}
</code>

The BoundedFrequencyRunner periodically triggers syncProxyRules to create or update IPVS virtual servers and real servers, binding the virtual IP to the dummy interface kube‑ipvs0 .

<code>proxier.syncRunner = async.NewBoundedFrequencyRunner(
    "sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)
</code>

Parameters:

minSyncPeriod – minimum rule update interval

syncPeriod – maximum rule update interval

proxier.syncProxyRules – core function that synchronises IPVS rules

ProxyServer Startup Flow

When the server starts, it performs the following steps:

Start the health‑check service ( HealthzServer ).

Expose metrics via a MetricsServer .

Optionally adjust kernel conntrack parameters.

Create an informerFactory to watch Kubernetes resources.

Instantiate a ServiceConfig to watch Service objects and queue changes.

Register Service event handlers with the Proxier .

Start the ServiceConfig .

ServiceConfig and Handlers

<code>type ServiceConfig struct {
    listerSynced cache.InformerSynced
    eventHandlers []ServiceHandler
}

type ServiceHandler interface {
    OnServiceAdd(service *v1.Service)
    OnServiceUpdate(oldService, service *v1.Service)
    OnServiceDelete(service *v1.Service)
    OnServiceSynced()
}

func NewServiceConfig(serviceInformer coreinformers.ServiceInformer, resyncPeriod time.Duration) *ServiceConfig {
    result := &amp;ServiceConfig{listerSynced: serviceInformer.Informer().HasSynced}
    serviceInformer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{
        AddFunc:    result.handleAddService,
        UpdateFunc: result.handleUpdateService,
        DeleteFunc: result.handleDeleteService,
    }, resyncPeriod)
    return result
}

func (c *ServiceConfig) handleAddService(obj interface{}) {
    service, ok := obj.(*v1.Service)
    if !ok {
        utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
        return
    }
    for i := range c.eventHandlers {
        c.eventHandlers[i].OnServiceAdd(service)
    }
}
</code>

When a new Service is created, handleAddService forwards the event to all registered handlers, which ultimately invoke Proxier.OnServiceAdd to update the internal serviceChanges map.

<code>func (proxier *Proxier) OnServiceAdd(service *v1.Service) {
    proxier.OnServiceUpdate(nil, service)
}

func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) {
    if proxier.serviceChanges.Update(oldService, service) && proxier.isInitialized() {
        proxier.syncRunner.Run()
    }
}
</code>

Change Trackers

<code>type ServiceChangeTracker struct {
    lock sync.Mutex
    items map[types.NamespacedName]*serviceChange
    makeServiceInfo makeServicePortFunc
    isIPv6Mode *bool
    recorder record.EventRecorder
}

type serviceChange struct {
    previous ServiceMap
    current  ServiceMap
}
</code>

These trackers accumulate modifications to Services and Endpoints until the asynchronous runner applies them.

Synchronising IPVS Rules

<code>func (proxier *Proxier) syncProxyRules() {
    // Build IPVS rules for each service.
    for svcName, svc := range proxier.serviceMap {
        // Handle SNAT for loop‑back traffic.
        for _, e := range proxier.endpointsMap[svcName] {
            // ...
        }
        // Create or update the virtual server.
        serv := &amp;utilipvs.VirtualServer{
            Address:   svcInfo.ClusterIP(),
            Port:      uint16(svcInfo.Port()),
            Protocol: string(svcInfo.Protocol()),
            Scheduler: proxier.ipvsScheduler,
        }
        if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
            serv.Flags |= utilipvs.FlagPersistent
            serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds())
        }
        if err := proxier.syncService(svcNameString, serv, true); err == nil {
            // sync endpoints, external IPs, load‑balancer IPs, NodePort, etc.
        }
    }
    // Sync ipset entries and iptables rules.
    for _, set := range proxier.ipsetList {
        set.syncIPSetEntries()
    }
    proxier.writeIptablesRules()
    proxier.iptablesData.Reset()
    proxier.iptablesData.Write(proxier.natChains.Bytes())
    proxier.iptablesData.Write(proxier.natRules.Bytes())
    proxier.iptablesData.Write(proxier.filterChains.Bytes())
    proxier.iptablesData.Write(proxier.filterRules.Bytes())
}
</code>

The function iterates over the serviceMap , creates corresponding IPVS virtual servers, handles session affinity, binds the ClusterIP to the dummy interface, and finally synchronises ipset and iptables rules.

Conclusion

kube‑proxy’s code remains relatively concise: it watches Service and Endpoint objects, records changes in ServiceMap and EndpointMap , and uses an asynchronous BoundedFrequencyRunner to invoke syncProxyRules , which generates the necessary IPVS, iptables, and ipset configurations.

cloud-nativekubernetesGonetworkingkube-proxyIPVS
360 Zhihui Cloud Developer
Written by

360 Zhihui Cloud Developer

360 Zhihui Cloud is an enterprise open service platform that aims to "aggregate data value and empower an intelligent future," leveraging 360's extensive product and technology resources to deliver platform services to customers.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.