Reading Bagz Files#

This tutorial gives an overview of integrating Bagz file format into Grain pipeline. Bagz, an alternative to ArrayRecord, is a novel file format which supports per-record compression and fast index-based lookup. It can also integrate with Apache Beam, a feature that we’re going to present in this tutorial first.

Setup#

To start we need to make sure we have all required packages. We pin JAX’s version as the latest Apache Beam doesn’t support NumPy 2.0 yet.

%pip install grain bagz apache-beam jax==0.4.38
import grain
import bagz
from bagz.beam import bagzio
import numpy as np
import pathlib
import random
import apache_beam as beam
assert np.__version__[0] == "1", "Apache Beam requires NumPy<2"

Apache Beam#

Likewise ArrayRecord, Bagz package can also integrate with the Apache Beam library to build ETL pipelines. In the example below we construct a pipeline which consumes some in-memory list, performs simple transformations, and loads outputs to a Bagz file with a bagzio module. @* in the filename indicates that we will have an unspecified number of shards this pipeline. To learn more about sharding in Bagz, please see Bagz docs.

with beam.Pipeline() as pipeline:
  data = ["record1", "record2", "record3"]
  _ = (
      pipeline
      | 'CreateData' >> beam.Create(data)
      | 'Capitalize' >> beam.Map(lambda x: x.upper())
      | 'Encode' >> beam.Map(lambda x: x.encode())
      | 'WriteData' >> bagzio.WriteToBagz('beam_data@*.bagz')
  )
WARNING:apache_beam.runners.interactive.interactive_environment:Dependencies required for Interactive Beam PCollection visualization are not available, please use: `pip install apache-beam[interactive]` to install necessary dependencies to enable all data visualization features.
file = pathlib.Path("beam_data-00000-of-00001.bagz")
reader = bagz.Reader(file)
print(list(reader))
[b'RECORD1', b'RECORD2', b'RECORD3']

Creating and reading Bagz files#

As Bagz format is record-based we can use a simple loop and bagz.Writer context manager to write our contents to the output file.

random.seed(42)

records = list(f"Record: {random.randint(100, 1000)}" for _ in range(40))

file = pathlib.Path("data.bagz")

with bagz.Writer(file) as writer:
    for rec in records:
        writer.write(rec)

Bagz supports random access, therefore we can lookup items by index, check length of the file, and slice it arbitrarily.

reader = bagz.Reader(file)

print(len(reader))

print(reader[10])

print(list(reader[5:15]))
40
b'Record: 792'
[b'Record: 350', b'Record: 328', b'Record: 242', b'Record: 854', b'Record: 204', b'Record: 792', b'Record: 858', b'Record: 658', b'Record: 189', b'Record: 704']

Grain pipeline with Bagz files#

With random access in mind, we can now consume Bagz files in a Grain pipeline with grain.MapDataset class. Then applying any transformation is the same as with other sources, such as ArrayRecord files.

dataset = (
    grain.MapDataset.source(reader)
    .shuffle(seed=42)
    .map(lambda x: x.decode())  # move from bytes to strings
    .filter(lambda x: x[-1] != "6")  # let's filter out some files
    .map(lambda x: x.upper())  # and capitalize them
    .to_iter_dataset()
)
print(f"Filtered out: {len(reader) - len(list(dataset))} records.")

list(dataset)
Filtered out: 2 records.
['RECORD: 704',
 'RECORD: 674',
 'RECORD: 877',
 'RECORD: 189',
 'RECORD: 325',
 'RECORD: 323',
 'RECORD: 303',
 'RECORD: 125',
 'RECORD: 381',
 'RECORD: 990',
 'RECORD: 127',
 'RECORD: 859',
 'RECORD: 858',
 'RECORD: 204',
 'RECORD: 854',
 'RECORD: 350',
 'RECORD: 132',
 'RECORD: 338',
 'RECORD: 928',
 'RECORD: 532',
 'RECORD: 328',
 'RECORD: 818',
 'RECORD: 833',
 'RECORD: 658',
 'RECORD: 195',
 'RECORD: 214',
 'RECORD: 529',
 'RECORD: 765',
 'RECORD: 617',
 'RECORD: 384',
 'RECORD: 658',
 'RECORD: 559',
 'RECORD: 703',
 'RECORD: 925',
 'RECORD: 130',
 'RECORD: 792',
 'RECORD: 242',
 'RECORD: 754']