I’m trying to run consolidation using multi-processing. My EC2 instance consists of 8 cores, 16 vCPUs, and 128 GiB of memory (EC2 instance
r5.4xlarge). I am interested in consolidating fragments for massive sparse arrays (14 million rows and ~60,000 columns, each) ingested into the TileDB-SOMA format.
I have a total of 4 arrays (2 unique arrays data-wise, one stored in col-major and one in row-major). When I run the consolidation one array at a time, I end up using 30-55% of the CPU (based on eye-balling memory usage using
I would like to multi-process the step, by reducing the memory budget such that I can run 4 consolidation processes concurrently using the
ProcessPoolExecutor in Python. How would I tune memory budget/other consolidation parameters such that I can use the entire capacity of my instance without it crashing?
Also just generally looking to understand how consolidation works (steps) and important tuning parameters to consider.
Here’s an example error I get sometimes:
tiledb.cc.TileDBError: [TileDB::SparseGlobalOrderReaderError] Error: Cannot load a single tile for fragment, increase memory budget, tile size : 2098336, per fragment memory 1356419.686710, total budget 10737418240 , num fragments to process 3958
What is the size of the array you are trying to consolidate? I am guessing you are trying to achieve optimal performance for the array post consolidation so knowing the actual size will be helpful to give a more detailed answer later on.
We have been improving memory budgeting for consolidation quite a bit and are planning to make it much more simple in upcoming releases but at the moment it requires a little bit of math. There are two memory budget parameters to take into consideration. The first is ‘sm.mem.total_budget’, which governs the memory used by TileDB for the read/write operation done during consolidation. The second is ‘sm.consolidation.buffer_size’ which governs the memory used for the buffers used to transfer the data between the read and write operation. This is where math is required… We allocate buffers for each attribute/dimension. Fixed size attributes/dimensions require one buffer and var sized ones will require two. Nullable attributes require an extra buffer. The sum of the number of buffers times the buffer size configuration parameter will give the amount of memory used.
Now, depending on the size of your array, we might consider two different consolidation options. The ultimate goal of consolidation is to make is so that the data in side of the fragments is not interleaved with the data from others. We also added a way to split the resulting fragments so that we can still use TileDB’s ability to filter out full fragments when processing range queries on dimensions. I’ll discuss the first in the next paragraph and the other one in the one after.
The first option is to consolidate together. When there are too many fragments to process at once, you might struggle with memory (more on that below where I explain the error you are seeing). You can use ‘sm.consolidation.step_max_frags’ to limit how many fragments get processed at once, but that means you’ll need to run more consolidation steps, which will make the operation take longer. Another option to consider is ‘sm.consolidation.max_fragment_size’. Setting a maximum fragment size will allow to split the result of consolidation into multiple fragments which contain data that is totally de-interleaved.
The second option is to use a new feature of TileDB which is still experimental, the consolidation plan. This feature looks at the fragments in your array and figures out which ones contain data that intersects others. It will group those fragments together to be de-interleaved. It also takes in a maximum fragment size parameter and fragments that are too large will be added to the list of fragments to process. Finally, small fragments might be grouped together, if the result will not intersect any other fragments. The output of the plan will be a list of consolidation nodes that can all be run independently. Each node will contain a list of fragments that need to be consolidated together. Then you can run consolidation on each separate fragment lists (see the fragment_uris parameter of the consolidate API in python). Unfortunately these APIs are not yet exposed to python so you’d have to use the C++ version.
Finally, on the error you are getting. It looks like you are trying to consolidate about 4000 fragments at once, and our current algorithm for consolidation needs to be able to load coordinates for at least one tile (one tile for each dimensions) for each fragment to be consolidated. The ‘sm.mem.total_budget’ default is 10GB and half of that is used to load coordinates. 5GB / 4000 gives about 1.3MB per fragment and in your case, it looks like the coordinates for a fragment is about 2MB. To solve this in this case, you could up ‘sm.mem.total_budget’ by 2-3x or set ‘sm.consolidation.step_max_frags’ to maybe 1000.
Hope that is helpful and let us know if you have more questions.
This is so helpful! Thanks The mem budget is great, I just bumped that up to a fraction of the total memory on the instance and it worked totally fine. Also, I’ll just stick to consolidating arrays separately for now for simplicity’s sake
Here’s information with regards to the post-consolidation tips:
- We have 4 sparse arrays, where each is a combination picked from all possible permutations of (1) raw vs normalized and (2) column optimized vs row optimized
- Raw counts are
np.uint32 while normalized counts are
- There is no interleaving in the fragments since the indices are of type
- Sizes below:
We are trying to optimize for:
- Single column and arbitrarily many rows (100K - 1M)
- All columns, many rows (100K-1M)
- Extents: [ 1,048,576 x 1], Capacity: 131,072
- Extents: [ 1 x 32,768 ], Capacity: 16,384
Please let me know if you have any advice in this regard.
Hello, thanks for the details and I’m glad my initial response was helpful.
I’m not sure what you mean when you say that the data is not interleaved because the indices are of type uint64. Mostly this refers to what region is covered by the data in the fragment (non-empty domain) and if there are lots of intersections between the fragments non-empty domains. If the fragments are fully disjoint, ranged queries should be fairly fast.
That said, from the error message you shared, it looks like you have about 4000 fragments, which isn’t super large. Could you share more details as to why you want to consolidate these arrays… Are some queries running slow? Are you planning on growing these arrays with a lot more data later?
Yes, I made a mistake earlier. The row indices between fragments are not interleaved (they’re appended to the array and assigned new row indices). The column indices between fragments are interleaved.
We want to consolidate so that we can optimize speed and because we will be adding more data.