Multiple concurrent writers to append-only sparse array

I’m experimenting with putting an append-only sparse array behind our RNA-seq front-end. That component does read alignment and feature counting, then ends up with a list of { gene, count } pairs for the sample. We do this part at scale and I’d like to write the data into tileDB. What I’m missing is how to append a new “row” (or column) to the array for the sample. I was considering an array of { sample, gene } with count as an int attribute.

Now that I think about it I’d ideally want to store the sample data in a column since the primary use case will be to get all count data for a set of samples and reading these as rows doesn’t make much sense. So, I guess my question still stands, but is more “how do I add a column of data”?

Hi @michaeljon,

If you only need to append new samples, then your array of { sample, gene } with count as an int attribute. suggestion sounds correct – the data schema is fixed, and you can issue multiple writes (possibly in parallel) with different sets of samples written by each array write operation. Here are two examples, the first just sequential append, and the second writing with multiple processes.

nb: if you would like to easily scale up these kind of computations across many nodes, TileDB Cloud Task Graphs can run code like this (and more) across many auto-scaled nodes, see this recent blog post for more details.

  • Simple example: append to array sequentially, and print out several slices:
#%%
import tiledb
import numpy as np
import tempfile

#%%
uri = tempfile.mkdtemp()

# create a sparse array with dimensions (sample, gene) and attribute (count)
dims = [
  tiledb.Dim(name="sample", domain=(0, 100), tile=10, dtype=np.int32),
  tiledb.Dim(name="gene", domain=(0, 2), tile=None, dtype='ascii')
]
dom = tiledb.Domain(*dims)
attr = tiledb.Attr(name="count", dtype=np.int32)

schema = tiledb.ArraySchema(domain=dom, sparse=True, attrs=[attr])

# create a new array
tiledb.Array.create(uri, schema)
#%%
# write first set of test data
s1 = np.arange(10)
g1 = np.array(['gene1', 'gene2', 'gene3', 'gene4', 'gene5', 'gene6', 'gene7', 'gene8', 'gene9', 'gene10'])
c1 = np.random.randint(0, 100, 10)

with tiledb.open(uri, mode='w') as A:
    A[s1, g1] = c1

# %%
# write second set of test data
s1 = np.arange(10, 20)
g2 = np.array(['gene1', 'gene2', 'gene3', 'gene4', 'gene5', 'gene6', 'gene7', 'gene8', 'gene9', 'gene10'])
c2 = np.random.randint(0, 100, 10)

with tiledb.open(uri, mode='w') as A:
    A[s1, g1] = c1
# %%
with tiledb.open(uri) as A:
    print(A.multi_index[:])

# %%
# slice only the first 5 samples
with tiledb.open(uri) as A:
    print(A.multi_index[:5])

# %%
# slice only the 9th, 13th, and 15-17th samples
with tiledb.open(uri) as A:
    print(A.df[[9, 13, 15, 16, 17]])
# %%

  • Multiprocess example: Write sample sets using multiple sub-process workers (note: these are different samples on each worker process):
#%%
import tiledb
import numpy as np
import tempfile
import multiprocessing as mp
if mp.get_start_method(allow_none=True) != 'fork':
  pass #mp.set_start_method('fork')


#%%
def create_array(uri):
    # create a sparse array with dimensions (sample, gene) and attribute (count)
    dims = [
      tiledb.Dim(name="sample", domain=(0, 100), tile=10, dtype=np.int32),
      tiledb.Dim(name="gene", domain=(0, 2), tile=None, dtype='ascii')
    ]
    dom = tiledb.Domain(*dims)
    attr = tiledb.Attr(name="count", dtype=np.int32)

    schema = tiledb.ArraySchema(domain=dom, sparse=True, attrs=[attr])

    # create a new array
    tiledb.Array.create(uri, schema)

#%%
def write_data(uri, samples, genes, counts):
    with tiledb.open(uri, mode='w') as A:
        A[samples, genes] = counts


#%%
def create_data(partitions, partition_size):
    samples = []
    genes = []
    counts = []
    for i,p in enumerate(range(partitions)):
        samples.append(np.arange(i * partition_size, (i+1)*partition_size))
        genes.append(["gene" + str(i) for i in np.random.randint(0, 1000, partition_size)])
        counts.append(np.random.randint(0, 100, partition_size))
    return samples, genes, counts

