Filters with dask.array.to_tiledb()

While measuring the parallel read throughput of TileDB Embedded from cloud object storage, I noticed that TileDB was significantly slower than Zarr, even when the fragment metadata is consolidated. I think I remember reading somewhere that an attribute is not filtered by default, which would explain the difference in parallel read performance, but I could be wrong. Nevertheless, I set out to apply different compression algorithms to the TileDB array while it was being written.

I enjoy using dask.array.to_tiledb(...) because it makes it extremely easy to write a multidimensional data set to a TileDB Embedded array, so I was hoping there were some compression options that could be passed into the function. From reading the filtering documentation, it seems that the filters have to be applied to the attribute and/or coordinates, where they are then passed into the array schema. My concern is, since dask.array.to_tiledb(...) infers schema from the Dask array, simply adding another argument will not be sufficient.

With all that being said, is there a way for me to simply apply a filter through this Dask function? If not upon write, could a filter be applied afterwards, similar to consolidation (I suspect not because once the data is written it is immutable)?

Hi @jgreen, some comments/questions below:

  • Can you please share the TileDB array schema? I assume that the array is dense, but I’d like to see if the tiling is the same as Zarr’s chunking.
  • I assume your array is stored on GCS (from past conversations), correct?
  • Can you count the number of entries with prefix <your_tiledb_array>/__fragments/?
  • We added a new consolidation and vacuuming mode called commits (you need to call both consolidate and vacuum with it). That’s important if you ended up with thousands of fragments.
  • How are you using the array filter in Dask, can you share some code? We just enabled query condition (i.e., filter) push-down to TileDB, but the result is still a dense array with the non-qualifying cells being filled with special fill values.
  • Can you share the non-empty domain of the array?

In general, for dense arrays configured similarly, you should not be seeing great performance differences between Zarr and TileDB. Let’s see if we can resolve this with proper configuration, otherwise we’ll dig into the perf stats and debug accordingly.

Hello @stavros, thanks for the reply! To address your questions and comments:

  • Array schema (Default output from Dask array to TileDB Embedded array w/ chunksize = ~100MB):
    image
    I can confirm that the tile size is the same shape as I have it in Zarr.

  • Yes, the array is stored in GCS (and I have been able to store it without having to set an environment variable w/ the credentials in my user container).

  • There are 64 fragment files in <tiledb_array/__fragments/

  • I will apply the commits consolidation and vacuum to this array. I remember reading about it when I was first learning about the consolidation steps, but I could not locate any settings in the configuration that would achieve this. I’ll give it another shot.

  • I have not yet figured out how to use array filters with Dask and apply them to a TileDB array, do you have any guidance on an argument I can pass into dask.array.to_tiledb('uri', ...) to set a compressor? I noticed from the schema that the array was not compressed by default, whereas Zarr’s default is the LZ4 algorithm; perhaps this could explain the parallel read performance discrepancy I am seeing.

  • The non-empty domain is ((0, 90519), (0, 93), (0, 191)), which I can also confirm is the same domain as the original NetCDF-4 file.

Some other noteworthy items:

  • A read throughput plot to show you the difference (~6 GB original file size)
    image
    This test was averaged over 3 separate reads for each format, and this is actually the closest I have seen TileDB to Zarr. Usually, TileDB peaks at about 1500 Mbps, but adding compression into the mix would potentially lead to much faster cloud reads.

This doesn’t seem like a big problem now (only 2 seconds difference in execution time), but this particular test is only a stepping stone to testing distributed cluster reads with a 30GB data set; getting any possible configuration problems I am having ironed is top priority right now so that we see an accurate representation of TileDB Embedded’s scaling capabilities.

Thanks, that helps! @ihnorton and @nguyenv should help with setting the compression filters when creating the array from Dask. That should definitely help with performance on cloud object stores.

In order for us to debug further, can you please send us your read code and try to dump the stats per worker? Also, are you using multiple different machines (one Dask worker per machine) or a single machine with one worker per thread? If it’s the latter, you’ll need to configure the number of IO threads per worker, since otherwise each worker will attempt to spin up as many threads as are available, which will adversely impact performance due to contention.

In this specific context I am using a single GCE VM w/ hyperthreading, and I configured Dask Distributed to allocate 2 threads per worker. The benchmarking code can be found in this Jupyter notebook, where I am testing many different file formats for a paper I am writing. The read code is contained within the Benchmarking Setup section and the testing for TileDB is close to the very end of the notebook.

Unfortunately, due to Google bucket security concerns you will not yet be able to run this for yourself in its current state, but I can ask my superiors to make the TileDB array public for a time if you do wish to test.

I will send the stats per worker once the code finishes executing, thanks again!

I configured Dask Distributed to allocate 2 threads per worker

Each worker has a separate TileDB context, which independently utilizes all machine threads by default. You still need to manually pass the configuration parameters for IO and CPU threads to the TileDB context of each worker. These parameters are sm.compute_concurrency_level and sm.io_concurrency_level @nguyenv and @ihnorton may be able to set these configuration parameters manually in our Dask integrations.

Here are the runtime stats for 2, 4, 6, & 8 workers:

Probably you run these stats in the Dask driver, instead of inside each worker. Please add the stat commands inside the code running in each worker and report each worker’s separate stats.

is there a way for me to simply apply a filter through this Dask function?

There should be a way to add this functionality fairly easily. cc-ing @ihnorton who can chime in.

