-
Notifications
You must be signed in to change notification settings - Fork 3.5k
Add SharedInformer implementation to python-client #2515
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
k8s-ci-robot
merged 14 commits into
master
from
copilot/implement-informer-in-python-client
Apr 20, 2026
Merged
Changes from 11 commits
Commits
Show all changes
14 commits
Select commit
Hold shift + click to select a range
ab86645
Initial plan
Copilot ef3f21c
Add informer implementation: SharedInformer, ObjectCache, tests, and …
Copilot 9c708ad
Add BOOKMARK event support to SharedInformer
Copilot d21aa1b
Track and reuse resourceVersion across watch reconnects; re-list only…
Copilot 63bd3d5
Add e2e tests for SharedInformer against a real cluster
Copilot 4bf1e06
Fix syntax error in informer_example.py (unicode quotes in string lit…
Copilot 90c2b02
Sync _resource_version from Watch on each event, before firing handlers
Copilot 5a8fef1
Resync now does a full re-list; add test_resync_period_triggers_full_…
Copilot b91f278
Add 7 tests analogous to JS/Java reference tests; _initial_list now f…
Copilot 25d1f12
Add 5 tests from client-go shared_informer_test.go scenarios
Copilot 6e39732
Address 6 code review comments: _fire docstring, resync fix, test ass…
Copilot 1d10150
Fix copyright year to 2026 in new files; add e2e resync test
Copilot 6425bab
Add 3 BOOKMARK unit tests: RV advance, raw-dict handler, multiple boo…
Copilot a858453
Fix test_bookmark_advances_resource_version: set initial mock_w.resou…
Copilot File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,75 @@ | ||
| # Copyright 2024 The Kubernetes Authors. | ||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| """Example: use SharedInformer to watch pods in the default namespace. | ||
|
|
||
| The informer runs a background daemon thread that keeps a local cache | ||
| synchronised with the Kubernetes API server. The main thread is free to | ||
| query the cache at any time without worrying about connectivity or retries. | ||
| """ | ||
|
|
||
| import time | ||
|
|
||
| import kubernetes | ||
| from kubernetes import config | ||
| from kubernetes.client import CoreV1Api | ||
| from kubernetes.informer import ADDED, DELETED, MODIFIED, SharedInformer | ||
|
|
||
|
|
||
| def on_pod_added(pod): | ||
| name = pod.metadata.name if hasattr(pod, "metadata") else pod["metadata"]["name"] | ||
| print("[ADDED] ", name) | ||
|
|
||
|
|
||
| def on_pod_modified(pod): | ||
| name = pod.metadata.name if hasattr(pod, "metadata") else pod["metadata"]["name"] | ||
| print("[MODIFIED]", name) | ||
|
|
||
|
|
||
| def on_pod_deleted(pod): | ||
| name = pod.metadata.name if hasattr(pod, "metadata") else pod["metadata"]["name"] | ||
| print("[DELETED] ", name) | ||
|
|
||
|
|
||
| def main(): | ||
| config.load_kube_config() | ||
|
|
||
| v1 = CoreV1Api() | ||
| informer = SharedInformer( | ||
| list_func=v1.list_namespaced_pod, | ||
| namespace="default", | ||
| resync_period=60, | ||
| ) | ||
|
|
||
| informer.add_event_handler(ADDED, on_pod_added) | ||
| informer.add_event_handler(MODIFIED, on_pod_modified) | ||
| informer.add_event_handler(DELETED, on_pod_deleted) | ||
|
|
||
| informer.start() | ||
| print('Informer started. Watching pods in "default" namespace ...') | ||
|
|
||
| try: | ||
| while True: | ||
| cached = informer.cache.list() | ||
| print("Cached pods: {}".format(len(cached))) | ||
| time.sleep(10) | ||
| except KeyboardInterrupt: | ||
| pass | ||
| finally: | ||
| informer.stop() | ||
| print("Informer stopped.") | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| main() | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,3 +23,4 @@ | |
| from . import stream | ||
| from . import utils | ||
| from . import leaderelection | ||
| from . import informer | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,185 @@ | ||
| # Copyright 2024 The Kubernetes Authors. | ||
| # Licensed under the Apache License, Version 2.0 (the "License"). | ||
| # End-to-end tests for kubernetes.informer.SharedInformer. | ||
|
|
||
| import threading | ||
| import time | ||
| import unittest | ||
| import uuid | ||
|
|
||
| from kubernetes.client import api_client | ||
| from kubernetes.client.api import core_v1_api | ||
| from kubernetes.e2e_test import base | ||
| from kubernetes.informer import ADDED, DELETED, MODIFIED, SharedInformer | ||
|
|
||
| _TIMEOUT = 30 | ||
|
|
||
|
|
||
| def _uid(): | ||
| return str(uuid.uuid4())[-12:] | ||
|
|
||
|
|
||
| def _cm(name, payload=None): | ||
| return { | ||
| "apiVersion": "v1", | ||
| "kind": "ConfigMap", | ||
| "metadata": {"name": name, "labels": {"inf-e2e": "1"}}, | ||
| "data": payload or {"k": "v"}, | ||
| } | ||
|
|
||
|
|
||
| def _name_of(obj): | ||
| if hasattr(obj, "metadata"): | ||
| return obj.metadata.name | ||
| return (obj.get("metadata") or {}).get("name") | ||
|
|
||
|
|
||
| class TestSharedInformerE2E(unittest.TestCase): | ||
|
|
||
| @classmethod | ||
| def setUpClass(cls): | ||
| cls.cfg = base.get_e2e_configuration() | ||
| cls.apiclient = api_client.ApiClient(configuration=cls.cfg) | ||
| cls.api = core_v1_api.CoreV1Api(cls.apiclient) | ||
|
|
||
| def _drop(self, cm_name): | ||
| try: | ||
| self.api.delete_namespaced_config_map(name=cm_name, namespace="default") | ||
| except Exception: | ||
| pass | ||
|
|
||
| def _expect(self, ev, label): | ||
| if not ev.wait(timeout=_TIMEOUT): | ||
| self.fail("Timeout waiting for: " + label) | ||
|
|
||
| def _wait_in_cache(self, inf, key): | ||
| stop = time.monotonic() + _TIMEOUT | ||
| while time.monotonic() < stop: | ||
| if inf.cache.get_by_key(key) is not None: | ||
| return | ||
| time.sleep(0.25) | ||
| self.fail("key " + key + " never appeared in cache") | ||
|
|
||
| def _wait_listed(self, inf): | ||
| stop = time.monotonic() + _TIMEOUT | ||
| while inf._resource_version is None and time.monotonic() < stop: | ||
| time.sleep(0.1) | ||
| self.assertIsNotNone(inf._resource_version, "initial list never completed") | ||
|
|
||
| # ------------------------------------------------------- | ||
|
|
||
| def test_cache_populated_after_start(self): | ||
| """Pre-existing ConfigMaps appear in the cache once the informer starts.""" | ||
| name = "inf-pre-" + _uid() | ||
| self.api.create_namespaced_config_map(body=_cm(name), namespace="default") | ||
| self.addCleanup(self._drop, name) | ||
|
|
||
| inf = SharedInformer( | ||
| list_func=self.api.list_namespaced_config_map, | ||
| namespace="default", | ||
| label_selector="inf-e2e=1", | ||
| ) | ||
| inf.start() | ||
| self.addCleanup(inf.stop) | ||
|
|
||
| self._wait_in_cache(inf, "default/" + name) | ||
| cached = inf.cache.get_by_key("default/" + name) | ||
| self.assertEqual(_name_of(cached), name) | ||
| # Verify the cached object actually contains the expected data payload. | ||
| data = cached.data if hasattr(cached, "data") else (cached.get("data") or {}) | ||
| self.assertEqual(data.get("k"), "v") | ||
|
|
||
| def test_added_event_and_cache_entry(self): | ||
| """Creating a ConfigMap fires ADDED and the object appears in the cache.""" | ||
| name = "inf-add-" + _uid() | ||
| seen = threading.Event() | ||
|
|
||
| inf = SharedInformer( | ||
| list_func=self.api.list_namespaced_config_map, | ||
| namespace="default", | ||
| label_selector="inf-e2e=1", | ||
| ) | ||
| inf.add_event_handler(ADDED, lambda o: seen.set() if _name_of(o) == name else None) | ||
| inf.start() | ||
| self.addCleanup(inf.stop) | ||
| self.addCleanup(self._drop, name) | ||
|
|
||
| self._wait_listed(inf) | ||
| self.api.create_namespaced_config_map(body=_cm(name), namespace="default") | ||
| self._expect(seen, "ADDED/" + name) | ||
| self.assertIsNotNone(inf.cache.get_by_key("default/" + name)) | ||
|
|
||
| def test_modified_event_and_cache_refresh(self): | ||
| """Patching a ConfigMap fires MODIFIED and the cache holds the updated object.""" | ||
| name = "inf-mod-" + _uid() | ||
| seen = threading.Event() | ||
|
|
||
| inf = SharedInformer( | ||
| list_func=self.api.list_namespaced_config_map, | ||
| namespace="default", | ||
| label_selector="inf-e2e=1", | ||
| ) | ||
| inf.add_event_handler(MODIFIED, lambda o: seen.set() if _name_of(o) == name else None) | ||
| inf.start() | ||
| self.addCleanup(inf.stop) | ||
| self.addCleanup(self._drop, name) | ||
|
|
||
| self.api.create_namespaced_config_map(body=_cm(name), namespace="default") | ||
| self._wait_in_cache(inf, "default/" + name) | ||
|
|
||
| self.api.patch_namespaced_config_map( | ||
| name=name, namespace="default", body={"data": {"k": "updated"}} | ||
| ) | ||
| self._expect(seen, "MODIFIED/" + name) | ||
| # Verify that the cache now holds the updated data. | ||
| cached = inf.cache.get_by_key("default/" + name) | ||
| self.assertIsNotNone(cached) | ||
| data = cached.data if hasattr(cached, "data") else (cached.get("data") or {}) | ||
| self.assertEqual(data.get("k"), "updated") | ||
|
|
||
| def test_deleted_event_removes_from_cache(self): | ||
| """Deleting a ConfigMap fires DELETED and removes it from the cache.""" | ||
| name = "inf-del-" + _uid() | ||
| seen = threading.Event() | ||
|
|
||
| inf = SharedInformer( | ||
| list_func=self.api.list_namespaced_config_map, | ||
| namespace="default", | ||
| label_selector="inf-e2e=1", | ||
| ) | ||
| inf.add_event_handler(DELETED, lambda o: seen.set() if _name_of(o) == name else None) | ||
| inf.start() | ||
| self.addCleanup(inf.stop) | ||
|
|
||
| self.api.create_namespaced_config_map(body=_cm(name), namespace="default") | ||
| self._wait_in_cache(inf, "default/" + name) | ||
|
|
||
| self.api.delete_namespaced_config_map(name=name, namespace="default") | ||
| self._expect(seen, "DELETED/" + name) | ||
| self.assertIsNone(inf.cache.get_by_key("default/" + name)) | ||
|
|
||
| def test_resource_version_advances(self): | ||
| """The stored resourceVersion advances after watch events are received.""" | ||
| name = "inf-rv-" + _uid() | ||
| seen = threading.Event() | ||
|
|
||
| inf = SharedInformer( | ||
| list_func=self.api.list_namespaced_config_map, | ||
| namespace="default", | ||
| label_selector="inf-e2e=1", | ||
| ) | ||
| inf.add_event_handler(ADDED, lambda o: seen.set() if _name_of(o) == name else None) | ||
| inf.start() | ||
| self.addCleanup(inf.stop) | ||
| self.addCleanup(self._drop, name) | ||
|
|
||
| self._wait_listed(inf) | ||
| rv_before = int(inf._resource_version) | ||
|
|
||
| self.api.create_namespaced_config_map(body=_cm(name), namespace="default") | ||
| self._expect(seen, "ADDED/" + name) | ||
| self.assertGreater(int(inf._resource_version), rv_before) | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| unittest.main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,27 @@ | ||
| # Copyright 2024 The Kubernetes Authors. | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: wrong copyright year? Does it matter? |
||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| from .cache import ObjectCache, _meta_namespace_key | ||
| from .informer import SharedInformer, ADDED, MODIFIED, DELETED, BOOKMARK, ERROR | ||
|
|
||
| __all__ = [ | ||
| "ObjectCache", | ||
| "_meta_namespace_key", | ||
| "SharedInformer", | ||
| "ADDED", | ||
| "MODIFIED", | ||
| "DELETED", | ||
| "BOOKMARK", | ||
| "ERROR", | ||
| ] | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,94 @@ | ||
| # Copyright 2024 The Kubernetes Authors. | ||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| """Thread-safe in-memory store for the Kubernetes informer.""" | ||
|
|
||
| import threading | ||
|
|
||
|
|
||
| def _meta_namespace_key(obj): | ||
| """Build a lookup key from object metadata. | ||
|
|
||
| Supports both dict-based objects and generated model objects. | ||
| Returns namespace/name for namespaced objects, just name otherwise. | ||
| """ | ||
| if isinstance(obj, dict): | ||
| meta = obj.get("metadata") or {} | ||
| ns = meta.get("namespace") or "" | ||
| name = meta.get("name") or "" | ||
| else: | ||
| meta = getattr(obj, "metadata", None) | ||
| if meta is None: | ||
| return "" | ||
| if hasattr(meta, "namespace"): | ||
| ns = getattr(meta, "namespace", None) or "" | ||
| name = getattr(meta, "name", None) or "" | ||
| else: | ||
| ns = meta.get("namespace") or "" | ||
| name = meta.get("name") or "" | ||
| if ns: | ||
| return "{}/{}".format(ns, name) | ||
| return name | ||
|
|
||
|
|
||
| class ObjectCache: | ||
| """Thread-safe in-memory mapping of Kubernetes objects. | ||
|
|
||
| The SharedInformer keeps this store synchronised with the API server. | ||
| Consumers can call list() and get_by_key() from any thread safely. | ||
| """ | ||
|
|
||
| def __init__(self, key_func=None): | ||
| self._key_func = key_func if key_func is not None else _meta_namespace_key | ||
| self._objects = {} | ||
| self._rlock = threading.RLock() | ||
|
|
||
| # --- mutation helpers (called by SharedInformer) --- | ||
|
|
||
| def _put(self, obj): | ||
| key = self._key_func(obj) | ||
| with self._rlock: | ||
| self._objects[key] = obj | ||
|
|
||
| def _remove(self, obj): | ||
| key = self._key_func(obj) | ||
| with self._rlock: | ||
| self._objects.pop(key, None) | ||
|
|
||
| def _replace_all(self, objects): | ||
| rebuilt = {self._key_func(o): o for o in objects} | ||
| with self._rlock: | ||
| self._objects = rebuilt | ||
|
|
||
| # --- public read API --- | ||
|
|
||
| def list(self): | ||
| """Return a snapshot list of all cached objects.""" | ||
| with self._rlock: | ||
| return list(self._objects.values()) | ||
|
|
||
| def list_keys(self): | ||
| """Return a snapshot list of all cache keys.""" | ||
| with self._rlock: | ||
| return list(self._objects.keys()) | ||
|
|
||
| def get(self, obj): | ||
| """Look up the cached copy of obj. Returns None when absent.""" | ||
| key = self._key_func(obj) | ||
| return self.get_by_key(key) | ||
|
|
||
| def get_by_key(self, key): | ||
| """Look up an object by key. Returns None when absent.""" | ||
| with self._rlock: | ||
| return self._objects.get(key) |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/2024/2026/, please update all files
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in 1d10150 — all 6 newly created files now have
Copyright 2026.