Source code for akimbo.io

from __future__ import annotations

import awkward as ak
import fsspec
import numpy as np


def ak_to_series(ds, backend="pandas", extract=True):
    """Make backend-specific series from data"""
    if backend == "pandas":
        import akimbo.pandas

        s = akimbo.pandas.PandasAwkwardAccessor(None).to_output(ds)
    elif backend == "polars":
        import akimbo.polars

        s = akimbo.polars.PolarsAwkwardAccessor(None).to_output(ds)
    elif backend == "dask":
        import akimbo.dask

        # TODO: actually don't use this, use dask-awkward, or dask.dataframe
        s = akimbo.polars.PolarsAwkwardAccessor(None).to_output(ds)
    elif backend == "cudf":
        import akimbo.cudf

        s = akimbo.cudf.CudfAwkwardAccessor(None).to_output(ds)
    elif backend in ["ray", "spark"]:
        raise ValueError("Backend only supports dataframes, not series")

    else:
        raise ValueError("Backend must be in {'pandas', 'polars', 'dask'}")
    if extract and ds.fields:
        return s.ak.unpack()
    return s


# TODO: read_parquet should use native versions rather than convert. This version
#  is OK for pandas
[docs] def read_parquet( url: str, storage_options: dict | None = None, extract: bool = True, backend: str = "pandas", **kwargs, ): """Read a Parquet dataset with nested data into a Series or DataFrame. This may cope with some deeply nested structures that pandas refuses to read by itself. You can pass a selection of columns to read (list of strings), and other columns will not be parsed into memory. Each of these labels may be a root of deeper-nested structs, or use "*" globbing. Parameters ---------- url: data location Directory with data files, single file or glob pattern storage_options: any arguments for an fsspec backend extract: whether to turn top-level records into a dataframe. If False, will return a series. backend: one of "pandas", "polars" or "dask" """ # TODO: is this useful compared to pyarrow.parquet? Describe differences ds = ak.from_parquet(url, storage_options=storage_options, **kwargs) return ak_to_series(ds, backend, extract=extract)
# TODO: should be a map over input files, maybe with newline byte blocks # as in dask
[docs] def read_json( url: str, storage_options: dict | None = None, schema: dict | None = None, extract: bool = True, backend: str = "pandas", **kwargs, ): """Read a JSON dataset with nested data into a Series or DataFrame. You can pass a selection of columns to read (list or jsonschema format), using ``schema=``, and other columns will not be parsed into memory. See the docs for ak.from_json for further details. (examples to come) Parameters ---------- url: data location (may include glob characters) storage_options: any arguments for an fsspec backend schema: if given, the JSONschema expected in the data; this allows for selecting only some part of the record structure, this saving on some parsing time and potentially a lot of memory footprint. Even if reading all the data, providing a schema will lead to better performance. extract: whether to turn top-level records into a dataframe. If False, will return a series. backend: one of "pandas", "polars" or "dask" """ # TODO: implement columns=["field1", "field2.sub", ...] style schema # using dak.lib.io.layout_to_jsonschema with fsspec.open_files(url, **(storage_options or {})) as f: ds = ak.concatenate( [ak.from_json(_, line_delimited=True, schema=schema, **kwargs) for _ in f] ) return ak_to_series(ds, backend, extract=extract)
[docs] def get_json_schema( url: str, storage_options: dict | None = None, nbytes: int = 1_000_000, **kwargs ): """Get JSONSchema representation of the contents of a line-delimited JSON file Currently, requires dask_awkward to be installed, which in turn required dask Parameters ---------- url: file location storage_options: passed to fsspec nbytes: how much of the file to read in infer the types. Must be at least one line, and should be representative of all the data. Returns ------- JSONschema dictionary """ from dask_awkward.lib.io.json import layout_to_jsonschema with fsspec.open(url, **(storage_options or {})) as f: data = f.read(nbytes).rsplit(b"\n", 1)[0] arr = ak.from_json(data, line_delimited=True, **kwargs) return layout_to_jsonschema(arr.layout)
# TODO: should be a map over input files, maybe with newline byte blocks # as in dask
[docs] def read_avro( url: str, storage_options: dict | None = None, extract: bool = True, backend: str = "pandas", **kwargs, ): """Read AVRO structured data files Parameters ---------- url: data location (may include glob characters) storage_options: any arguments for an fsspec backend extract: whether to turn top-level records into a dataframe. If False, will return a series. backend: one of "pandas", "polars" or "dask" """ from awkward._connect.avro import ReadAvroFT from awkward.operations.ak_from_avro_file import _impl with fsspec.open(url, **(storage_options or {})) as f: # TODO: ak.from_avro_file broken with file-like reader = ReadAvroFT(f, limit_entries=None, debug_forth=False) ds = _impl(*reader.outcontents, highlevel=True, attrs=None, behavior=None) return ak_to_series(ds, backend, extract=extract)
[docs] def get_avro_schema( url: str, storage_options: dict | None = None, ): """Fetch ak form of the schema defined in given avro file""" from awkward._connect.avro import ReadAvroFT with fsspec.open(url, "rb", **(storage_options or {})) as f: reader = ReadAvroFT(f, limit_entries=1, debug_forth=False) form, length, container = reader.outcontents return form
# TODO: feather2/arrow format, get schema _jitted = [None] def join( table1: ak.Array, table2: ak.Array, key: str, colname: str = "match", sort: bool = False, rkey: str = None, numba=True, ): """Make nested ORM-style left join on common key in two tables Assuming ``key`` is a field in each table, the output will look like ``table1`` but with an extra column ``colname`` containing a list of records from matching rows in ``table2``. """ rkey = rkey or key # assert key fields are 1D? allow optional? if sort: # indexed view is not cache friendly; real sort is better but copies table1 = table1[ak.argsort(table1[key], axis=0)] table2 = table2[ak.argsort(table2[rkey], axis=0)] if numba: if _jitted[0] is None: try: from numba import njit # per-session cache, cache=True doesn't work _jitted[0] = njit()(_merge) except ImportError: raise ImportError( "numba is required for fast joins, but you can choose to run with" " numba=False" ) merge = _jitted[0] else: merge = _merge counts = np.empty(len(table1), dtype="uint64") # TODO: the line below over-allocates, can switch to something growable matches = np.empty(len(table2), dtype="uint64") # TODO: to_numpy(allow_missing) makes this a bit faster, but is not # not GPU general counts, matches, ind = merge(table1[key], table2[key], counts, matches) matches.resize(int(ind), refcheck=False) indexed = table2[matches] listy = ak.unflatten(indexed, counts) return ak.with_field(table1, listy, colname) def _merge(ind1, ind2, counts, matches): len2 = len(ind2) j = 0 offind = 0 matchind = 0 last = 0 for i in ind1: while True: if j >= len2: break if i > ind2[j]: # ID not yet found j += 1 continue if i < ind2[j]: # no more entrie break # hit while True: matches[matchind] = j j += 1 matchind += 1 if j >= len2 or i != ind2[j]: break counts[offind] = matchind - last last = matchind offind += 1 return counts, matches, matchind