#%%
def run_batched():
  uri = tempfile.mkdtemp()
  create_array(uri)

  npartitions = 5
  partition_size = 7
  dataset = create_data(npartitions, partition_size)

  # create tuples of data to write on each worker
  items = list(zip([uri for _ in range(npartitions)], *dataset))

  with mp.Pool(4) as pool:
      pool.starmap(write_data, items)

  return uri
# %%
if __name__ == "__main__":
    uri = run_batched()

    with tiledb.open(uri) as A:
      print(A.df[:])

I should also mention that TileDB supports schema evolution which allows adding attributes to an existing array, in case that is helpful.

Hope that helps,
Isaiah

We’re finally getting around to implementing this. I’m clearly missing something here. I can make the above code work, but our sample names are strings. I thought this was going to be straightforward to do. The input data is a 1d array of the int32 count values (in the same order every time).

import tiledb
import numpy as np
from os.path import exists
import shutil

# create the dimensions sequence x gene
gene_dim = tiledb.Dim(name="gene_dim", tile=None, dtype="ascii")
sequence_dim = tiledb.Dim(name="sequence_dim", tile=None, dtype="ascii")

# create the domain
dom = tiledb.Domain(gene_dim, sequence_dim)

# create the count attribute (set the default filler to 0)
count_attr = tiledb.Attr(name="count", dtype=np.int32, fill=0)

# create the array schema
schema = tiledb.ArraySchema(domain=dom, sparse=True, attrs=[count_attr])

ARRAY_NAME = "feature_counts"

if exists(ARRAY_NAME):
    shutil.rmtree(ARRAY_NAME, ignore_errors=True)

tiledb.Array.create(ARRAY_NAME, schema)

If I have the sample’s data in a 1d array then I thought this might work, but tiledb is throwing an error that the dimension sizes don’t match. IndexError: sparse index dimension length mismatch

    with tiledb.open(ARRAY_NAME, mode='w') as A:
        A[[sample], genes] = counts

Could you please try removing the square brackets [] around samples in the assignment?

 A[[sample], genes] = counts

to

 A[sample, genes] = counts

I’d already tried that thinking that I needed to force the indexer to consider a singleton. I get the same error message.

Would it be possible to share a full example, perhaps modifying the snippet I shared to match your workflow?

Dropped the code in GitHub - michaeljon/tiledb-array-repro

There are two cases at the bottom. One reads the “counts” into a DataFrame and walks the rows. The second just hard-codes the three inputs for sample, genes, and counts.

TileDB-Py version: 0.24.0
TileDB core version: (2, 18, 2)

Hey @ihnorton, did you get a chance to look at the code? Curious what I’m doing wrong here.

thanks
mj

Hi @michaeljon,

Thanks for the repro. The coordinate lengths need to match (for both read and write). Fixed here: update indexing to provide matching coordinate lengths by ihnorton · Pull Request #1 · michaeljon/tiledb-array-repro · GitHub

Best,
Isaiah

Thanks again.

I’m clearly missing something here either in the data model or the way I’m thinking of the problem. Our data looks more or less like the counts.tsv (after processing) or a pivoted version of that for a single sample. What I was assuming was that I could query counts “down” the gene column or counts “across” the sample row. But, the data that’s coming back from either A[:"A2MP1"]["count"] is returned what looks that way too much (and unmatched) data. Querying a single sample like A["01595556-0bb1-4416-8da9-cdca78fd4ab4", :]["count"] for its counts seems to work, but I’m missing how to ask for many samples.

er… nevermind… somewhere i missed Array’s df and writing the data works much better opening the array once outside the loop over the data.

df = pd.read_csv("counts.tsv.gz", sep="\t", header=0, index_col=0, compression="gzip")

genes = df.columns
samples = df.index

with tiledb.open(ARRAY_NAME, mode="w") as A:
    for ndx, sample in enumerate(samples):
        counts = df.iloc[ndx, :]
        sample_coords = np.repeat(sample, len(counts))

        A[sample_coords, genes] = counts
1 Like