Source code for src.toolbox.steps.custom.export
# This file is part of the NOC Autonomy Toolbox.
#
# Copyright 2025-2026 National Oceanography Centre and The Contributors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Class definition for exporting data steps."""
from toolbox.steps.base_step import BaseStep, register_step
import toolbox.utils.diagnostics as diag
import json
@register_step
[docs]
class ExportStep(BaseStep):
"""
Step to export data in various formats.
"""
[docs]
step_name = "Data Export"
[docs]
parameter_schema = {
"export_format": {
"type": str,
"default": "netcdf",
"description": "Format to export data (csv, netcdf, hdf5, parquet)"
},
"output_path": {
"type": str,
"default": "./pipeline_output/exported_data.nc",
"description": "Path to save the exported data"
},
"compress_netcdf": {
"type": bool,
"default": True,
"description": "Apply lossless zlib compression to NetCDF exports"
},
"compression_level": {
"type": int,
"default": 6,
"description": "Zlib compression level from 1 (fastest) to 9 (smallest)"
}
}
[docs]
def run(self):
self.log(
f"Exporting data in {self.export_format} format to {self.output_path}"
)
self.check_data()
data = self.context["data"]
if "qc_history" in self.context:
self.log("QC history found in context.")
data.attrs["delayed_qc_history"] = json.dumps(self.context["qc_history"])
if self.export_format not in ["csv", "netcdf", "hdf5", "parquet"]:
raise ValueError(
f"Unsupported export format: {self.export_format}. Supported formats are: csv, netcdf, hdf5, parquet."
)
if not self.output_path:
raise ValueError("Output path must be specified for data export.")
if not isinstance(self.output_path, str):
raise ValueError("Output path must be a string.")
if self.export_format == "csv":
data.to_dataframe().to_csv(self.output_path)
elif self.export_format == "netcdf":
# Apply lossless compression if enabled
if getattr(self, "compress_netcdf", True):
self.log("Applying lossless NetCDF compression.")
comp_level = getattr(self, "compression_level", 6)
# Apply zlib compression to all variables
encoding_dict = {
var_name: {"zlib": True, "complevel": comp_level}
for var_name in data.variables
}
data.to_netcdf(self.output_path, engine="netcdf4", encoding=encoding_dict)
else:
data.to_netcdf(self.output_path, engine="netcdf4")
elif self.export_format == "hdf5":
data.to_netcdf(self.output_path, engine="h5netcdf")
elif self.export_format == "parquet":
data.to_dataframe().to_parquet(self.output_path)
else:
raise ValueError(f"Unsupported export format: {self.export_format}")
self.log(f"Data exported successfully to {self.output_path}")
if self.diagnostics and not self.is_web_mode():
self.generate_diagnostics()
return self.context
[docs]
def generate_diagnostics(self):
"""
Generate diagnostics for the export step natively.
"""
self.log(f"Generating diagnostics for {self.step_name}")
diag.generate_diagnostics(self.context, self.step_name)
self.log("Diagnostics generated successfully.")