Multiple concurrent writers to append-only sparse array

I’m experimenting with putting an append-only sparse array behind our RNA-seq front-end. That component does read alignment and feature counting, then ends up with a list of { gene, count } pairs for the sample. We do this part at scale and I’d like to write the data into tileDB. What I’m missing is how to append a new “row” (or column) to the array for the sample. I was considering an array of { sample, gene } with count as an int attribute.

Now that I think about it I’d ideally want to store the sample data in a column since the primary use case will be to get all count data for a set of samples and reading these as rows doesn’t make much sense. So, I guess my question still stands, but is more “how do I add a column of data”?

Hi @michaeljon,

If you only need to append new samples, then your array of { sample, gene } with count as an int attribute. suggestion sounds correct – the data schema is fixed, and you can issue multiple writes (possibly in parallel) with different sets of samples written by each array write operation. Here are two examples, the first just sequential append, and the second writing with multiple processes.

nb: if you would like to easily scale up these kind of computations across many nodes, TileDB Cloud Task Graphs can run code like this (and more) across many auto-scaled nodes, see this recent blog post for more details.

  • Simple example: append to array sequentially, and print out several slices:
#%%
import tiledb
import numpy as np
import tempfile

#%%
uri = tempfile.mkdtemp()

# create a sparse array with dimensions (sample, gene) and attribute (count)
dims = [
  tiledb.Dim(name="sample", domain=(0, 100), tile=10, dtype=np.int32),
  tiledb.Dim(name="gene", domain=(0, 2), tile=None, dtype='ascii')
]
dom = tiledb.Domain(*dims)
attr = tiledb.Attr(name="count", dtype=np.int32)

schema = tiledb.ArraySchema(domain=dom, sparse=True, attrs=[attr])

# create a new array
tiledb.Array.create(uri, schema)
#%%
# write first set of test data
s1 = np.arange(10)
g1 = np.array(['gene1', 'gene2', 'gene3', 'gene4', 'gene5', 'gene6', 'gene7', 'gene8', 'gene9', 'gene10'])
c1 = np.random.randint(0, 100, 10)

with tiledb.open(uri, mode='w') as A:
    A[s1, g1] = c1

# %%
# write second set of test data
s1 = np.arange(10, 20)
g2 = np.array(['gene1', 'gene2', 'gene3', 'gene4', 'gene5', 'gene6', 'gene7', 'gene8', 'gene9', 'gene10'])
c2 = np.random.randint(0, 100, 10)

with tiledb.open(uri, mode='w') as A:
    A[s1, g1] = c1
# %%
with tiledb.open(uri) as A:
    print(A.multi_index[:])

# %%
# slice only the first 5 samples
with tiledb.open(uri) as A:
    print(A.multi_index[:5])

# %%
# slice only the 9th, 13th, and 15-17th samples
with tiledb.open(uri) as A:
    print(A.df[[9, 13, 15, 16, 17]])
# %%

  • Multiprocess example: Write sample sets using multiple sub-process workers (note: these are different samples on each worker process):
#%%
import tiledb
import numpy as np
import tempfile
import multiprocessing as mp
if mp.get_start_method(allow_none=True) != 'fork':
  pass #mp.set_start_method('fork')


#%%
def create_array(uri):
    # create a sparse array with dimensions (sample, gene) and attribute (count)
    dims = [
      tiledb.Dim(name="sample", domain=(0, 100), tile=10, dtype=np.int32),
      tiledb.Dim(name="gene", domain=(0, 2), tile=None, dtype='ascii')
    ]
    dom = tiledb.Domain(*dims)
    attr = tiledb.Attr(name="count", dtype=np.int32)

    schema = tiledb.ArraySchema(domain=dom, sparse=True, attrs=[attr])

    # create a new array
    tiledb.Array.create(uri, schema)

#%%
def write_data(uri, samples, genes, counts):
    with tiledb.open(uri, mode='w') as A:
        A[samples, genes] = counts


#%%
def create_data(partitions, partition_size):
    samples = []
    genes = []
    counts = []
    for i,p in enumerate(range(partitions)):
        samples.append(np.arange(i * partition_size, (i+1)*partition_size))
        genes.append(["gene" + str(i) for i in np.random.randint(0, 1000, partition_size)])
        counts.append(np.random.randint(0, 100, partition_size))
    return samples, genes, counts

#%%
def run_batched():
  uri = tempfile.mkdtemp()
  create_array(uri)

  npartitions = 5
  partition_size = 7
  dataset = create_data(npartitions, partition_size)

  # create tuples of data to write on each worker
  items = list(zip([uri for _ in range(npartitions)], *dataset))

  with mp.Pool(4) as pool:
      pool.starmap(write_data, items)

  return uri
# %%
if __name__ == "__main__":
    uri = run_batched()

    with tiledb.open(uri) as A:
      print(A.df[:])

I should also mention that TileDB supports schema evolution which allows adding attributes to an existing array, in case that is helpful.

Hope that helps,
Isaiah