Tabular V2 - Reference
table methods
Table.create()
tab.create(pa_schema)
will raise FileAlreadyExists
if the table already has a schema
expects a valid pyarrow.Schema
metadata
some fields in the metadata have special meaning, and will change the behaviour of the table:
index
: the column will be used to dynamically partition the data, giving a boost to queries on that fieldisGeometry
: the column will be expected to be either wkt or wkb (based on the pyarrow type)big
: if set, the column can contain long text or binary data
experimental metadata:
aggr
: specify how aggregation should be done on this field:sum
,count
,avg
,min
ormax
isLat
: specify that a column is latitudeisLon
: specify that a column is longitude
Table.drop()
drop the schema and all the data in the column this operation is not reversible
tab.drop()
select
fetch rows from a table, with optional filtering and column selection, will return a cursor:
for row in tab.select().rows():
print(row)
Cursor
when selecting, a cursor will be returned. the cursor must be iterated over to perform any action.
To iterate over a cursor, you must specify how you want to fetch the data:
* rows()
: will return a generator of rows
* dataframes()
: will return a generator of pandas.DataFrame
* pages(N)
: will return a generator of pages, each page containing N rows
* batches()
: will return a generator or pyarrow.RecordBatch
Note: if pages(0)
, the cursor will return a page as soon as a chunk is obtained from the server
filtering
The first argument of a select is the filter that will be applied to the data. If no filter is specified, all the data will be returned.
The syntax for filters is similar to pyarrow.compute.Expression
, with only few differences.
tab.select('age < 26')
tab.select('age < 26 and name == "Alice""')
tab.select('age < 26 or name == "Alice"')
tab.select('age + 10 < 26')
tab.select('age < (1+2)*3')
to support geospatial queries we have the following operators:
tab.select('geofield intersect "POLYGON ((0 0, 0 1, 1 1, 1 0, 0 0))"`)
tab.select('geofield contains "POLYGON ((0 0, 0 1, 1 1, 1 0, 0 0))"`)
tab.select('geofield within "POLYGON ((0 0, 0 1, 1 1, 1 0, 0 0))"`)
tab.select('geofield ~ "POLYGON ((0 0, 0 1, 1 1, 1 0, 0 0))"`)
The last operator acts as a greedy match, and will return all the rows that have a geometry that might intersect each other. It is designed to be as fast as possible, but the user must handle the false positives later. This can be very useful for large datasets in some cases.
column selection
you can specify a subset of columns to be returned, further limiting the computation and network needs,
by specifying the argument cols
as a list of strings:
for row in tab.select(cols=["id", "name"]).rows():
print(row)
bind variables
since pyarrow.compute
does not play well with datetime
or timestamp
objects, or more in general
where validation of input is needed, we have allow the user to use bind variables in the query:
tab.select('age < 26') # no bind vars
tab.select('age < $age', vars={"age": 26}) # bind by name
tab.select('age < ?', vars=[26]) # bind by position
variables can be specified as named or positional.
This becomes necessary when using structured types, like timestamp
:
tab.create(pa.schema([
pa.field("id", pa.string(), nullable=False),
pa.field("ts", pa.timestamp("ms"), nullable=False),
]))
tab.select('ts < ?', vars=[datetime.datetime.now()])
Table.aggregate(group_by, query=None, aggr=None)
experimental
allow for aggregation of data using a group by
expression, an optional filter
, and aggr
definition
tab.create(pa.schema([
pa.field("id", pa.string(), nullable=False),
pa.field("country", pa.string(), nullable=False),
pa.field("name", pa.string(), nullable=False),
pa.field("age", pa.int32(), nullable=False, metadata={"aggr": "avg"}),
]))
with tab as tx:
tx.insert([
{"id": "1", "name": "Alice", "country": "UK", "age": 25},
{"id": "2", "name": "Bob", "country": "UK", "age": 30},
{"id": "3", "name": "Charlie", "country": "US", "age": 35},
])
data = tab.aggregate('country')
# UK 27.5
# US 35
data is returned as a DataFrame
with an index set to the group by field.
group by
the group by
expression is mandatory, and can be a field or an expression:
tab.aggregate('country') # will return 2 rows, one for UK and one for US
tab.aggregate('age>30') # will return 2 rows, with True and False each
there are some special functions that can be used here, like h3
:
h3(lat, lon, res)
this special function can be used on a point cloud to group the data by h3 tile, given a resolution:
tab.aggregate('h3(lat, lon, 8)') # compute the h3 tile of each row based on lat and lon
this assumes that lat
and lon
have valid latitude and longitude values, refer to h3 documentation for more information
Filtering
the query
clause is optional, and can be used to filter the data before aggregation. Follow the same syntax as the select
method:
tab.aggregate("country", query='age > 30')
aggr
the aggr
specifies which fields need to be aggregated, and how.
if not specified, the table schema is used to determine the aggregation method.
if no aggregation method is specified, the field is skipped.
tab.aggregate("country", aggr={"age": "max", "name": "count"})
currently, we support aggregation by avg
, sum
, min
, max
and count
.
transactions
transaction should be used to provide atomicity to the operations on the table:
with tab as tx:
...
this create a context, and inside this context you can safely perform operations on the table. Those operation won't be visible to anyone, until the block is exited, and the transaction is committed.
If the block exists with an exception, the whole transaction is rolled back and no changes will be applied.
Internally, this is done by using creating a transaction with begin
and then commit
or rollback
on exit.
TX.insert()
you can append data to a table using the insert
method:
with tab as tx:
tx.insert({"id": "1", "name": "Alice", "age": 25})
tx.insert({"id": "2", "name": "Bob", "age": 30})
insert()
also allow you to use list[dict]
, pyarrow.RecordBatch
or pandas.DataFrame
as input.
make sure the schema is correct and the data is valid, otherwise an exception will be raised.
TX.delete()
you can delete data from a table using the delete
method:
with tab as tx:
tx.delete('age < 26')
TX.replace
you cannot update the data on the table directly, but you can replace some of the data with something else:
with tab as tx:
for row in tx.replace('age < 26'):
if row["old"]:
row["age"]+= 30
tx.insert(row)
This will remove all the rows that match the filter, and return them to the caller. The caller can then decide what to do with each row, insert it back, discard it, insert other rows.
Note: replace computes what to do first, and then returns the rows. inserted rows won't affect the current iteration.
Note: there are limitations on replacing what was inserted in the same transaction.
BigCol
If a row has a field with large data, the data is then stored in a separate buffer and a reference to the buffer is stored in the table instead
this happen automatically and the user will get back the full data later when querying.
filtering is also performed on the client after fetching the full data, so this process should be transparent to the user
The key benefit is to keep the data streams small and fast, and cache the big files separately.
bSquare
We convert any polygon added to the system as a bounding square, and use the geometry of this square to index the data.
This provide a convenient way to index the data with low false positives, the client can then perform the more
complex query on the full polygon on the client side, especially if the polygon is big and store in a BigCol
.