Source code for src.toolbox.steps.custom.export

# This file is part of the NOC Autonomy Toolbox.
#
# Copyright 2025-2026 National Oceanography Centre and The Contributors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Class definition for exporting data steps."""

from toolbox.steps.base_step import BaseStep, register_step
import toolbox.utils.diagnostics as diag
import json

@register_step
[docs] class ExportStep(BaseStep): """ Step to export data in various formats. """
[docs] step_name = "Data Export"
[docs] parameter_schema = { "export_format": { "type": str, "default": "netcdf", "description": "Format to export data (csv, netcdf, hdf5, parquet)" }, "output_path": { "type": str, "default": "./pipeline_output/exported_data.nc", "description": "Path to save the exported data" }, "compress_netcdf": { "type": bool, "default": True, "description": "Apply lossless zlib compression to NetCDF exports" }, "compression_level": { "type": int, "default": 6, "description": "Zlib compression level from 1 (fastest) to 9 (smallest)" } }
[docs] def run(self): self.log( f"Exporting data in {self.export_format} format to {self.output_path}" ) self.check_data() data = self.context["data"] if "qc_history" in self.context: self.log("QC history found in context.") data.attrs["delayed_qc_history"] = json.dumps(self.context["qc_history"]) if self.export_format not in ["csv", "netcdf", "hdf5", "parquet"]: raise ValueError( f"Unsupported export format: {self.export_format}. Supported formats are: csv, netcdf, hdf5, parquet." ) if not self.output_path: raise ValueError("Output path must be specified for data export.") if not isinstance(self.output_path, str): raise ValueError("Output path must be a string.") if self.export_format == "csv": data.to_dataframe().to_csv(self.output_path) elif self.export_format == "netcdf": # Apply lossless compression if enabled if getattr(self, "compress_netcdf", True): self.log("Applying lossless NetCDF compression.") comp_level = getattr(self, "compression_level", 6) # Apply zlib compression to all variables encoding_dict = { var_name: {"zlib": True, "complevel": comp_level} for var_name in data.variables } data.to_netcdf(self.output_path, engine="netcdf4", encoding=encoding_dict) else: data.to_netcdf(self.output_path, engine="netcdf4") elif self.export_format == "hdf5": data.to_netcdf(self.output_path, engine="h5netcdf") elif self.export_format == "parquet": data.to_dataframe().to_parquet(self.output_path) else: raise ValueError(f"Unsupported export format: {self.export_format}") self.log(f"Data exported successfully to {self.output_path}") if self.diagnostics and not self.is_web_mode(): self.generate_diagnostics() return self.context
[docs] def generate_diagnostics(self): """ Generate diagnostics for the export step natively. """ self.log(f"Generating diagnostics for {self.step_name}") diag.generate_diagnostics(self.context, self.step_name) self.log("Diagnostics generated successfully.")