Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pysteps/blending/clim.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,13 @@ def calc_clim_skill(
past_skill = np.array(None)
# check if there is enough data to compute the climatological skill
if not past_skill.any():
print("WARNING: Past skill file is empty, using default BPS2006 skill")
return get_default_skill(n_cascade_levels, n_models)
elif past_skill.shape[0] < window_length:
print(
f"WARNING: Past skill file has fewer days ({past_skill.shape[0]})"
f"than expected ({window_length}). Using default BPS2006 skill"
)
return get_default_skill(n_cascade_levels, n_models)
# reduce window if necessary
else:
Expand Down
49 changes: 39 additions & 10 deletions pysteps/blending/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,8 +416,6 @@ class StepsBlendingState:
# Final outputs
final_blended_forecast: np.ndarray | None = None
final_blended_forecast_non_perturbed: np.ndarray | None = None
weights: np.ndarray | None = None
weights_model_only: np.ndarray | None = None

# Timing and indexing
time_prev_timestep: list[float] | None = None
Expand All @@ -426,10 +424,24 @@ class StepsBlendingState:
is_nowcast_time_step: bool | None = None
subtimestep_index: int | None = None

# Weights used for blending
# Weights used for blending. `weights` / `weights_model_only` are per-worker
# scratch (the current member's weights, used by the downstream blends).
# `weights_per_member` / `weights_model_only_per_member` persist each member's
# weights across timesteps so members do not overwrite one another and the
# full-NWP smoothing reads each member's own previous-timestep weights.

# Current working copy: full weights (extrapolation + NWP + noise)
weights: np.ndarray | None = None

# Current working copy: Extrapolation + radar removed (for when there is no radar)
weights_model_only: np.ndarray | None = None

# List index by j that survives over timesteps (full weights)
weights_per_member: list[Any] | None = None

# List index by j that survives over timesteps (Extrapolation + radar removed)
weights_model_only_per_member: list[Any] | None = None

# This is stores here as well because this is changed during the forecast loop and thus no longer part of the config
extrapolation_kwargs: dict[str, Any] = field(default_factory=dict)

Expand Down Expand Up @@ -665,7 +677,7 @@ def __blended_nowcast_main_loop(self):
def worker(j):
worker_state = copy(self.__state)
self.__determine_NWP_skill_for_next_timestep(t, j, worker_state)
self.__determine_weights_per_component(t, worker_state)
self.__determine_weights_per_component(t, j, worker_state)
self.__regress_extrapolation_and_noise_cascades(j, worker_state, t)
self.__perturb_blend_and_advect_extrapolation_and_noise_to_current_timestep(
t, j, worker_state
Expand Down Expand Up @@ -1629,6 +1641,14 @@ def __prepare_forecast_loop(self):
self.__state.previous_displacement_prob_matching = np.stack(
[None for j in range(self.__config.n_ens_members)]
)
# Per-member blending weights, persisted across timesteps so members do
# not overwrite one another (see StepsBlendingState).
self.__state.weights_per_member = [
None for j in range(self.__config.n_ens_members)
]
self.__state.weights_model_only_per_member = [
None for j in range(self.__config.n_ens_members)
]
self.__state.final_blended_forecast = [
[] for j in range(self.__config.n_ens_members)
]
Expand Down Expand Up @@ -2128,11 +2148,16 @@ def __determine_NWP_skill_for_next_timestep(self, t, j, worker_state):
axis=0,
)

def __determine_weights_per_component(self, t, worker_state):
def __determine_weights_per_component(self, t, j, worker_state):
"""
Compute blending weights for each component based on the selected method
('bps' or 'spn'). Weights are determined for both full blending and
model-only scenarios, accounting for correlations and covariance.

The resulting weights are stored per ensemble member ``j`` in
``weights_per_member`` / ``weights_model_only_per_member`` so members do
not overwrite one another and the full-NWP smoothing reads each member's
own previous-timestep weights.
"""
start_smoothing_to_final_weights = False
if self.__config.timestep_start_full_nwp_weight is not None:
Expand All @@ -2149,7 +2174,7 @@ def __determine_weights_per_component(self, t, worker_state):
)
else:
worker_state.weights = calculate_end_weights(
previous_weights=self.__state.weights,
previous_weights=worker_state.weights_per_member[j],
timestep=t,
n_timesteps=self.__timesteps[-1],
start_full_nwp_weight=self.__config.timestep_start_full_nwp_weight,
Expand Down Expand Up @@ -2212,7 +2237,7 @@ def __determine_weights_per_component(self, t, worker_state):
)
elif start_smoothing_to_final_weights:
worker_state.weights_model_only = calculate_end_weights(
previous_weights=self.__state.weights_model_only,
previous_weights=worker_state.weights_model_only_per_member[j],
timestep=t,
n_timesteps=self.__timesteps[-1],
start_full_nwp_weight=self.__config.timestep_start_full_nwp_weight,
Expand All @@ -2223,8 +2248,11 @@ def __determine_weights_per_component(self, t, worker_state):
"Unknown weights method %s: must be 'bps' or 'spn'"
% self.__config.weights_method
)
self.__state.weights = worker_state.weights
self.__state.weights_model_only = worker_state.weights_model_only
# Persist this member's weights without overwriting other members. The
# shallow copy shares the same list object, so index assignment touches a
# disjoint element per member (mirrors previous_displacement[j]).
worker_state.weights_per_member[j] = worker_state.weights
worker_state.weights_model_only_per_member[j] = worker_state.weights_model_only

def __regress_extrapolation_and_noise_cascades(self, j, worker_state, t):
"""
Expand Down Expand Up @@ -2921,7 +2949,8 @@ def __blend_cascades(self, t_sub, j, worker_state):
covariance=covariance_nwp_models,
)

self.__state.weights = worker_state.weights
# Persist this member's (spn) weights without overwriting other members.
worker_state.weights_per_member[j] = worker_state.weights

# Create weights_with_noise to ensure there is always a 3D weights field, even
# if self.__config.nowcasting_method is "external_nowcast" and n_ens_members is 1.
Expand Down
Loading