# This file is part of the NOC Autonomy Toolbox.
#
# Copyright 2025-2026 National Oceanography Centre and The Contributors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#### Mandatory imports ####
from toolbox.steps.base_step import BaseStep, register_step
from toolbox.utils.qc_handling import QCHandlingMixin
import toolbox.utils.diagnostics as diag
#### Custom imports ####
import numpy as np
[docs]
def check_config(self, expected_params):
for param in expected_params:
if not hasattr(self, param):
raise KeyError(f"[{self.step_name}] '{param}' is missing from config")
if "_name" in param:
if getattr(self, param) not in self.data.data_vars:
raise KeyError(f"[{self.step_name}] {getattr(self, param)} could not be found in the data")
@register_step
[docs]
class DeriveUncalibratedPhase(BaseStep, QCHandlingMixin):
[docs]
step_name = "Derive Uncalibrated Phase"
[docs]
def run(self):
"""
Example
-------
- name: "Derive Uncalibrated Phase"
parameters:
# <MANDATORY>
blue_phase_name: "BPHASE_DOXY"
# <OPTIONAL>
red_phase_name: "RPHASE_DOXY"
diagnostics: false
Returns
-------
"""
self.filter_qc()
# Check blue_phase_name is present
check_config(self, ("blue_phase_name",))
# Check if the output already exists
if "UNCAL_PHASE_DOXY" in self.data.data_vars:
self.log_warn("UNCAL_PHASE_DOXY already exists in the data. Overwriting...")
# Calculate Uncalibrated phase and specify what QC will be derived from
qc_parents = [f"{self.blue_phase_name}_QC"]
if hasattr(self, "red_phase_name"):
check_config(self, ("red_phase_name",))
self.data["UNCAL_PHASE_DOXY"] = self.data[self.blue_phase_name] - self.data[self.red_phase_name]
qc_parents.append(f"{self.red_phase_name}_QC")
else:
self.data["UNCAL_PHASE_DOXY"] = self.data[self.blue_phase_name]
self.reconstruct_data()
self.update_qc()
self.generate_qc(
{"UNCAL_PHASE_DOXY_QC": qc_parents}
)
if self.diagnostics:
self.generate_diagnostics()
self.context["data"] = self.data
return self.context
[docs]
def generate_diagnostics(self):
pass
@register_step
[docs]
class DeriveOptodeTemperature(BaseStep, QCHandlingMixin):
[docs]
step_name = "Derive Optode Temperature"
[docs]
def run(self):
"""
Example
-------
- name: "Derive Optode Temperature"
parameters:
temp_voltage_name: "TEMP_VOLTAGE_DOXY"
calib_coefficients: [0, 1, 0, 0, 0, 0]
diagnostics: false
Returns
-------
"""
self.filter_qc()
# Check the optode temperature voltage and calibration coefficients are present
check_config(self, ("temp_voltage_name", "calib_coefficients"))
# Check there are at least two coefficients for the polynomial. Fill in missing values.
if len(self.calib_coefficients) < 2:
raise ValueError(f"[{self.step_name}] At least two calibration coefficients are required.")
coeffs = [0] * 6
for i in range(len(self.calib_coefficients)):
coeffs[i] = self.calib_coefficients[i]
# Check if the output already exists
if "TEMP_DOXY" in self.data.data_vars:
self.log_warn("TEMP_DOXY already exists in the data. Overwriting...")
# Calculate temp_doxy
temp_doxy = 0
for i, coeff in enumerate(coeffs):
temp_doxy += (coeff[i] * self.data[self.temp_voltage_name]**i)
self.data["TEMP_DOXY"] = temp_doxy
self.reconstruct_data()
self.update_qc()
self.generate_qc(
{"TEMP_DOXY_QC": [f"{self.temp_voltage_name}_QC"]}
)
if self.diagnostics:
self.generate_diagnostics()
self.context["data"] = self.data
return self.context
[docs]
def generate_diagnostics(self):
pass
@register_step
[docs]
class PhasePressureCorrection(BaseStep, QCHandlingMixin):
[docs]
step_name = "Phase Pressure Correction"
[docs]
def run(self):
"""
Example
-------
- name: "Phase Pressure Correction"
parameters:
optode_pressure_name: "PRES"
correction_coefficient: 0.1
diagnostics: false
Returns
-------
"""
self.filter_qc()
# Check the optode pressure and correction coefficient are present and that UNCAL_PHASE_DOXY is in the data
check_config(self, ("optode_pressure_name", "correction_coefficient"))
if "UNCAL_PHASE_DOXY" not in self.data.data_vars:
raise KeyError(f"[{self.step_name}] UNCAL_PHASE_DOXY required but is missing from the data")
# Apply the correction
self.data["UNCAL_PHASE_DOXY_PCORR"] = (
self.data["UNCAL_PHASE_DOXY"] +
0.001 * self.correction_coefficient * self.data[self.optode_pressure_name]
)
self.reconstruct_data()
self.update_qc()
self.generate_qc(
{"UNCAL_PHASE_DOXY_PCORR_QC": [f"{self.optode_pressure_name}_QC", "UNCAL_PHASE_DOXY_QC"]}
)
if self.diagnostics:
self.generate_diagnostics()
self.context["data"] = self.data
return self.context
[docs]
def generate_diagnostics(self):
pass
@register_step
[docs]
class DeriveCalibratedPhase(BaseStep, QCHandlingMixin):
[docs]
step_name = "Derive Calibrated Phase"
[docs]
def run(self):
"""
Example
-------
- name: "Derive Calibrated Phase"
parameters:
uncalibrated_phase_name: "UNCAL_PHASE_DOXY"
calib_coefficients: [0, 1, 0, 0]
diagnostics: false
Returns
-------
"""
self.filter_qc()
# Check the config satisfies requirements
check_config(self, ("uncalibrated_phase_name", "calib_coefficients"))
# Check there are at least two coefficients for the polynomial. Fill in missing values.
if len(self.calib_coefficients) < 2:
raise ValueError(f"[{self.step_name}] At least two calibration coefficients are required.")
coeffs = [0] * 4
for i in range(len(self.calib_coefficients)):
coeffs[i] = self.calib_coefficients[i]
# Check if the output already exists
if "CAL_PHASE_DOXY" in self.data.data_vars:
self.log_warn("CAL_PHASE_DOXY already exists in the data. Overwriting...")
# Calculate cal_phase_doxy
cal_phase_doxy = 0
for i, coeff in enumerate(coeffs):
cal_phase_doxy += (coeff * self.data[self.uncalibrated_phase_name] ** i)
self.data["CAL_PHASE_DOXY"] = cal_phase_doxy
self.reconstruct_data()
self.update_qc()
self.generate_qc(
{"CAL_PHASE_DOXY_QC": [f"{self.uncalibrated_phase_name}_QC"]}
)
if self.diagnostics:
self.generate_diagnostics()
self.context["data"] = self.data
return self.context
[docs]
def generate_diagnostics(self):
pass
@register_step
[docs]
class DeriveOxygenConcentration(BaseStep, QCHandlingMixin):
[docs]
step_name = "Derive Oxygen Concentration"
[docs]
def func_poly(self):
# Check the calibration matrix has the right shape
if np.shape(self.calib_coefficient_matrix) != (5, 4):
raise ValueError(
f"[{self.step_name}] Calib coefficient matrix must be of shape (5, 4) for method 'poly'."
)
# Build the internal coefficient matrix
coeffs_matrix = np.full((5, 4), 0)
for i, row in enumerate(self.calib_coefficient_matrix):
coeffs_matrix[i, :] = row
# Apply the conversion
poly_temp = np.array([self.data[self.temperature_name].values**i for i in range(4)])[np.newaxis, :, :]
molar_doxy = (
(poly_temp * coeffs_matrix[:, :, np.newaxis]).sum(axis=1) *
np.array([self.data["CAL_PHASE_DOXY"].values ** i for i in range(5)])
).sum(axis=0)
return molar_doxy
[docs]
def func_SVU(self):
# Check the calibration matrix has the right shape
if np.shape(self.calib_coefficient_matrix) != (2, 4):
raise ValueError(
f"[{self.step_name}] Calib coefficient matrix must be of shape (2, 4) for method 'poly'."
)
# Build the internal coefficient matrix
coeffs_matrix = np.full((2, 4), 0)
for i, row in enumerate(self.calib_coefficient_matrix):
coeffs_matrix[i, :] = row
F1, F2 = self.temperature_independent_coefficients
# Apply the conversion
poly_temp = np.array([self.data[self.temperature_name].values ** i for i in range(4)])[np.newaxis, :, :]
coeffs = (poly_temp * coeffs_matrix[:, :, np.newaxis]).sum(axis=1)
# Apply Stern–Volmer equation
molar_doxy = (
F1 / (coeffs[0, :] * self.data["CAL_PHASE_DOXY"] + F2) - 1.0
) * coeffs[1, :]
return molar_doxy
[docs]
def run(self):
"""
Example
-------
- name: "Derive Oxygen Concentration"
parameters:
# <MANDATORY>
method: "poly"
# <METHOD DEPENDENT>
# The following params are for "poly" method
temperature_name: "TEMP"
calib_coefficient_matrix: [
[0, 1, 0, 0],
[0, 1, 0, 0],
[0, 1, 0, 0],
[0, 1, 0, 0],
[0, 1, 0, 0]
]
diagnostics: false
Returns
-------
"""
self.filter_qc()
methods = {
"poly": (
self.func_poly,
("temperature_name", "calib_coefficient_matrix")
),
"SVU": (
self.func_SVU,
("temperature_name", "calib_coefficient_matrix", "temperature_independent_coefficients")
),
}
# Check the specified method
check_config(self, ("method",))
if self.method not in methods.keys():
raise ValueError(f"[{self.step_name}] Unknown method '{self.method}'")
# Unpack the method args and functions
func, args = methods[self.method]
# Check the config satisfies requirements
check_config(self, args)
# Check if the output already exists
if "MOLAR_DOXY" in self.data.data_vars:
self.log_warn("MOLAR_DOXY already exists in the data. Overwriting...")
self.data["MOLAR_DOXY"] = (("N_MEASUREMENTS",), func())
self.reconstruct_data()
self.update_qc()
self.generate_qc(
{"MOLAR_DOXY_QC": ["CAL_PHASE_DOXY_QC", f"{self.temperature_name}_QC"]}
)
if self.diagnostics:
self.generate_diagnostics()
self.context["data"] = self.data
return self.context
[docs]
def generate_diagnostics(self):
pass
@register_step
[docs]
class MolarDOXYSalinityCorrection(BaseStep, QCHandlingMixin):
[docs]
step_name = "Molar DOXY Salinity Correction"
[docs]
def oxy_solubility_salinity_correction(self):
# Get data
T = self.data[self.temperature_name]
S = self.data[self.salinity_name]
# Coefficients (Garcia & Gordon 1992 – Benson & Krause refit)
B0 = -6.24523e-3
B1 = -7.37614e-3
B2 = -1.03410e-2
B3 = -8.17083e-3
C0 = -4.88682e-7
# Scaled temperature term Ts
Ts = np.log((298.15 - T) / (273.15 + T))
# SCorr computation
salinity_correction_factor = np.exp(
(S - self.reference_salinity) * (B0 + B1 * Ts + B2 * (Ts ** 2) + B3 * (Ts ** 3))
+ C0 * ((S ** 2) - (self.reference_salinity ** 2))
)
return salinity_correction_factor
[docs]
def water_vapour_partial_pressure(self, reference_salinity=None):
# Get data
T = self.data[self.temperature_name]
if reference_salinity is None:
S = self.data[self.salinity_name]
else:
S = reference_salinity
# Convert degrees C to Kelvin
T = T + 273.15
# Constants from polynomial equation 10 in Weiss&Price, 1980.
A = 24.4543
B = -67.4509
C = -4.8489
D = -0.000544
# Equation 10 in Weiss&Price, 1980
vapour_partial_pressure = 1013.25 * np.exp(A + B * (100 / T) + C * np.log(T / 100) + D * S)
return vapour_partial_pressure
[docs]
def run(self):
"""
Example
-------
- name: "Molar DOXY Salinity Correction"
parameters:
# <MANDATORY>
salinity_name: "PRAC_SALINITY"
temperature_name: "TEMP"
# <OPTIONAL>
reference_salinity: 0
diagnostics: false
Returns
-------
"""
self.filter_qc()
# Check the requred variable names are specified
check_config(self, ("salinity_name", "temperature_name"))
if "MOLAR_DOXY" not in self.data.data_vars:
raise KeyError(f"[{self.step_name}] MOLAR_DOXY required but is missing from the data")
# Update optional reference salinity
if not hasattr(self, "reference_salinity"):
self.log("No 'reference_salinity' specified, defaulting to 0.")
self.reference_salinity = 0
# Calculate factor with partial pressure of water vapour, following Weiss & PRice (1980)
A = 1013.25 - self.water_vapour_partial_pressure(reference_salinity=self.reference_salinity)
B = 1013.25 - self.water_vapour_partial_pressure()
S_Corr = self.oxy_solubility_salinity_correction()
MOLAR_DOXY_PSAL = (A / B) * S_Corr * self.data["MOLAR_DOXY"]
# Apply the correction
self.data["MOLAR_DOXY_PSAL"] = MOLAR_DOXY_PSAL
self.reconstruct_data()
self.update_qc()
self.generate_qc(
{"MOLAR_DOXY_PSAL_QC":
[f"{self.salinity_name}_QC",
f"{self.temperature_name}_QC",
"MOLAR_DOXY_QC"]
}
)
if self.diagnostics:
self.generate_diagnostics()
self.context["data"] = self.data
return self.context
[docs]
def generate_diagnostics(self):
pass
@register_step
[docs]
class MolarDOXYPressureCorrection(BaseStep, QCHandlingMixin):
[docs]
step_name = "Molar DOXY Pressure Correction"
[docs]
def run(self):
"""
Example
-------
- name: "Molar DOXY Pressure Correction"
parameters:
# <MANDATORY>
pressure_name: "PRES"
temperature_name: "TEMP"
molar_doxy_name: "MOLAR_DOXY_PSAL"
uncalibrated_phase_correction_applied: true
diagnostics: false
Returns
-------
"""
self.filter_qc()
# Check the required variable names are supplied
check_config(
self,
("pressure_name", "temperature_name", "molar_doxy_name", "uncalibrated_phase_correction_applied"),
)
# Set the correction coefficients
if self.uncalibrated_phase_correction_applied:
C1, C2 = 0.00022, 0.0419
else:
C1, C2 = 0.00025, 0.0328
MOLAR_DOXY_PSAL_PRES = (
self.data[self.molar_doxy_name] *
(1.0 + ((C1 * self.data[self.temperature_name] + C2) * self.data[self.pressure_name]) / 1000)
)
# Apply the correction
self.data["MOLAR_DOXY_PSAL_PRES"] = MOLAR_DOXY_PSAL_PRES
self.reconstruct_data()
self.update_qc()
self.generate_qc(
{"MOLAR_DOXY_PSAL_PRES_QC":
[f"{self.pressure_name}_QC",
f"{self.temperature_name}_QC",
f"{self.molar_doxy_name}_QC"]
}
)
if self.diagnostics:
self.generate_diagnostics()
self.context["data"] = self.data
return self.context
[docs]
def generate_diagnostics(self):
pass