Source code for ncas_amof_netcdf_template.util
"""
Reasonably helpful functions that can be often used.
"""
import csv
import datetime as dt
from netCDF4 import Dataset
import numpy as np
import warnings
import json
import yaml
import xml.etree.ElementTree as ET
from typing import Any, Union, Optional
def _map_data_type(data_type: str) -> type:
types_dict = {
"str": str,
"string": str,
"int": int,
"integer": int,
"float": float,
"bool": bool,
}
return types_dict[data_type]
[docs]def check_int(value: Any) -> bool:
"""
Returns True if value can be converted to integer, otherwise returns False.
Args:
value (str): string to test
Returns:
bool: True if value is an integer
"""
try:
int(value)
return True
except ValueError:
return False
except:
raise
[docs]def check_float(value: Any) -> bool:
"""
Returns True if value can be converted to float, otherwise returns False.
Args:
value (str): string to test
Returns:
bool: True if value is a float
"""
try:
float(value)
return True
except ValueError:
return False
except:
raise
[docs]def check_type_convert(value: Any, dtype: type) -> bool:
"""
Returns True if value can be converted to type dtype, otherwise returns False.
Args:
value (str): string to test
dtype (type): type to test
Returns:
bool: True if value can be of type dtype
"""
try:
dtype(value)
return True
except ValueError:
return False
except:
raise
[docs]def read_csv_metadata(metafile: str) -> dict[str, dict[str, Union[str, type]]]:
"""
Returns a dict from a csv with metadata.
Can also include latitude and longitude variables if
they are single values (e.g. point deployment).
Args:
metafile (file): csv file with metadata, one attribute per line
Returns:
dict: metadata from csv as dictionary
"""
with open(metafile, "rt") as meta:
raw_metadata = {} # empty dict
metaread = csv.reader(meta)
for row in metaread:
if len(row) >= 2:
# raw_metadata[row[0]] = {"value": "", "append": "False", "type": "str"}
raw_metadata[row[0]] = {"value": "", "type": "str"}
n = None
# if row[-1].strip().startswith("type=") or row[-1].strip().startswith(
# "append="
# ):
if row[-1].strip().startswith("type="):
# raw_metadata[row[0]][row[-1].strip().split("=")[0]] = (
# row[-1].strip().split("=")[1]
# )
# if row[-2].strip().startswith("type=") or row[
# -2
# ].strip().startswith("append="):
# n = -2
# raw_metadata[row[0]][row[-2].strip().split("=")[0]] = (
# row[-2].strip().split("=")[1]
# )
# else:
# n = -1
raw_metadata[row[0]]["type"] = row[-1].strip().split("=")[1]
n = -1
raw_metadata[row[0]]["value"] = ",".join(row[1:n]).strip()
raw_metadata[row[0]]["type"] = _map_data_type(
raw_metadata[row[0]]["type"]
)
# raw_metadata[row[0]]["append"] = (
# True if raw_metadata[row[0]]["append"].lower() == "true" else False
# )
return raw_metadata
[docs]def read_json_metadata(metafile: str) -> dict[str, dict[str, Union[str, type]]]:
"""
Returns a dict from a JSON with metadata.
Can also include latitude and longitude variables if
they are single values (e.g. point deployment).
Args:
metafile (file): JSON file with metadata
Returns:
dict: metadata from JSON as dictionary
"""
with open(metafile, "rt") as meta:
raw_metadata = json.load(meta)
for key, value in raw_metadata.items():
# Convert all values to strings for now, type will convert later
if not isinstance(value, dict):
# raw_metadata[key] = {"value": str(value), "type": "str", "append": "False"}
raw_metadata[key] = {"value": str(value), "type": "str"}
elif not isinstance(value["value"], str):
raw_metadata[key]["value"] = str(value["value"])
# Set defaults if not present, convert where needed
if "type" not in raw_metadata[key]:
raw_metadata[key]["type"] = "str"
raw_metadata[key]["type"] = _map_data_type(raw_metadata[key]["type"])
# if "append" not in raw_metadata[key]:
# raw_metadata[key]["append"] = False
# elif not isinstance(raw_metadata[key]["append"], bool):
# raw_metadata[key]["append"] = (
# True if raw_metadata[key]["append"].lower() == "true" else False
# )
return raw_metadata
[docs]def read_yaml_metadata(metafile: str) -> dict[str, dict[str, Union[str, type]]]:
"""
Returns a dict from a YAML with metadata.
Can also include latitude and longitude variables if
they are single values (e.g. point deployment).
Args:
metafile (file): YAML file with metadata
Returns:
dict: metadata from YAML as dictionary
"""
with open(metafile, "rt") as meta:
raw_metadata = yaml.safe_load(meta)
for key, value in raw_metadata.items():
# Convert all values to strings for now, type will convert later
if not isinstance(value, dict):
# raw_metadata[key] = {"value": str(value), "type": "str", "append": "False"}
raw_metadata[key] = {"value": str(value), "type": "str"}
elif not isinstance(value["value"], str):
raw_metadata[key]["value"] = str(value["value"])
# Set defaults if not present, convert where needed
if "type" not in raw_metadata[key]:
raw_metadata[key]["type"] = "str"
raw_metadata[key]["type"] = _map_data_type(raw_metadata[key]["type"])
# if "append" not in raw_metadata[key]:
# raw_metadata[key]["append"] = False
# else:
# raw_metadata[key]["append"] = (
# True if raw_metadata[key]["append"].lower() == "true" else False
# )
return raw_metadata
[docs]def read_xml_metadata(metafile: str) -> dict[str, dict[str, Union[str, type]]]:
"""
Returns a dict from a XML with metadata.
Can also include latitude and longitude variables if
they are single values (e.g. point deployment).
Args:
metafile (file): XML file with metadata
Returns:
dict: metadata from XML as dictionary
"""
raw_metadata = {}
tree = ET.parse(metafile)
root = tree.getroot()
for child in root:
# raw_metadata[child.tag] = {"value": "", "append": False, "type": str}
raw_metadata[child.tag] = {"value": "", "type": str}
for subchild in child:
if subchild.tag == "type":
raw_metadata[child.tag]["type"] = _map_data_type(subchild.text)
# elif subchild.tag == "append":
# raw_metadata[child.tag]["append"] = (
# True if subchild.text.lower() == "true" else False
# )
elif subchild.tag == "value":
raw_metadata[child.tag]["value"] = subchild.text
return raw_metadata
[docs]def get_metadata(
metafile: str, file_format: Optional[str] = None
) -> dict[str, dict[str, Union[str, type]]]:
"""
Returns a dict from of metadata from file. Metadata can be in a CSV, JSON, YAML, or XML file.
Can also include latitude and longitude variables if
they are single values (e.g. point deployment).
Args:
metafile (file): file with metadata
file_format (str): format of metadata file. One of 'CSV', 'JSON',
'YAML', or 'XML' if given. Default is None, which means the function
will attempt to detect file type.
Returns:
dict: metadata as dictionary
"""
if metafile.endswith(".csv") or file_format == "CSV":
return read_csv_metadata(metafile)
elif metafile.endswith(".json") or file_format == "JSON":
return read_json_metadata(metafile)
elif (
metafile.endswith(".yaml") or metafile.endswith(".yml") or file_format == "YAML"
):
return read_yaml_metadata(metafile)
elif metafile.endswith(".xml") or file_format == "XML":
return read_xml_metadata(metafile)
else:
warnings.warn(
"Unknown metadata file type, trying csv...", UserWarning, stacklevel=2
)
return read_csv_metadata(metafile)
[docs]def add_metadata_to_netcdf(
ncfile: Dataset,
metadata_file: Optional[str] = None,
file_format: Optional[str] = None,
) -> None:
"""
Reads metadata from csv file using get_metadata, adds values to
global attributes in netCDF file.
Numbers in metadata file are converted to integers or floats unless
they are strings in the format 'number' (e.g. '123').
Can also include latitude and longitude variables if they are
single values (e.g. point deployment), using update_variable function.
Args:
ncfile (netCDF Dataset): Dataset object of netCDF file.
metadata_file (str): file with metadata, one attribute per line.
file_format (str): format of metadata file. One of 'CSV', 'JSON',
'YAML', or 'XML' if given. Default is None, which means the function
will attempt to detect file type.
"""
if metadata_file is not None:
raw_metadata = get_metadata(metadata_file, file_format)
for attr, attr_info in raw_metadata.items():
value = attr_info["value"]
# append_value = attr_info["append"]
valuetype = attr_info["type"]
# if value can be converted to valuetype, do so, otherwise keep as string
if check_type_convert(value, valuetype):
value = valuetype(value)
else:
warnings.warn(
f"Value '{value}' for attribute '{attr}' could not be converted to type '{valuetype}'",
UserWarning,
stacklevel=2,
)
if attr == "latitude" or attr == "longitude":
update_variable(ncfile, attr, value)
# elif append_value and attr in ncfile.ncattrs():
# current_value = ncfile.getncattr(attr)
# if isinstance(current_value, list):
# new_value = current_value.append(value)
# else:
# new_value = [current_value, value]
# ncfile.setncattr(attr, new_value)
else:
ncfile.setncattr(attr, value)
[docs]def add_metadata_from_dict(
ncfile: Dataset, metadata_dict: dict[str, str | int | float]
) -> None:
"""
Take metadata from dictionary and add to global attributes in netCDF file.
Can also include latitude and longitude variables if they are
single values (e.g. point deployment), using update_variable function.
Args:
ncfile (netCDF Dataset): Dataset object of netCDF file.
metadata_dict (dict): Dictionary of attribute name/value pairs.
"""
for key, value in metadata_dict.items():
if key in ["latitude", "longitude"]:
update_variable(ncfile, key, value)
else:
ncfile.setncattr(key, value)
[docs]def get_times(
dt_times: list[dt.datetime],
) -> tuple[
list[float],
list[float],
list[int],
list[int],
list[int],
list[int],
list[int],
list[float],
float,
float,
str,
]:
"""
Returns all time units for AMOF netCDF files from series of datetime objects.
Args:
dt_times (list-like object): object with datetime objects for times
Returns:
lists: unix_times, day-of-year, years, months, days, hours, minutes, seconds
floats: unix time of first and last times (time_coverage_start and
time_coverage_end)
str: date in YYYYmmdd format of first time, (file_date)
"""
unix_times = [i.replace(tzinfo=dt.timezone.utc).timestamp() for i in dt_times]
doy = [i.timetuple().tm_yday for i in dt_times]
years = [i.year for i in dt_times]
months = [i.month for i in dt_times]
days = [i.day for i in dt_times]
hours = [i.hour for i in dt_times]
minutes = [i.minute for i in dt_times]
seconds = [i.second + i.microsecond / 1000000 for i in dt_times]
time_coverage_start_dt = unix_times[0]
time_coverage_end_dt = unix_times[-1]
doy = list(
np.array(doy)
+ np.array([i / 24 for i in hours])
+ np.array([i / (24 * 60) for i in minutes])
+ np.array([i / (24 * 60 * 60) for i in seconds])
)
file_date = ""
if years[0] == years[-1]:
file_date += str(years[0])
if months[0] == months[-1]:
file_date += str(zero_pad_number(months[0]))
if days[0] == days[-1]:
file_date += str(zero_pad_number(days[0]))
if hours[0] == hours[-1]:
file_date += f"-{zero_pad_number(hours[0])}"
if minutes[0] == minutes[-1]:
file_date += str(zero_pad_number(minutes[0]))
if int(seconds[0]) == int(seconds[-1]):
file_date += str(zero_pad_number(int(seconds[0])))
else:
raise ValueError("Incompatible dates - data from over 2 years")
return (
unix_times,
doy,
years,
months,
days,
hours,
minutes,
seconds,
time_coverage_start_dt,
time_coverage_end_dt,
file_date,
)
[docs]def change_qc_flags(
ncfile: Dataset,
ncfile_varname: str,
flag_meanings: list[str] = [],
flag_values: Optional[list[int]] = None,
) -> None:
"""
Change the flag meanings and flag values in a quality control variable from
the default options. The first two flag meanings must be "not_used" and
"good_data", and all spaces in flag meanings will be replaced with underscores.
If given, the first two flag values must be 0 and 1. If not given, flag values
will be worked out based on the number of flag meanings and will be sequential
values.
Args:
ncfile (netCDF Dataset): Dataset object of netCDF file.
ncfile_varname (str): Name of variable in netCDF file.
flag_meanings (list): List of flag meanings to be used in variable.
flag_values (list): List of integer flag values to be used in variable. Will
be automatically worked out if not provided.
"""
if ncfile_varname not in ncfile.variables.keys():
msg = f"Variable {ncfile_varname} not in netCDF file"
raise ValueError(msg)
for i, meaning in enumerate(flag_meanings):
if " " in meaning:
msg = f"Space found in flag meaning '{meaning}', changing to underscore."
warnings.warn(msg)
flag_meanings[i] = meaning.replace(" ", "_")
if flag_meanings[0] != "not_used" or flag_meanings[1] != "good_data":
msg = (
"Invalid flag meanings - first two flag meanings must be 'not_used' "
f"and 'good_data', not '{flag_meanings[0]}' and '{flag_meanings[1]}'."
)
raise ValueError(msg)
if flag_values:
if flag_values[0] != 0 or flag_values[1] != 1:
msg = (
"Invalid flag values - first two flag values must be 0 and 1, "
f"not {flag_values[0]} and {flag_values[1]}."
)
raise ValueError(msg)
if len(flag_values) != len(flag_meanings):
msg = (
f"Different number of flag_values ({len(flag_values)}) "
f"and flag_meanings ({len(flag_meanings)})."
)
raise ValueError(msg)
else:
flag_values = list(range(len(flag_meanings)))
var_type = ncfile[ncfile_varname].dtype
flag_value_array = np.array(flag_values, dtype=var_type)
ncfile[ncfile_varname].setncattr("flag_values", flag_value_array)
ncfile[ncfile_varname].setncattr("flag_meanings", " ".join(flag_meanings))
[docs]def update_variable(
ncfile: Dataset,
ncfile_varname: str,
data: Union[np.ndarray[Any, Any], list[Any]],
qc_data_error: bool = True,
) -> None:
"""
Adds data to variable, and updates valid_min and valid_max
variable attrs if they exist.
Args:
ncfile (netCDF Dataset): Dataset object of netCDF file.
ncfile_varname (str): Name of variable in netCDF file.
data (array or list): Data to be added to netCDF variable.
qc_data_error (bool): Raise error if trying to add values to QC flag
variables that are not in the flag_values attribute.
Otherwise, just a warning is printed. Default True.
"""
if "valid_min" in ncfile.variables[ncfile_varname].ncattrs():
ncfile.variables[ncfile_varname].valid_min = np.float64(np.nanmin(data)).astype(
ncfile.variables[ncfile_varname].datatype
)
ncfile.variables[ncfile_varname].valid_max = np.float64(np.nanmax(data)).astype(
ncfile.variables[ncfile_varname].datatype
)
if (
"qc" in ncfile_varname.lower()
and "flag_values" in ncfile.variables[ncfile_varname].ncattrs()
):
if not np.isin(data, ncfile.variables[ncfile_varname].flag_values).all():
valid_values = ncfile.variables[ncfile_varname].flag_values.tolist()
msg = (
"Invalid data being added to QC variable, "
f"only {valid_values} are allowed."
)
if qc_data_error:
raise ValueError(msg)
else:
print(f"[WARN]: {msg}")
ncfile.variables[ncfile_varname][:] = data
[docs]def zero_pad_number(n: int) -> str:
"""
Returns single digit number n as '0n'
Returns multiple digit number n as 'n'
Used for date or month strings
Args:
n (int): Number
Returns:
str: Number with zero padding if single digit.
"""
if len(f"{n}") == 1:
return f"0{n}"
else:
return f"{n}"