Hi @michaeljon,
If you only need to append new samples, then your array of { sample, gene } with count as an int attribute.
suggestion sounds correct – the data schema is fixed, and you can issue multiple writes (possibly in parallel) with different sets of samples written by each array write operation. Here are two examples, the first just sequential append, and the second writing with multiple processes.
nb: if you would like to easily scale up these kind of computations across many nodes, TileDB Cloud Task Graphs can run code like this (and more) across many auto-scaled nodes, see this recent blog post for more details.
- Simple example: append to array sequentially, and print out several slices:
#%%
import tiledb
import numpy as np
import tempfile
#%%
uri = tempfile.mkdtemp()
# create a sparse array with dimensions (sample, gene) and attribute (count)
dims = [
tiledb.Dim(name="sample", domain=(0, 100), tile=10, dtype=np.int32),
tiledb.Dim(name="gene", domain=(0, 2), tile=None, dtype='ascii')
]
dom = tiledb.Domain(*dims)
attr = tiledb.Attr(name="count", dtype=np.int32)
schema = tiledb.ArraySchema(domain=dom, sparse=True, attrs=[attr])
# create a new array
tiledb.Array.create(uri, schema)
#%%
# write first set of test data
s1 = np.arange(10)
g1 = np.array(['gene1', 'gene2', 'gene3', 'gene4', 'gene5', 'gene6', 'gene7', 'gene8', 'gene9', 'gene10'])
c1 = np.random.randint(0, 100, 10)
with tiledb.open(uri, mode='w') as A:
A[s1, g1] = c1
# %%
# write second set of test data
s1 = np.arange(10, 20)
g2 = np.array(['gene1', 'gene2', 'gene3', 'gene4', 'gene5', 'gene6', 'gene7', 'gene8', 'gene9', 'gene10'])
c2 = np.random.randint(0, 100, 10)
with tiledb.open(uri, mode='w') as A:
A[s1, g1] = c1
# %%
with tiledb.open(uri) as A:
print(A.multi_index[:])
# %%
# slice only the first 5 samples
with tiledb.open(uri) as A:
print(A.multi_index[:5])
# %%
# slice only the 9th, 13th, and 15-17th samples
with tiledb.open(uri) as A:
print(A.df[[9, 13, 15, 16, 17]])
# %%
- Multiprocess example: Write sample sets using multiple sub-process workers (note: these are different samples on each worker process):
#%%
import tiledb
import numpy as np
import tempfile
import multiprocessing as mp
if mp.get_start_method(allow_none=True) != 'fork':
pass #mp.set_start_method('fork')
#%%
def create_array(uri):
# create a sparse array with dimensions (sample, gene) and attribute (count)
dims = [
tiledb.Dim(name="sample", domain=(0, 100), tile=10, dtype=np.int32),
tiledb.Dim(name="gene", domain=(0, 2), tile=None, dtype='ascii')
]
dom = tiledb.Domain(*dims)
attr = tiledb.Attr(name="count", dtype=np.int32)
schema = tiledb.ArraySchema(domain=dom, sparse=True, attrs=[attr])
# create a new array
tiledb.Array.create(uri, schema)
#%%
def write_data(uri, samples, genes, counts):
with tiledb.open(uri, mode='w') as A:
A[samples, genes] = counts
#%%
def create_data(partitions, partition_size):
samples = []
genes = []
counts = []
for i,p in enumerate(range(partitions)):
samples.append(np.arange(i * partition_size, (i+1)*partition_size))
genes.append(["gene" + str(i) for i in np.random.randint(0, 1000, partition_size)])
counts.append(np.random.randint(0, 100, partition_size))
return samples, genes, counts
#%%
def run_batched():
uri = tempfile.mkdtemp()
create_array(uri)
npartitions = 5
partition_size = 7
dataset = create_data(npartitions, partition_size)
# create tuples of data to write on each worker
items = list(zip([uri for _ in range(npartitions)], *dataset))
with mp.Pool(4) as pool:
pool.starmap(write_data, items)
return uri
# %%
if __name__ == "__main__":
uri = run_batched()
with tiledb.open(uri) as A:
print(A.df[:])
I should also mention that TileDB supports schema evolution which allows adding attributes to an existing array, in case that is helpful.
Hope that helps,
Isaiah