Hi @stavros,
Sure!
- I have about 500 trading pairs. For the separate arrays, I use the coordinates: ‘date’ (giving the date when the trade occurred), and ‘aggID’ (giving the trade ID). When combining them into a single array, I use the ‘pair’ as additional coordinate.
Example of single pair (individual arrays):
Example of multiple pairs (one combined array):
Schema for the former
ArraySchema(
domain=Domain(*[
Dim(name='date', domain=(numpy.datetime64('1980-01-01T00:00:00.000000000'), numpy.datetime64('2100-01-01T00:00:00.000000000')), tile=604800000000000 nanoseconds, dtype='datetime64[ns]'),
Dim(name='aggID', domain=(0, 9000000000000000000), tile=10000, dtype='int64'),
]),
attrs=[
Attr(name='price', dtype='float64', var=False, nullable=False, filters=FilterList([LZ4Filter(level=9), ])),
Attr(name='quantity', dtype='float64', var=False, nullable=False, filters=FilterList([LZ4Filter(level=9), ])),
Attr(name='first_tradeID', dtype='int64', var=False, nullable=False, filters=FilterList([LZ4Filter(level=9), ])),
Attr(name='last_tradeID', dtype='int64', var=False, nullable=False, filters=FilterList([LZ4Filter(level=9), ])),
Attr(name='is_buyer_maker', dtype='int8', var=False, nullable=False, filters=FilterList([LZ4Filter(level=9), ])),
],
cell_order='row-major',
tile_order='row-major',
capacity=10000000,
sparse=True,
allows_duplicates=False,
coords_filters=FilterList([ZstdFilter(level=-1), ])
)
Schema for the latter
ArraySchema(
domain=Domain(*[
Dim(name='pair', domain=(None, None), tile=None, dtype='|S0'),
Dim(name='date', domain=(numpy.datetime64('1980-01-01T00:00:00.000000000'), numpy.datetime64('2100-01-01T00:00:00.000000000')), tile=604800000000000 nanoseconds, dtype='datetime64[ns]'),
Dim(name='aggID', domain=(0, 9000000000000000000), tile=10000, dtype='int64'),
]),
attrs=[
Attr(name='price', dtype='float64', var=False, nullable=False, filters=FilterList([LZ4Filter(level=9), ])),
Attr(name='quantity', dtype='float64', var=False, nullable=False, filters=FilterList([LZ4Filter(level=9), ])),
Attr(name='first_tradeID', dtype='int64', var=False, nullable=False, filters=FilterList([LZ4Filter(level=9), ])),
Attr(name='last_tradeID', dtype='int64', var=False, nullable=False, filters=FilterList([LZ4Filter(level=9), ])),
Attr(name='is_buyer_maker', dtype='int8', var=False, nullable=False, filters=FilterList([LZ4Filter(level=9), ])),
],
cell_order='row-major',
tile_order='row-major',
capacity=10000000,
sparse=True,
allows_duplicates=False,
coords_filters=FilterList([ZstdFilter(level=-1), ])
)
-
Here are the 2 functions for writing an array for each pair separately, and for all pairs combined
def to_tiledb500(df,uri):
config = tiledb.Config({
"sm.tile_cache_size":str(30_000_000),
"sm.consolidation.step_min_frags":"5",
"sm.consolidation.step_max_frags":"5",
"sm.consolidation.steps":"10",
"sm.consolidation.buffer_size":str(30_000_000),
"sm.consolidation.step_size_ratio": "0.1",
"sm.dedup_coords":'true',
"py.init_buffer_bytes": str(1024**2 * 400)
})
ctx = tiledb.Ctx(config)
# Domain
dom = tiledb.Domain(
# tiledb.Dim(name="pair", domain=(None,None), tile=None, dtype=np.bytes_),
tiledb.Dim(name="date", domain=(np.datetime64('1980-01-01'), np.datetime64("2100-01-01")),
tile=np.timedelta64(1, 'W'), dtype="datetime64[ns]"),
tiledb.Dim(name="aggID", domain=(0, 9e18), tile=1e4, dtype=np.int64),)
# List of available filters
bit_shuffle = tiledb.BitShuffleFilter()
byte_shuffle = tiledb.ByteShuffleFilter()
RLE = tiledb.RleFilter()
double_delta_encoding = tiledb.DoubleDeltaFilter()
positive_delta_encoding = tiledb.PositiveDeltaFilter()
bit_width_reduction = tiledb.BitWidthReductionFilter(window=int(1e3))
gzip = tiledb.GzipFilter(level=9)
lz4 = tiledb.LZ4Filter(level=9)
bzip2 = tiledb.Bzip2Filter(level=9)
zstd = tiledb.ZstdFilter(level=4)
# Attributes
attrs = [
tiledb.Attr(name="price",dtype=np.float64,ctx=ctx,
filters=tiledb.FilterList([lz4])),
tiledb.Attr(name="quantity",dtype=np.float64,ctx=ctx,
filters=tiledb.FilterList([lz4])),
tiledb.Attr(name="first_tradeID",dtype=np.int64,ctx=ctx,
filters=tiledb.FilterList([lz4])),
tiledb.Attr(name="last_tradeID",dtype=np.int64,ctx=ctx,
filters=tiledb.FilterList([lz4])),
tiledb.Attr(name="is_buyer_maker",dtype=np.int8,ctx=ctx,
filters=tiledb.FilterList([lz4])),
]
# Schema
schema = tiledb.ArraySchema(domain=dom, sparse=True,
attrs=attrs,
# coords_filters=[zstd],
cell_order="row-major",tile_order="row-major",
capacity=int(10e6),ctx=ctx,allows_duplicates=False)
if not os.path.exists(uri):
tiledb.SparseArray.create(uri,schema)
with tiledb.SparseArray(uri, mode='w',ctx=ctx) as A:
A[
# df.index.get_level_values('pair').values,
df.index.get_level_values('date').values,
df.index.get_level_values('aggID').values,] = {
"price":df.price.values,
"quantity":df.quantity.values,
"first_tradeID":df.first_tradeID.values,
"last_tradeID":df.last_tradeID.values,
"is_buyer_maker":df.is_buyer_maker.values
}
def to_tiledb1(df,uri):
config = tiledb.Config({
"sm.tile_cache_size":str(30_000_000),
"sm.consolidation.step_min_frags":"5",
"sm.consolidation.step_max_frags":"5",
"sm.consolidation.steps":"10",
"sm.consolidation.buffer_size":str(30_000_000),
"sm.consolidation.step_size_ratio": "0.1",
"sm.dedup_coords":'true',
"py.init_buffer_bytes": str(1024**2 * 400)
})
ctx = tiledb.Ctx(config)
# Domain
dom = tiledb.Domain(
tiledb.Dim(name="pair", domain=(None,None), tile=None, dtype=np.bytes_),
tiledb.Dim(name="date", domain=(np.datetime64('1980-01-01'), np.datetime64("2100-01-01")),
tile=np.timedelta64(1, 'W'), dtype="datetime64[ns]"),
tiledb.Dim(name="aggID", domain=(0, 9e18), tile=1e4, dtype=np.int64),)
# List of available filters
bit_shuffle = tiledb.BitShuffleFilter()
byte_shuffle = tiledb.ByteShuffleFilter()
RLE = tiledb.RleFilter()
double_delta_encoding = tiledb.DoubleDeltaFilter()
positive_delta_encoding = tiledb.PositiveDeltaFilter()
bit_width_reduction = tiledb.BitWidthReductionFilter(window=int(1e3))
gzip = tiledb.GzipFilter(level=9)
lz4 = tiledb.LZ4Filter(level=9)
bzip2 = tiledb.Bzip2Filter(level=9)
zstd = tiledb.ZstdFilter(level=4)
# Attributes
attrs = [
tiledb.Attr(name="price",dtype=np.float64,ctx=ctx,
filters=tiledb.FilterList([lz4])),
tiledb.Attr(name="quantity",dtype=np.float64,ctx=ctx,
filters=tiledb.FilterList([lz4])),
tiledb.Attr(name="first_tradeID",dtype=np.int64,ctx=ctx,
filters=tiledb.FilterList([lz4])),
tiledb.Attr(name="last_tradeID",dtype=np.int64,ctx=ctx,
filters=tiledb.FilterList([lz4])),
tiledb.Attr(name="is_buyer_maker",dtype=np.int8,ctx=ctx,
filters=tiledb.FilterList([lz4])),
]
# Schema
schema = tiledb.ArraySchema(domain=dom, sparse=True,
attrs=attrs,
# coords_filters=[zstd],
cell_order="row-major",tile_order="row-major",
capacity=int(10e6),ctx=ctx,allows_duplicates=False)
if not os.path.exists(uri):
tiledb.SparseArray.create(uri,schema)
with tiledb.SparseArray(uri, mode='w',ctx=ctx) as A:
A[
df.index.get_level_values('pair').values,
df.index.get_level_values('date').values,
df.index.get_level_values('aggID').values,] = {
"price":df.price.values,
"quantity":df.quantity.values,
"first_tradeID":df.first_tradeID.values,
"last_tradeID":df.last_tradeID.values,
"is_buyer_maker":df.is_buyer_maker.values
}
Here’s some sample data.
And here’s how to write it:
df = pd.read_csv('sample_data.csv').set_index(['pair','date','aggID'])
# Write to single-pair array
df.loc['DOTUSDT'].pipe(to_tiledb500,'DOTUSDT')
# Write to multi-pair array
df.pipe(to_tiledb1,'multi_pairs')
Thanks for the meta-data tip btw - it’s a more elegant solution!