-
Notifications
You must be signed in to change notification settings - Fork 94
Add grib_tree method #399
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add grib_tree method #399
Changes from 1 commit
cee6410
8c2ba42
a91a1ca
3f6b93c
6e0eac3
52fdcef
0b22a2d
7f17999
37c65ce
cf63ed4
229c3da
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,10 @@ | ||
| import base64 | ||
| import copy | ||
| import logging | ||
| from collections import defaultdict | ||
| from typing import Iterable, List, Dict, Set | ||
|
|
||
| import ujson | ||
|
|
||
| try: | ||
| import cfgrib | ||
|
|
@@ -354,3 +359,197 @@ def example_combine( | |
| identical_dims=["heightAboveGround", "latitude", "longitude"], | ||
| ) | ||
| return mzz.translate() | ||
|
|
||
|
|
||
| def grib_tree( | ||
| message_groups: Iterable[Dict], | ||
| remote_options=None, | ||
| ) -> Dict: | ||
| """ | ||
| Build a hierarchical data model from a set of scanned grib messages. The iterable input groups should | ||
| be a collection of results from scan_grib. Multiple grib files can be processed together to produce an | ||
| FMRC like collection. | ||
| The time (reference_time) and step coordinates will be used as concat_dims in the MultiZarrToZarr | ||
| aggregation. Each variable name will become a group with nested subgroups representing the grib | ||
| step type and grib level. The resulting hierarchy can be opened as a zarr_group or a xarray datatree. | ||
| Grib message variable names that decode as "unknown" are dropped | ||
| Grib typeOfLevel attributes that decode as unknown are treated as a single group | ||
| Grib steps that are missing due to WrongStepUnitError are patched with NaT | ||
|
|
||
| :param message_groups: a collection of zarr store like dictionaries as produced by scan_grib | ||
| :param remote_options: remote options to pass to ZarrToMultiZarr | ||
| :return: A new zarr store like dictionary for use as a reference filesystem mapper | ||
| """ | ||
| from kerchunk.combine import MultiZarrToZarr | ||
emfdavid marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| # Hard code the filters in the correct order for the group hierarchy | ||
| filters = ["stepType", "typeOfLevel"] | ||
|
|
||
| zarr_store = {} | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Allow for parquet/bring-your-own output?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually, we need to figure out how parquet works at all for deep nested data like this.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since the hierarchy in zarr is just a path embedded in the key... it might must work. |
||
| zroot = zarr.open_group(store=zarr_store) | ||
| result = dict(refs=zarr_store) | ||
|
|
||
| aggregations: Dict[str, List] = defaultdict(list) | ||
| aggregation_dims: Dict[str, Set] = defaultdict(set) | ||
|
|
||
| unknown_counter = 0 | ||
| for msg_ind, group in enumerate(message_groups): | ||
| if "version" not in result: | ||
|
||
| result["version"] = group["version"] | ||
|
|
||
| gattrs = ujson.loads(group["refs"][".zattrs"]) | ||
| coordinates = gattrs["coordinates"].split(" ") | ||
|
|
||
| # Find the data variable | ||
| vname = None | ||
| for key, entry in group["refs"].items(): | ||
| name = key.split("/")[0] | ||
| if name not in [".zattrs", ".zgroup"] and name not in coordinates: | ||
| vname = name | ||
| break | ||
|
|
||
| if vname is None: | ||
| raise RuntimeError( | ||
| f"Can not find a data var for msg# {msg_ind} in {group['refs'].keys()}" | ||
| ) | ||
|
|
||
| if vname == "unknown": | ||
| # To resolve unknown variables add custom grib tables. | ||
| # https://confluence.ecmwf.int/display/UDOC/Creating+your+own+local+definitions+-+ecCodes+GRIB+FAQ | ||
| # If you process the groups from a single file in order, you can use the msg# to compare with the | ||
| # IDX file. | ||
| logger.warning( | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not include the message about what to do in the logging?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Happy to put more of the answer in the log message - though I have not been successful in doing it myself... yet. Example: IDX file entries (messages indexed from 1) The metadata in the NOAA EMC generated idx files suggests we are missing local definitions for a number of variables that represent min, max or avg values over a layer. I have asked NOAA-EMC for help with decoding these properly.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK, we can leave this for future improvement
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Crickets over at NOAA-EMC... I think we might have a working rust gribberish reader before we get a clear answer on how to build the ecCodes tables for these unknown properties.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah the tables are annoying, need to figure that out in rust too to be fully comprehensive |
||
| "Found unknown variable in msg# %s... it will be dropped", msg_ind | ||
| ) | ||
| unknown_counter += 1 | ||
| continue | ||
|
|
||
| logger.debug("Processing vname: %s", vname) | ||
| dattrs = ujson.loads(group["refs"][f"{vname}/.zattrs"]) | ||
| # filter order matters - it determines the hierarchy | ||
| gfilters = {} | ||
| for key in filters: | ||
| attr_val = dattrs.get(f"GRIB_{key}") | ||
| if attr_val is None: | ||
| continue | ||
| if attr_val == "unknown": | ||
| logger.warning( | ||
| "Found 'unknown' attribute value for key %s in var %s of msg# %s", | ||
| key, | ||
| vname, | ||
| msg_ind, | ||
| ) | ||
| # Use unknown as a group or drop it? | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any cases like this?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. From the same HRRR SFCF grib2 files discussed above re: unknown variables, here are two examples of unknown levels: Corresponding idx file entries.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are you saying that we should be able to decode them? The grib object has many many possble attributes, so this seems solvable. Not needed for this PR.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes - I believe these are just more custom codes, similar to the unknown variables above. |
||
|
|
||
| gfilters[key] = attr_val | ||
|
|
||
| zgroup = zroot.require_group(vname) | ||
| if "name" not in zgroup.attrs: | ||
| zgroup.attrs["name"] = dattrs.get("GRIB_name") | ||
|
|
||
| for key, value in gfilters.items(): | ||
| if value: # Ignore empty string and None | ||
| # name the group after the attribute values: surface, instant, etc | ||
| zgroup = zgroup.require_group(value) | ||
| # Add an attribute to give context | ||
| zgroup.attrs[key] = value | ||
|
|
||
| # add to the list of groups to multi-zarr | ||
| aggregations[zgroup.path].append(group) | ||
|
|
||
| # keep track of the level coordinate variables and their values | ||
| for key, entry in group["refs"].items(): | ||
| name = key.split("/")[0] | ||
| if name == gfilters.get("typeOfLevel") and key.endswith("0"): | ||
| if isinstance(entry, list): | ||
| entry = tuple(entry) | ||
| aggregation_dims[zgroup.path].add(entry) | ||
|
|
||
| concat_dims = ["time", "step"] | ||
| identical_dims = ["longitude", "latitude"] | ||
| for path in aggregations.keys(): | ||
| # Parallelize this step! | ||
| catdims = concat_dims.copy() | ||
| idims = identical_dims.copy() | ||
|
|
||
| level_dimension_value_count = len(aggregation_dims.get(path, ())) | ||
| level_group_name = path.split("/")[-1] | ||
| if level_dimension_value_count == 0: | ||
| logger.debug( | ||
| "Path % has no value coordinate value associated with the level name %s", | ||
| path, | ||
| level_group_name, | ||
| ) | ||
| elif level_dimension_value_count == 1: | ||
| idims.append(level_group_name) | ||
| elif level_dimension_value_count > 1: | ||
| # The level name should be the last element in the path | ||
| catdims.insert(3, level_group_name) | ||
|
|
||
| logger.info( | ||
| "%s calling MultiZarrToZarr with idims %s and catdims %s", | ||
| path, | ||
| idims, | ||
| catdims, | ||
| ) | ||
|
|
||
| fix_group_step = add_missing_step_var(aggregations[path], path) | ||
| mzz = MultiZarrToZarr( | ||
| fix_group_step, | ||
| remote_options=remote_options, | ||
| concat_dims=catdims, | ||
| identical_dims=idims, | ||
| ) | ||
| try: | ||
| group = mzz.translate() | ||
| except KeyError: | ||
| import pprint | ||
|
|
||
| gstr = pprint.pformat(fix_group_step) | ||
| logger.exception(f"Failed to multizarr {path}\n{gstr}") | ||
| continue | ||
|
|
||
| for key, value in group["refs"].items(): | ||
| if key not in [".zattrs", ".zgroup"]: | ||
| zarr_store[f"{path}/{key}"] = value | ||
|
|
||
| return result | ||
|
|
||
|
|
||
| def add_missing_step_var(groups: List[dict], path: str) -> List[dict]: | ||
| """ | ||
| Attempt to fill in missing step var. Should this be done where the step unit error is handled | ||
| in scan grib? | ||
| :param groups: | ||
| :param path: | ||
| :return: | ||
| """ | ||
| result = [] | ||
| for group in groups: | ||
| if "step/.zarray" not in group["refs"]: | ||
| group = copy.deepcopy(group) | ||
| logger.warning("Adding missing step variable to group path %s", path) | ||
| group["refs"]["step/.zarray"] = ( | ||
|
||
| '{"chunks":[],"compressor":null,"dtype":"<f8","fill_value":"NaN","filters":null,"order":"C",' | ||
| '"shape":[],"zarr_format":2}' | ||
| ) | ||
| group["refs"]["step/.zattrs"] = ( | ||
| '{"_ARRAY_DIMENSIONS":[],"long_name":"time since forecast_reference_time",' | ||
| '"standard_name":"forecast_period","units":"hours"}' | ||
| ) | ||
|
|
||
| # # Try to set the value - this doesn't work | ||
| # import xarray | ||
| # fo = fsspec.filesystem("reference", fo=group, mode="rw") | ||
| # xd = xarray.open_dataset(fo.get_mapper(), engine="zarr", consolidated=False) | ||
| # if np.isnan(xd.step.values[()]): | ||
| # logger.info("%s has step val %s", path, xd.step) | ||
| # xd.step[()] = xd.valid_time.values - xd.time.values | ||
| # xd.close() | ||
| # for k, v in group["refs"].items(): | ||
| # if "step" in k: | ||
| # logger.info("New step value %s, %s", k, v) | ||
|
|
||
| result.append(group) | ||
|
|
||
| return result | ||
Uh oh!
There was an error while loading. Please reload this page.