Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cronjobs/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ dependencies = [
"pygit2>=1.19.0",
"pyjwt>=2.10.1",
"cryptography>=46.0.5",
"google-cloud-bigquery>=3.41.0",
]

[build-system]
Expand Down
114 changes: 114 additions & 0 deletions cronjobs/src/commands/consumption_movers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import heapq
from datetime import date, timedelta

import requests
from decouple import config
from google.cloud import bigquery


SLACK_CHANNEL = config("SLACK_CHANNEL", default="remote-settings-alerts")
SLACK_WEBHOOK_URL = config("SLACK_WEBHOOK_URL", default=None)

PREVIOUS_PERIOD_DAYS = config("PREVIOUS_PERIOD_DAYS", default=90, cast=int)
LAST_PERIOD_DAYS = config("LAST_PERIOD_DAYS", default=7, cast=int)
MIN_AVG_BYTES = config(
"MIN_AVG_BYTES", default=500e9, cast=float
) # 500GB, to filter out noise from small collections.
TOP_N = config("TOP_N", default=5, cast=int)

COLLECTIONS_AVERAGE = """
WITH daily_consumption AS (
SELECT
DATE_TRUNC(timestamp, DAY) AS day,
collection_id,
SUM(size) AS size
FROM `mozdata.remote_settings_logs_aggregates.prod_logs_aggregates`
WHERE timestamp >= TIMESTAMP_SUB(TIMESTAMP_TRUNC(CURRENT_TIMESTAMP(), DAY), INTERVAL {interval_days_start} DAY)
AND timestamp < TIMESTAMP_SUB(TIMESTAMP_TRUNC(CURRENT_TIMESTAMP(), DAY), INTERVAL {interval_days_end} DAY)
AND collection_id IS NOT NULL
GROUP BY day, collection_id
)
SELECT
collection_id,
AVG(size) AS avg_size
FROM daily_consumption
GROUP BY collection_id
"""


def consumption_movers() -> None:
"""
Identify the collections with the biggest increases and decreases in bandwidth consumption,
and posts a message in Slack to alert the team.
"""
end_day = date.today() - timedelta(days=1) # Look at complete days only.
start_day = end_day - timedelta(days=LAST_PERIOD_DAYS)

client = bigquery.Client()
previous = dict(
client.query(
COLLECTIONS_AVERAGE.format(
interval_days_start=PREVIOUS_PERIOD_DAYS + LAST_PERIOD_DAYS,
interval_days_end=LAST_PERIOD_DAYS,
)
).result()
)
last = dict(
client.query(
COLLECTIONS_AVERAGE.format(
interval_days_start=LAST_PERIOD_DAYS,
interval_days_end=0,
)
).result()
)
movers = []
for collection_id, last_avg in last.items():
if collection_id not in previous:
# Skip new collections.
continue

# Skip small collections to avoid noise.
if last_avg < MIN_AVG_BYTES and previous[collection_id] < MIN_AVG_BYTES:
continue

previous_avg = previous[collection_id]
delta = last_avg - previous_avg
pct_change = (last_avg / previous_avg - 1) * 100 if previous_avg > 0 else 100
movers.append(
{
"collection_id": collection_id,
"pct_change": pct_change,
"delta": delta,
}
)

message = f"""*Bandwidth Consumption*
Period: {start_day.strftime("%b %-d")} → {end_day.strftime("%b %-d")}"""

if top_increase := heapq.nlargest(
TOP_N, [m for m in movers if m["pct_change"] > 0], key=lambda m: m["delta"]
):
message += f"""\n
*Top increases* (Compared against previous {PREVIOUS_PERIOD_DAYS}-day daily average)
{"\n".join(f"- `{m['collection_id']}`: {m['pct_change']:+.1f}% " for m in top_increase)}"""

if top_decrease := heapq.nsmallest(
TOP_N, [m for m in movers if m["pct_change"] < 0], key=lambda m: m["delta"]
):
message += f"""\n
*Top decreases*
{"\n".join(f"- `{m['collection_id']}`: {m['pct_change']:+.1f}% " for m in top_decrease)}"""

if not SLACK_WEBHOOK_URL:
print("SLACK_WEBHOOK_URL is not set; message was not sent")
print(f"Slack message:\n{message}")
return

resp = requests.post(
SLACK_WEBHOOK_URL,
json={
"channel": SLACK_CHANNEL,
"text": message,
},
)
resp.raise_for_status()
102 changes: 102 additions & 0 deletions cronjobs/tests/commands/test_consumption_movers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
from unittest.mock import MagicMock, patch

from commands.consumption_movers import (
MIN_AVG_BYTES,
consumption_movers,
)


BIG_SIZE = MIN_AVG_BYTES * 2 # clearly above threshold
SMALL_SIZE = MIN_AVG_BYTES / 10 # clearly below threshold


def run_movers(
previous: dict[str, float],
last: dict[str, float],
webhook_url="https://hooks.slack.com/test",
):
with patch("commands.consumption_movers.bigquery.Client") as mock_client:
mock_client.return_value.query.return_value.result.side_effect = [
previous,
last,
]
with patch("commands.consumption_movers.SLACK_WEBHOOK_URL", webhook_url):
with patch("commands.consumption_movers.requests.post") as mock_post:
mock_post.return_value = MagicMock()
consumption_movers()
return mock_post


def slack_text(mock_post) -> str:
return mock_post.call_args.kwargs["json"]["text"]


def test_new_collections_are_ignored():
mock_post = run_movers(
previous={"known": BIG_SIZE},
last={"known": BIG_SIZE * 1.5, "brand-new": BIG_SIZE * 2},
)
assert "brand-new" not in slack_text(mock_post)


def test_small_collections_are_filtered():
mock_post = run_movers(
previous={"tiny": SMALL_SIZE},
last={"tiny": SMALL_SIZE * 2},
)
assert "tiny" not in slack_text(mock_post)


def test_previously_large_collection_kept():
mock_post = run_movers(
previous={"shrinking": BIG_SIZE},
last={"shrinking": SMALL_SIZE},
)
assert "shrinking" in slack_text(mock_post)


def test_previously_small_collection_kept():
mock_post = run_movers(
previous={"growing": SMALL_SIZE},
last={"growing": BIG_SIZE},
)
assert "growing" in slack_text(mock_post)


def test_no_increase_section_when_none():
mock_post = run_movers(
previous={"down": BIG_SIZE},
last={"down": BIG_SIZE * 0.5},
)
text = slack_text(mock_post)
assert "Top increases" not in text
assert "Top decreases" in text


def test_increases_sorted_by_delta_descending():
mock_post = run_movers(
previous={"big-delta": BIG_SIZE, "small-delta": BIG_SIZE},
last={"big-delta": BIG_SIZE * 3, "small-delta": BIG_SIZE * 1.1},
)
text = slack_text(mock_post)
assert text.index("big-delta") < text.index("small-delta")


def test_decreases_sorted_by_delta_ascending():
mock_post = run_movers(
previous={"big-drop": BIG_SIZE * 3, "small-drop": BIG_SIZE},
last={"big-drop": BIG_SIZE, "small-drop": BIG_SIZE * 0.9},
)
text = slack_text(mock_post)
assert text.index("big-drop") < text.index("small-drop")


def test_slack_payload_shape():
mock_post = run_movers(
previous={"cid": BIG_SIZE},
last={"cid": BIG_SIZE * 2},
)
mock_post.assert_called_once()
payload = mock_post.call_args.kwargs["json"]
assert "channel" in payload
assert "text" in payload
Loading
Loading