Source code for src.toolbox.steps.custom.qc.impossible_speed_qc

# This file is part of the NOC Autonomy Toolbox.
#
# Copyright 2025-2026 National Oceanography Centre and The Contributors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""QC test to identify impossible speeds in glider data (Currently Disabled)."""

#### Mandatory imports ####
from toolbox.steps.base_qc import BaseQC, register_qc, flag_cols

#### Custom imports ####
import matplotlib.pyplot as plt
import xarray as xr
import numpy as np
import matplotlib


@register_qc
[docs] class impossible_speed_qc(BaseQC): """ Target Variable: TIME, LATITUDE, LONGITUDE Flag Number: 4 (bad data) Variables Flagged: TIME, LATITUDE, LONGITUDE Checks that the the gliders horizontal speed stays below 3m/s NOTE: Currently disabled. Passes all data as good. """
[docs] qc_name = "impossible speed qc"
[docs] expected_parameters = {}
[docs] required_variables = ["TIME", "LATITUDE", "LONGITUDE"]
[docs] qc_outputs = ["TIME_QC", "LATITUDE_QC", "LONGITUDE_QC"]
[docs] def return_qc(self): # Calculate the broken speed as before so the diagnostic plot still shows the issue dt = (self.data["TIME"] - self.data["TIME"].shift(N_MEASUREMENTS=1)) / np.timedelta64(1, "s") lat = self.data["LATITUDE"].where(np.isfinite(self.data["LATITUDE"])).interpolate_na(dim="N_MEASUREMENTS") lon = self.data["LONGITUDE"].where(np.isfinite(self.data["LONGITUDE"])).interpolate_na(dim="N_MEASUREMENTS") lat_rad = np.radians(lat) lon_rad = np.radians(lon) prev_lat_rad = lat_rad.shift(N_MEASUREMENTS=1) prev_lon_rad = lon_rad.shift(N_MEASUREMENTS=1) a = ( np.sin((lat_rad - prev_lat_rad) / 2)**2 + np.cos(prev_lat_rad) * np.cos(lat_rad) * np.sin((lon_rad - prev_lon_rad) / 2)**2 ) distance_m = 6371000.0 * 2 * np.arcsin(np.sqrt(a)) self.absolute_speed = distance_m / dt self.absolute_speed = self.absolute_speed.fillna(0.0) # Force all flags to 1 (Good) so no data is binned flag_values = np.ones(self.data.sizes["N_MEASUREMENTS"], dtype=int) self.flags = xr.Dataset( data_vars={ col + "_QC": ("N_MEASUREMENTS", flag_values) for col in self.required_variables }, coords={"N_MEASUREMENTS": self.data["N_MEASUREMENTS"]}, ) return self.flags
[docs] def plot_diagnostics(self): # Tweakable plot variables plot_size = (7, 7) line_colour = "teal" bad_colour = "magenta" marker_size = 1.5 speed_threshold = 3.0 matplotlib.use("tkagg") fig, axs = plt.subplots(nrows=4, figsize=plot_size, sharex=True, dpi=200) fig.suptitle("TEST DISABLED: All Data Passed\nThis QC needs reviewing.", color="red", weight="bold") time_vals = self.data["TIME"].values abs_speed_vals = self.absolute_speed.values lat_vals = self.data["LATITUDE"].values lon_vals = self.data["LONGITUDE"].values pres_vals = self.data["PRES"].values mask_flag1 = (self.flags["LATITUDE_QC"] == 1).values mask_flag4 = (self.flags["LATITUDE_QC"] == 4).values count_flag1 = np.sum(mask_flag1) count_flag4 = np.sum(mask_flag4) # Plot 1: Absolute Speed axs[0].plot(time_vals, abs_speed_vals, c=line_colour, lw=0.5, zorder=1, label="Raw Speed (m/s)") max_speed = np.nanmax(abs_speed_vals) y_max = 20.0 if max_speed > 20.0 else max(4.0, max_speed * 1.1) axs[0].set( ylabel="Speed (m/s)", ylim=(0, y_max), ) axs[0].axhline(speed_threshold, ls="--", c=bad_colour, lw=1, label=f"Threshold ({speed_threshold}m/s)") axs[0].legend(loc="upper right", fontsize="small") # Plot 2: Latitude over time axs[1].plot( time_vals[mask_flag1], lat_vals[mask_flag1], c=line_colour, ls="", marker="o", markersize=marker_size, label=f"Flag 1 (N={count_flag1})" ) if count_flag4 > 0: axs[1].plot( time_vals[mask_flag4], lat_vals[mask_flag4], c=bad_colour, ls="", marker="o", markersize=marker_size, label=f"Flag 4 (N={count_flag4})" ) axs[1].set(ylabel="Latitude") axs[1].legend(loc="upper right", fontsize="small") # Plot 3: Longitude over time axs[2].plot( time_vals[mask_flag1], lon_vals[mask_flag1], c=line_colour, ls="", marker="o", markersize=marker_size, label=f"Flag 1 (N={count_flag1})" ) if count_flag4 > 0: axs[2].plot( time_vals[mask_flag4], lon_vals[mask_flag4], c=bad_colour, ls="", marker="o", markersize=marker_size, label=f"Flag 4 (N={count_flag4})" ) axs[2].set(ylabel="Longitude") axs[2].legend(loc="upper right", fontsize="small") # Plot 4: Pressure over time axs[3].plot( time_vals[mask_flag1], pres_vals[mask_flag1], c=line_colour, ls="", marker="o", markersize=marker_size, label=f"Flag 1 (N={count_flag1})" ) if count_flag4 > 0: axs[3].plot( time_vals[mask_flag4], pres_vals[mask_flag4], c=bad_colour, ls="", marker="o", markersize=marker_size, label=f"Flag 4 (N={count_flag4})" ) axs[3].set( xlabel="Time", ylabel="Pressure", ) axs[3].invert_yaxis() axs[3].legend(loc="upper right", fontsize="small") fig.tight_layout() plt.show(block=True)