Each worker has a separate TileDB context, which independently utilizes all machine threads by default. You still need to manually pass the configuration parameters for IO and CPU threads to the TileDB context of each worker.

Interesting, I had looked at these parameters on the docs before and thought I was alright with leaving them as the default upper bound of number of cores my machine has. When you say the TileDB context is on a per-worker basis, I conceptually understand it but am confused on exactly how to edit each worker’s context. By changing the TileDB configuration as a global variable I see no speedup in parallel reads, but I am almost certain that this is not what you mean.

I will keep looking through the docs of both Dask and TileDB for a clue on how to do this, but I would greatly appreciate a simple example if you have the time!

Hi @jgreen,

You can use the dask storage_options keyword argument with da.from_tiledb or .to_tiledb in order to set TileDB concurrency (or any other supported config parameters). The argument should be a dict of {'config_option': value}, which is then passed through to tiledb.open on each worker. example:

In [1]: a = da.ones((1000,1000), chunks=(10,10))

In [2]: a.to_tiledb("/tmp/s2", storage_options={"sm.compute_concurrency_level": 8, "sm.io_concurrency_level ": 8})

(I’ve just double-checked and this does work correctly with distributed as well)

Hope this helps!
Isaiah

2 Likes

Awesome, thank you @ihnorton! The last thing I need to figure out has to do with writing with different compressions algorithms with the dask.array.to_tiledb(...).

I have tried playing around with different syntax to include edits to the schema in the call like:

da.to_tiledb(<URI>, storage_options={<config_key>:<value>,
                    "attrs": {"name":<attr_name>, "filters": filter_list}})

or

da.to_tiledb(..., schema={"attrs":{"name":<attr_name>, "filters": filter_list}}

with no luck. If I have to apply filters the regular way I can, but I’m hoping that I am just missing something.

Hi @jgreen,

For now the best way to do this is to pre-create the array with a custom schema setting the filters you want, open that array, and pass the tiledb.Array object in to da.to_tiledb. Example below:

import tiledb, dask, dask.array as da, distributed, numpy as np
import tempfile

from tiledb import Domain, Dim, Attr

uri = tempfile.mkdtemp()

schema = tiledb.ArraySchema(
    Domain([Dim("x", domain=(0,99), tile=100, dtype=np.uint64), Dim("y", domain=(0,99), tile=100, dtype=np.uint64)]),
    attrs=[Attr("v", dtype=np.float64, filters=[tiledb.ZstdFilter(-1)])],
    sparse=False
)


tiledb.Array.create(uri, schema)

tdb_array = tiledb.open(uri, "w")

d_input = da.ones((100,100))
d_input.to_tiledb(tdb_array)

print("URI: ", uri)

I added an item internally to add plumbing to support setting filters directly soon (doing so should not require any changes in the dask to/from_tiledb functions, only TileDB-Py).

Best,
Isaiah

1 Like

Thank you! I really appreciate the examples, and I look forward to seeing how fast TileDB Embedded really is now that I have the correct configuration and compression options taken care of.

1 Like

@ihnorton I am still having an issue with read throughput & write speed, even with the fixes that you and @stavros suggested. I’m not sure how to dump the stats per worker, but here are some results:

  1. New array schema. Follow the example of creating custom schema above, I wrote the array with the line
 da.to_tiledb(tdb_array, storage_options={"sm.compute_concurrency_level": 2, 
"sm.io_concurrency_level ": 2})

with the objective of telling each worker to use 2 threads of my 16 thread machine. The write was far slower that setting both config options to 16 threads.

  1. Throughput plot:
    image

    Below are the sm.compute_concurrency_level and sm.io_concurrency_level settings I passed into dask.array.from_tiledb(...)

    TileDB: Compressed → 2 for both options
    TileDB: Uncompressed → 16 for both options
    TileDB: 16 (This is the same compressed array) → 16 for both options

I’ll keep looking into dumping the stats per worker (I apologize for the lack of knowledge, I am an aerospace engineering undergrad so this isn’t my area of expertise). Getting this right is very important to me, because I want to be able to show scientists their options as more and more work moves to the cloud.

1 Like

Hi @jgreen, let’s do two things:

  1. Please share as much of a reproducible experiment as possible, so that we dig into the configurations and debug performance if necessary. We only need the schema of the array (we can fill dense arrays with synthetic data, that won’t affect performance), and the scripts you write/read.

  2. Can you please perform the same experiment without Dask? TileDB does not rely on Dask for performance, it parallelizes everything in its C++ core. Once we understand your array schema and Dask scripts, we can offer a TileDB-only script. That second experiment might expose very quickly any performance issues with dense arrays. Also it’ll be easier to gather the stats, as there will be a single process and TileDB context.

I am cc-ing @ihnorton and @KiterLuc who will be on top of this and fix any issues as we gather information from you.

Thanks again for reaching out and we look forward to updates.

Stavros

Thank you for getting back to me so quickly. I compiled what I have for TileDB into this Jupyter notebook. It will take me a little while to come up with a system for benchmarking read throughput for TileDB without Dask, because the entire process relies on Dask to scale the local cluster up & store the data as a Dask array.

The TileDB Embedded array I have in Google Cloud storage is not public at the moment. I would make it so if I had permissions, but unfortunately my mentors are occupied with other work for the time being. Let me know if you need anything else from me, and thank you again for taking an in-depth look at this.

@ihnorton @nguyenv and @KiterLuc will be taking a look over then next few days (please allow delays due to folks taking vacation this period).

1 Like