Tabular v2 - Quick Start
The Ocean Data Platform (ODP) is a data platform to find, access and share ocean data. You can connect to ODP using the Python SDK, which provides specific APIs to interact with tabular datasets.
We added a completely new implementation of the tabular storage, called Tabular v2.
The key features of Tabular v2 are:
* pyarrow.ipc
: provides faster and more efficient data serialization and deserialization.
* simple query: allows you to query the data using a simpler syntax or pyarrow.compute
expressions
* ktable
: dynamically partition the data automatically on the backend
* transactions: simplify how you can modify the data and how errors are handled
* pyarrow.Schema
: provides a easier to use schema definition
* bigcol
: stores large columns in separate files, which allows for faster access to the data
* bsquare
: index polygons and points to speed up spatial queries
Getting Started
To get started with Tabular v2, you can follow the Getting Started guide.
Assuming you have everything setup, you should be able to obtain a tabular handler:
import pyarrow as pa
from odp.client import OdpClient
cli = OdpClient()
dataset = cli.catalog.get("my-dataset")
tab = cli.table_v2(dataset)
The above handler can then be used to make changes to that table, and we will be using it in the examples below.
Refer to the this for more information on how to create or modify datasets.
Create the Table
Before you can start adding data to the table, you need to create the table schema. This can be done using the pyarrow.Schema object:
tab.create(pa.schema([
pa.field("id", pa.string(), nullable=False, metadata={"index":"1"}),
pa.field("name", pa.string()),
pa.field("age", pa.int32()),
]))
This set the schema of the table, and mark the id
column to be indexed.
Add Data
You can add data to the table using the insert
method, which accept python dictionaries, pyarrow.RecordBatch
or `pandas.DataFrame'.
This should be done inside a transaction:
with tab as tx:
tx.insert({"id": "1", "name": "Alice", "age": 25})
tx.insert({"id": "2", "name": "Bob", "age": 30})
Using a transaction prevents other people to see intermediate steps until the transaction is committed.
Query Data
You don't need a transaction to query the data:
for row in tab.select().rows():
print(row)
You can also filter the data, choose which columns to return, and get dataframes:
for df in tab.select("age < 26", cols=["id"]).dataframes():
print(df)
Indexing
you can specify in the schema which columns should be indexed.
Indexing is obtained by splitting the data in separate files based on the values of the indexed fields, making it skip files which have no data of interest.
The drawback of this approach is that the more the column you index, the less efficient the index is.
The positive is that the more the data, the more efficient it gets.
Normally you will start to care about indexing when you have hundreds of thousands of rows.
Polygons and geospatial queries
Tabular v2 supports polygons and geospatial queries:
tab.create(pa.schema([
pa.field("id", pa.string(), nullable=False),
pa.field("geo", pa.string(), metadata={"index":"1", "isGeometry":"1"}),
]))
with tab as tx:
tx.insert({"id": "1", "geo": "POLYGON ((0 0, 0 1, 1 1, 1 0, 0 0))"})
tx.insert({"id": "2", "geo": "POLYGON ((1 1, 1 2, 2 2, 2 1, 1 1))"})
for row in tab.select(
'geo intersect "POLYGON ((0.5 0.5, 0.5 1.5, 1.5 1.5, 1.5 0.5, 0.5 0.5))"',
).rows():
print(row)
Big cells
Some datasets have columns that are too large to be stored in the same file as the rest of the data. When this happen, we store only the beginning of the data in the column, and we add a reference to a separate file for the whole data.
While this make querying the data harder and possibly slower in some cases, it allows for faster access in all the other cases and heavy caching of the big data in the client, with overall better performance.
Aggregation
experimental
Imagine you have a table:
id | country | name | age |
---|---|---|---|
1 | IT | Alice | 25 |
2 | IT | Bob | 30 |
3 | UK | Charlie | 35 |
and you want to know the average age of people in each country. You need to aggregate the data, grouping by the country
field:
data = tab.aggregate('country', aggr={"age": "avg"})
This will return a DataFrame
with the average age of people in each country:
age | |
---|---|
IT | 27.5 |
UK | 35 |
It is possible to further filter the data using a query
expression:
data = tab.aggregate('country', query='age > 28', aggr={"age": "avg"})
You can specify multiple aggregations:
data = tab.aggregate('country', aggr={"age": "avg", "name": "min"})
and allow the schema of the table to dictate the aggregation:
tab.create(pa.schema([
pa.field("id", pa.string(), nullable=False),
pa.field("country", pa.string(), nullable=False),
pa.field("name", pa.string(), nullable=False),
pa.field("age", pa.int32(), nullable=False, metadata={"aggr": "avg"}),
]))
which will automatically aggregate the age
field by average, whenever no aggr
is specified:
data = tab.aggregate('country')