|
1 | | -import datetime |
2 | | -import pins |
3 | | -from pins.errors import PinsError |
4 | 1 | import plotly.express as px |
5 | 2 | import pandas as pd |
6 | | -from datetime import datetime, timedelta |
| 3 | +from datetime import timedelta |
7 | 4 |
|
8 | 5 |
|
9 | 6 | def compute_metrics( |
@@ -75,60 +72,65 @@ def _rolling_df(df: pd.DataFrame, td: timedelta): |
75 | 72 | first = stop |
76 | 73 |
|
77 | 74 |
|
78 | | -def pin_metrics(board, df_metrics, metrics_pin_name, overwrite=False): |
79 | | - pass |
80 | | - |
81 | | - |
82 | | -# """ |
83 | | -# Update an existing pin storing model metrics over time |
84 | | - |
85 | | -# Parameters |
86 | | -# ---------- |
87 | | -# board : |
88 | | -# Pins board |
89 | | -# df_metrics: pd.DataFrame |
90 | | -# Dataframe of metrics over time, such as created by `vetiver_compute_metrics()` |
91 | | -# metrics_pin_name: |
92 | | -# Pin name for where the metrics are stored |
93 | | -# overwrite: bool |
94 | | -# If TRUE (the default), overwrite any metrics for |
95 | | -# dates that exist both in the existing pin and |
96 | | -# new metrics with the new values. If FALSE, error |
97 | | -# when the new metrics contain overlapping dates with |
98 | | -# the existing pin. |
99 | | -# """ |
100 | | -# date_types = (datetime.date, datetime.time, datetime.datetime) |
101 | | -# if not isinstance(df_metrics.index, date_types): |
102 | | -# try: |
103 | | -# df_metrics = df_metrics.index.astype("datetime") |
104 | | -# except TypeError: |
105 | | -# raise TypeError(f"Index of {df_metrics} must be a date type") |
106 | | - |
107 | | -# new_metrics = df_metrics.sort_index() |
108 | | - |
109 | | -# new_dates = df_metrics.index.unique() |
110 | | - |
111 | | -# try: |
112 | | -# old_metrics = board.pin_read(metrics_pin_name) |
113 | | -# except PinsError: |
114 | | -# board.pin_write(metrics_pin_name) |
115 | | - |
116 | | -# overlapping_dates = old_metrics.index in new_dates |
117 | | - |
118 | | -# if overwrite is True: |
119 | | -# old_metrics = old_metrics not in overlapping_dates |
120 | | -# else: |
121 | | -# if overlapping_dates: |
122 | | -# raise ValueError( |
123 | | -# f"The new metrics overlap with dates \ |
124 | | -# already stored in {repr(metrics_pin_name)} \ |
125 | | -# Check the aggregated dates or use `overwrite = True`" |
126 | | -# ) |
127 | | - |
128 | | -# new_metrics = old_metrics + df_metrics |
129 | | -# new_metrics = new_metrics.sort_index() |
130 | | - |
131 | | -# pins.pin_write(board, new_metrics, metrics_pin_name) |
| 75 | +def pin_metrics( |
| 76 | + board, df_metrics, metrics_pin_name: str, index_name="index", overwrite=False |
| 77 | +): |
| 78 | + """ |
| 79 | + Update an existing pin storing model metrics over time |
| 80 | +
|
| 81 | + Parameters |
| 82 | + ---------- |
| 83 | + board : |
| 84 | + Pins board |
| 85 | + df_metrics: pd.DataFrame |
| 86 | + Dataframe of metrics over time, such as created by `vetiver_compute_metrics()` |
| 87 | + metrics_pin_name: |
| 88 | + Pin name for where the metrics are stored |
| 89 | + index_name: |
| 90 | + The column in df_metrics containing the aggregated dates or datetimes. |
| 91 | + Note that this defaults to a column named "index". |
| 92 | + overwrite: bool |
| 93 | + If TRUE (the default), overwrite any metrics for |
| 94 | + dates that exist both in the existing pin and |
| 95 | + new metrics with the new values. If FALSE, error |
| 96 | + when the new metrics contain overlapping dates with |
| 97 | + the existing pin. |
| 98 | + """ |
| 99 | + |
| 100 | + new_dates = df_metrics[index_name] |
| 101 | + |
| 102 | + old_metrics = board.pin_read(metrics_pin_name) |
| 103 | + old_dates = old_metrics[index_name] |
| 104 | + |
| 105 | + # handle overlapping dates ---- |
| 106 | + if new_dates.dtype != old_dates.dtype: |
| 107 | + raise TypeError( |
| 108 | + f"index_name column ({repr(index_name)}) in old and new metrics " |
| 109 | + "must have the same dtype. " |
| 110 | + f"\nOld dtype: {old_dates.dtype}" |
| 111 | + f"\nNew dtype: {new_dates.dtype}" |
| 112 | + ) |
| 113 | + |
| 114 | + indx_old_overlap = old_metrics[index_name].isin(new_dates) |
| 115 | + |
| 116 | + if overwrite: |
| 117 | + # get only rows specific to old metrics, so when we concat below |
| 118 | + # it effectively is an upsert |
| 119 | + old_metrics = old_metrics.loc[~indx_old_overlap, :] |
| 120 | + |
| 121 | + elif not overwrite and indx_old_overlap.any(): |
| 122 | + raise ValueError( |
| 123 | + f"The new metrics overlap with dates already stored in {metrics_pin_name}." |
| 124 | + " Check the aggregated dates or use `overwrite=True`." |
| 125 | + ) |
| 126 | + |
| 127 | + # update and pin ---- |
| 128 | + combined_metrics = pd.concat([old_metrics, df_metrics], ignore_index=True) |
| 129 | + sorted_metrics = combined_metrics.sort_values(index_name) |
| 130 | + |
| 131 | + board.pin_write(sorted_metrics, metrics_pin_name, type="arrow") |
| 132 | + |
| 133 | + return sorted_metrics |
132 | 134 |
|
133 | 135 |
|
134 | 136 | def plot_metrics( |
|
0 commit comments