Usage help -- disk space, parallel writes

I have what seems like a very straightforward use case for tileDb – I’d like to store a 2d matrix of gene expression data. Typically I would update by samples and read by gene, but the opposite use case is also possible. I am playing around a little bit with a 20k x 50k dense array of floats, and I’m having several issues which make me think I’m doing something wrong.

Informal testing suggested that 1000 x 1000 tiles give the best performance tradeoff, so that’s what I went with.

  1. Disk space. If I write the entire array at once (this matrix is still small enough to keep in memory), the array is ~8GB, exactly as expected. However, if I write single columns at a time, the size quickly balloons (e.g. 40GB after 100 writes), and it stays that way even after calling .considate() and .vacuum().

  2. Parallel writes. If I use multiprocessing to write from 25 concurrent threads (or processes), I get about a 4x speedup compared to writing in serial. Suprisingly, this seems to be true regardless of whether I’m writing to the same tile or spread across tiles! If I look at the running processes with top, most of them are in the “D” waiting for disk access state.

Here’s some of my testing code.

import tiledb
import numpy as np
import os
n_samples=20000
n_genes=50000
uri='test_tiledb_20k_by_50k'

dom = tiledb.Domain(tiledb.Dim(name="samples", domain=(0,n_samples-1), tile=1000, dtype=np.int32),
                    tiledb.Dim(name="genes", domain=(0,n_genes-1), tile=1000, dtype=np.int32))
schema = tiledb.ArraySchema(domain=dom, sparse=False,
                                attrs=[tiledb.Attr(name="a", dtype=np.float64)]
                           )
tiledb.DenseArray.create(uri, schema, overwrite=True)
def get_size(start_path = '.'):
    total_size = 0
    for dirpath, dirnames, filenames in os.walk(start_path):
        for f in filenames:
            fp = os.path.join(dirpath, f)
            # skip if it is symbolic link
            if not os.path.islink(fp):
                total_size += os.path.getsize(fp)

    return total_size

get_size(uri)*1.0/(2**20)
0.00017261505126953125
a=tiledb.array.DenseArray(uri, mode='w')
%timeit -n 5 -r 1 np.random.rand(n_genes)
434 µs ± 0 ns per loop (mean ± std. dev. of 1 run, 5 loops each)
%timeit -n 5 -r 1 a[np.random.randint(n_samples),:]=np.random.rand(n_genes)
1.35 s ± 0 ns per loop (mean ± std. dev. of 1 run, 5 loops each)
get_size(uri)*1.0/(2**20)
1907.7264785766602
L1=np.arange(100)
%timeit -n 1 -r 1 for i in L1: a[i,:]=np.random.rand(n_genes)
2min 14s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
get_size(uri)*1.0/(2**20)
40062.25247859955
a.consolidate()
tiledb.vacuum(uri)
get_size(uri)*1.0/(2**20)
40062.25247859955
import multiprocessing as mp
from multiprocessing import Pool

if mp.get_start_method() is None:
    mp.set_start_method('spawn')

def write(i):
    a[:,i]=np.random.rand(n_samples)
    
p = Pool(25)

%timeit -n 1 -r 1 p.map(write,L1)
24.6 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
L2=np.random.randint(n_samples,size=100)
%timeit -n 1 -r 1 for i in L2: a[i,:]=np.random.rand(n_genes)
2min 14s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
%timeit -n 1 -r 1 p.map(write,L2)
24.3 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
get_size(uri)*1.0/(2**20)
108740.84340190887
a.consolidate()
tiledb.vacuum(uri)
get_size(uri)*1.0/(2**20)
108740.84340190887
(8.0*n_genes*n_samples)/(2**20)
7629.39453125

Hi @Victor_Chubukov, thanks for checking TileDB out!

For (1), please check https://docs.tiledb.com/main/background/internal-mechanics/writing#dense-writes. TileDB always materializes integral dense tiles, even if you are writing the tiles partially. So, each of your “column” writes materializes 20 full tiles (since the first dimension is 20k and your tile extent on the dimension is 1000). To solve this problem, you need to try and write as close to the integral tiles as possible, e.g., buffer 1000 contiguous columns and write 20 full tiles at a time.

For (2), there are a lot of factors in play. If you are using a single process, by default TileDB uses all the threads. If you use multiple processes, you’ll need to reduce the number of CPU and IO threads TileDB is using, otherwise you will experience high contention. Check https://docs.tiledb.com/main/how-to/configuration for the various configurations parameters you can play with.

For your use case, remember that TileDB mainly parallelizes over tiles, so if you have 25 concurrent threads and you write 20 tiles, you may be underutilizing your CPU. TileDB parallelizes other stuff as well (such as chunking within each tile if you specify compression, or across attributes if you have more than one), but it is a good rule of thumb to remember that TileDB is very good at parallelizing across tiles.

I hope the above helps. Please feel free to send more questions our way.

Thanks @stavros . I read that section a few times but I guess I still didn’t understand it – I thought that consolidation would then collapse all the data into one tile.

In any case, for initial loading I can certainly do some clever things to get full tiles into memory and then write them. But what would be your recommended setup for the long term? Suppose the data comes in in small batches of columns, and I want to have efficient querying of individual rows.

Hi @Victor_Chubukov, I forgot to make a comment about consolidation, apologies. Consolidation for dense arrays takes certain parameters that may prevent consolidation from running whatsoever. See these docs (although slightly outdated). By properly tweaking the configuration parameters, you’ll be able to consolidate as desirable.

Regarding best practices, we suggest always writing integral tiles in dense arrays. I understand in this case it might not be possible. It’s in my backlog to write tips for data ingestion under extreme scenarios. Please give me some time and you’ll see them published soon.

Thanks @stavros.

Consolidation for dense arrays takes certain parameters that may prevent consolidation from running whatsoever

I believe that something like that must be happening since the function returns instantly.

It’s in my backlog to write tips for data ingestion under extreme scenarios.

Thanks, that sounds good, but I wonder if I’m still missing something becuase this doesn’t seem like an extreme scenario to me. The way you’ve described it, where I try to write whole tiles, it seems like I basically want to have all the data ready before populating the database. But with all of these features that tileDb has for parallel writes, time travel, etc, it seems clear that it’s designed for constant updating.

I did a brief test with single-row tiles. Certainly the writing worked great, but reading a single column took ~20s which is of course not great.

Perhaps I misunderstood before. Writing row/column tiles should work great for writes, as long as it’s integral tiles.

Here is the challenge you need to think about, which has nothing to do with TileDB (other engines would suffer the same): you need to think about:

  1. The spatial locality of your results on the files on disk
  2. The amount of information you are retrieving from disk to process your query

For (1), if you are writing rows of tiles, but then querying columns, your result tiles will appear in different files on disk. That incurs a lot of unnecessary overhead.

For (2), your tile extents are 1000 x 1000, however you are accessing slabs that are 1000 x 1. This means that you are fetching 999x more data than your actual result (the tile is the atomic unit of IO for both reads and writes).

The above scenario is extreme because you are writing rows but reading columns. Your ideal scenario would be to:

  1. Define extents 20k x 1
  2. Write tiles in columns
  3. Read tiles in columns

Obviously that’s not possible for you, so I need to think about how I would implement your case given the restriction of writing rows but querying columns.