Python
Esta API está compuesta por funciones con 2 tipos de funcionalidad:
-
Módulos para solicitud de datos: para aquellos que desean solamente consultar los datos y metadatos de nuestro proyecto.
-
Clases para gestión de datos en Google Cloud: para aquellos que desean subir datos en nuestro proyecto (o cualquier otro proyecto en Google Cloud, siguiendo nuestra metodología e infraestructura).
Toda la documentación del código siguiente está en inglés
Módulos (Solicitud de datos)
Functions to get metadata from BD's API
check_input(f)
Checks if the number of inputs is valid
Source code in basedosdados/download/metadata.py
def check_input(f):
"""Checks if the number of inputs is valid"""
@wraps(f)
def wrapper(*args, **kwargs):
if sum([a is not None for a in args]) > 1:
raise ValueError("At most one of the inputs must be non null")
return f(*args, **kwargs)
return wrapper
get_columns(table_id=None, column_id=None, columns_name=None, page=1, page_size=10, backend=None)
Get a list of available columns,
either by table_id
, column_id
or column_name
Parameters:
Name | Type | Description | Default |
---|---|---|---|
table_id(str) |
table slug in google big query (gbq). |
required | |
column_id(str) |
column slug in google big query (gbq). |
required | |
column_name(str) |
table name in base dos dados metadata. |
required | |
page(int) |
page for pagination. |
required | |
page_size(int) |
page size for pagination. |
required | |
backend(Backend) |
backend instance, injected automatically. |
required |
Returns:
Type | Description |
---|---|
dict |
List of tables. |
Source code in basedosdados/download/metadata.py
@check_input
@inject_backend
def get_columns(
table_id: str = None,
column_id: str = None,
columns_name: str = None,
page: int = 1,
page_size: int = 10,
backend: Backend = None,
) -> list[dict]:
"""
Get a list of available columns,
either by `table_id`, `column_id` or `column_name`
Args:
table_id(str): table slug in google big query (gbq).
column_id(str): column slug in google big query (gbq).
column_name(str): table name in base dos dados metadata.
page(int): page for pagination.
page_size(int): page size for pagination.
backend(Backend): backend instance, injected automatically.
Returns:
dict: List of tables.
"""
result = backend.get_columns(table_id, column_id, columns_name, page, page_size)
for item in result.get("items", []) or []:
item["bigquery_type"] = item.pop("bigqueryType", {}).get("name")
return result
get_datasets(dataset_id=None, dataset_name=None, page=1, page_size=10, backend=None)
Get a list of available datasets,
either by dataset_id
or dataset_name
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dataset_id(str) |
dataset slug in google big query (gbq). |
required | |
dataset_name(str) |
dataset name in base dos dados metadata. |
required | |
page(int) |
page for pagination. |
required | |
page_size(int) |
page size for pagination. |
required | |
backend(Backend) |
backend instance, injected automatically. |
required |
Returns:
Type | Description |
---|---|
dict |
List of datasets. |
Source code in basedosdados/download/metadata.py
@check_input
@inject_backend
def get_datasets(
dataset_id: str = None,
dataset_name: str = None,
page: int = 1,
page_size: int = 10,
backend: Backend = None,
) -> list[dict]:
"""
Get a list of available datasets,
either by `dataset_id` or `dataset_name`
Args:
dataset_id(str): dataset slug in google big query (gbq).
dataset_name(str): dataset name in base dos dados metadata.
page(int): page for pagination.
page_size(int): page size for pagination.
backend(Backend): backend instance, injected automatically.
Returns:
dict: List of datasets.
"""
result = backend.get_datasets(dataset_id, dataset_name, page, page_size)
for item in result.get("items", []) or []:
item["organization"] = item.get("organization", {}).get("name")
item["tags"] = [i.get("name") for i in item.get("tags", {}).get("items")]
item["themes"] = [i.get("name") for i in item.get("themes", {}).get("items")]
return result
get_tables(dataset_id=None, table_id=None, table_name=None, page=1, page_size=10, backend=None)
Get a list of available tables,
either by dataset_id
, table_id
or table_name
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dataset_id(str) |
dataset slug in google big query (gbq). |
required | |
table_id(str) |
table slug in google big query (gbq). |
required | |
table_name(str) |
table name in base dos dados metadata. |
required | |
page(int) |
page for pagination. |
required | |
page_size(int) |
page size for pagination. |
required | |
backend(Backend) |
backend instance, injected automatically. |
required |
Returns:
Type | Description |
---|---|
dict |
List of tables. |
Source code in basedosdados/download/metadata.py
@check_input
@inject_backend
def get_tables(
dataset_id: str = None,
table_id: str = None,
table_name: str = None,
page: int = 1,
page_size: int = 10,
backend: Backend = None,
) -> list[dict]:
"""
Get a list of available tables,
either by `dataset_id`, `table_id` or `table_name`
Args:
dataset_id(str): dataset slug in google big query (gbq).
table_id(str): table slug in google big query (gbq).
table_name(str): table name in base dos dados metadata.
page(int): page for pagination.
page_size(int): page size for pagination.
backend(Backend): backend instance, injected automatically.
Returns:
dict: List of tables.
"""
return backend.get_tables(dataset_id, table_id, table_name, page, page_size)
inject_backend(f)
Inject backend instance if doesn't exists
Source code in basedosdados/download/metadata.py
def inject_backend(f):
"""Inject backend instance if doesn't exists"""
@wraps(f)
def wrapper(*args, **kwargs):
if "backend" not in kwargs:
kwargs["backend"] = Backend()
return f(*args, **kwargs)
return wrapper
search(q=None, page=1, page_size=10, backend=None)
Search for datasets, querying all available metadata for the term q
Parameters:
Name | Type | Description | Default |
---|---|---|---|
q(str) |
search term. |
required | |
page(int) |
page for pagination. |
required | |
page_size(int) |
page size for pagination. |
required | |
backend(Backend) |
backend instance, injected automatically. |
required |
Returns:
Type | Description |
---|---|
dict |
List of datasets and metadata. |
Source code in basedosdados/download/metadata.py
@check_input
@inject_backend
def search(
q: str = None,
page: int = 1,
page_size: int = 10,
backend: Backend = None,
) -> list[dict]:
"""
Search for datasets, querying all available metadata for the term `q`
Args:
q(str): search term.
page(int): page for pagination.
page_size(int): page size for pagination.
backend(Backend): backend instance, injected automatically.
Returns:
dict: List of datasets and metadata.
"""
items = []
for item in backend.search(q, page, page_size).get("results", []):
items.append(
{
"slug": item.get("slug"),
"name": item.get("name"),
"description": item.get("description"),
"n_tables": item.get("n_tables"),
"n_raw_data_sources": item.get("n_raw_data_sources"),
"n_information_requests": item.get("n_information_requests"),
"organization": {
"slug": item.get("organizations", [{}])[0].get("slug"),
"name": item.get("organizations", [{}])[0].get("name"),
},
}
)
return items
Functions for managing downloads
download(savepath, query=None, dataset_id=None, table_id=None, billing_project_id=None, query_project_id='basedosdados', limit=None, from_file=False, reauth=False, compression='GZIP')
Download table or query result from basedosdados BigQuery (or other).
-
Using a query:
download('select * from
basedosdados.br_suporte.diretorio_municipioslimit 10')
-
Using dataset_id & table_id:
download(dataset_id='br_suporte', table_id='diretorio_municipios')
You can also add arguments to modify save parameters:
download(dataset_id='br_suporte', table_id='diretorio_municipios', index=False, sep='|')
Parameters:
Name | Type | Description | Default |
---|---|---|---|
savepath |
str, pathlib.PosixPath |
savepath must be a file path. Only supports |
required |
query |
str |
Optional. Valid SQL Standard Query to basedosdados. If query is available, dataset_id and table_id are not required. |
None |
dataset_id |
str |
Optional. Dataset id available in basedosdados. It should always come with table_id. |
None |
table_id |
str |
Optional. Table id available in basedosdados.dataset_id. It should always come with dataset_id. |
None |
billing_project_id |
str |
Optional. Project that will be billed. Find your Project ID here https://console.cloud.google.com/projectselector2/home/dashboard |
None |
query_project_id |
str |
Optional. Which project the table lives. You can change this you want to query different projects. |
'basedosdados' |
limit |
int |
Optional Number of rows. |
None |
from_file |
boolean |
Optional. Uses the credentials from file, located in `~/.basedosdados/credentials/ |
False |
reauth |
boolean |
Optional. Re-authorize Google Cloud Project in case you need to change user or reset configurations. |
False |
compression |
str |
Optional.
Compression type. Only |
'GZIP' |
Exceptions:
Type | Description |
---|---|
Exception |
If either table_id, dataset_id or query are empty. |
Source code in basedosdados/download/download.py
def download(
savepath,
query=None,
dataset_id=None,
table_id=None,
billing_project_id=None,
query_project_id="basedosdados",
limit=None,
from_file=False,
reauth=False,
compression="GZIP",
):
"""Download table or query result from basedosdados BigQuery (or other).
* Using a **query**:
`download('select * from `basedosdados.br_suporte.diretorio_municipios` limit 10')`
* Using **dataset_id & table_id**:
`download(dataset_id='br_suporte', table_id='diretorio_municipios')`
You can also add arguments to modify save parameters:
`download(dataset_id='br_suporte', table_id='diretorio_municipios', index=False, sep='|')`
Args:
savepath (str, pathlib.PosixPath):
savepath must be a file path. Only supports `.csv`.
query (str): Optional.
Valid SQL Standard Query to basedosdados. If query is available,
dataset_id and table_id are not required.
dataset_id (str): Optional.
Dataset id available in basedosdados. It should always come with table_id.
table_id (str): Optional.
Table id available in basedosdados.dataset_id.
It should always come with dataset_id.
billing_project_id (str): Optional.
Project that will be billed. Find your Project ID here https://console.cloud.google.com/projectselector2/home/dashboard
query_project_id (str): Optional.
Which project the table lives. You can change this you want to query different projects.
limit (int): Optional
Number of rows.
from_file (boolean): Optional.
Uses the credentials from file, located in `~/.basedosdados/credentials/
reauth (boolean): Optional.
Re-authorize Google Cloud Project in case you need to change user or reset configurations.
compression (str): Optional.
Compression type. Only `GZIP` is available for now.
Raises:
Exception: If either table_id, dataset_id or query are empty.
"""
billing_project_id, from_file = _set_config_variables(
billing_project_id=billing_project_id, from_file=from_file
)
if (query is None) and ((table_id is None) or (dataset_id is None)):
raise BaseDosDadosException(
"Either table_id, dataset_id or query should be filled."
)
client = _google_client(billing_project_id, from_file, reauth)
# makes sure that savepath is a filepath and not a folder
savepath = _sets_savepath(savepath)
# if query is not defined (so it won't be overwritten) and if
# table is a view or external or if limit is specified,
# convert it to a query.
if not query and (
not _is_table(client, dataset_id, table_id, query_project_id) or limit
):
query = f"""
SELECT *
FROM {query_project_id}.{dataset_id}.{table_id}
"""
if limit is not None:
query += f" limit {limit}"
if query:
# sql queries produces anonymous tables, whose names
# can be found within `job._properties`
job = client["bigquery"].query(query)
# views may take longer: wait for job to finish.
_wait_for(job)
dest_table = job._properties["configuration"]["query"]["destinationTable"]
project_id = dest_table["projectId"]
dataset_id = dest_table["datasetId"]
table_id = dest_table["tableId"]
_direct_download(client, dataset_id, table_id, savepath, project_id, compression)
read_sql(query, billing_project_id=None, from_file=False, reauth=False, use_bqstorage_api=False)
Load data from BigQuery using a query. Just a wrapper around pandas.read_gbq
Parameters:
Name | Type | Description | Default |
---|---|---|---|
query |
sql |
Valid SQL Standard Query to basedosdados |
required |
billing_project_id |
str |
Optional. Project that will be billed. Find your Project ID here https://console.cloud.google.com/projectselector2/home/dashboard |
None |
from_file |
boolean |
Optional. Uses the credentials from file, located in `~/.basedosdados/credentials/ |
False |
reauth |
boolean |
Optional. Re-authorize Google Cloud Project in case you need to change user or reset configurations. |
False |
use_bqstorage_api |
boolean |
Optional. Use the BigQuery Storage API to download query results quickly, but at an increased cost(https://cloud.google.com/bigquery/docs/reference/storage/). To use this API, first enable it in the Cloud Console(https://console.cloud.google.com/apis/library/bigquerystorage.googleapis.com). You must also have the bigquery.readsessions.create permission on the project you are billing queries to. |
False |
Returns:
Type | Description |
---|---|
pd.DataFrame |
Query result |
Source code in basedosdados/download/download.py
def read_sql(
query,
billing_project_id=None,
from_file=False,
reauth=False,
use_bqstorage_api=False,
):
"""Load data from BigQuery using a query. Just a wrapper around pandas.read_gbq
Args:
query (sql):
Valid SQL Standard Query to basedosdados
billing_project_id (str): Optional.
Project that will be billed. Find your Project ID here https://console.cloud.google.com/projectselector2/home/dashboard
from_file (boolean): Optional.
Uses the credentials from file, located in `~/.basedosdados/credentials/
reauth (boolean): Optional.
Re-authorize Google Cloud Project in case you need to change user or reset configurations.
use_bqstorage_api (boolean): Optional.
Use the BigQuery Storage API to download query results quickly, but at an increased cost(https://cloud.google.com/bigquery/docs/reference/storage/).
To use this API, first enable it in the Cloud Console(https://console.cloud.google.com/apis/library/bigquerystorage.googleapis.com).
You must also have the bigquery.readsessions.create permission on the project you are billing queries to.
Returns:
pd.DataFrame:
Query result
"""
billing_project_id, from_file = _set_config_variables(
billing_project_id=billing_project_id, from_file=from_file
)
try:
# Set a two hours timeout
bigquery_storage_v1.client.BigQueryReadClient.read_rows = partialmethod(
bigquery_storage_v1.client.BigQueryReadClient.read_rows,
timeout=3600 * 2,
)
return read_gbq(
query,
project_id=config.billing_project_id,
use_bqstorage_api=use_bqstorage_api,
credentials=_credentials(from_file=config.from_file, reauth=reauth),
)
except GenericGBQException as e:
if "Reason: 403" in str(e):
raise BaseDosDadosAccessDeniedException from e
if re.match("Reason: 400 POST .* [Pp]roject[ ]*I[Dd]", str(e)):
raise BaseDosDadosInvalidProjectIDException from e
raise
except PyDataCredentialsError as e:
raise BaseDosDadosAuthorizationException from e
except (OSError, ValueError) as e:
no_billing_id = "Could not determine project ID" in str(e)
no_billing_id |= "reading from stdin while output is captured" in str(e)
if no_billing_id:
raise BaseDosDadosNoBillingProjectIDException from e
raise
read_table(dataset_id, table_id, billing_project_id=None, query_project_id='basedosdados', limit=None, from_file=False, reauth=False, use_bqstorage_api=False)
Load data from BigQuery using dataset_id and table_id.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dataset_id |
str |
Optional. Dataset id available in basedosdados. It should always come with table_id. |
required |
table_id |
str |
Optional. Table id available in basedosdados.dataset_id. It should always come with dataset_id. |
required |
billing_project_id |
str |
Optional. Project that will be billed. Find your Project ID here https://console.cloud.google.com/projectselector2/home/dashboard |
None |
query_project_id |
str |
Optional. Which project the table lives. You can change this you want to query different projects. |
'basedosdados' |
limit |
int |
Optional. Number of rows to read from table. |
None |
from_file |
boolean |
Optional. Uses the credentials from file, located in `~/.basedosdados/credentials/ |
False |
reauth |
boolean |
Optional. Re-authorize Google Cloud Project in case you need to change user or reset configurations. |
False |
use_bqstorage_api |
boolean |
Optional. Use the BigQuery Storage API to download query results quickly, but at an increased cost(https://cloud.google.com/bigquery/docs/reference/storage/). To use this API, first enable it in the Cloud Console(https://console.cloud.google.com/apis/library/bigquerystorage.googleapis.com). You must also have the bigquery.readsessions.create permission on the project you are billing queries to. |
False |
Returns:
Type | Description |
---|---|
pd.DataFrame |
Query result |
Source code in basedosdados/download/download.py
def read_table(
dataset_id,
table_id,
billing_project_id=None,
query_project_id="basedosdados",
limit=None,
from_file=False,
reauth=False,
use_bqstorage_api=False,
):
"""Load data from BigQuery using dataset_id and table_id.
Args:
dataset_id (str): Optional.
Dataset id available in basedosdados. It should always come with table_id.
table_id (str): Optional.
Table id available in basedosdados.dataset_id.
It should always come with dataset_id.
billing_project_id (str): Optional.
Project that will be billed. Find your Project ID here https://console.cloud.google.com/projectselector2/home/dashboard
query_project_id (str): Optional.
Which project the table lives. You can change this you want to query different projects.
limit (int): Optional.
Number of rows to read from table.
from_file (boolean): Optional.
Uses the credentials from file, located in `~/.basedosdados/credentials/
reauth (boolean): Optional.
Re-authorize Google Cloud Project in case you need to change user or reset configurations.
use_bqstorage_api (boolean): Optional.
Use the BigQuery Storage API to download query results quickly, but at an increased cost(https://cloud.google.com/bigquery/docs/reference/storage/).
To use this API, first enable it in the Cloud Console(https://console.cloud.google.com/apis/library/bigquerystorage.googleapis.com).
You must also have the bigquery.readsessions.create permission on the project you are billing queries to.
Returns:
pd.DataFrame:
Query result
"""
billing_project_id, from_file = _set_config_variables(
billing_project_id=billing_project_id, from_file=from_file
)
if (dataset_id is not None) and (table_id is not None):
query = f"""
SELECT *
FROM `{query_project_id}.{dataset_id}.{table_id}`"""
if limit is not None:
query += f" LIMIT {limit}"
else:
raise BaseDosDadosException("Both table_id and dataset_id should be filled.")
return read_sql(
query,
billing_project_id=billing_project_id,
from_file=from_file,
reauth=reauth,
use_bqstorage_api=use_bqstorage_api,
)
Clases (Gestión de datos)
Class for managing the files in cloud storage.
Storage (Base)
Manage files on Google Cloud Storage.
Source code in basedosdados/upload/storage.py
class Storage(Base):
"""
Manage files on Google Cloud Storage.
"""
def __init__(self, dataset_id, table_id, **kwargs):
super().__init__(**kwargs)
self.bucket = self.client["storage_staging"].bucket(self.bucket_name)
self.dataset_id = dataset_id.replace("-", "_")
self.table_id = table_id.replace("-", "_")
@staticmethod
def _resolve_partitions(partitions):
if isinstance(partitions, dict):
return "/".join(f"{k}={v}" for k, v in partitions.items()) + "/"
if isinstance(partitions, str):
if partitions.endswith("/"):
partitions = partitions[:-1]
# If there is no partition
if len(partitions) == 0:
return ""
# It should fail if there is folder which is not a partition
try:
# check if it fits rule
{b.split("=")[0]: b.split("=")[1] for b in partitions.split("/")}
except IndexError as e:
raise Exception(
f"The path {partitions} is not a valid partition"
) from e
return partitions + "/"
raise Exception(f"Partitions format or type not accepted: {partitions}")
def _build_blob_name(self, filename, mode, partitions=None):
"""
Builds the blob name.
"""
# table folder
blob_name = f"{mode}/{self.dataset_id}/{self.table_id}/"
# add partition folder
if partitions is not None:
blob_name += self._resolve_partitions(partitions)
# add file name
blob_name += filename
return blob_name
def init(self, replace=False, very_sure=False):
"""Initializes bucket and folders.
Folder should be:
* `raw` : that contains really raw data
* `staging` : preprocessed data ready to upload to BigQuery
Args:
replace (bool): Optional.
Whether to replace if bucket already exists
very_sure (bool): Optional.
Are you aware that everything is going to be erased if you
replace the bucket?
Raises:
Warning: very_sure argument is still False.
"""
if replace:
if not very_sure:
raise Warning(
"\n********************************************************"
"\nYou are trying to replace all the data that you have "
f"in bucket {self.bucket_name}.\nAre you sure?\n"
"If yes, add the flag --very_sure\n"
"********************************************************"
)
self.bucket.delete(force=True)
self.client["storage_staging"].create_bucket(self.bucket)
for folder in ["staging/", "raw/"]:
self.bucket.blob(folder).upload_from_string("")
def upload(
self,
path,
mode="all",
partitions=None,
if_exists="raise",
chunk_size=None,
**upload_args,
):
"""Upload to storage at `<bucket_name>/<mode>/<dataset_id>/<table_id>`. You can:
* Add a single **file** setting `path = <file_path>`.
* Add a **folder** with multiple files setting `path =
<folder_path>`. *The folder should just contain the files and
no folders.*
* Add **partitioned files** setting `path = <folder_path>`.
This folder must follow the hive partitioning scheme i.e.
`<table_id>/<key>=<value>/<key2>=<value2>/<partition>.csv`
(ex: `mytable/country=brasil/year=2020/mypart.csv`).
*Remember all files must follow a single schema.* Otherwise, things
might fail in the future.
There are 6 modes:
* `raw` : should contain raw files from datasource
* `staging` : should contain pre-treated files ready to upload to BiqQuery
* `header`: should contain the header of the tables
* `auxiliary_files`: should contain auxiliary files from eache table
* `architecture`: should contain the architecture sheet of the tables
* `all`: if no treatment is needed, use `all`.
Args:
path (str or pathlib.PosixPath): Where to find the file or
folder that you want to upload to storage
mode (str): Folder of which dataset to update [raw|staging|header|auxiliary_files|architecture|all]
partitions (str, pathlib.PosixPath, or dict): Optional.
*If adding a single file*, use this to add it to a specific partition.
* str : `<key>=<value>/<key2>=<value2>`
* dict: `dict(key=value, key2=value2)`
if_exists (str): Optional.
What to do if data exists
* 'raise' : Raises Conflict exception
* 'replace' : Replace table
* 'pass' : Do nothing
chunk_size (int): Optional
The size of a chunk of data whenever iterating (in bytes).
This must be a multiple of 256 KB per the API specification.
If not specified, the chunk_size of the blob itself is used. If that is not specified, a default value of 40 MB is used.
upload_args ():
Extra arguments accepted by [`google.cloud.storage.blob.Blob.upload_from_file`](https://googleapis.dev/python/storage/latest/blobs.html?highlight=upload_from_filename#google.cloud.storage.blob.Blob.upload_from_filename)
"""
if (self.dataset_id is None) or (self.table_id is None):
raise Exception("You need to pass dataset_id and table_id")
path = Path(path)
if path.is_dir():
paths = [
f
for f in path.glob("**/*")
if f.is_file() and f.suffix in [".csv", ".parquet", "parquet.gzip"]
]
parts = [
(
filepath.as_posix()
.replace(path.as_posix() + "/", "")
.replace(str(filepath.name), "")
)
for filepath in paths
]
else:
paths = [path]
parts = [partitions or None]
self._check_mode(mode)
mode = (
["raw", "staging", "header", "auxiliary_files", "architecture"]
if mode == "all"
else [mode]
)
for m in mode:
for filepath, part in tqdm(list(zip(paths, parts)), desc="Uploading files"):
blob_name = self._build_blob_name(filepath.name, m, part)
blob = self.bucket.blob(blob_name, chunk_size=chunk_size)
if not blob.exists() or if_exists == "replace":
upload_args["timeout"] = upload_args.get("timeout", None)
blob.upload_from_filename(str(filepath), **upload_args)
elif if_exists == "pass":
pass
else:
raise BaseDosDadosException(
f"Data already exists at {self.bucket_name}/{blob_name}. "
"If you are using Storage.upload then set if_exists to "
"'replace' to overwrite data \n"
"If you are using Table.create then set if_storage_data_exists "
"to 'replace' to overwrite data."
)
logger.success(
" {object} {filename}_{mode} was {action}!",
filename=filepath.name,
mode=m,
object="File",
action="uploaded",
)
def download(
self,
filename="*",
savepath=".",
partitions=None,
mode="staging",
if_not_exists="raise",
):
"""Download files from Google Storage from path `mode`/`dataset_id`/`table_id`/`partitions`/`filename` and replicate folder hierarchy
on save,
There are 5 modes:
* `raw` : should contain raw files from datasource
* `staging` : should contain pre-treated files ready to upload to BiqQuery
* `header`: should contain the header of the tables
* `auxiliary_files`: should contain auxiliary files from eache table
* `architecture`: should contain the architecture sheet of the tables
You can also use the `partitions` argument to choose files from a partition
Args:
filename (str): Optional
Specify which file to download. If "*" , downloads all files within the bucket folder. Defaults to "*".
savepath (str):
Where you want to save the data on your computer. Must be a path to a directory.
partitions (str, dict): Optional
If downloading a single file, use this to specify the partition path from which to download.
* str : `<key>=<value>/<key2>=<value2>`
* dict: `dict(key=value, key2=value2)`
mode (str): Optional
Folder of which dataset to update.[raw|staging|header|auxiliary_files|architecture]
if_not_exists (str): Optional.
What to do if data not found.
* 'raise' : Raises FileNotFoundError.
* 'pass' : Do nothing and exit the function
Raises:
FileNotFoundError: If the given path `<mode>/<dataset_id>/<table_id>/<partitions>/<filename>` could not be found or there are no files to download.
"""
# Prefix to locate files within the bucket
prefix = f"{mode}/{self.dataset_id}/{self.table_id}/"
# Add specific partition to search prefix
if partitions:
prefix += self._resolve_partitions(partitions)
# if no filename is passed, list all blobs within a given table
if filename != "*":
prefix += filename
blob_list = list(self.bucket.list_blobs(prefix=prefix))
# if there are no blobs matching the search raise FileNotFoundError or return
if not blob_list:
if if_not_exists == "raise":
raise FileNotFoundError(f"Could not locate files at {prefix}")
return
# download all blobs matching the search to given savepath
for blob in tqdm(blob_list, desc="Download Blob"):
# parse blob.name and get the csv file name
csv_name = blob.name.split("/")[-1]
# build folder path replicating storage hierarchy
blob_folder = blob.name.replace(csv_name, "")
# replicate folder hierarchy
savepath = Path(savepath)
(savepath / blob_folder).mkdir(parents=True, exist_ok=True)
# download blob to savepath
save_file_path = savepath / blob.name
blob.download_to_filename(filename=save_file_path)
logger.success(
" {object} {object_id}_{mode} was {action} at: {path}!",
object_id=self.dataset_id,
mode=mode,
object="File",
action="downloaded",
path={str(savepath)},
)
def delete_file(self, filename, mode, partitions=None, not_found_ok=False):
"""Deletes file from path `<bucket_name>/<mode>/<dataset_id>/<table_id>/<partitions>/<filename>`.
Args:
filename (str): Name of the file to be deleted
mode (str): Folder of which dataset to update [raw|staging|header|auxiliary_files|architecture|all]
partitions (str, pathlib.PosixPath, or dict): Optional.
Hive structured partition as a string or dict
* str : `<key>=<value>/<key2>=<value2>`
* dict: `dict(key=value, key2=value2)`
not_found_ok (bool): Optional.
What to do if file not found
"""
self._check_mode(mode)
mode = (
["raw", "staging", "header", "auxiliary_files", "architecture"]
if mode == "all"
else [mode]
)
for m in mode:
blob = self.bucket.blob(self._build_blob_name(filename, m, partitions))
if blob.exists() or not blob.exists() and not not_found_ok:
blob.delete()
else:
return
logger.success(
" {object} {filename}_{mode} was {action}!",
filename=filename,
mode=mode,
object="File",
action="deleted",
)
def delete_table(self, mode="staging", bucket_name=None, not_found_ok=False):
"""Deletes a table from storage, sends request in batches.
Args:
mode (str): Folder of which dataset to update [raw|staging|header|auxiliary_files|architecture]
Folder of which dataset to update. Defaults to "staging".
bucket_name (str):
The bucket name from which to delete the table. If None, defaults to the bucket initialized when instantiating the Storage object.
(You can check it with the Storage().bucket property)
not_found_ok (bool): Optional.
What to do if table not found
"""
prefix = f"{mode}/{self.dataset_id}/{self.table_id}/"
if bucket_name is not None:
table_blobs = list(
self.client["storage_staging"]
.bucket(f"{bucket_name}")
.list_blobs(prefix=prefix)
)
else:
table_blobs = list(self.bucket.list_blobs(prefix=prefix))
if not table_blobs:
if not_found_ok:
return
raise FileNotFoundError(
f"Could not find the requested table {self.dataset_id}.{self.table_id}"
)
# Divides table_blobs list for maximum batch request size
table_blobs_chunks = [
table_blobs[i : i + 999] for i in range(0, len(table_blobs), 999) # noqa
]
for i, source_table in enumerate(
tqdm(table_blobs_chunks, desc="Delete Table Chunk")
):
counter = 0
while counter < 10:
try:
with self.client["storage_staging"].batch():
for blob in source_table:
blob.delete()
break
except Exception:
print(
f"Delete Table Chunk {i} | Attempt {counter}: delete operation starts again in 5 seconds...",
)
time.sleep(5)
counter += 1
traceback.print_exc(file=sys.stderr)
logger.success(
" {object} {object_id}_{mode} was {action}!",
object_id=self.table_id,
mode=mode,
object="Table",
action="deleted",
)
def copy_table(
self,
source_bucket_name="basedosdados",
destination_bucket_name=None,
mode="staging",
new_table_id=None,
):
"""Copies table from a source bucket to your bucket, sends request in batches.
Args:
source_bucket_name (str):
The bucket name from which to copy data. You can change it
to copy from other external bucket.
destination_bucket_name (str): Optional
The bucket name where data will be copied to.
If None, defaults to the bucket initialized when instantiating the Storage object (You can check it with the
Storage().bucket property)
mode (str): Folder of which dataset to update [raw|staging|header|auxiliary_files|architecture]
Folder of which dataset to update. Defaults to "staging".
new_table_id (str): Optional.
New table id to be copied to. If None, defaults to the table id initialized when instantiating the Storage object.
"""
source_table_ref = list(
self.client["storage_staging"]
.bucket(source_bucket_name)
.list_blobs(prefix=f"{mode}/{self.dataset_id}/{self.table_id}/")
)
if not source_table_ref:
raise FileNotFoundError(
f"Could not find the requested table {self.dataset_id}.{self.table_id}"
)
if destination_bucket_name is None:
destination_bucket = self.bucket
else:
destination_bucket = self.client["storage_staging"].bucket(
destination_bucket_name
)
# Divides source_table_ref list for maximum batch request size
source_table_ref_chunks = [
source_table_ref[i : i + 999] # noqa
for i in range(0, len(source_table_ref), 999) # noqa
]
for i, source_table in enumerate(
tqdm(source_table_ref_chunks, desc="Copy Table Chunk")
):
counter = 0
while counter < 10:
try:
with self.client["storage_staging"].batch():
for blob in source_table:
new_name = None
if new_table_id:
new_name = blob.name.replace(
self.table_id, new_table_id
)
self.bucket.copy_blob(
blob,
destination_bucket=destination_bucket,
new_name=new_name,
)
break
except Exception:
print(
f"Copy Table Chunk {i} | Attempt {counter}: copy operation starts again in 5 seconds...",
)
counter += 1
time.sleep(5)
traceback.print_exc(file=sys.stderr)
logger.success(
" {object} {object_id}_{mode} was {action} to {new_object_id}_{mode}!",
object_id=self.table_id,
new_object_id=new_table_id if new_table_id else self.table_id,
mode=mode,
object="Table",
action="copied",
)
copy_table(self, source_bucket_name='basedosdados', destination_bucket_name=None, mode='staging', new_table_id=None)
Copies table from a source bucket to your bucket, sends request in batches.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
source_bucket_name |
str |
The bucket name from which to copy data. You can change it to copy from other external bucket. |
'basedosdados' |
destination_bucket_name |
str |
Optional The bucket name where data will be copied to. If None, defaults to the bucket initialized when instantiating the Storage object (You can check it with the Storage().bucket property) |
None |
mode |
str |
Folder of which dataset to update [raw|staging|header|auxiliary_files|architecture] Folder of which dataset to update. Defaults to "staging". |
'staging' |
new_table_id |
str |
Optional. New table id to be copied to. If None, defaults to the table id initialized when instantiating the Storage object. |
None |
Source code in basedosdados/upload/storage.py
def copy_table(
self,
source_bucket_name="basedosdados",
destination_bucket_name=None,
mode="staging",
new_table_id=None,
):
"""Copies table from a source bucket to your bucket, sends request in batches.
Args:
source_bucket_name (str):
The bucket name from which to copy data. You can change it
to copy from other external bucket.
destination_bucket_name (str): Optional
The bucket name where data will be copied to.
If None, defaults to the bucket initialized when instantiating the Storage object (You can check it with the
Storage().bucket property)
mode (str): Folder of which dataset to update [raw|staging|header|auxiliary_files|architecture]
Folder of which dataset to update. Defaults to "staging".
new_table_id (str): Optional.
New table id to be copied to. If None, defaults to the table id initialized when instantiating the Storage object.
"""
source_table_ref = list(
self.client["storage_staging"]
.bucket(source_bucket_name)
.list_blobs(prefix=f"{mode}/{self.dataset_id}/{self.table_id}/")
)
if not source_table_ref:
raise FileNotFoundError(
f"Could not find the requested table {self.dataset_id}.{self.table_id}"
)
if destination_bucket_name is None:
destination_bucket = self.bucket
else:
destination_bucket = self.client["storage_staging"].bucket(
destination_bucket_name
)
# Divides source_table_ref list for maximum batch request size
source_table_ref_chunks = [
source_table_ref[i : i + 999] # noqa
for i in range(0, len(source_table_ref), 999) # noqa
]
for i, source_table in enumerate(
tqdm(source_table_ref_chunks, desc="Copy Table Chunk")
):
counter = 0
while counter < 10:
try:
with self.client["storage_staging"].batch():
for blob in source_table:
new_name = None
if new_table_id:
new_name = blob.name.replace(
self.table_id, new_table_id
)
self.bucket.copy_blob(
blob,
destination_bucket=destination_bucket,
new_name=new_name,
)
break
except Exception:
print(
f"Copy Table Chunk {i} | Attempt {counter}: copy operation starts again in 5 seconds...",
)
counter += 1
time.sleep(5)
traceback.print_exc(file=sys.stderr)
logger.success(
" {object} {object_id}_{mode} was {action} to {new_object_id}_{mode}!",
object_id=self.table_id,
new_object_id=new_table_id if new_table_id else self.table_id,
mode=mode,
object="Table",
action="copied",
)
delete_file(self, filename, mode, partitions=None, not_found_ok=False)
Deletes file from path <bucket_name>/<mode>/<dataset_id>/<table_id>/<partitions>/<filename>
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filename |
str |
Name of the file to be deleted |
required |
mode |
str |
Folder of which dataset to update [raw|staging|header|auxiliary_files|architecture|all] |
required |
partitions |
str, pathlib.PosixPath, or dict |
Optional. Hive structured partition as a string or dict
|
None |
not_found_ok |
bool |
Optional. What to do if file not found |
False |
Source code in basedosdados/upload/storage.py
def delete_file(self, filename, mode, partitions=None, not_found_ok=False):
"""Deletes file from path `<bucket_name>/<mode>/<dataset_id>/<table_id>/<partitions>/<filename>`.
Args:
filename (str): Name of the file to be deleted
mode (str): Folder of which dataset to update [raw|staging|header|auxiliary_files|architecture|all]
partitions (str, pathlib.PosixPath, or dict): Optional.
Hive structured partition as a string or dict
* str : `<key>=<value>/<key2>=<value2>`
* dict: `dict(key=value, key2=value2)`
not_found_ok (bool): Optional.
What to do if file not found
"""
self._check_mode(mode)
mode = (
["raw", "staging", "header", "auxiliary_files", "architecture"]
if mode == "all"
else [mode]
)
for m in mode:
blob = self.bucket.blob(self._build_blob_name(filename, m, partitions))
if blob.exists() or not blob.exists() and not not_found_ok:
blob.delete()
else:
return
logger.success(
" {object} {filename}_{mode} was {action}!",
filename=filename,
mode=mode,
object="File",
action="deleted",
)
delete_table(self, mode='staging', bucket_name=None, not_found_ok=False)
Deletes a table from storage, sends request in batches.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
mode |
str |
Folder of which dataset to update [raw|staging|header|auxiliary_files|architecture] Folder of which dataset to update. Defaults to "staging". |
'staging' |
bucket_name |
str |
The bucket name from which to delete the table. If None, defaults to the bucket initialized when instantiating the Storage object. (You can check it with the Storage().bucket property) |
None |
not_found_ok |
bool |
Optional. What to do if table not found |
False |
Source code in basedosdados/upload/storage.py
def delete_table(self, mode="staging", bucket_name=None, not_found_ok=False):
"""Deletes a table from storage, sends request in batches.
Args:
mode (str): Folder of which dataset to update [raw|staging|header|auxiliary_files|architecture]
Folder of which dataset to update. Defaults to "staging".
bucket_name (str):
The bucket name from which to delete the table. If None, defaults to the bucket initialized when instantiating the Storage object.
(You can check it with the Storage().bucket property)
not_found_ok (bool): Optional.
What to do if table not found
"""
prefix = f"{mode}/{self.dataset_id}/{self.table_id}/"
if bucket_name is not None:
table_blobs = list(
self.client["storage_staging"]
.bucket(f"{bucket_name}")
.list_blobs(prefix=prefix)
)
else:
table_blobs = list(self.bucket.list_blobs(prefix=prefix))
if not table_blobs:
if not_found_ok:
return
raise FileNotFoundError(
f"Could not find the requested table {self.dataset_id}.{self.table_id}"
)
# Divides table_blobs list for maximum batch request size
table_blobs_chunks = [
table_blobs[i : i + 999] for i in range(0, len(table_blobs), 999) # noqa
]
for i, source_table in enumerate(
tqdm(table_blobs_chunks, desc="Delete Table Chunk")
):
counter = 0
while counter < 10:
try:
with self.client["storage_staging"].batch():
for blob in source_table:
blob.delete()
break
except Exception:
print(
f"Delete Table Chunk {i} | Attempt {counter}: delete operation starts again in 5 seconds...",
)
time.sleep(5)
counter += 1
traceback.print_exc(file=sys.stderr)
logger.success(
" {object} {object_id}_{mode} was {action}!",
object_id=self.table_id,
mode=mode,
object="Table",
action="deleted",
)
download(self, filename='*', savepath='.', partitions=None, mode='staging', if_not_exists='raise')
Download files from Google Storage from path mode
/dataset_id
/table_id
/partitions
/filename
and replicate folder hierarchy
on save,
There are 5 modes:
* raw
: should contain raw files from datasource
* staging
: should contain pre-treated files ready to upload to BiqQuery
* header
: should contain the header of the tables
* auxiliary_files
: should contain auxiliary files from eache table
* architecture
: should contain the architecture sheet of the tables
You can also use the partitions
argument to choose files from a partition
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filename |
str |
Optional Specify which file to download. If "" , downloads all files within the bucket folder. Defaults to "". |
'*' |
savepath |
str |
Where you want to save the data on your computer. Must be a path to a directory. |
'.' |
partitions |
str, dict |
Optional If downloading a single file, use this to specify the partition path from which to download.
|
None |
mode |
str |
Optional Folder of which dataset to update.[raw|staging|header|auxiliary_files|architecture] |
'staging' |
if_not_exists |
str |
Optional. What to do if data not found.
|
'raise' |
Exceptions:
Type | Description |
---|---|
FileNotFoundError |
If the given path |
Source code in basedosdados/upload/storage.py
def download(
self,
filename="*",
savepath=".",
partitions=None,
mode="staging",
if_not_exists="raise",
):
"""Download files from Google Storage from path `mode`/`dataset_id`/`table_id`/`partitions`/`filename` and replicate folder hierarchy
on save,
There are 5 modes:
* `raw` : should contain raw files from datasource
* `staging` : should contain pre-treated files ready to upload to BiqQuery
* `header`: should contain the header of the tables
* `auxiliary_files`: should contain auxiliary files from eache table
* `architecture`: should contain the architecture sheet of the tables
You can also use the `partitions` argument to choose files from a partition
Args:
filename (str): Optional
Specify which file to download. If "*" , downloads all files within the bucket folder. Defaults to "*".
savepath (str):
Where you want to save the data on your computer. Must be a path to a directory.
partitions (str, dict): Optional
If downloading a single file, use this to specify the partition path from which to download.
* str : `<key>=<value>/<key2>=<value2>`
* dict: `dict(key=value, key2=value2)`
mode (str): Optional
Folder of which dataset to update.[raw|staging|header|auxiliary_files|architecture]
if_not_exists (str): Optional.
What to do if data not found.
* 'raise' : Raises FileNotFoundError.
* 'pass' : Do nothing and exit the function
Raises:
FileNotFoundError: If the given path `<mode>/<dataset_id>/<table_id>/<partitions>/<filename>` could not be found or there are no files to download.
"""
# Prefix to locate files within the bucket
prefix = f"{mode}/{self.dataset_id}/{self.table_id}/"
# Add specific partition to search prefix
if partitions:
prefix += self._resolve_partitions(partitions)
# if no filename is passed, list all blobs within a given table
if filename != "*":
prefix += filename
blob_list = list(self.bucket.list_blobs(prefix=prefix))
# if there are no blobs matching the search raise FileNotFoundError or return
if not blob_list:
if if_not_exists == "raise":
raise FileNotFoundError(f"Could not locate files at {prefix}")
return
# download all blobs matching the search to given savepath
for blob in tqdm(blob_list, desc="Download Blob"):
# parse blob.name and get the csv file name
csv_name = blob.name.split("/")[-1]
# build folder path replicating storage hierarchy
blob_folder = blob.name.replace(csv_name, "")
# replicate folder hierarchy
savepath = Path(savepath)
(savepath / blob_folder).mkdir(parents=True, exist_ok=True)
# download blob to savepath
save_file_path = savepath / blob.name
blob.download_to_filename(filename=save_file_path)
logger.success(
" {object} {object_id}_{mode} was {action} at: {path}!",
object_id=self.dataset_id,
mode=mode,
object="File",
action="downloaded",
path={str(savepath)},
)
init(self, replace=False, very_sure=False)
Initializes bucket and folders.
Folder should be:
raw
: that contains really raw datastaging
: preprocessed data ready to upload to BigQuery
Parameters:
Name | Type | Description | Default |
---|---|---|---|
replace |
bool |
Optional. Whether to replace if bucket already exists |
False |
very_sure |
bool |
Optional. Are you aware that everything is going to be erased if you replace the bucket? |
False |
Exceptions:
Type | Description |
---|---|
Warning |
very_sure argument is still False. |
Source code in basedosdados/upload/storage.py
def init(self, replace=False, very_sure=False):
"""Initializes bucket and folders.
Folder should be:
* `raw` : that contains really raw data
* `staging` : preprocessed data ready to upload to BigQuery
Args:
replace (bool): Optional.
Whether to replace if bucket already exists
very_sure (bool): Optional.
Are you aware that everything is going to be erased if you
replace the bucket?
Raises:
Warning: very_sure argument is still False.
"""
if replace:
if not very_sure:
raise Warning(
"\n********************************************************"
"\nYou are trying to replace all the data that you have "
f"in bucket {self.bucket_name}.\nAre you sure?\n"
"If yes, add the flag --very_sure\n"
"********************************************************"
)
self.bucket.delete(force=True)
self.client["storage_staging"].create_bucket(self.bucket)
for folder in ["staging/", "raw/"]:
self.bucket.blob(folder).upload_from_string("")
upload(self, path, mode='all', partitions=None, if_exists='raise', chunk_size=None, **upload_args)
Upload to storage at <bucket_name>/<mode>/<dataset_id>/<table_id>
. You can:
-
Add a single file setting
path = <file_path>
. -
Add a folder with multiple files setting
path = <folder_path>
. The folder should just contain the files and no folders. -
Add partitioned files setting
path = <folder_path>
. This folder must follow the hive partitioning scheme i.e.<table_id>/<key>=<value>/<key2>=<value2>/<partition>.csv
(ex:mytable/country=brasil/year=2020/mypart.csv
).
Remember all files must follow a single schema. Otherwise, things might fail in the future.
There are 6 modes:
raw
: should contain raw files from datasourcestaging
: should contain pre-treated files ready to upload to BiqQueryheader
: should contain the header of the tablesauxiliary_files
: should contain auxiliary files from eache tablearchitecture
: should contain the architecture sheet of the tablesall
: if no treatment is needed, useall
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str or pathlib.PosixPath |
Where to find the file or folder that you want to upload to storage |
required |
mode |
str |
Folder of which dataset to update [raw|staging|header|auxiliary_files|architecture|all] |
'all' |
partitions |
str, pathlib.PosixPath, or dict |
Optional. If adding a single file, use this to add it to a specific partition.
|
None |
if_exists |
str |
Optional. What to do if data exists
|
'raise' |
chunk_size |
int |
Optional The size of a chunk of data whenever iterating (in bytes). This must be a multiple of 256 KB per the API specification. If not specified, the chunk_size of the blob itself is used. If that is not specified, a default value of 40 MB is used. |
None |
upload_args |
Extra arguments accepted by |
{} |
Source code in basedosdados/upload/storage.py
def upload(
self,
path,
mode="all",
partitions=None,
if_exists="raise",
chunk_size=None,
**upload_args,
):
"""Upload to storage at `<bucket_name>/<mode>/<dataset_id>/<table_id>`. You can:
* Add a single **file** setting `path = <file_path>`.
* Add a **folder** with multiple files setting `path =
<folder_path>`. *The folder should just contain the files and
no folders.*
* Add **partitioned files** setting `path = <folder_path>`.
This folder must follow the hive partitioning scheme i.e.
`<table_id>/<key>=<value>/<key2>=<value2>/<partition>.csv`
(ex: `mytable/country=brasil/year=2020/mypart.csv`).
*Remember all files must follow a single schema.* Otherwise, things
might fail in the future.
There are 6 modes:
* `raw` : should contain raw files from datasource
* `staging` : should contain pre-treated files ready to upload to BiqQuery
* `header`: should contain the header of the tables
* `auxiliary_files`: should contain auxiliary files from eache table
* `architecture`: should contain the architecture sheet of the tables
* `all`: if no treatment is needed, use `all`.
Args:
path (str or pathlib.PosixPath): Where to find the file or
folder that you want to upload to storage
mode (str): Folder of which dataset to update [raw|staging|header|auxiliary_files|architecture|all]
partitions (str, pathlib.PosixPath, or dict): Optional.
*If adding a single file*, use this to add it to a specific partition.
* str : `<key>=<value>/<key2>=<value2>`
* dict: `dict(key=value, key2=value2)`
if_exists (str): Optional.
What to do if data exists
* 'raise' : Raises Conflict exception
* 'replace' : Replace table
* 'pass' : Do nothing
chunk_size (int): Optional
The size of a chunk of data whenever iterating (in bytes).
This must be a multiple of 256 KB per the API specification.
If not specified, the chunk_size of the blob itself is used. If that is not specified, a default value of 40 MB is used.
upload_args ():
Extra arguments accepted by [`google.cloud.storage.blob.Blob.upload_from_file`](https://googleapis.dev/python/storage/latest/blobs.html?highlight=upload_from_filename#google.cloud.storage.blob.Blob.upload_from_filename)
"""
if (self.dataset_id is None) or (self.table_id is None):
raise Exception("You need to pass dataset_id and table_id")
path = Path(path)
if path.is_dir():
paths = [
f
for f in path.glob("**/*")
if f.is_file() and f.suffix in [".csv", ".parquet", "parquet.gzip"]
]
parts = [
(
filepath.as_posix()
.replace(path.as_posix() + "/", "")
.replace(str(filepath.name), "")
)
for filepath in paths
]
else:
paths = [path]
parts = [partitions or None]
self._check_mode(mode)
mode = (
["raw", "staging", "header", "auxiliary_files", "architecture"]
if mode == "all"
else [mode]
)
for m in mode:
for filepath, part in tqdm(list(zip(paths, parts)), desc="Uploading files"):
blob_name = self._build_blob_name(filepath.name, m, part)
blob = self.bucket.blob(blob_name, chunk_size=chunk_size)
if not blob.exists() or if_exists == "replace":
upload_args["timeout"] = upload_args.get("timeout", None)
blob.upload_from_filename(str(filepath), **upload_args)
elif if_exists == "pass":
pass
else:
raise BaseDosDadosException(
f"Data already exists at {self.bucket_name}/{blob_name}. "
"If you are using Storage.upload then set if_exists to "
"'replace' to overwrite data \n"
"If you are using Table.create then set if_storage_data_exists "
"to 'replace' to overwrite data."
)
logger.success(
" {object} {filename}_{mode} was {action}!",
filename=filepath.name,
mode=m,
object="File",
action="uploaded",
)
Module for manage dataset to the server.
Dataset (Base)
Manage datasets in BigQuery.
Source code in basedosdados/upload/dataset.py
class Dataset(Base):
"""
Manage datasets in BigQuery.
"""
def __init__(self, dataset_id, **kwargs):
super().__init__(**kwargs)
self.dataset_id = dataset_id.replace("-", "_")
@property
@lru_cache
def dataset_config(self):
"""
Dataset config file.
"""
return self.backend.get_dataset_config(self.dataset_id)
def _loop_modes(self, mode="all"):
"""
Loop modes.
"""
def dataset_tag(m):
return f"_{m}" if m == "staging" else ""
mode = ["prod", "staging"] if mode == "all" else [mode]
return (
{
"client": self.client[f"bigquery_{m}"],
"id": f"{self.client[f'bigquery_{m}'].project}.{self.dataset_id}{dataset_tag(m)}",
"mode": m,
}
for m in mode
)
def _setup_dataset_object(self, dataset_id, location=None, mode="staging"):
"""
Setup dataset object.
"""
dataset = bigquery.Dataset(dataset_id)
if mode == "staging":
dataset_path = dataset_id.replace("_staging", "")
description = f"staging dataset for `{dataset_path}`"
labels = {"staging": True}
else:
try:
description = self.dataset_config.get("descriptionPt", "")
labels = {
tag.get("namePt"): True for tag in self.dataset_config.get("tags")
}
except BaseException:
logger.warning(
f"dataset {dataset_id} does not have a description in the API."
)
description = "description not available in the API."
labels = {}
dataset.description = description
dataset.labels = labels
dataset.location = location
return dataset
def publicize(self, mode="all", dataset_is_public=True):
"""Changes IAM configuration to turn BigQuery dataset public.
Args:
mode (bool): Which dataset to create [prod|staging|all].
dataset_is_public (bool): Control if prod dataset is public or not. By default staging datasets like `dataset_id_staging` are not public.
"""
for m in self._loop_modes(mode):
dataset = m["client"].get_dataset(m["id"])
entries = dataset.access_entries
# TODO https://github.com/basedosdados/mais/pull/1020
# TODO if staging dataset is private, the prod view can't acess it: if dataset_is_public and "staging" not in dataset.dataset_id:
if dataset_is_public:
if "staging" not in dataset.dataset_id:
entries.extend(
[
bigquery.AccessEntry(
role="roles/bigquery.dataViewer",
entity_type="iamMember",
entity_id="allUsers",
),
bigquery.AccessEntry(
role="roles/bigquery.metadataViewer",
entity_type="iamMember",
entity_id="allUsers",
),
bigquery.AccessEntry(
role="roles/bigquery.user",
entity_type="iamMember",
entity_id="allUsers",
),
]
)
else:
entries.extend(
[
bigquery.AccessEntry(
role="roles/bigquery.dataViewer",
entity_type="iamMember",
entity_id="allUsers",
),
]
)
dataset.access_entries = entries
m["client"].update_dataset(dataset, ["access_entries"])
logger.success(
" {object} {object_id}_{mode} was {action}!",
object_id=self.dataset_id,
mode=m["mode"],
object="Dataset",
action="publicized",
)
def exists(self, mode="staging"):
"""
Check if dataset exists.
"""
ref_dataset_id = (
self.dataset_id if mode == "prod" else self.dataset_id + "_staging"
)
try:
ref = self.client[f"bigquery_{mode}"].get_dataset(ref_dataset_id)
except Exception:
ref = None
return bool(ref)
def create(
self, mode="all", if_exists="raise", dataset_is_public=True, location=None
):
"""Creates BigQuery datasets given `dataset_id`.
It can create two datasets:
* `<dataset_id>` (mode = 'prod')
* `<dataset_id>_staging` (mode = 'staging')
If `mode` is all, it creates both.
Args:
mode (str): Optional. Which dataset to create [prod|staging|all].
if_exists (str): Optional. What to do if dataset exists
* raise : Raises Conflict exception
* replace : Drop all tables and replace dataset
* update : Update dataset description
* pass : Do nothing
dataset_is_public (bool): Control if prod dataset is public or not. By default staging datasets like `dataset_id_staging` are not public.
location (str): Optional. Location of dataset data.
List of possible region names locations: https://cloud.google.com/bigquery/docs/locations
Raises:
Warning: Dataset already exists and if_exists is set to `raise`
"""
# Set dataset_id to the ID of the dataset to create.
for m in self._loop_modes(mode):
if if_exists == "replace":
self.delete(mode=m["mode"])
elif if_exists == "update":
self.update(mode=m["mode"])
continue
# Send the dataset to the API for creation, with an explicit timeout.
# Raises google.api_core.exceptions.Conflict if the Dataset already
# exists within the project.
try:
if not self.exists(mode=m["mode"]):
# Construct a full Dataset object to send to the API.
dataset_obj = self._setup_dataset_object(
dataset_id=m["id"], location=location, mode=m["mode"]
)
m["client"].create_dataset(dataset_obj) # Make an API request.
logger.success(
" {object} {object_id}_{mode} was {action}!",
object_id=self.dataset_id,
mode=m["mode"],
object="Dataset",
action="created",
)
# Make prod dataset public
self.publicize(dataset_is_public=dataset_is_public, mode=m["mode"])
except Conflict as e:
if if_exists == "pass":
continue
raise Conflict(f"Dataset {self.dataset_id} already exists") from e
def delete(self, mode="all"):
"""Deletes dataset in BigQuery. Toogle mode to choose which dataset to delete.
Args:
mode (str): Optional. Which dataset to delete [prod|staging|all]
"""
for m in self._loop_modes(mode):
m["client"].delete_dataset(m["id"], delete_contents=True, not_found_ok=True)
logger.info(
" {object} {object_id}_{mode} was {action}!",
object_id=self.dataset_id,
mode=m["mode"],
object="Dataset",
action="deleted",
)
def update(self, mode="all", location=None):
"""Update dataset description. Toogle mode to choose which dataset to update.
Args:
mode (str): Optional. Which dataset to update [prod|staging|all]
location (str): Optional. Location of dataset data.
List of possible region names locations: https://cloud.google.com/bigquery/docs/locations
"""
for m in self._loop_modes(mode):
# Send the dataset to the API to update, with an explicit timeout.
# Raises google.api_core.exceptions.Conflict if the Dataset already
# exists within the project.
m["client"].update_dataset(
self._setup_dataset_object(m["id"], location=location, mode=m["mode"]),
fields=["description"],
) # Make an API request.
logger.success(
" {object} {object_id}_{mode} was {action}!",
object_id=self.dataset_id,
mode=m["mode"],
object="Dataset",
action="updated",
)
dataset_config
property
readonly
Dataset config file.
create(self, mode='all', if_exists='raise', dataset_is_public=True, location=None)
Creates BigQuery datasets given dataset_id
.
It can create two datasets:
<dataset_id>
(mode = 'prod')<dataset_id>_staging
(mode = 'staging')
If mode
is all, it creates both.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
mode |
str |
Optional. Which dataset to create [prod|staging|all]. |
'all' |
if_exists |
str |
Optional. What to do if dataset exists
|
'raise' |
dataset_is_public |
bool |
Control if prod dataset is public or not. By default staging datasets like |
True |
location |
str |
Optional. Location of dataset data. List of possible region names locations: https://cloud.google.com/bigquery/docs/locations |
None |
Exceptions:
Type | Description |
---|---|
Warning |
Dataset already exists and if_exists is set to |
Source code in basedosdados/upload/dataset.py
def create(
self, mode="all", if_exists="raise", dataset_is_public=True, location=None
):
"""Creates BigQuery datasets given `dataset_id`.
It can create two datasets:
* `<dataset_id>` (mode = 'prod')
* `<dataset_id>_staging` (mode = 'staging')
If `mode` is all, it creates both.
Args:
mode (str): Optional. Which dataset to create [prod|staging|all].
if_exists (str): Optional. What to do if dataset exists
* raise : Raises Conflict exception
* replace : Drop all tables and replace dataset
* update : Update dataset description
* pass : Do nothing
dataset_is_public (bool): Control if prod dataset is public or not. By default staging datasets like `dataset_id_staging` are not public.
location (str): Optional. Location of dataset data.
List of possible region names locations: https://cloud.google.com/bigquery/docs/locations
Raises:
Warning: Dataset already exists and if_exists is set to `raise`
"""
# Set dataset_id to the ID of the dataset to create.
for m in self._loop_modes(mode):
if if_exists == "replace":
self.delete(mode=m["mode"])
elif if_exists == "update":
self.update(mode=m["mode"])
continue
# Send the dataset to the API for creation, with an explicit timeout.
# Raises google.api_core.exceptions.Conflict if the Dataset already
# exists within the project.
try:
if not self.exists(mode=m["mode"]):
# Construct a full Dataset object to send to the API.
dataset_obj = self._setup_dataset_object(
dataset_id=m["id"], location=location, mode=m["mode"]
)
m["client"].create_dataset(dataset_obj) # Make an API request.
logger.success(
" {object} {object_id}_{mode} was {action}!",
object_id=self.dataset_id,
mode=m["mode"],
object="Dataset",
action="created",
)
# Make prod dataset public
self.publicize(dataset_is_public=dataset_is_public, mode=m["mode"])
except Conflict as e:
if if_exists == "pass":
continue
raise Conflict(f"Dataset {self.dataset_id} already exists") from e
delete(self, mode='all')
Deletes dataset in BigQuery. Toogle mode to choose which dataset to delete.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
mode |
str |
Optional. Which dataset to delete [prod|staging|all] |
'all' |
Source code in basedosdados/upload/dataset.py
def delete(self, mode="all"):
"""Deletes dataset in BigQuery. Toogle mode to choose which dataset to delete.
Args:
mode (str): Optional. Which dataset to delete [prod|staging|all]
"""
for m in self._loop_modes(mode):
m["client"].delete_dataset(m["id"], delete_contents=True, not_found_ok=True)
logger.info(
" {object} {object_id}_{mode} was {action}!",
object_id=self.dataset_id,
mode=m["mode"],
object="Dataset",
action="deleted",
)
exists(self, mode='staging')
Check if dataset exists.
Source code in basedosdados/upload/dataset.py
def exists(self, mode="staging"):
"""
Check if dataset exists.
"""
ref_dataset_id = (
self.dataset_id if mode == "prod" else self.dataset_id + "_staging"
)
try:
ref = self.client[f"bigquery_{mode}"].get_dataset(ref_dataset_id)
except Exception:
ref = None
return bool(ref)
publicize(self, mode='all', dataset_is_public=True)
Changes IAM configuration to turn BigQuery dataset public.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
mode |
bool |
Which dataset to create [prod|staging|all]. |
'all' |
dataset_is_public |
bool |
Control if prod dataset is public or not. By default staging datasets like |
True |
Source code in basedosdados/upload/dataset.py
def publicize(self, mode="all", dataset_is_public=True):
"""Changes IAM configuration to turn BigQuery dataset public.
Args:
mode (bool): Which dataset to create [prod|staging|all].
dataset_is_public (bool): Control if prod dataset is public or not. By default staging datasets like `dataset_id_staging` are not public.
"""
for m in self._loop_modes(mode):
dataset = m["client"].get_dataset(m["id"])
entries = dataset.access_entries
# TODO https://github.com/basedosdados/mais/pull/1020
# TODO if staging dataset is private, the prod view can't acess it: if dataset_is_public and "staging" not in dataset.dataset_id:
if dataset_is_public:
if "staging" not in dataset.dataset_id:
entries.extend(
[
bigquery.AccessEntry(
role="roles/bigquery.dataViewer",
entity_type="iamMember",
entity_id="allUsers",
),
bigquery.AccessEntry(
role="roles/bigquery.metadataViewer",
entity_type="iamMember",
entity_id="allUsers",
),
bigquery.AccessEntry(
role="roles/bigquery.user",
entity_type="iamMember",
entity_id="allUsers",
),
]
)
else:
entries.extend(
[
bigquery.AccessEntry(
role="roles/bigquery.dataViewer",
entity_type="iamMember",
entity_id="allUsers",
),
]
)
dataset.access_entries = entries
m["client"].update_dataset(dataset, ["access_entries"])
logger.success(
" {object} {object_id}_{mode} was {action}!",
object_id=self.dataset_id,
mode=m["mode"],
object="Dataset",
action="publicized",
)
update(self, mode='all', location=None)
Update dataset description. Toogle mode to choose which dataset to update.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
mode |
str |
Optional. Which dataset to update [prod|staging|all] |
'all' |
location |
str |
Optional. Location of dataset data. List of possible region names locations: https://cloud.google.com/bigquery/docs/locations |
None |
Source code in basedosdados/upload/dataset.py
def update(self, mode="all", location=None):
"""Update dataset description. Toogle mode to choose which dataset to update.
Args:
mode (str): Optional. Which dataset to update [prod|staging|all]
location (str): Optional. Location of dataset data.
List of possible region names locations: https://cloud.google.com/bigquery/docs/locations
"""
for m in self._loop_modes(mode):
# Send the dataset to the API to update, with an explicit timeout.
# Raises google.api_core.exceptions.Conflict if the Dataset already
# exists within the project.
m["client"].update_dataset(
self._setup_dataset_object(m["id"], location=location, mode=m["mode"]),
fields=["description"],
) # Make an API request.
logger.success(
" {object} {object_id}_{mode} was {action}!",
object_id=self.dataset_id,
mode=m["mode"],
object="Dataset",
action="updated",
)
Class for manage tables in Storage and Big Query
Table (Base)
Manage tables in Google Cloud Storage and BigQuery.
Source code in basedosdados/upload/table.py
class Table(Base):
"""
Manage tables in Google Cloud Storage and BigQuery.
"""
def __init__(self, dataset_id, table_id, **kwargs):
super().__init__(**kwargs)
self.table_id = table_id.replace("-", "_")
self.dataset_id = dataset_id.replace("-", "_")
self.table_full_name = dict(
prod=f"{self.client['bigquery_prod'].project}.{self.dataset_id}.{self.table_id}",
staging=f"{self.client['bigquery_staging'].project}.{self.dataset_id}_staging.{self.table_id}",
)
self.table_full_name.update(dict(all=deepcopy(self.table_full_name)))
@property
@lru_cache(256)
def table_config(self):
"""
Load table config
"""
# return self._load_yaml(self.table_folder / "table_config.yaml")
return self.backend.get_table_config(self.dataset_id, self.table_id)
def _get_table_obj(self, mode):
"""
Get table object from BigQuery
"""
return self.client[f"bigquery_{mode}"].get_table(self.table_full_name[mode])
def _is_partitioned(
self, data_sample_path=None, source_format=None, csv_delimiter=None
):
if data_sample_path is not None:
table_columns = self._get_columns_from_data(
data_sample_path=data_sample_path,
source_format=source_format,
csv_delimiter=csv_delimiter,
mode="staging",
)
else:
table_columns = self._get_columns_metadata_from_api()
return bool(table_columns.get("partition_columns", []))
def _load_schema_from_json(
self,
columns=None,
):
schema = []
for col in columns:
# ref: https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.schema.SchemaField
if col.get("name") is None:
msg = "Columns must have a name! Check your data files for columns without name"
raise BaseDosDadosException(msg)
schema.append(
SchemaField(
name=col.get("name"),
field_type=col.get("type"),
description=col.get("description", None),
)
)
return schema
def _load_staging_schema_from_data(
self, data_sample_path=None, source_format="csv", csv_delimiter=","
):
"""
Generate schema from columns metadata in data sample
"""
if self.table_exists(mode="staging"):
logger.warning(
" {object} {object_id} allready exists, replacing schema!",
object_id=self.table_id,
object="Table",
)
table_columns = self._get_columns_from_data(
data_sample_path=data_sample_path,
source_format=source_format,
csv_delimiter=csv_delimiter,
mode="staging",
)
return self._load_schema_from_json(columns=table_columns.get("columns"))
def _load_schema_from_bq(self, mode="staging"):
"""Load schema from table config
Args:
mode (bool): Which dataset to create [prod|staging].
"""
table_columns = self._get_columns_from_bq()
columns = table_columns.get("partition_columns") + table_columns.get("columns")
return self._load_schema_from_json(columns=columns)
def _load_schema_from_api(self, mode="staging"):
"""Load schema from table config
Args:
mode (bool): Which dataset to create [prod|staging].
"""
if self.table_exists(mode=mode):
logger.warning(
" {object} {object_id} allready exists, replacing schema!",
object_id=self.table_id,
object="Table",
)
table_columns = self._get_columns_metadata_from_api()
columns = table_columns.get("partition_columns") + table_columns.get("columns")
return self._load_schema_from_json(columns=columns)
def _get_columns_from_data(
self,
data_sample_path=None,
source_format="csv",
csv_delimiter=",",
mode="staging",
): # sourcery skip: low-code-quality
"""
Get the partition columns from the structure of data_sample_path.
Args:
data_sample_path (str, pathlib.PosixPath): Optional.
Data sample path to auto complete columns names
It supports Comma Delimited CSV, Apache Avro and
Apache Parquet.
source_format (str): Optional
Data source format. Only 'csv', 'avro' and 'parquet'
are supported. Defaults to 'csv'.
"""
partition_columns = []
if isinstance(
data_sample_path,
(
str,
Path,
),
):
# Check if partitioned and get data sample and partition columns
data_sample_path = Path(data_sample_path)
if data_sample_path.is_dir():
data_sample_path = [
f
for f in data_sample_path.glob("**/*")
if f.is_file() and f.suffix == f".{source_format}"
][0]
partition_columns = [
k.split("=")[0]
for k in data_sample_path.as_posix().split("/")
if "=" in k
]
columns = Datatype(source_format=source_format).header(
data_sample_path=data_sample_path, csv_delimiter=csv_delimiter
)
return {
"columns": [{"name": col, "type": "STRING"} for col in columns],
"partition_columns": [
{"name": col, "type": "STRING"} for col in partition_columns
],
}
def _get_columns_metadata_from_api(
self,
):
"""
Get columns and partition columns from API.
"""
table_columns = self.table_config.get("columns", {})
columns = [col for col in table_columns if col.get("isPartition", {}) is False]
partition_columns = [
col for col in table_columns if col.get("isPartition", {}) is True
]
return {
"columns": [
{
"name": col.get("name"),
"type": col.get("bigqueryType").get("name"),
"description": col.get("descriptionPt"),
}
for col in columns
],
"partition_columns": [
{
"name": col.get("name"),
"type": col.get("bigqueryType").get("name"),
"description": col.get("descriptionPt"),
}
for col in partition_columns
],
}
def _parser_blobs_to_partition_dict(self) -> dict:
"""
Extracts the partition information from the blobs.
"""
if not self.table_exists(mode="staging"):
return
blobs = (
self.client["storage_staging"]
.bucket(self.bucket_name)
.list_blobs(prefix=f"staging/{self.dataset_id}/{self.table_id}/")
)
partitions_dict = {}
# only needs the first bloob
for blob in blobs:
for folder in blob.name.split("/"):
if "=" in folder:
key = folder.split("=")[0]
value = folder.split("=")
try:
partitions_dict[key].append(value)
except KeyError:
partitions_dict[key] = [value]
return partitions_dict
def _get_columns_from_bq(self, mode="staging"):
if not self.table_exists(mode=mode):
msg = f"Table {self.dataset_id}.{self.table_id} does not exist in {mode}, please create first!"
raise logger.error(msg)
else:
schema = self._get_table_obj(mode=mode).schema
partition_dict = self._parser_blobs_to_partition_dict()
if partition_dict:
partition_columns = list(partition_dict.keys())
else:
partition_columns = []
return {
"columns": [
{
"name": col.name,
"type": col.field_type,
"description": col.description,
}
for col in schema
if col.name not in partition_columns
],
"partition_columns": [
{
"name": col.name,
"type": col.field_type,
"description": col.description,
}
for col in schema
if col.name in partition_columns
],
}
def _get_cross_columns_from_bq_api(self):
bq = self._get_columns_from_bq(mode="staging")
bq_columns = bq.get("partition_columns") + bq.get("columns")
api = self._get_columns_metadata_from_api()
api_columns = api.get("partition_columns") + api.get("columns")
if api_columns != []:
for bq_col in bq_columns:
for api_col in api_columns:
if bq_col.get("name") == api_col.get("name"):
bq_col["type"] = api_col.get("type")
bq_col["description"] = api_col.get("description")
return bq_columns
def _make_publish_sql(self):
"""Create publish.sql with columns and bigquery_type"""
# publish.sql header and instructions
publish_txt = """
/*
Query para publicar a tabela.
Esse é o lugar para:
- modificar nomes, ordem e tipos de colunas
- dar join com outras tabelas
- criar colunas extras (e.g. logs, proporções, etc.)
Qualquer coluna definida aqui deve também existir em `table_config.yaml`.
# Além disso, sinta-se à vontade para alterar alguns nomes obscuros
# para algo um pouco mais explícito.
TIPOS:
- Para modificar tipos de colunas, basta substituir STRING por outro tipo válido.
- Exemplo: `SAFE_CAST(column_name AS NUMERIC) column_name`
- Mais detalhes: https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types
*/
"""
# table_columns = self._get_columns_from_api(mode="staging")
columns = self._get_cross_columns_from_bq_api()
# remove triple quotes extra space
publish_txt = inspect.cleandoc(publish_txt)
publish_txt = textwrap.dedent(publish_txt)
# add create table statement
project_id_prod = self.client["bigquery_prod"].project
publish_txt += f"\n\nCREATE OR REPLACE VIEW {project_id_prod}.{self.dataset_id}.{self.table_id} AS\nSELECT \n"
# sort columns by is_partition, partitions_columns come first
# add columns in publish.sql
for col in columns:
name = col.get("name")
bigquery_type = (
"STRING" if col.get("type") is None else col.get("type").upper()
)
publish_txt += f"SAFE_CAST({name} AS {bigquery_type}) {name},\n"
# remove last comma
publish_txt = publish_txt[:-2] + "\n"
# add from statement
project_id_staging = self.client["bigquery_staging"].project
publish_txt += (
f"FROM {project_id_staging}.{self.dataset_id}_staging.{self.table_id} AS t"
)
return publish_txt
def table_exists(self, mode):
"""Check if table exists in BigQuery.
Args:
mode (str): Which dataset to check [prod|staging].
"""
try:
ref = self._get_table_obj(mode=mode)
except google.api_core.exceptions.NotFound:
ref = None
return bool(ref)
def _get_biglake_connection(
self, set_biglake_connection_permissions=True, location=None, mode="staging"
):
connection = Connection(name="biglake", location=location, mode="staging")
if not connection.exists:
try:
logger.info("Creating BigLake connection...")
connection.create()
logger.success("BigLake connection created!")
except google.api_core.exceptions.Forbidden as exc:
logger.error(
"You don't have permission to create a BigLake connection. "
"Please contact an admin to create one for you."
)
raise BaseDosDadosException(
"You don't have permission to create a BigLake connection. "
"Please contact an admin to create one for you."
) from exc
except Exception as exc:
logger.error(
"Something went wrong while creating the BigLake connection. "
"Please contact an admin to create one for you."
)
raise BaseDosDadosException(
"Something went wrong while creating the BigLake connection. "
"Please contact an admin to create one for you."
) from exc
if set_biglake_connection_permissions:
try:
logger.info("Setting permissions for BigLake service account...")
connection.set_biglake_permissions()
logger.success("Permissions set successfully!")
except google.api_core.exceptions.Forbidden as exc:
logger.error(
"Could not set permissions for BigLake service account. "
"Please make sure you have permissions to grant roles/storage.objectViewer"
f" to the BigLake service account. ({connection.service_account})."
" If you don't, please ask an admin to do it for you or set "
"set_biglake_connection_permissions=False."
)
raise BaseDosDadosException(
"Could not set permissions for BigLake service account. "
"Please make sure you have permissions to grant roles/storage.objectViewer"
f" to the BigLake service account. ({connection.service_account})."
" If you don't, please ask an admin to do it for you or set "
"set_biglake_connection_permissions=False."
) from exc
except Exception as exc:
logger.error(
"Something went wrong while setting permissions for BigLake service account. "
"Please make sure you have permissions to grant roles/storage.objectViewer"
f" to the BigLake service account. ({connection.service_account})."
" If you don't, please ask an admin to do it for you or set "
"set_biglake_connection_permissions=False."
)
raise BaseDosDadosException(
"Something went wrong while setting permissions for BigLake service account. "
"Please make sure you have permissions to grant roles/storage.objectViewer"
f" to the BigLake service account. ({connection.service_account})."
" If you don't, please ask an admin to do it for you or set "
"set_biglake_connection_permissions=False."
) from exc
return connection
def _get_table_description(self, mode="staging"):
"""Adds table description to BigQuery table.
Args:
table_obj (google.cloud.bigquery.table.Table): Table object.
mode (str): Which dataset to check [prod|staging].
"""
table_path = self.table_full_name["prod"]
if mode == "staging":
description = f"staging table for `{table_path}`"
else:
try:
description = self.table_config.get("descriptionPt", "")
except BaseException:
logger.warning(
f"table {self.table_id} does not have a description in the API."
)
description = "description not available in the API."
return description
def create(
self,
path=None,
source_format="csv",
csv_delimiter=",",
csv_skip_leading_rows=1,
csv_allow_jagged_rows=False,
if_table_exists="raise",
if_storage_data_exists="raise",
if_dataset_exists="pass",
dataset_is_public=True,
location=None,
chunk_size=None,
biglake_table=False,
set_biglake_connection_permissions=True,
):
"""Creates a BigQuery table in the staging dataset.
If a path is provided, data is automatically saved in storage,
and a datasets folder and BigQuery location are created, in addition to creating
the table and its configuration files.
The new table is located at `<dataset_id>_staging.<table_id>` in BigQuery.
Data can be found in Storage at `<bucket_name>/staging/<dataset_id>/<table_id>/*`
and is used to build the table.
The following data types are supported:
- Comma-Delimited CSV
- Apache Avro
- Apache Parquet
Data can also be partitioned following the Hive partitioning scheme
`<key1>=<value1>/<key2>=<value2>`; for example,
`year=2012/country=BR`. The partition is automatically detected by searching for `partitions`
in the `table_config.yaml` file.
Args:
path (str or pathlib.PosixPath): The path to the file to be uploaded to create the table.
source_format (str): Optional. The format of the data source. Only 'csv', 'avro', and 'parquet'
are supported. Defaults to 'csv'.
csv_delimiter (str): Optional.
The separator for fields in a CSV file. The separator can be any ISO-8859-1
single-byte character. Defaults to ','.
csv_skip_leading_rows(int): Optional.
The number of rows at the top of a CSV file that BigQuery will skip when loading the data.
Defaults to 1.
csv_allow_jagged_rows (bool): Optional.
Indicates if BigQuery should allow extra values that are not represented in the table schema.
Defaults to False.
if_table_exists (str): Optional. Determines what to do if the table already exists:
* 'raise' : Raises a Conflict exception
* 'replace' : Replaces the table
* 'pass' : Does nothing
if_storage_data_exists (str): Optional. Determines what to do if the data already exists on your bucket:
* 'raise' : Raises a Conflict exception
* 'replace' : Replaces the table
* 'pass' : Does nothing
if_dataset_exists (str): Optional. Determines what to do if the dataset already exists:
* 'raise' : Raises a Conflict exception
* 'replace' : Replaces the dataset
* 'pass' : Does nothing
dataset_is_public (bool): Optional. Controls if the prod dataset is public or not. By default, staging datasets like `dataset_id_staging` are not public.
location (str): Optional. The location of the dataset data. List of possible region names locations: https://cloud.google.com/bigquery/docs/locations
chunk_size (int): Optional. The size of a chunk of data whenever iterating (in bytes). This must be a multiple of 256 KB per the API specification.
If not specified, the chunk_size of the blob itself is used. If that is not specified, a default value of 40 MB is used.
biglake_table (bool): Optional. Sets this as a BigLake table. BigLake tables allow end-users to query from external data (such as GCS) even if
they don't have access to the source data. IAM is managed like any other BigQuery native table. See https://cloud.google.com/bigquery/docs/biglake-intro for more on BigLake.
set_biglake_connection_permissions (bool): Optional. If set to `True`, attempts to grant the BigLake connection service account access to the table's data in GCS.
"""
if path is None:
# Look if table data already exists at Storage
data = self.client["storage_staging"].list_blobs(
self.bucket_name, prefix=f"staging/{self.dataset_id}/{self.table_id}"
)
# Raise: Cannot create table without external data
if not data:
raise BaseDosDadosException(
"You must provide a path for uploading data"
)
# Add data to storage
if isinstance(
path,
(
str,
Path,
),
):
Storage(
dataset_id=self.dataset_id,
table_id=self.table_id,
config_path=self.config_path,
bucket_name=self.bucket_name,
).upload(
path=path,
mode="staging",
if_exists=if_storage_data_exists,
chunk_size=chunk_size,
)
# Create Dataset if it doesn't exist
dataset_obj = Dataset(
self.dataset_id,
)
dataset_obj.create(
if_exists=if_dataset_exists,
mode="all",
location=location,
dataset_is_public=dataset_is_public,
)
if biglake_table:
biglake_connection = self._get_biglake_connection(
set_biglake_connection_permissions=set_biglake_connection_permissions,
location=location,
mode="staging",
)
biglake_connection_id = biglake_connection.connection_id
table = bigquery.Table(self.table_full_name["staging"])
table.description = self._get_table_description(mode="staging")
table.external_data_configuration = Datatype(
dataset_id=self.dataset_id,
table_id=self.table_id,
schema=self._load_staging_schema_from_data(
data_sample_path=path,
source_format=source_format,
csv_delimiter=csv_delimiter,
),
source_format=source_format,
csv_skip_leading_rows=csv_skip_leading_rows,
csv_delimiter=csv_delimiter,
csv_allow_jagged_rows=csv_allow_jagged_rows,
mode="staging",
bucket_name=self.bucket_name,
partitioned=self._is_partitioned(
data_sample_path=path,
source_format=source_format,
csv_delimiter=csv_delimiter,
),
biglake_connection_id=biglake_connection_id if biglake_table else None,
).external_config
# When using BigLake tables, schema must be provided to the `Table` object
if biglake_table:
table.schema = self._load_staging_schema_from_data(
data_sample_path=path,
source_format=source_format,
csv_delimiter=csv_delimiter,
)
logger.info(f"Using BigLake connection {biglake_connection_id}")
# Lookup if table alreay exists
table_ref = None
with contextlib.suppress(google.api_core.exceptions.NotFound):
table_ref = self.client["bigquery_staging"].get_table(
self.table_full_name["staging"]
)
if isinstance(table_ref, google.cloud.bigquery.table.Table):
if if_table_exists == "pass":
return None
if if_table_exists == "raise":
raise FileExistsError(
"Table already exists, choose replace if you want to overwrite it"
)
if if_table_exists == "replace" and self.table_exists(mode="staging"):
self.delete(mode="staging")
try:
self.client["bigquery_staging"].create_table(table)
except google.api_core.exceptions.Forbidden as exc:
if biglake_table:
raise BaseDosDadosException(
"Permission denied. The service account used to create the BigLake connection"
" does not have permission to read data from the source bucket. Please grant"
f" the service account {biglake_connection.service_account} the Storage Object Viewer"
" (roles/storage.objectViewer) role on the source bucket (or on the project)."
" Or, you can try running this again with set_biglake_connection_permissions=True."
) from exc
raise BaseDosDadosException(
"Something went wrong when creating the table. Please check the logs for more information."
) from exc
except Exception as exc:
raise BaseDosDadosException(
"Something went wrong when creating the table. Please check the logs for more information."
) from exc
logger.success(
"{object} {object_id} was {action} in {mode}!",
object_id=self.table_id,
mode="staging",
object="Table",
action="created",
)
# return None
def update(self, mode="prod", custom_schema=None):
"""Updates BigQuery schema and description.
Args:
mode (str): Optional.
Table of which table to update [prod]
not_found_ok (bool): Optional.
What to do if table is not found
"""
self._check_mode(mode)
table = self._get_table_obj(mode)
table.description = self._get_table_description()
# when mode is staging the table schema already exists
if mode == "prod" and custom_schema is None:
table.schema = self._load_schema_from_json(
columns=self._get_cross_columns_from_bq_api()
)
if mode == "prod" and custom_schema is not None:
table.schema = self._load_schema_from_json(custom_schema)
fields = ["description", "schema"]
self.client["bigquery_prod"].update_table(table, fields=fields)
logger.success(
" {object} {object_id} was {action} in {mode}!",
object_id=self.table_id,
mode=mode,
object="Table",
action="updated",
)
def publish(self, if_exists="raise", custon_publish_sql=None, custom_schema=None):
"""Creates BigQuery table at production dataset.
Table should be located at `<dataset_id>.<table_id>`.
It creates a view that uses the query from
`<metadata_path>/<dataset_id>/<table_id>/publish.sql`.
Make sure that all columns from the query also exists at
`<metadata_path>/<dataset_id>/<table_id>/table_config.sql`, including
the partitions.
Args:
if_exists (str): Optional.
What to do if table exists.
* 'raise' : Raises Conflict exception
* 'replace' : Replace table
* 'pass' : Do nothing
Todo:
* Check if all required fields are filled
"""
# TODO: review this method
if if_exists == "replace" and self.table_exists(mode="prod"):
self.delete(mode="prod")
publish_sql = self._make_publish_sql()
# create view using API metadata
if custon_publish_sql is None:
self.client["bigquery_prod"].query(publish_sql).result()
self.update(mode="prod")
# create view using custon query
if custon_publish_sql is not None:
self.client["bigquery_prod"].query(custon_publish_sql).result()
# update schema using a custom schema
if custom_schema is not None:
self.update(custom_schema=custom_schema)
logger.success(
" {object} {object_id} was {action}!",
object_id=self.table_id,
object="Table",
action="published",
)
def delete(self, mode="all"):
"""Deletes table in BigQuery.
Args:
mode (str): Table of which table to delete [prod|staging]
"""
self._check_mode(mode)
if mode == "all":
for m, n in self.table_full_name[mode].items():
self.client[f"bigquery_{m}"].delete_table(n, not_found_ok=True)
logger.info(
" {object} {object_id}_{mode} was {action}!",
object_id=self.table_id,
mode=m,
object="Table",
action="deleted",
)
else:
self.client[f"bigquery_{mode}"].delete_table(
self.table_full_name[mode], not_found_ok=True
)
logger.info(
" {object} {object_id}_{mode} was {action}!",
object_id=self.table_id,
mode=mode,
object="Table",
action="deleted",
)
def append(
self,
filepath,
partitions=None,
if_exists="replace",
chunk_size=None,
**upload_args,
):
"""Appends new data to existing BigQuery table.
As long as the data has the same schema. It appends the data in the
filepath to the existing table.
Args:
filepath (str or pathlib.PosixPath): Where to find the file that you want to upload to create a table with
partitions (str, pathlib.PosixPath, dict): Optional.
Hive structured partition as a string or dict
* str : `<key>=<value>/<key2>=<value2>`
* dict: `dict(key=value, key2=value2)`
if_exists (str): 0ptional.
What to do if data with same name exists in storage
* 'raise' : Raises Conflict exception
* 'replace' : Replace table
* 'pass' : Do nothing
chunk_size (int): Optional
The size of a chunk of data whenever iterating (in bytes).
This must be a multiple of 256 KB per the API specification.
If not specified, the chunk_size of the blob itself is used. If that is not specified, a default value of 40 MB is used.
"""
if not self.table_exists("staging"):
raise BaseDosDadosException(
"You cannot append to a table that does not exist"
)
Storage(
self.dataset_id,
self.table_id,
).upload(
filepath,
mode="staging",
partitions=partitions,
if_exists=if_exists,
chunk_size=chunk_size,
**upload_args,
)
logger.success(
" {object} {object_id} was {action}!",
object_id=self.table_id,
object="Table",
action="appended",
)
table_config
property
readonly
Load table config
append(self, filepath, partitions=None, if_exists='replace', chunk_size=None, **upload_args)
Appends new data to existing BigQuery table.
As long as the data has the same schema. It appends the data in the filepath to the existing table.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filepath |
str or pathlib.PosixPath |
Where to find the file that you want to upload to create a table with |
required |
partitions |
str, pathlib.PosixPath, dict |
Optional. Hive structured partition as a string or dict
|
None |
if_exists |
str |
0ptional. What to do if data with same name exists in storage
|
'replace' |
chunk_size |
int |
Optional The size of a chunk of data whenever iterating (in bytes). This must be a multiple of 256 KB per the API specification. If not specified, the chunk_size of the blob itself is used. If that is not specified, a default value of 40 MB is used. |
None |
Source code in basedosdados/upload/table.py
def append(
self,
filepath,
partitions=None,
if_exists="replace",
chunk_size=None,
**upload_args,
):
"""Appends new data to existing BigQuery table.
As long as the data has the same schema. It appends the data in the
filepath to the existing table.
Args:
filepath (str or pathlib.PosixPath): Where to find the file that you want to upload to create a table with
partitions (str, pathlib.PosixPath, dict): Optional.
Hive structured partition as a string or dict
* str : `<key>=<value>/<key2>=<value2>`
* dict: `dict(key=value, key2=value2)`
if_exists (str): 0ptional.
What to do if data with same name exists in storage
* 'raise' : Raises Conflict exception
* 'replace' : Replace table
* 'pass' : Do nothing
chunk_size (int): Optional
The size of a chunk of data whenever iterating (in bytes).
This must be a multiple of 256 KB per the API specification.
If not specified, the chunk_size of the blob itself is used. If that is not specified, a default value of 40 MB is used.
"""
if not self.table_exists("staging"):
raise BaseDosDadosException(
"You cannot append to a table that does not exist"
)
Storage(
self.dataset_id,
self.table_id,
).upload(
filepath,
mode="staging",
partitions=partitions,
if_exists=if_exists,
chunk_size=chunk_size,
**upload_args,
)
logger.success(
" {object} {object_id} was {action}!",
object_id=self.table_id,
object="Table",
action="appended",
)
create(self, path=None, source_format='csv', csv_delimiter=',', csv_skip_leading_rows=1, csv_allow_jagged_rows=False, if_table_exists='raise', if_storage_data_exists='raise', if_dataset_exists='pass', dataset_is_public=True, location=None, chunk_size=None, biglake_table=False, set_biglake_connection_permissions=True)
Creates a BigQuery table in the staging dataset.
If a path is provided, data is automatically saved in storage, and a datasets folder and BigQuery location are created, in addition to creating the table and its configuration files.
The new table is located at <dataset_id>_staging.<table_id>
in BigQuery.
Data can be found in Storage at <bucket_name>/staging/<dataset_id>/<table_id>/*
and is used to build the table.
The following data types are supported:
- Comma-Delimited CSV
- Apache Avro
- Apache Parquet
Data can also be partitioned following the Hive partitioning scheme
<key1>=<value1>/<key2>=<value2>
; for example,
year=2012/country=BR
. The partition is automatically detected by searching for partitions
in the table_config.yaml
file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str or pathlib.PosixPath |
The path to the file to be uploaded to create the table. |
None |
source_format |
str |
Optional. The format of the data source. Only 'csv', 'avro', and 'parquet' are supported. Defaults to 'csv'. |
'csv' |
csv_delimiter |
str |
Optional. The separator for fields in a CSV file. The separator can be any ISO-8859-1 single-byte character. Defaults to ','. |
',' |
csv_skip_leading_rows(int) |
Optional. The number of rows at the top of a CSV file that BigQuery will skip when loading the data. Defaults to 1. |
required | |
csv_allow_jagged_rows |
bool |
Optional. Indicates if BigQuery should allow extra values that are not represented in the table schema. Defaults to False. |
False |
if_table_exists |
str |
Optional. Determines what to do if the table already exists:
|
'raise' |
if_storage_data_exists |
str |
Optional. Determines what to do if the data already exists on your bucket:
|
'raise' |
if_dataset_exists |
str |
Optional. Determines what to do if the dataset already exists:
|
'pass' |
dataset_is_public |
bool |
Optional. Controls if the prod dataset is public or not. By default, staging datasets like |
True |
location |
str |
Optional. The location of the dataset data. List of possible region names locations: https://cloud.google.com/bigquery/docs/locations |
None |
chunk_size |
int |
Optional. The size of a chunk of data whenever iterating (in bytes). This must be a multiple of 256 KB per the API specification. If not specified, the chunk_size of the blob itself is used. If that is not specified, a default value of 40 MB is used. |
None |
biglake_table |
bool |
Optional. Sets this as a BigLake table. BigLake tables allow end-users to query from external data (such as GCS) even if they don't have access to the source data. IAM is managed like any other BigQuery native table. See https://cloud.google.com/bigquery/docs/biglake-intro for more on BigLake. |
False |
set_biglake_connection_permissions |
bool |
Optional. If set to |
True |
Source code in basedosdados/upload/table.py
def create(
self,
path=None,
source_format="csv",
csv_delimiter=",",
csv_skip_leading_rows=1,
csv_allow_jagged_rows=False,
if_table_exists="raise",
if_storage_data_exists="raise",
if_dataset_exists="pass",
dataset_is_public=True,
location=None,
chunk_size=None,
biglake_table=False,
set_biglake_connection_permissions=True,
):
"""Creates a BigQuery table in the staging dataset.
If a path is provided, data is automatically saved in storage,
and a datasets folder and BigQuery location are created, in addition to creating
the table and its configuration files.
The new table is located at `<dataset_id>_staging.<table_id>` in BigQuery.
Data can be found in Storage at `<bucket_name>/staging/<dataset_id>/<table_id>/*`
and is used to build the table.
The following data types are supported:
- Comma-Delimited CSV
- Apache Avro
- Apache Parquet
Data can also be partitioned following the Hive partitioning scheme
`<key1>=<value1>/<key2>=<value2>`; for example,
`year=2012/country=BR`. The partition is automatically detected by searching for `partitions`
in the `table_config.yaml` file.
Args:
path (str or pathlib.PosixPath): The path to the file to be uploaded to create the table.
source_format (str): Optional. The format of the data source. Only 'csv', 'avro', and 'parquet'
are supported. Defaults to 'csv'.
csv_delimiter (str): Optional.
The separator for fields in a CSV file. The separator can be any ISO-8859-1
single-byte character. Defaults to ','.
csv_skip_leading_rows(int): Optional.
The number of rows at the top of a CSV file that BigQuery will skip when loading the data.
Defaults to 1.
csv_allow_jagged_rows (bool): Optional.
Indicates if BigQuery should allow extra values that are not represented in the table schema.
Defaults to False.
if_table_exists (str): Optional. Determines what to do if the table already exists:
* 'raise' : Raises a Conflict exception
* 'replace' : Replaces the table
* 'pass' : Does nothing
if_storage_data_exists (str): Optional. Determines what to do if the data already exists on your bucket:
* 'raise' : Raises a Conflict exception
* 'replace' : Replaces the table
* 'pass' : Does nothing
if_dataset_exists (str): Optional. Determines what to do if the dataset already exists:
* 'raise' : Raises a Conflict exception
* 'replace' : Replaces the dataset
* 'pass' : Does nothing
dataset_is_public (bool): Optional. Controls if the prod dataset is public or not. By default, staging datasets like `dataset_id_staging` are not public.
location (str): Optional. The location of the dataset data. List of possible region names locations: https://cloud.google.com/bigquery/docs/locations
chunk_size (int): Optional. The size of a chunk of data whenever iterating (in bytes). This must be a multiple of 256 KB per the API specification.
If not specified, the chunk_size of the blob itself is used. If that is not specified, a default value of 40 MB is used.
biglake_table (bool): Optional. Sets this as a BigLake table. BigLake tables allow end-users to query from external data (such as GCS) even if
they don't have access to the source data. IAM is managed like any other BigQuery native table. See https://cloud.google.com/bigquery/docs/biglake-intro for more on BigLake.
set_biglake_connection_permissions (bool): Optional. If set to `True`, attempts to grant the BigLake connection service account access to the table's data in GCS.
"""
if path is None:
# Look if table data already exists at Storage
data = self.client["storage_staging"].list_blobs(
self.bucket_name, prefix=f"staging/{self.dataset_id}/{self.table_id}"
)
# Raise: Cannot create table without external data
if not data:
raise BaseDosDadosException(
"You must provide a path for uploading data"
)
# Add data to storage
if isinstance(
path,
(
str,
Path,
),
):
Storage(
dataset_id=self.dataset_id,
table_id=self.table_id,
config_path=self.config_path,
bucket_name=self.bucket_name,
).upload(
path=path,
mode="staging",
if_exists=if_storage_data_exists,
chunk_size=chunk_size,
)
# Create Dataset if it doesn't exist
dataset_obj = Dataset(
self.dataset_id,
)
dataset_obj.create(
if_exists=if_dataset_exists,
mode="all",
location=location,
dataset_is_public=dataset_is_public,
)
if biglake_table:
biglake_connection = self._get_biglake_connection(
set_biglake_connection_permissions=set_biglake_connection_permissions,
location=location,
mode="staging",
)
biglake_connection_id = biglake_connection.connection_id
table = bigquery.Table(self.table_full_name["staging"])
table.description = self._get_table_description(mode="staging")
table.external_data_configuration = Datatype(
dataset_id=self.dataset_id,
table_id=self.table_id,
schema=self._load_staging_schema_from_data(
data_sample_path=path,
source_format=source_format,
csv_delimiter=csv_delimiter,
),
source_format=source_format,
csv_skip_leading_rows=csv_skip_leading_rows,
csv_delimiter=csv_delimiter,
csv_allow_jagged_rows=csv_allow_jagged_rows,
mode="staging",
bucket_name=self.bucket_name,
partitioned=self._is_partitioned(
data_sample_path=path,
source_format=source_format,
csv_delimiter=csv_delimiter,
),
biglake_connection_id=biglake_connection_id if biglake_table else None,
).external_config
# When using BigLake tables, schema must be provided to the `Table` object
if biglake_table:
table.schema = self._load_staging_schema_from_data(
data_sample_path=path,
source_format=source_format,
csv_delimiter=csv_delimiter,
)
logger.info(f"Using BigLake connection {biglake_connection_id}")
# Lookup if table alreay exists
table_ref = None
with contextlib.suppress(google.api_core.exceptions.NotFound):
table_ref = self.client["bigquery_staging"].get_table(
self.table_full_name["staging"]
)
if isinstance(table_ref, google.cloud.bigquery.table.Table):
if if_table_exists == "pass":
return None
if if_table_exists == "raise":
raise FileExistsError(
"Table already exists, choose replace if you want to overwrite it"
)
if if_table_exists == "replace" and self.table_exists(mode="staging"):
self.delete(mode="staging")
try:
self.client["bigquery_staging"].create_table(table)
except google.api_core.exceptions.Forbidden as exc:
if biglake_table:
raise BaseDosDadosException(
"Permission denied. The service account used to create the BigLake connection"
" does not have permission to read data from the source bucket. Please grant"
f" the service account {biglake_connection.service_account} the Storage Object Viewer"
" (roles/storage.objectViewer) role on the source bucket (or on the project)."
" Or, you can try running this again with set_biglake_connection_permissions=True."
) from exc
raise BaseDosDadosException(
"Something went wrong when creating the table. Please check the logs for more information."
) from exc
except Exception as exc:
raise BaseDosDadosException(
"Something went wrong when creating the table. Please check the logs for more information."
) from exc
logger.success(
"{object} {object_id} was {action} in {mode}!",
object_id=self.table_id,
mode="staging",
object="Table",
action="created",
)
# return None
delete(self, mode='all')
Deletes table in BigQuery.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
mode |
str |
Table of which table to delete [prod|staging] |
'all' |
Source code in basedosdados/upload/table.py
def delete(self, mode="all"):
"""Deletes table in BigQuery.
Args:
mode (str): Table of which table to delete [prod|staging]
"""
self._check_mode(mode)
if mode == "all":
for m, n in self.table_full_name[mode].items():
self.client[f"bigquery_{m}"].delete_table(n, not_found_ok=True)
logger.info(
" {object} {object_id}_{mode} was {action}!",
object_id=self.table_id,
mode=m,
object="Table",
action="deleted",
)
else:
self.client[f"bigquery_{mode}"].delete_table(
self.table_full_name[mode], not_found_ok=True
)
logger.info(
" {object} {object_id}_{mode} was {action}!",
object_id=self.table_id,
mode=mode,
object="Table",
action="deleted",
)
publish(self, if_exists='raise', custon_publish_sql=None, custom_schema=None)
Creates BigQuery table at production dataset.
Table should be located at <dataset_id>.<table_id>
.
It creates a view that uses the query from
<metadata_path>/<dataset_id>/<table_id>/publish.sql
.
Make sure that all columns from the query also exists at
<metadata_path>/<dataset_id>/<table_id>/table_config.sql
, including
the partitions.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
if_exists |
str |
Optional. What to do if table exists.
|
'raise' |
Todo:
* Check if all required fields are filled
Source code in basedosdados/upload/table.py
def publish(self, if_exists="raise", custon_publish_sql=None, custom_schema=None):
"""Creates BigQuery table at production dataset.
Table should be located at `<dataset_id>.<table_id>`.
It creates a view that uses the query from
`<metadata_path>/<dataset_id>/<table_id>/publish.sql`.
Make sure that all columns from the query also exists at
`<metadata_path>/<dataset_id>/<table_id>/table_config.sql`, including
the partitions.
Args:
if_exists (str): Optional.
What to do if table exists.
* 'raise' : Raises Conflict exception
* 'replace' : Replace table
* 'pass' : Do nothing
Todo:
* Check if all required fields are filled
"""
# TODO: review this method
if if_exists == "replace" and self.table_exists(mode="prod"):
self.delete(mode="prod")
publish_sql = self._make_publish_sql()
# create view using API metadata
if custon_publish_sql is None:
self.client["bigquery_prod"].query(publish_sql).result()
self.update(mode="prod")
# create view using custon query
if custon_publish_sql is not None:
self.client["bigquery_prod"].query(custon_publish_sql).result()
# update schema using a custom schema
if custom_schema is not None:
self.update(custom_schema=custom_schema)
logger.success(
" {object} {object_id} was {action}!",
object_id=self.table_id,
object="Table",
action="published",
)
table_exists(self, mode)
Check if table exists in BigQuery.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
mode |
str |
Which dataset to check [prod|staging]. |
required |
Source code in basedosdados/upload/table.py
def table_exists(self, mode):
"""Check if table exists in BigQuery.
Args:
mode (str): Which dataset to check [prod|staging].
"""
try:
ref = self._get_table_obj(mode=mode)
except google.api_core.exceptions.NotFound:
ref = None
return bool(ref)
update(self, mode='prod', custom_schema=None)
Updates BigQuery schema and description.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
mode |
str |
Optional. Table of which table to update [prod] |
'prod' |
not_found_ok |
bool |
Optional. What to do if table is not found |
required |
Source code in basedosdados/upload/table.py
def update(self, mode="prod", custom_schema=None):
"""Updates BigQuery schema and description.
Args:
mode (str): Optional.
Table of which table to update [prod]
not_found_ok (bool): Optional.
What to do if table is not found
"""
self._check_mode(mode)
table = self._get_table_obj(mode)
table.description = self._get_table_description()
# when mode is staging the table schema already exists
if mode == "prod" and custom_schema is None:
table.schema = self._load_schema_from_json(
columns=self._get_cross_columns_from_bq_api()
)
if mode == "prod" and custom_schema is not None:
table.schema = self._load_schema_from_json(custom_schema)
fields = ["description", "schema"]
self.client["bigquery_prod"].update_table(table, fields=fields)
logger.success(
" {object} {object_id} was {action} in {mode}!",
object_id=self.table_id,
mode=mode,
object="Table",
action="updated",
)