Python
Esta API é composta por funções com 2 tipos de funcionalidade:
-
Módulos para requisição de dados: para aquele(as) que desejam somente consultar os dados e metadados do nosso projeto (ou qualquer outro projeto no Google Cloud).
-
Classes para gerenciamento de dados no Google Cloud: para aqueles(as) que desejam subir dados no nosso projeto (ou qualquer outro projeto no Google Cloud, seguindo a nossa metodologia e infraestrutura).
Toda documentação do código abaixo está em inglês
Módulos (Requisição de dados)
download(savepath, query=None, dataset_id=None, table_id=None, billing_project_id=None, query_project_id='basedosdados', limit=None, from_file=False, reauth=False, compression='GZIP')
Download table or query result from basedosdados BigQuery (or other).
-
Using a query:
download('select * from
basedosdados.br_suporte.diretorio_municipioslimit 10')
-
Using dataset_id & table_id:
download(dataset_id='br_suporte', table_id='diretorio_municipios')
You can also add arguments to modify save parameters:
download(dataset_id='br_suporte', table_id='diretorio_municipios', index=False, sep='|')
Parameters:
Name | Type | Description | Default |
---|---|---|---|
savepath |
str, pathlib.PosixPath |
savepath must be a file path. Only supports |
required |
query |
str |
Optional. Valid SQL Standard Query to basedosdados. If query is available, dataset_id and table_id are not required. |
None |
dataset_id |
str |
Optional. Dataset id available in basedosdados. It should always come with table_id. |
None |
table_id |
str |
Optional. Table id available in basedosdados.dataset_id. It should always come with dataset_id. |
None |
billing_project_id |
str |
Optional. Project that will be billed. Find your Project ID here https://console.cloud.google.com/projectselector2/home/dashboard |
None |
query_project_id |
str |
Optional. Which project the table lives. You can change this you want to query different projects. |
'basedosdados' |
limit |
int |
Optional Number of rows. |
None |
from_file |
boolean |
Optional. Uses the credentials from file, located in `~/.basedosdados/credentials/ |
False |
reauth |
boolean |
Optional. Re-authorize Google Cloud Project in case you need to change user or reset configurations. |
False |
compression |
str |
Optional.
Compression type. Only |
'GZIP' |
Exceptions:
Type | Description |
---|---|
Exception |
If either table_id, dataset_id or query are empty. |
Source code in basedosdados/download/download.py
def download(
savepath,
query=None,
dataset_id=None,
table_id=None,
billing_project_id=None,
query_project_id="basedosdados",
limit=None,
from_file=False,
reauth=False,
compression="GZIP",
):
"""Download table or query result from basedosdados BigQuery (or other).
* Using a **query**:
`download('select * from `basedosdados.br_suporte.diretorio_municipios` limit 10')`
* Using **dataset_id & table_id**:
`download(dataset_id='br_suporte', table_id='diretorio_municipios')`
You can also add arguments to modify save parameters:
`download(dataset_id='br_suporte', table_id='diretorio_municipios', index=False, sep='|')`
Args:
savepath (str, pathlib.PosixPath):
savepath must be a file path. Only supports `.csv`.
query (str): Optional.
Valid SQL Standard Query to basedosdados. If query is available,
dataset_id and table_id are not required.
dataset_id (str): Optional.
Dataset id available in basedosdados. It should always come with table_id.
table_id (str): Optional.
Table id available in basedosdados.dataset_id.
It should always come with dataset_id.
billing_project_id (str): Optional.
Project that will be billed. Find your Project ID here https://console.cloud.google.com/projectselector2/home/dashboard
query_project_id (str): Optional.
Which project the table lives. You can change this you want to query different projects.
limit (int): Optional
Number of rows.
from_file (boolean): Optional.
Uses the credentials from file, located in `~/.basedosdados/credentials/
reauth (boolean): Optional.
Re-authorize Google Cloud Project in case you need to change user or reset configurations.
compression (str): Optional.
Compression type. Only `GZIP` is available for now.
Raises:
Exception: If either table_id, dataset_id or query are empty.
"""
billing_project_id, from_file = _set_config_variables(
billing_project_id=billing_project_id, from_file=from_file
)
if (query is None) and ((table_id is None) or (dataset_id is None)):
raise BaseDosDadosException(
"Either table_id, dataset_id or query should be filled."
)
client = google_client(query_project_id, billing_project_id, from_file, reauth)
# makes sure that savepath is a filepath and not a folder
savepath = _sets_savepath(savepath)
# if query is not defined (so it won't be overwritten) and if
# table is a view or external or if limit is specified,
# convert it to a query.
if not query and (
not _is_table(client, dataset_id, table_id, query_project_id) or limit
):
query = f"""
SELECT *
FROM {query_project_id}.{dataset_id}.{table_id}
"""
if limit is not None:
query += f" limit {limit}"
if query:
# sql queries produces anonymous tables, whose names
# can be found within `job._properties`
job = client["bigquery"].query(query)
# views may take longer: wait for job to finish.
_wait_for(job)
dest_table = job._properties["configuration"]["query"]["destinationTable"]
project_id = dest_table["projectId"]
dataset_id = dest_table["datasetId"]
table_id = dest_table["tableId"]
_direct_download(client, dataset_id, table_id, savepath, project_id, compression)
read_sql(query, billing_project_id=None, from_file=False, reauth=False, use_bqstorage_api=False)
Load data from BigQuery using a query. Just a wrapper around pandas.read_gbq
Parameters:
Name | Type | Description | Default |
---|---|---|---|
query |
sql |
Valid SQL Standard Query to basedosdados |
required |
billing_project_id |
str |
Optional. Project that will be billed. Find your Project ID here https://console.cloud.google.com/projectselector2/home/dashboard |
None |
from_file |
boolean |
Optional. Uses the credentials from file, located in `~/.basedosdados/credentials/ |
False |
reauth |
boolean |
Optional. Re-authorize Google Cloud Project in case you need to change user or reset configurations. |
False |
use_bqstorage_api |
boolean |
Optional. Use the BigQuery Storage API to download query results quickly, but at an increased cost(https://cloud.google.com/bigquery/docs/reference/storage/). To use this API, first enable it in the Cloud Console(https://console.cloud.google.com/apis/library/bigquerystorage.googleapis.com). You must also have the bigquery.readsessions.create permission on the project you are billing queries to. |
False |
Returns:
Type | Description |
---|---|
pd.DataFrame |
Query result |
Source code in basedosdados/download/download.py
def read_sql(
query,
billing_project_id=None,
from_file=False,
reauth=False,
use_bqstorage_api=False,
):
"""Load data from BigQuery using a query. Just a wrapper around pandas.read_gbq
Args:
query (sql):
Valid SQL Standard Query to basedosdados
billing_project_id (str): Optional.
Project that will be billed. Find your Project ID here https://console.cloud.google.com/projectselector2/home/dashboard
from_file (boolean): Optional.
Uses the credentials from file, located in `~/.basedosdados/credentials/
reauth (boolean): Optional.
Re-authorize Google Cloud Project in case you need to change user or reset configurations.
use_bqstorage_api (boolean): Optional.
Use the BigQuery Storage API to download query results quickly, but at an increased cost(https://cloud.google.com/bigquery/docs/reference/storage/).
To use this API, first enable it in the Cloud Console(https://console.cloud.google.com/apis/library/bigquerystorage.googleapis.com).
You must also have the bigquery.readsessions.create permission on the project you are billing queries to.
Returns:
pd.DataFrame:
Query result
"""
billing_project_id, from_file = _set_config_variables(
billing_project_id=billing_project_id, from_file=from_file
)
try:
# Set a two hours timeout
bigquery_storage_v1.client.BigQueryReadClient.read_rows = partialmethod(
bigquery_storage_v1.client.BigQueryReadClient.read_rows,
timeout=3600 * 2,
)
return pandas_gbq.read_gbq(
query,
credentials=credentials(from_file=from_file, reauth=reauth),
project_id=billing_project_id,
use_bqstorage_api=use_bqstorage_api,
)
except GenericGBQException as e:
if "Reason: 403" in str(e):
raise BaseDosDadosAccessDeniedException
elif re.match("Reason: 400 POST .* [Pp]roject[ ]*I[Dd]", str(e)):
raise BaseDosDadosInvalidProjectIDException
raise
except PyDataCredentialsError as e:
raise BaseDosDadosAuthorizationException
except (OSError, ValueError) as e:
no_billing_id = "Could not determine project ID" in str(e)
no_billing_id |= "reading from stdin while output is captured" in str(e)
if no_billing_id:
raise BaseDosDadosNoBillingProjectIDException
raise
read_table(dataset_id, table_id, billing_project_id=None, query_project_id='basedosdados', limit=None, from_file=False, reauth=False, use_bqstorage_api=False)
Load data from BigQuery using dataset_id and table_id.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dataset_id |
str |
Optional. Dataset id available in basedosdados. It should always come with table_id. |
required |
table_id |
str |
Optional. Table id available in basedosdados.dataset_id. It should always come with dataset_id. |
required |
billing_project_id |
str |
Optional. Project that will be billed. Find your Project ID here https://console.cloud.google.com/projectselector2/home/dashboard |
None |
query_project_id |
str |
Optional. Which project the table lives. You can change this you want to query different projects. |
'basedosdados' |
limit |
int |
Optional. Number of rows to read from table. |
None |
from_file |
boolean |
Optional. Uses the credentials from file, located in `~/.basedosdados/credentials/ |
False |
reauth |
boolean |
Optional. Re-authorize Google Cloud Project in case you need to change user or reset configurations. |
False |
use_bqstorage_api |
boolean |
Optional. Use the BigQuery Storage API to download query results quickly, but at an increased cost(https://cloud.google.com/bigquery/docs/reference/storage/). To use this API, first enable it in the Cloud Console(https://console.cloud.google.com/apis/library/bigquerystorage.googleapis.com). You must also have the bigquery.readsessions.create permission on the project you are billing queries to. |
False |
Returns:
Type | Description |
---|---|
pd.DataFrame |
Query result |
Source code in basedosdados/download/download.py
def read_table(
dataset_id,
table_id,
billing_project_id=None,
query_project_id="basedosdados",
limit=None,
from_file=False,
reauth=False,
use_bqstorage_api=False,
):
"""Load data from BigQuery using dataset_id and table_id.
Args:
dataset_id (str): Optional.
Dataset id available in basedosdados. It should always come with table_id.
table_id (str): Optional.
Table id available in basedosdados.dataset_id.
It should always come with dataset_id.
billing_project_id (str): Optional.
Project that will be billed. Find your Project ID here https://console.cloud.google.com/projectselector2/home/dashboard
query_project_id (str): Optional.
Which project the table lives. You can change this you want to query different projects.
limit (int): Optional.
Number of rows to read from table.
from_file (boolean): Optional.
Uses the credentials from file, located in `~/.basedosdados/credentials/
reauth (boolean): Optional.
Re-authorize Google Cloud Project in case you need to change user or reset configurations.
use_bqstorage_api (boolean): Optional.
Use the BigQuery Storage API to download query results quickly, but at an increased cost(https://cloud.google.com/bigquery/docs/reference/storage/).
To use this API, first enable it in the Cloud Console(https://console.cloud.google.com/apis/library/bigquerystorage.googleapis.com).
You must also have the bigquery.readsessions.create permission on the project you are billing queries to.
Returns:
pd.DataFrame:
Query result
"""
billing_project_id, from_file = _set_config_variables(
billing_project_id=billing_project_id, from_file=from_file
)
if (dataset_id is not None) and (table_id is not None):
query = f"""
SELECT *
FROM `{query_project_id}.{dataset_id}.{table_id}`"""
if limit is not None:
query += f" LIMIT {limit}"
else:
raise BaseDosDadosException("Both table_id and dataset_id should be filled.")
return read_sql(
query,
billing_project_id=billing_project_id,
from_file=from_file,
reauth=reauth,
use_bqstorage_api=use_bqstorage_api,
)
Classes (Gerenciamento de dados)
Storage (Base)
Manage files on Google Cloud Storage.
Source code in basedosdados/upload/storage.py
class Storage(Base):
"""
Manage files on Google Cloud Storage.
"""
def __init__(self, dataset_id=None, table_id=None, **kwargs):
super().__init__(**kwargs)
self.bucket = self.client["storage_staging"].bucket(self.bucket_name)
self.dataset_id = dataset_id.replace("-", "_")
self.table_id = table_id.replace("-", "_")
def _resolve_partitions(self, partitions):
if isinstance(partitions, dict):
return "/".join(f"{k}={v}" for k, v in partitions.items()) + "/"
elif isinstance(partitions, str):
if partitions.endswith("/"):
partitions = partitions[:-1]
# If there is no partition
if len(partitions) == 0:
return ""
# It should fail if there is folder which is not a partition
try:
# check if it fits rule
{b.split("=")[0]: b.split("=")[1] for b in partitions.split("/")}
except IndexError:
raise Exception(f"The path {partitions} is not a valid partition")
return partitions + "/"
else:
raise Exception(f"Partitions format or type not accepted: {partitions}")
def _build_blob_name(self, filename, mode, partitions=None):
# table folder
blob_name = f"{mode}/{self.dataset_id}/{self.table_id}/"
# add partition folder
if partitions is not None:
blob_name += self._resolve_partitions(partitions)
# add file name
blob_name += filename
return blob_name
def init(self, replace=False, very_sure=False):
"""Initializes bucket and folders.
Folder should be:
* `raw` : that contains really raw data
* `staging` : preprocessed data ready to upload to BigQuery
Args:
replace (bool): Optional.
Whether to replace if bucket already exists
very_sure (bool): Optional.
Are you aware that everything is going to be erased if you
replace the bucket?
Raises:
Warning: very_sure argument is still False.
"""
if replace:
if not very_sure:
raise Warning(
"\n********************************************************"
"\nYou are trying to replace all the data that you have "
f"in bucket {self.bucket_name}.\nAre you sure?\n"
"If yes, add the flag --very_sure\n"
"********************************************************"
)
else:
self.bucket.delete(force=True)
self.client["storage_staging"].create_bucket(self.bucket)
for folder in ["staging/", "raw/"]:
self.bucket.blob(folder).upload_from_string("")
def upload(
self,
path,
mode="all",
partitions=None,
if_exists="raise",
**upload_args,
):
"""Upload to storage at `<bucket_name>/<mode>/<dataset_id>/<table_id>`. You can:
* Add a single **file** setting `path = <file_path>`.
* Add a **folder** with multiple files setting `path =
<folder_path>`. *The folder should just contain the files and
no folders.*
* Add **partitioned files** setting `path = <folder_path>`.
This folder must follow the hive partitioning scheme i.e.
`<table_id>/<key>=<value>/<key2>=<value2>/<partition>.csv`
(ex: `mytable/country=brasil/year=2020/mypart.csv`).
*Remember all files must follow a single schema.* Otherwise, things
might fail in the future.
There are 6 modes:
* `raw` : should contain raw files from datasource
* `staging` : should contain pre-treated files ready to upload to BiqQuery
* `header`: should contain the header of the tables
* `auxiliary_files`: should contain auxiliary files from eache table
* `architecture`: should contain the architecture sheet of the tables
* `all`: if no treatment is needed, use `all`.
Args:
path (str or pathlib.PosixPath): Where to find the file or
folder that you want to upload to storage
mode (str): Folder of which dataset to update [raw|staging|header|auxiliary_files|architecture|all]
partitions (str, pathlib.PosixPath, or dict): Optional.
*If adding a single file*, use this to add it to a specific partition.
* str : `<key>=<value>/<key2>=<value2>`
* dict: `dict(key=value, key2=value2)`
if_exists (str): Optional.
What to do if data exists
* 'raise' : Raises Conflict exception
* 'replace' : Replace table
* 'pass' : Do nothing
upload_args ():
Extra arguments accepted by [`google.cloud.storage.blob.Blob.upload_from_file`](https://googleapis.dev/python/storage/latest/blobs.html?highlight=upload_from_filename#google.cloud.storage.blob.Blob.upload_from_filename)
"""
if (self.dataset_id is None) or (self.table_id is None):
raise Exception("You need to pass dataset_id and table_id")
path = Path(path)
if path.is_dir():
paths = [f for f in path.glob("**/*") if f.is_file() and f.suffix == ".csv"]
parts = [
(
filepath.as_posix()
.replace(path.as_posix() + "/", "")
.replace(str(filepath.name), "")
)
for filepath in paths
]
else:
paths = [path]
parts = [partitions or None]
self._check_mode(mode)
mode = (
["raw", "staging", "header", "auxiliary_files", "architecture"]
if mode == "all"
else [mode]
)
for m in mode:
for filepath, part in tqdm(list(zip(paths, parts)), desc="Uploading files"):
blob_name = self._build_blob_name(filepath.name, m, part)
blob = self.bucket.blob(blob_name)
if not blob.exists() or if_exists == "replace":
upload_args["timeout"] = upload_args.get("timeout", None)
blob.upload_from_filename(str(filepath), **upload_args)
elif if_exists == "pass":
pass
else:
raise BaseDosDadosException(
f"Data already exists at {self.bucket_name}/{blob_name}. "
"If you are using Storage.upload then set if_exists to "
"'replace' to overwrite data \n"
"If you are using Table.create then set if_storage_data_exists "
"to 'replace' to overwrite data."
)
logger.success(
" {object} {filename}_{mode} was {action}!",
filename=filepath.name,
mode=mode,
object="File",
action="uploaded",
)
def download(
self,
filename="*",
savepath="",
partitions=None,
mode="raw",
if_not_exists="raise",
):
"""Download files from Google Storage from path `mode`/`dataset_id`/`table_id`/`partitions`/`filename` and replicate folder hierarchy
on save,
There are 5 modes:
* `raw` : should contain raw files from datasource
* `staging` : should contain pre-treated files ready to upload to BiqQuery
* `header`: should contain the header of the tables
* `auxiliary_files`: should contain auxiliary files from eache table
* `architecture`: should contain the architecture sheet of the tables
You can also use the `partitions` argument to choose files from a partition
Args:
filename (str): Optional
Specify which file to download. If "*" , downloads all files within the bucket folder. Defaults to "*".
savepath (str):
Where you want to save the data on your computer. Must be a path to a directory.
partitions (str, dict): Optional
If downloading a single file, use this to specify the partition path from which to download.
* str : `<key>=<value>/<key2>=<value2>`
* dict: `dict(key=value, key2=value2)`
mode (str): Optional
Folder of which dataset to update.[raw|staging|header|auxiliary_files|architecture]
if_not_exists (str): Optional.
What to do if data not found.
* 'raise' : Raises FileNotFoundError.
* 'pass' : Do nothing and exit the function
Raises:
FileNotFoundError: If the given path `<mode>/<dataset_id>/<table_id>/<partitions>/<filename>` could not be found or there are no files to download.
"""
# Prefix to locate files within the bucket
prefix = f"{mode}/{self.dataset_id}/{self.table_id}/"
# Add specific partition to search prefix
if partitions:
prefix += self._resolve_partitions(partitions)
# if no filename is passed, list all blobs within a given table
if filename != "*":
prefix += filename
blob_list = list(self.bucket.list_blobs(prefix=prefix))
# if there are no blobs matching the search raise FileNotFoundError or return
if blob_list == []:
if if_not_exists == "raise":
raise FileNotFoundError(f"Could not locate files at {prefix}")
else:
return
# download all blobs matching the search to given savepath
for blob in tqdm(blob_list, desc="Download Blob"):
# parse blob.name and get the csv file name
csv_name = blob.name.split("/")[-1]
# build folder path replicating storage hierarchy
blob_folder = blob.name.replace(csv_name, "")
# replicate folder hierarchy
(Path(savepath) / blob_folder).mkdir(parents=True, exist_ok=True)
# download blob to savepath
blob.download_to_filename(filename=f"{savepath}/{blob.name}")
logger.success(
" {object} {object_id}_{mode} was {action}!",
object_id=self.dataset_id,
mode=mode,
object="File",
action="downloaded",
)
def delete_file(self, filename, mode, partitions=None, not_found_ok=False):
"""Deletes file from path `<bucket_name>/<mode>/<dataset_id>/<table_id>/<partitions>/<filename>`.
Args:
filename (str): Name of the file to be deleted
mode (str): Folder of which dataset to update [raw|staging|header|auxiliary_files|architecture|all]
partitions (str, pathlib.PosixPath, or dict): Optional.
Hive structured partition as a string or dict
* str : `<key>=<value>/<key2>=<value2>`
* dict: `dict(key=value, key2=value2)`
not_found_ok (bool): Optional.
What to do if file not found
"""
self._check_mode(mode)
mode = (
["raw", "staging", "header", "auxiliary_files", "architecture"]
if mode == "all"
else [mode]
)
for m in mode:
blob = self.bucket.blob(self._build_blob_name(filename, m, partitions))
if blob.exists() or not blob.exists() and not not_found_ok:
blob.delete()
else:
return
logger.success(
" {object} {filename}_{mode} was {action}!",
filename=filename,
mode=mode,
object="File",
action="deleted",
)
def delete_table(self, mode="staging", bucket_name=None, not_found_ok=False):
"""Deletes a table from storage, sends request in batches.
Args:
mode (str): Folder of which dataset to update [raw|staging|header|auxiliary_files|architecture]
Folder of which dataset to update. Defaults to "staging".
bucket_name (str):
The bucket name from which to delete the table. If None, defaults to the bucket initialized when instantiating the Storage object.
(You can check it with the Storage().bucket property)
not_found_ok (bool): Optional.
What to do if table not found
"""
prefix = f"{mode}/{self.dataset_id}/{self.table_id}/"
if bucket_name is not None:
table_blobs = list(
self.client["storage_staging"]
.bucket(f"{bucket_name}")
.list_blobs(prefix=prefix)
)
else:
table_blobs = list(self.bucket.list_blobs(prefix=prefix))
if table_blobs == []:
if not_found_ok:
return
else:
raise FileNotFoundError(
f"Could not find the requested table {self.dataset_id}.{self.table_id}"
)
else:
# Divides table_blobs list for maximum batch request size
table_blobs_chunks = [
table_blobs[i : i + 999] for i in range(0, len(table_blobs), 999)
]
for i, source_table in enumerate(
tqdm(table_blobs_chunks, desc="Delete Table Chunk")
):
counter = 0
while counter < 10:
try:
with self.client["storage_staging"].batch():
for blob in source_table:
blob.delete()
break
except Exception as e:
print(
f"Delete Table Chunk {i} | Attempt {counter}: delete operation starts again in 5 seconds...",
)
time.sleep(5)
counter += 1
traceback.print_exc(file=sys.stderr)
logger.success(
" {object} {object_id}_{mode} was {action}!",
object_id=self.table_id,
mode=mode,
object="Table",
action="deleted",
)
def copy_table(
self,
source_bucket_name="basedosdados",
destination_bucket_name=None,
mode="staging",
):
"""Copies table from a source bucket to your bucket, sends request in batches.
Args:
source_bucket_name (str):
The bucket name from which to copy data. You can change it
to copy from other external bucket.
destination_bucket_name (str): Optional
The bucket name where data will be copied to.
If None, defaults to the bucket initialized when instantiating the Storage object (You can check it with the
Storage().bucket property)
mode (str): Folder of which dataset to update [raw|staging|header|auxiliary_files|architecture]
Folder of which dataset to update. Defaults to "staging".
"""
source_table_ref = list(
self.client["storage_staging"]
.bucket(source_bucket_name)
.list_blobs(prefix=f"{mode}/{self.dataset_id}/{self.table_id}/")
)
if source_table_ref == []:
raise FileNotFoundError(
f"Could not find the requested table {self.dataset_id}.{self.table_id}"
)
if destination_bucket_name is None:
destination_bucket = self.bucket
else:
destination_bucket = self.client["storage_staging"].bucket(
destination_bucket_name
)
# Divides source_table_ref list for maximum batch request size
source_table_ref_chunks = [
source_table_ref[i : i + 999] for i in range(0, len(source_table_ref), 999)
]
for i, source_table in enumerate(
tqdm(source_table_ref_chunks, desc="Copy Table Chunk")
):
counter = 0
while counter < 10:
try:
with self.client["storage_staging"].batch():
for blob in source_table:
self.bucket.copy_blob(
blob,
destination_bucket=destination_bucket,
)
break
except Exception as e:
print(
f"Copy Table Chunk {i} | Attempt {counter}: copy operation starts again in 5 seconds...",
)
counter += 1
time.sleep(5)
traceback.print_exc(file=sys.stderr)
logger.success(
" {object} {object_id}_{mode} was {action}!",
object_id=self.table_id,
mode=mode,
object="Table",
action="copied",
)
copy_table(self, source_bucket_name='basedosdados', destination_bucket_name=None, mode='staging')
Copies table from a source bucket to your bucket, sends request in batches.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
source_bucket_name |
str |
The bucket name from which to copy data. You can change it to copy from other external bucket. |
'basedosdados' |
destination_bucket_name |
str |
Optional The bucket name where data will be copied to. If None, defaults to the bucket initialized when instantiating the Storage object (You can check it with the Storage().bucket property) |
None |
mode |
str |
Folder of which dataset to update [raw|staging|header|auxiliary_files|architecture] Folder of which dataset to update. Defaults to "staging". |
'staging' |
Source code in basedosdados/upload/storage.py
def copy_table(
self,
source_bucket_name="basedosdados",
destination_bucket_name=None,
mode="staging",
):
"""Copies table from a source bucket to your bucket, sends request in batches.
Args:
source_bucket_name (str):
The bucket name from which to copy data. You can change it
to copy from other external bucket.
destination_bucket_name (str): Optional
The bucket name where data will be copied to.
If None, defaults to the bucket initialized when instantiating the Storage object (You can check it with the
Storage().bucket property)
mode (str): Folder of which dataset to update [raw|staging|header|auxiliary_files|architecture]
Folder of which dataset to update. Defaults to "staging".
"""
source_table_ref = list(
self.client["storage_staging"]
.bucket(source_bucket_name)
.list_blobs(prefix=f"{mode}/{self.dataset_id}/{self.table_id}/")
)
if source_table_ref == []:
raise FileNotFoundError(
f"Could not find the requested table {self.dataset_id}.{self.table_id}"
)
if destination_bucket_name is None:
destination_bucket = self.bucket
else:
destination_bucket = self.client["storage_staging"].bucket(
destination_bucket_name
)
# Divides source_table_ref list for maximum batch request size
source_table_ref_chunks = [
source_table_ref[i : i + 999] for i in range(0, len(source_table_ref), 999)
]
for i, source_table in enumerate(
tqdm(source_table_ref_chunks, desc="Copy Table Chunk")
):
counter = 0
while counter < 10:
try:
with self.client["storage_staging"].batch():
for blob in source_table:
self.bucket.copy_blob(
blob,
destination_bucket=destination_bucket,
)
break
except Exception as e:
print(
f"Copy Table Chunk {i} | Attempt {counter}: copy operation starts again in 5 seconds...",
)
counter += 1
time.sleep(5)
traceback.print_exc(file=sys.stderr)
logger.success(
" {object} {object_id}_{mode} was {action}!",
object_id=self.table_id,
mode=mode,
object="Table",
action="copied",
)
delete_file(self, filename, mode, partitions=None, not_found_ok=False)
Deletes file from path <bucket_name>/<mode>/<dataset_id>/<table_id>/<partitions>/<filename>
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filename |
str |
Name of the file to be deleted |
required |
mode |
str |
Folder of which dataset to update [raw|staging|header|auxiliary_files|architecture|all] |
required |
partitions |
str, pathlib.PosixPath, or dict |
Optional. Hive structured partition as a string or dict
|
None |
not_found_ok |
bool |
Optional. What to do if file not found |
False |
Source code in basedosdados/upload/storage.py
def delete_file(self, filename, mode, partitions=None, not_found_ok=False):
"""Deletes file from path `<bucket_name>/<mode>/<dataset_id>/<table_id>/<partitions>/<filename>`.
Args:
filename (str): Name of the file to be deleted
mode (str): Folder of which dataset to update [raw|staging|header|auxiliary_files|architecture|all]
partitions (str, pathlib.PosixPath, or dict): Optional.
Hive structured partition as a string or dict
* str : `<key>=<value>/<key2>=<value2>`
* dict: `dict(key=value, key2=value2)`
not_found_ok (bool): Optional.
What to do if file not found
"""
self._check_mode(mode)
mode = (
["raw", "staging", "header", "auxiliary_files", "architecture"]
if mode == "all"
else [mode]
)
for m in mode:
blob = self.bucket.blob(self._build_blob_name(filename, m, partitions))
if blob.exists() or not blob.exists() and not not_found_ok:
blob.delete()
else:
return
logger.success(
" {object} {filename}_{mode} was {action}!",
filename=filename,
mode=mode,
object="File",
action="deleted",
)
delete_table(self, mode='staging', bucket_name=None, not_found_ok=False)
Deletes a table from storage, sends request in batches.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
mode |
str |
Folder of which dataset to update [raw|staging|header|auxiliary_files|architecture] Folder of which dataset to update. Defaults to "staging". |
'staging' |
bucket_name |
str |
The bucket name from which to delete the table. If None, defaults to the bucket initialized when instantiating the Storage object. (You can check it with the Storage().bucket property) |
None |
not_found_ok |
bool |
Optional. What to do if table not found |
False |
Source code in basedosdados/upload/storage.py
def delete_table(self, mode="staging", bucket_name=None, not_found_ok=False):
"""Deletes a table from storage, sends request in batches.
Args:
mode (str): Folder of which dataset to update [raw|staging|header|auxiliary_files|architecture]
Folder of which dataset to update. Defaults to "staging".
bucket_name (str):
The bucket name from which to delete the table. If None, defaults to the bucket initialized when instantiating the Storage object.
(You can check it with the Storage().bucket property)
not_found_ok (bool): Optional.
What to do if table not found
"""
prefix = f"{mode}/{self.dataset_id}/{self.table_id}/"
if bucket_name is not None:
table_blobs = list(
self.client["storage_staging"]
.bucket(f"{bucket_name}")
.list_blobs(prefix=prefix)
)
else:
table_blobs = list(self.bucket.list_blobs(prefix=prefix))
if table_blobs == []:
if not_found_ok:
return
else:
raise FileNotFoundError(
f"Could not find the requested table {self.dataset_id}.{self.table_id}"
)
else:
# Divides table_blobs list for maximum batch request size
table_blobs_chunks = [
table_blobs[i : i + 999] for i in range(0, len(table_blobs), 999)
]
for i, source_table in enumerate(
tqdm(table_blobs_chunks, desc="Delete Table Chunk")
):
counter = 0
while counter < 10:
try:
with self.client["storage_staging"].batch():
for blob in source_table:
blob.delete()
break
except Exception as e:
print(
f"Delete Table Chunk {i} | Attempt {counter}: delete operation starts again in 5 seconds...",
)
time.sleep(5)
counter += 1
traceback.print_exc(file=sys.stderr)
logger.success(
" {object} {object_id}_{mode} was {action}!",
object_id=self.table_id,
mode=mode,
object="Table",
action="deleted",
)
download(self, filename='*', savepath='', partitions=None, mode='raw', if_not_exists='raise')
Download files from Google Storage from path mode
/dataset_id
/table_id
/partitions
/filename
and replicate folder hierarchy
on save,
There are 5 modes:
* raw
: should contain raw files from datasource
* staging
: should contain pre-treated files ready to upload to BiqQuery
* header
: should contain the header of the tables
* auxiliary_files
: should contain auxiliary files from eache table
* architecture
: should contain the architecture sheet of the tables
You can also use the partitions
argument to choose files from a partition
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filename |
str |
Optional Specify which file to download. If "" , downloads all files within the bucket folder. Defaults to "". |
'*' |
savepath |
str |
Where you want to save the data on your computer. Must be a path to a directory. |
'' |
partitions |
str, dict |
Optional If downloading a single file, use this to specify the partition path from which to download.
|
None |
mode |
str |
Optional Folder of which dataset to update.[raw|staging|header|auxiliary_files|architecture] |
'raw' |
if_not_exists |
str |
Optional. What to do if data not found.
|
'raise' |
Exceptions:
Type | Description |
---|---|
FileNotFoundError |
If the given path |
Source code in basedosdados/upload/storage.py
def download(
self,
filename="*",
savepath="",
partitions=None,
mode="raw",
if_not_exists="raise",
):
"""Download files from Google Storage from path `mode`/`dataset_id`/`table_id`/`partitions`/`filename` and replicate folder hierarchy
on save,
There are 5 modes:
* `raw` : should contain raw files from datasource
* `staging` : should contain pre-treated files ready to upload to BiqQuery
* `header`: should contain the header of the tables
* `auxiliary_files`: should contain auxiliary files from eache table
* `architecture`: should contain the architecture sheet of the tables
You can also use the `partitions` argument to choose files from a partition
Args:
filename (str): Optional
Specify which file to download. If "*" , downloads all files within the bucket folder. Defaults to "*".
savepath (str):
Where you want to save the data on your computer. Must be a path to a directory.
partitions (str, dict): Optional
If downloading a single file, use this to specify the partition path from which to download.
* str : `<key>=<value>/<key2>=<value2>`
* dict: `dict(key=value, key2=value2)`
mode (str): Optional
Folder of which dataset to update.[raw|staging|header|auxiliary_files|architecture]
if_not_exists (str): Optional.
What to do if data not found.
* 'raise' : Raises FileNotFoundError.
* 'pass' : Do nothing and exit the function
Raises:
FileNotFoundError: If the given path `<mode>/<dataset_id>/<table_id>/<partitions>/<filename>` could not be found or there are no files to download.
"""
# Prefix to locate files within the bucket
prefix = f"{mode}/{self.dataset_id}/{self.table_id}/"
# Add specific partition to search prefix
if partitions:
prefix += self._resolve_partitions(partitions)
# if no filename is passed, list all blobs within a given table
if filename != "*":
prefix += filename
blob_list = list(self.bucket.list_blobs(prefix=prefix))
# if there are no blobs matching the search raise FileNotFoundError or return
if blob_list == []:
if if_not_exists == "raise":
raise FileNotFoundError(f"Could not locate files at {prefix}")
else:
return
# download all blobs matching the search to given savepath
for blob in tqdm(blob_list, desc="Download Blob"):
# parse blob.name and get the csv file name
csv_name = blob.name.split("/")[-1]
# build folder path replicating storage hierarchy
blob_folder = blob.name.replace(csv_name, "")
# replicate folder hierarchy
(Path(savepath) / blob_folder).mkdir(parents=True, exist_ok=True)
# download blob to savepath
blob.download_to_filename(filename=f"{savepath}/{blob.name}")
logger.success(
" {object} {object_id}_{mode} was {action}!",
object_id=self.dataset_id,
mode=mode,
object="File",
action="downloaded",
)
init(self, replace=False, very_sure=False)
Initializes bucket and folders.
Folder should be:
raw
: that contains really raw datastaging
: preprocessed data ready to upload to BigQuery
Parameters:
Name | Type | Description | Default |
---|---|---|---|
replace |
bool |
Optional. Whether to replace if bucket already exists |
False |
very_sure |
bool |
Optional. Are you aware that everything is going to be erased if you replace the bucket? |
False |
Exceptions:
Type | Description |
---|---|
Warning |
very_sure argument is still False. |
Source code in basedosdados/upload/storage.py
def init(self, replace=False, very_sure=False):
"""Initializes bucket and folders.
Folder should be:
* `raw` : that contains really raw data
* `staging` : preprocessed data ready to upload to BigQuery
Args:
replace (bool): Optional.
Whether to replace if bucket already exists
very_sure (bool): Optional.
Are you aware that everything is going to be erased if you
replace the bucket?
Raises:
Warning: very_sure argument is still False.
"""
if replace:
if not very_sure:
raise Warning(
"\n********************************************************"
"\nYou are trying to replace all the data that you have "
f"in bucket {self.bucket_name}.\nAre you sure?\n"
"If yes, add the flag --very_sure\n"
"********************************************************"
)
else:
self.bucket.delete(force=True)
self.client["storage_staging"].create_bucket(self.bucket)
for folder in ["staging/", "raw/"]:
self.bucket.blob(folder).upload_from_string("")
upload(self, path, mode='all', partitions=None, if_exists='raise', **upload_args)
Upload to storage at <bucket_name>/<mode>/<dataset_id>/<table_id>
. You can:
-
Add a single file setting
path = <file_path>
. -
Add a folder with multiple files setting
path = <folder_path>
. The folder should just contain the files and no folders. -
Add partitioned files setting
path = <folder_path>
. This folder must follow the hive partitioning scheme i.e.<table_id>/<key>=<value>/<key2>=<value2>/<partition>.csv
(ex:mytable/country=brasil/year=2020/mypart.csv
).
Remember all files must follow a single schema. Otherwise, things might fail in the future.
There are 6 modes:
raw
: should contain raw files from datasourcestaging
: should contain pre-treated files ready to upload to BiqQueryheader
: should contain the header of the tablesauxiliary_files
: should contain auxiliary files from eache tablearchitecture
: should contain the architecture sheet of the tablesall
: if no treatment is needed, useall
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str or pathlib.PosixPath |
Where to find the file or folder that you want to upload to storage |
required |
mode |
str |
Folder of which dataset to update [raw|staging|header|auxiliary_files|architecture|all] |
'all' |
partitions |
str, pathlib.PosixPath, or dict |
Optional. If adding a single file, use this to add it to a specific partition.
|
None |
if_exists |
str |
Optional. What to do if data exists
|
'raise' |
upload_args |
Extra arguments accepted by |
{} |
Source code in basedosdados/upload/storage.py
def upload(
self,
path,
mode="all",
partitions=None,
if_exists="raise",
**upload_args,
):
"""Upload to storage at `<bucket_name>/<mode>/<dataset_id>/<table_id>`. You can:
* Add a single **file** setting `path = <file_path>`.
* Add a **folder** with multiple files setting `path =
<folder_path>`. *The folder should just contain the files and
no folders.*
* Add **partitioned files** setting `path = <folder_path>`.
This folder must follow the hive partitioning scheme i.e.
`<table_id>/<key>=<value>/<key2>=<value2>/<partition>.csv`
(ex: `mytable/country=brasil/year=2020/mypart.csv`).
*Remember all files must follow a single schema.* Otherwise, things
might fail in the future.
There are 6 modes:
* `raw` : should contain raw files from datasource
* `staging` : should contain pre-treated files ready to upload to BiqQuery
* `header`: should contain the header of the tables
* `auxiliary_files`: should contain auxiliary files from eache table
* `architecture`: should contain the architecture sheet of the tables
* `all`: if no treatment is needed, use `all`.
Args:
path (str or pathlib.PosixPath): Where to find the file or
folder that you want to upload to storage
mode (str): Folder of which dataset to update [raw|staging|header|auxiliary_files|architecture|all]
partitions (str, pathlib.PosixPath, or dict): Optional.
*If adding a single file*, use this to add it to a specific partition.
* str : `<key>=<value>/<key2>=<value2>`
* dict: `dict(key=value, key2=value2)`
if_exists (str): Optional.
What to do if data exists
* 'raise' : Raises Conflict exception
* 'replace' : Replace table
* 'pass' : Do nothing
upload_args ():
Extra arguments accepted by [`google.cloud.storage.blob.Blob.upload_from_file`](https://googleapis.dev/python/storage/latest/blobs.html?highlight=upload_from_filename#google.cloud.storage.blob.Blob.upload_from_filename)
"""
if (self.dataset_id is None) or (self.table_id is None):
raise Exception("You need to pass dataset_id and table_id")
path = Path(path)
if path.is_dir():
paths = [f for f in path.glob("**/*") if f.is_file() and f.suffix == ".csv"]
parts = [
(
filepath.as_posix()
.replace(path.as_posix() + "/", "")
.replace(str(filepath.name), "")
)
for filepath in paths
]
else:
paths = [path]
parts = [partitions or None]
self._check_mode(mode)
mode = (
["raw", "staging", "header", "auxiliary_files", "architecture"]
if mode == "all"
else [mode]
)
for m in mode:
for filepath, part in tqdm(list(zip(paths, parts)), desc="Uploading files"):
blob_name = self._build_blob_name(filepath.name, m, part)
blob = self.bucket.blob(blob_name)
if not blob.exists() or if_exists == "replace":
upload_args["timeout"] = upload_args.get("timeout", None)
blob.upload_from_filename(str(filepath), **upload_args)
elif if_exists == "pass":
pass
else:
raise BaseDosDadosException(
f"Data already exists at {self.bucket_name}/{blob_name}. "
"If you are using Storage.upload then set if_exists to "
"'replace' to overwrite data \n"
"If you are using Table.create then set if_storage_data_exists "
"to 'replace' to overwrite data."
)
logger.success(
" {object} {filename}_{mode} was {action}!",
filename=filepath.name,
mode=mode,
object="File",
action="uploaded",
)
Dataset (Base)
Manage datasets in BigQuery.
Source code in basedosdados/upload/dataset.py
class Dataset(Base):
"""
Manage datasets in BigQuery.
"""
def __init__(self, dataset_id, **kwargs):
super().__init__(**kwargs)
self.dataset_id = dataset_id.replace("-", "_")
self.dataset_folder = Path(self.metadata_path / self.dataset_id)
self.metadata = Metadata(self.dataset_id, **kwargs)
@property
def dataset_config(self):
return self._load_yaml(
self.metadata_path / self.dataset_id / "dataset_config.yaml"
)
def _loop_modes(self, mode="all"):
mode = ["prod", "staging"] if mode == "all" else [mode]
dataset_tag = lambda m: f"_{m}" if m == "staging" else ""
return (
{
"client": self.client[f"bigquery_{m}"],
"id": f"{self.client[f'bigquery_{m}'].project}.{self.dataset_id}{dataset_tag(m)}",
}
for m in mode
)
def _setup_dataset_object(self, dataset_id, location=None):
dataset = bigquery.Dataset(dataset_id)
## TODO: not being used since 1.6.0 - need to redo the description tha goes to bigquery
dataset.description = "Para saber mais acesse https://basedosdados.org/"
# dataset.description = self._render_template(
# Path("dataset/dataset_description.txt"), self.dataset_config
# )
dataset.location = location
return dataset
def _write_readme_file(self):
readme_content = (
f"Como capturar os dados de {self.dataset_id}?\n\nPara cap"
f"turar esses dados, basta verificar o link dos dados orig"
f"inais indicado em dataset_config.yaml no item website.\n"
f"\nCaso tenha sido utilizado algum código de captura ou t"
f"ratamento, estes estarão contidos em code/. Se o dado pu"
f"blicado for em sua versão bruta, não existirá a pasta co"
f"de/.\n\nOs dados publicados estão disponíveis em: https:"
f"//basedosdados.org/dataset/{self.dataset_id.replace('_','-')}"
)
readme_path = Path(self.metadata_path / self.dataset_id / "README.md")
with open(readme_path, "w") as readmefile:
readmefile.write(readme_content)
def init(self, replace=False):
"""Initialize dataset folder at metadata_path at `metadata_path/<dataset_id>`.
The folder should contain:
* `dataset_config.yaml`
* `README.md`
Args:
replace (str): Optional. Whether to replace existing folder.
Raises:
FileExistsError: If dataset folder already exists and replace is False
"""
# Create dataset folder
try:
self.dataset_folder.mkdir(exist_ok=replace, parents=True)
except FileExistsError:
raise FileExistsError(
f"Dataset {str(self.dataset_folder.stem)} folder does not exists. "
"Set replace=True to replace current files."
)
# create dataset_config.yaml with metadata
self.metadata.create(if_exists="replace")
# create README.md file
self._write_readme_file()
# Add code folder
(self.dataset_folder / "code").mkdir(exist_ok=replace, parents=True)
return self
def publicize(self, mode="all", dataset_is_public=True):
"""Changes IAM configuration to turn BigQuery dataset public.
Args:
mode (bool): Which dataset to create [prod|staging|all].
dataset_is_public (bool): Control if prod dataset is public or not. By default staging datasets like `dataset_id_staging` are not public.
"""
for m in self._loop_modes(mode):
dataset = m["client"].get_dataset(m["id"])
entries = dataset.access_entries
# TODO https://github.com/basedosdados/mais/pull/1020
if dataset_is_public and "staging" not in dataset.dataset_id:
entries.extend(
[
bigquery.AccessEntry(
role="roles/bigquery.dataViewer",
entity_type="iamMember",
entity_id="allUsers",
),
bigquery.AccessEntry(
role="roles/bigquery.metadataViewer",
entity_type="iamMember",
entity_id="allUsers",
),
bigquery.AccessEntry(
role="roles/bigquery.user",
entity_type="iamMember",
entity_id="allUsers",
),
]
)
dataset.access_entries = entries
m["client"].update_dataset(dataset, ["access_entries"])
logger.success(
" {object} {object_id}_{mode} was {action}!",
object_id=self.dataset_id,
mode=mode,
object="Dataset",
action="publicized",
)
def create(
self, mode="all", if_exists="raise", dataset_is_public=True, location=None
):
"""Creates BigQuery datasets given `dataset_id`.
It can create two datasets:
* `<dataset_id>` (mode = 'prod')
* `<dataset_id>_staging` (mode = 'staging')
If `mode` is all, it creates both.
Args:
mode (str): Optional. Which dataset to create [prod|staging|all].
if_exists (str): Optional. What to do if dataset exists
* raise : Raises Conflict exception
* replace : Drop all tables and replace dataset
* update : Update dataset description
* pass : Do nothing
dataset_is_public (bool): Control if prod dataset is public or not. By default staging datasets like `dataset_id_staging` are not public.
location (str): Optional. Location of dataset data.
List of possible region names locations: https://cloud.google.com/bigquery/docs/locations
Raises:
Warning: Dataset already exists and if_exists is set to `raise`
"""
if if_exists == "replace":
self.delete(mode)
elif if_exists == "update":
self.update()
return
# Set dataset_id to the ID of the dataset to create.
for m in self._loop_modes(mode):
# Construct a full Dataset object to send to the API.
dataset_obj = self._setup_dataset_object(m["id"], location=location)
# Send the dataset to the API for creation, with an explicit timeout.
# Raises google.api_core.exceptions.Conflict if the Dataset already
# exists within the project.
try:
job = m["client"].create_dataset(dataset_obj) # Make an API request.
logger.success(
" {object} {object_id}_{mode} was {action}!",
object_id=self.dataset_id,
mode=mode,
object="Dataset",
action="created",
)
except Conflict:
if if_exists == "pass":
return
else:
raise Conflict(f"Dataset {self.dataset_id} already exists")
# Make prod dataset public
self.publicize(dataset_is_public=dataset_is_public)
def delete(self, mode="all"):
"""Deletes dataset in BigQuery. Toogle mode to choose which dataset to delete.
Args:
mode (str): Optional. Which dataset to delete [prod|staging|all]
"""
for m in self._loop_modes(mode):
m["client"].delete_dataset(m["id"], delete_contents=True, not_found_ok=True)
logger.info(
" {object} {object_id}_{mode} was {action}!",
object_id=self.dataset_id,
mode=mode,
object="Dataset",
action="deleted",
)
def update(self, mode="all", location=None):
"""Update dataset description. Toogle mode to choose which dataset to update.
Args:
mode (str): Optional. Which dataset to update [prod|staging|all]
location (str): Optional. Location of dataset data.
List of possible region names locations: https://cloud.google.com/bigquery/docs/locations
"""
for m in self._loop_modes(mode):
# Send the dataset to the API to update, with an explicit timeout.
# Raises google.api_core.exceptions.Conflict if the Dataset already
# exists within the project.
dataset = m["client"].update_dataset(
self._setup_dataset_object(
m["id"],
location=location,
),
fields=["description"],
) # Make an API request.
logger.success(
" {object} {object_id}_{mode} was {action}!",
object_id=self.dataset_id,
mode=mode,
object="Dataset",
action="updated",
)
create(self, mode='all', if_exists='raise', dataset_is_public=True, location=None)
Creates BigQuery datasets given dataset_id
.
It can create two datasets:
<dataset_id>
(mode = 'prod')<dataset_id>_staging
(mode = 'staging')
If mode
is all, it creates both.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
mode |
str |
Optional. Which dataset to create [prod|staging|all]. |
'all' |
if_exists |
str |
Optional. What to do if dataset exists
|
'raise' |
dataset_is_public |
bool |
Control if prod dataset is public or not. By default staging datasets like |
True |
location |
str |
Optional. Location of dataset data. List of possible region names locations: https://cloud.google.com/bigquery/docs/locations |
None |
Exceptions:
Type | Description |
---|---|
Warning |
Dataset already exists and if_exists is set to |
Source code in basedosdados/upload/dataset.py
def create(
self, mode="all", if_exists="raise", dataset_is_public=True, location=None
):
"""Creates BigQuery datasets given `dataset_id`.
It can create two datasets:
* `<dataset_id>` (mode = 'prod')
* `<dataset_id>_staging` (mode = 'staging')
If `mode` is all, it creates both.
Args:
mode (str): Optional. Which dataset to create [prod|staging|all].
if_exists (str): Optional. What to do if dataset exists
* raise : Raises Conflict exception
* replace : Drop all tables and replace dataset
* update : Update dataset description
* pass : Do nothing
dataset_is_public (bool): Control if prod dataset is public or not. By default staging datasets like `dataset_id_staging` are not public.
location (str): Optional. Location of dataset data.
List of possible region names locations: https://cloud.google.com/bigquery/docs/locations
Raises:
Warning: Dataset already exists and if_exists is set to `raise`
"""
if if_exists == "replace":
self.delete(mode)
elif if_exists == "update":
self.update()
return
# Set dataset_id to the ID of the dataset to create.
for m in self._loop_modes(mode):
# Construct a full Dataset object to send to the API.
dataset_obj = self._setup_dataset_object(m["id"], location=location)
# Send the dataset to the API for creation, with an explicit timeout.
# Raises google.api_core.exceptions.Conflict if the Dataset already
# exists within the project.
try:
job = m["client"].create_dataset(dataset_obj) # Make an API request.
logger.success(
" {object} {object_id}_{mode} was {action}!",
object_id=self.dataset_id,
mode=mode,
object="Dataset",
action="created",
)
except Conflict:
if if_exists == "pass":
return
else:
raise Conflict(f"Dataset {self.dataset_id} already exists")
# Make prod dataset public
self.publicize(dataset_is_public=dataset_is_public)
delete(self, mode='all')
Deletes dataset in BigQuery. Toogle mode to choose which dataset to delete.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
mode |
str |
Optional. Which dataset to delete [prod|staging|all] |
'all' |
Source code in basedosdados/upload/dataset.py
def delete(self, mode="all"):
"""Deletes dataset in BigQuery. Toogle mode to choose which dataset to delete.
Args:
mode (str): Optional. Which dataset to delete [prod|staging|all]
"""
for m in self._loop_modes(mode):
m["client"].delete_dataset(m["id"], delete_contents=True, not_found_ok=True)
logger.info(
" {object} {object_id}_{mode} was {action}!",
object_id=self.dataset_id,
mode=mode,
object="Dataset",
action="deleted",
)
init(self, replace=False)
Initialize dataset folder at metadata_path at metadata_path/<dataset_id>
.
The folder should contain:
dataset_config.yaml
README.md
Parameters:
Name | Type | Description | Default |
---|---|---|---|
replace |
str |
Optional. Whether to replace existing folder. |
False |
Exceptions:
Type | Description |
---|---|
FileExistsError |
If dataset folder already exists and replace is False |
Source code in basedosdados/upload/dataset.py
def init(self, replace=False):
"""Initialize dataset folder at metadata_path at `metadata_path/<dataset_id>`.
The folder should contain:
* `dataset_config.yaml`
* `README.md`
Args:
replace (str): Optional. Whether to replace existing folder.
Raises:
FileExistsError: If dataset folder already exists and replace is False
"""
# Create dataset folder
try:
self.dataset_folder.mkdir(exist_ok=replace, parents=True)
except FileExistsError:
raise FileExistsError(
f"Dataset {str(self.dataset_folder.stem)} folder does not exists. "
"Set replace=True to replace current files."
)
# create dataset_config.yaml with metadata
self.metadata.create(if_exists="replace")
# create README.md file
self._write_readme_file()
# Add code folder
(self.dataset_folder / "code").mkdir(exist_ok=replace, parents=True)
return self
publicize(self, mode='all', dataset_is_public=True)
Changes IAM configuration to turn BigQuery dataset public.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
mode |
bool |
Which dataset to create [prod|staging|all]. |
'all' |
dataset_is_public |
bool |
Control if prod dataset is public or not. By default staging datasets like |
True |
Source code in basedosdados/upload/dataset.py
def publicize(self, mode="all", dataset_is_public=True):
"""Changes IAM configuration to turn BigQuery dataset public.
Args:
mode (bool): Which dataset to create [prod|staging|all].
dataset_is_public (bool): Control if prod dataset is public or not. By default staging datasets like `dataset_id_staging` are not public.
"""
for m in self._loop_modes(mode):
dataset = m["client"].get_dataset(m["id"])
entries = dataset.access_entries
# TODO https://github.com/basedosdados/mais/pull/1020
if dataset_is_public and "staging" not in dataset.dataset_id:
entries.extend(
[
bigquery.AccessEntry(
role="roles/bigquery.dataViewer",
entity_type="iamMember",
entity_id="allUsers",
),
bigquery.AccessEntry(
role="roles/bigquery.metadataViewer",
entity_type="iamMember",
entity_id="allUsers",
),
bigquery.AccessEntry(
role="roles/bigquery.user",
entity_type="iamMember",
entity_id="allUsers",
),
]
)
dataset.access_entries = entries
m["client"].update_dataset(dataset, ["access_entries"])
logger.success(
" {object} {object_id}_{mode} was {action}!",
object_id=self.dataset_id,
mode=mode,
object="Dataset",
action="publicized",
)
update(self, mode='all', location=None)
Update dataset description. Toogle mode to choose which dataset to update.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
mode |
str |
Optional. Which dataset to update [prod|staging|all] |
'all' |
location |
str |
Optional. Location of dataset data. List of possible region names locations: https://cloud.google.com/bigquery/docs/locations |
None |
Source code in basedosdados/upload/dataset.py
def update(self, mode="all", location=None):
"""Update dataset description. Toogle mode to choose which dataset to update.
Args:
mode (str): Optional. Which dataset to update [prod|staging|all]
location (str): Optional. Location of dataset data.
List of possible region names locations: https://cloud.google.com/bigquery/docs/locations
"""
for m in self._loop_modes(mode):
# Send the dataset to the API to update, with an explicit timeout.
# Raises google.api_core.exceptions.Conflict if the Dataset already
# exists within the project.
dataset = m["client"].update_dataset(
self._setup_dataset_object(
m["id"],
location=location,
),
fields=["description"],
) # Make an API request.
logger.success(
" {object} {object_id}_{mode} was {action}!",
object_id=self.dataset_id,
mode=mode,
object="Dataset",
action="updated",
)
Table (Base)
Manage tables in Google Cloud Storage and BigQuery.
Source code in basedosdados/upload/table.py
class Table(Base):
"""
Manage tables in Google Cloud Storage and BigQuery.
"""
def __init__(self, dataset_id, table_id, **kwargs):
super().__init__(**kwargs)
self.table_id = table_id.replace("-", "_")
self.dataset_id = dataset_id.replace("-", "_")
self.dataset_folder = Path(self.metadata_path / self.dataset_id)
self.table_folder = self.dataset_folder / table_id
self.table_full_name = dict(
prod=f"{self.client['bigquery_prod'].project}.{self.dataset_id}.{self.table_id}",
staging=f"{self.client['bigquery_staging'].project}.{self.dataset_id}_staging.{self.table_id}",
)
self.table_full_name.update(dict(all=deepcopy(self.table_full_name)))
self.metadata = Metadata(self.dataset_id, self.table_id, **kwargs)
@property
def table_config(self):
return self._load_yaml(self.table_folder / "table_config.yaml")
def _get_table_obj(self, mode):
return self.client[f"bigquery_{mode}"].get_table(self.table_full_name[mode])
def _is_partitioned(self):
## check if the table are partitioned, need the split because of a change in the type of partitions in pydantic
partitions = self.table_config["partitions"]
if not partitions:
return False
elif isinstance(partitions, list):
# check if any None inside list.
# False if it is the case Ex: [None, 'partition']
# True otherwise Ex: ['partition1', 'partition2']
return all(item is not None for item in partitions)
def _load_schema(self, mode="staging"):
"""Load schema from table_config.yaml
Args:
mode (bool): Which dataset to create [prod|staging].
"""
self._check_mode(mode)
json_path = self.table_folder / f"schema-{mode}.json"
columns = self.table_config["columns"]
if mode == "staging":
new_columns = []
for c in columns:
# case is_in_staging are None then must be True
is_in_staging = (
True if c.get("is_in_staging") is None else c["is_in_staging"]
)
# append columns declared in table_config.yaml to schema only if is_in_staging: True
if is_in_staging and not c.get("is_partition"):
c["type"] = "STRING"
new_columns.append(c)
del columns
columns = new_columns
elif mode == "prod":
schema = self._get_table_obj(mode).schema
# get field names for fields at schema and at table_config.yaml
column_names = [c["name"] for c in columns]
schema_names = [s.name for s in schema]
# check if there are mismatched fields
not_in_columns = [name for name in schema_names if name not in column_names]
not_in_schema = [name for name in column_names if name not in schema_names]
# raise if field is not in table_config
if not_in_columns:
raise BaseDosDadosException(
"Column {error_columns} was not found in table_config.yaml. Are you sure that "
"all your column names between table_config.yaml, publish.sql and "
"{project_id}.{dataset_id}.{table_id} are the same?".format(
error_columns=not_in_columns,
project_id=self.table_config["project_id_prod"],
dataset_id=self.table_config["dataset_id"],
table_id=self.table_config["table_id"],
)
)
# raise if field is not in schema
elif not_in_schema:
raise BaseDosDadosException(
"Column {error_columns} was not found in publish.sql. Are you sure that "
"all your column names between table_config.yaml, publish.sql and "
"{project_id}.{dataset_id}.{table_id} are the same?".format(
error_columns=not_in_schema,
project_id=self.table_config["project_id_prod"],
dataset_id=self.table_config["dataset_id"],
table_id=self.table_config["table_id"],
)
)
else:
# if field is in schema, get field_type and field_mode
for c in columns:
for s in schema:
if c["name"] == s.name:
c["type"] = s.field_type
c["mode"] = s.mode
break
## force utf-8, write schema_{mode}.json
json.dump(columns, (json_path).open("w", encoding="utf-8"))
# load new created schema
return self.client[f"bigquery_{mode}"].schema_from_json(str(json_path))
def _make_publish_sql(self):
"""Create publish.sql with columns and bigquery_type"""
### publish.sql header and instructions
publish_txt = """
/*
Query para publicar a tabela.
Esse é o lugar para:
- modificar nomes, ordem e tipos de colunas
- dar join com outras tabelas
- criar colunas extras (e.g. logs, proporções, etc.)
Qualquer coluna definida aqui deve também existir em `table_config.yaml`.
# Além disso, sinta-se à vontade para alterar alguns nomes obscuros
# para algo um pouco mais explícito.
TIPOS:
- Para modificar tipos de colunas, basta substituir STRING por outro tipo válido.
- Exemplo: `SAFE_CAST(column_name AS NUMERIC) column_name`
- Mais detalhes: https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types
*/
"""
# remove triple quotes extra space
publish_txt = inspect.cleandoc(publish_txt)
publish_txt = textwrap.dedent(publish_txt)
# add create table statement
project_id_prod = self.client["bigquery_prod"].project
publish_txt += f"\n\nCREATE VIEW {project_id_prod}.{self.dataset_id}.{self.table_id} AS\nSELECT \n"
# sort columns by is_partition, partitions_columns come first
if self._is_partitioned():
columns = sorted(
self.table_config["columns"],
key=lambda k: (k["is_partition"] is not None, k["is_partition"]),
reverse=True,
)
else:
columns = self.table_config["columns"]
# add columns in publish.sql
for col in columns:
name = col["name"]
bigquery_type = (
"STRING"
if col["bigquery_type"] is None
else col["bigquery_type"].upper()
)
publish_txt += f"SAFE_CAST({name} AS {bigquery_type}) {name},\n"
## remove last comma
publish_txt = publish_txt[:-2] + "\n"
# add from statement
project_id_staging = self.client["bigquery_staging"].project
publish_txt += (
f"FROM {project_id_staging}.{self.dataset_id}_staging.{self.table_id} AS t"
)
# save publish.sql in table_folder
(self.table_folder / "publish.sql").open("w", encoding="utf-8").write(
publish_txt
)
def _make_template(self, columns, partition_columns, if_table_config_exists):
# create table_config.yaml with metadata
self.metadata.create(
if_exists=if_table_config_exists,
columns=partition_columns + columns,
partition_columns=partition_columns,
table_only=False,
)
self._make_publish_sql()
def _sheet_to_df(self, columns_config_url_or_path):
url = columns_config_url_or_path.replace("edit#gid=", "export?format=csv&gid=")
try:
return pd.read_csv(StringIO(requests.get(url).content.decode("utf-8")))
except:
raise BaseDosDadosException(
"Check if your google sheet Share are: Anyone on the internet with this link can view"
)
def table_exists(self, mode):
"""Check if table exists in BigQuery.
Args:
mode (str): Which dataset to check [prod|staging].
"""
try:
ref = self._get_table_obj(mode=mode)
except google.api_core.exceptions.NotFound:
ref = None
if ref:
return True
else:
return False
def update_columns(self, columns_config_url_or_path=None):
"""
Fills columns in table_config.yaml automatically using a public google sheets URL or a local file. Also regenerate
publish.sql and autofill type using bigquery_type.
The sheet must contain the columns:
- name: column name
- description: column description
- bigquery_type: column bigquery type
- measurement_unit: column mesurement unit
- covered_by_dictionary: column related dictionary
- directory_column: column related directory in the format <dataset_id>.<table_id>:<column_name>
- temporal_coverage: column temporal coverage
- has_sensitive_data: the column has sensitive data
- observations: column observations
Args:
columns_config_url_or_path (str): Path to the local architeture file or a public google sheets URL.
Path only suports csv, xls, xlsx, xlsm, xlsb, odf, ods, odt formats.
Google sheets URL must be in the format https://docs.google.com/spreadsheets/d/<table_key>/edit#gid=<table_gid>.
"""
ruamel = ryaml.YAML()
ruamel.preserve_quotes = True
ruamel.indent(mapping=4, sequence=6, offset=4)
table_config_yaml = ruamel.load(
(self.table_folder / "table_config.yaml").open(encoding="utf-8")
)
if "https://docs.google.com/spreadsheets/d/" in columns_config_url_or_path:
if (
"edit#gid=" not in columns_config_url_or_path
or "https://docs.google.com/spreadsheets/d/"
not in columns_config_url_or_path
or not columns_config_url_or_path.split("=")[1].isdigit()
):
raise BaseDosDadosException(
"The Google sheet url not in correct format."
"The url must be in the format https://docs.google.com/spreadsheets/d/<table_key>/edit#gid=<table_gid>"
)
df = self._sheet_to_df(columns_config_url_or_path)
else:
file_type = columns_config_url_or_path.split(".")[-1]
if file_type == "csv":
df = pd.read_csv(columns_config_url_or_path, encoding="utf-8")
elif file_type in ["xls", "xlsx", "xlsm", "xlsb", "odf", "ods", "odt"]:
df = pd.read_excel(columns_config_url_or_path)
else:
raise BaseDosDadosException(
"File not suported. Only csv, xls, xlsx, xlsm, xlsb, odf, ods, odt are supported."
)
df = df.fillna("NULL")
required_columns = [
"name",
"bigquery_type",
"description",
"temporal_coverage",
"covered_by_dictionary",
"directory_column",
"measurement_unit",
"has_sensitive_data",
"observations",
]
not_found_columns = required_columns.copy()
for sheet_column in df.columns.tolist():
for required_column in required_columns:
if sheet_column == required_column:
not_found_columns.remove(required_column)
if not_found_columns != []:
raise BaseDosDadosException(
f"The following required columns are not found: {', '.join(not_found_columns)}."
)
columns_parameters = zip(
*[df[required_column].tolist() for required_column in required_columns]
)
for (
name,
bigquery_type,
description,
temporal_coverage,
covered_by_dictionary,
directory_column,
measurement_unit,
has_sensitive_data,
observations,
) in columns_parameters:
for col in table_config_yaml["columns"]:
if col["name"] == name:
col["bigquery_type"] = (
col["bigquery_type"]
if bigquery_type == "NULL"
else bigquery_type.lower()
)
col["description"] = (
col["description"] if description == "NULL" else description
)
col["temporal_coverage"] = (
col["temporal_coverage"]
if temporal_coverage == "NULL"
else [temporal_coverage]
)
col["covered_by_dictionary"] = (
"no"
if covered_by_dictionary == "NULL"
else covered_by_dictionary
)
dataset = directory_column.split(".")[0]
col["directory_column"]["dataset_id"] = (
col["directory_column"]["dataset_id"]
if dataset == "NULL"
else dataset
)
table = directory_column.split(".")[-1].split(":")[0]
col["directory_column"]["table_id"] = (
col["directory_column"]["table_id"]
if table == "NULL"
else table
)
column = directory_column.split(".")[-1].split(":")[-1]
col["directory_column"]["column_name"] = (
col["directory_column"]["column_name"]
if column == "NULL"
else column
)
col["measurement_unit"] = (
col["measurement_unit"]
if measurement_unit == "NULL"
else measurement_unit
)
col["has_sensitive_data"] = (
"no" if has_sensitive_data == "NULL" else has_sensitive_data
)
col["observations"] = (
col["observations"] if observations == "NULL" else observations
)
ruamel.dump(
table_config_yaml,
open(self.table_folder / "table_config.yaml", "w", encoding="utf-8"),
)
# regenerate publish.sql
self._make_publish_sql()
def table_exists(self, mode):
"""Check if table exists in BigQuery.
Args:
mode (str): Which dataset to check [prod|staging|all].
"""
try:
ref = self._get_table_obj(mode=mode)
except google.api_core.exceptions.NotFound:
ref = None
if ref:
return True
else:
return False
def init(
self,
data_sample_path=None,
if_folder_exists="raise",
if_table_config_exists="raise",
source_format="csv",
columns_config_url_or_path=None,
):
"""Initialize table folder at metadata_path at `metadata_path/<dataset_id>/<table_id>`.
The folder should contain:
* `table_config.yaml`
* `publish.sql`
You can also point to a sample of the data to auto complete columns names.
Args:
data_sample_path (str, pathlib.PosixPath): Optional.
Data sample path to auto complete columns names
It supports Comma Delimited CSV, Apache Avro and
Apache Parquet.
if_folder_exists (str): Optional.
What to do if table folder exists
* 'raise' : Raises FileExistsError
* 'replace' : Replace folder
* 'pass' : Do nothing
if_table_config_exists (str): Optional
What to do if table_config.yaml and publish.sql exists
* 'raise' : Raises FileExistsError
* 'replace' : Replace files with blank template
* 'pass' : Do nothing
source_format (str): Optional
Data source format. Only 'csv', 'avro' and 'parquet'
are supported. Defaults to 'csv'.
columns_config_url_or_path (str): Path to the local architeture file or a public google sheets URL.
Path only suports csv, xls, xlsx, xlsm, xlsb, odf, ods, odt formats.
Google sheets URL must be in the format https://docs.google.com/spreadsheets/d/<table_key>/edit#gid=<table_gid>.
Raises:
FileExistsError: If folder exists and replace is False.
NotImplementedError: If data sample is not in supported type or format.
"""
if not self.dataset_folder.exists():
raise FileExistsError(
f"Dataset folder {self.dataset_folder} folder does not exists. "
"Create a dataset before adding tables."
)
try:
self.table_folder.mkdir(exist_ok=(if_folder_exists == "replace"))
except FileExistsError:
if if_folder_exists == "raise":
raise FileExistsError(
f"Table folder already exists for {self.table_id}. "
)
elif if_folder_exists == "pass":
return self
if not data_sample_path and if_table_config_exists != "pass":
raise BaseDosDadosException(
"You must provide a path to correctly create config files"
)
partition_columns = []
if isinstance(
data_sample_path,
(
str,
Path,
),
):
# Check if partitioned and get data sample and partition columns
data_sample_path = Path(data_sample_path)
if data_sample_path.is_dir():
data_sample_path = [
f
for f in data_sample_path.glob("**/*")
if f.is_file() and f.suffix == f".{source_format}"
][0]
partition_columns = [
k.split("=")[0]
for k in data_sample_path.as_posix().split("/")
if "=" in k
]
columns = Datatype(self, source_format).header(data_sample_path)
else:
columns = ["column_name"]
if if_table_config_exists == "pass":
# Check if config files exists before passing
if (
Path(self.table_folder / "table_config.yaml").is_file()
and Path(self.table_folder / "publish.sql").is_file()
):
pass
# Raise if no sample to determine columns
elif not data_sample_path:
raise BaseDosDadosException(
"You must provide a path to correctly create config files"
)
else:
self._make_template(columns, partition_columns, if_table_config_exists)
elif if_table_config_exists == "raise":
# Check if config files already exist
if (
Path(self.table_folder / "table_config.yaml").is_file()
and Path(self.table_folder / "publish.sql").is_file()
):
raise FileExistsError(
f"table_config.yaml and publish.sql already exists at {self.table_folder}"
)
# if config files don't exist, create them
else:
self._make_template(columns, partition_columns, if_table_config_exists)
else:
# Raise: without a path to data sample, should not replace config files with empty template
self._make_template(columns, partition_columns, if_table_config_exists)
if columns_config_url_or_path is not None:
self.update_columns(columns_config_url_or_path)
return self
def create(
self,
path=None,
job_config_params=None,
force_dataset=True,
if_table_exists="raise",
if_storage_data_exists="raise",
if_table_config_exists="raise",
source_format="csv",
columns_config_url_or_path=None,
dataset_is_public=True,
location=None,
):
"""Creates BigQuery table at staging dataset.
If you add a path, it automatically saves the data in the storage,
creates a datasets folder and BigQuery location, besides creating the
table and its configuration files.
The new table should be located at `<dataset_id>_staging.<table_id>` in BigQuery.
It looks for data saved in Storage at `<bucket_name>/staging/<dataset_id>/<table_id>/*`
and builds the table.
It currently supports the types:
- Comma Delimited CSV
- Apache Avro
- Apache Parquet
Data can also be partitioned following the hive partitioning scheme
`<key1>=<value1>/<key2>=<value2>` - for instance,
`year=2012/country=BR`. The partition is automatcally detected
by searching for `partitions` on the `table_config.yaml`.
Args:
path (str or pathlib.PosixPath): Where to find the file that you want to upload to create a table with
job_config_params (dict): Optional.
Job configuration params from bigquery
if_table_exists (str): Optional
What to do if table exists
* 'raise' : Raises Conflict exception
* 'replace' : Replace table
* 'pass' : Do nothing
force_dataset (bool): Creates `<dataset_id>` folder and BigQuery Dataset if it doesn't exists.
if_table_config_exists (str): Optional.
What to do if config files already exist
* 'raise': Raises FileExistError
* 'replace': Replace with blank template
* 'pass'; Do nothing
if_storage_data_exists (str): Optional.
What to do if data already exists on your bucket:
* 'raise' : Raises Conflict exception
* 'replace' : Replace table
* 'pass' : Do nothing
source_format (str): Optional
Data source format. Only 'csv', 'avro' and 'parquet'
are supported. Defaults to 'csv'.
columns_config_url_or_path (str): Path to the local architeture file or a public google sheets URL.
Path only suports csv, xls, xlsx, xlsm, xlsb, odf, ods, odt formats.
Google sheets URL must be in the format https://docs.google.com/spreadsheets/d/<table_key>/edit#gid=<table_gid>.
dataset_is_public (bool): Control if prod dataset is public or not. By default staging datasets like `dataset_id_staging` are not public.
location (str): Optional. Location of dataset data.
List of possible region names locations: https://cloud.google.com/bigquery/docs/locations
"""
if path is None:
# Look if table data already exists at Storage
data = self.client["storage_staging"].list_blobs(
self.bucket_name, prefix=f"staging/{self.dataset_id}/{self.table_id}"
)
# Raise: Cannot create table without external data
if not data:
raise BaseDosDadosException(
"You must provide a path for uploading data"
)
# Add data to storage
if isinstance(
path,
(
str,
Path,
),
):
Storage(self.dataset_id, self.table_id, **self.main_vars).upload(
path, mode="staging", if_exists=if_storage_data_exists
)
# Create Dataset if it doesn't exist
if force_dataset:
dataset_obj = Dataset(self.dataset_id, **self.main_vars)
try:
dataset_obj.init()
except FileExistsError:
pass
dataset_obj.create(
if_exists="pass", location=location, dataset_is_public=dataset_is_public
)
self.init(
data_sample_path=path,
if_folder_exists="replace",
if_table_config_exists=if_table_config_exists,
columns_config_url_or_path=columns_config_url_or_path,
source_format=source_format,
)
table = bigquery.Table(self.table_full_name["staging"])
table.external_data_configuration = Datatype(
self, source_format, "staging", partitioned=self._is_partitioned()
).external_config
# Lookup if table alreay exists
table_ref = None
try:
table_ref = self.client["bigquery_staging"].get_table(
self.table_full_name["staging"]
)
except google.api_core.exceptions.NotFound:
pass
if isinstance(table_ref, google.cloud.bigquery.table.Table):
if if_table_exists == "pass":
return None
elif if_table_exists == "raise":
raise FileExistsError(
"Table already exists, choose replace if you want to overwrite it"
)
if if_table_exists == "replace":
self.delete(mode="staging")
self.client["bigquery_staging"].create_table(table)
logger.success(
"{object} {object_id} was {action}!",
object_id=self.table_id,
object="Table",
action="created",
)
def update(self, mode="all", not_found_ok=True):
"""Updates BigQuery schema and description.
Args:
mode (str): Optional.
Table of which table to update [prod|staging|all]
not_found_ok (bool): Optional.
What to do if table is not found
"""
self._check_mode(mode)
mode = ["prod", "staging"] if mode == "all" else [mode]
for m in mode:
try:
table = self._get_table_obj(m)
except google.api_core.exceptions.NotFound:
continue
# if m == "staging":
table.description = self._render_template(
Path("table/table_description.txt"), self.table_config
)
# save table description
open(
self.metadata_path
/ self.dataset_id
/ self.table_id
/ "table_description.txt",
"w",
encoding="utf-8",
).write(table.description)
# when mode is staging the table schema already exists
table.schema = self._load_schema(m)
fields = ["description", "schema"] if m == "prod" else ["description"]
self.client[f"bigquery_{m}"].update_table(table, fields=fields)
logger.success(
" {object} {object_id} was {action}!",
object_id=self.table_id,
object="Table",
action="updated",
)
def publish(self, if_exists="raise"):
"""Creates BigQuery table at production dataset.
Table should be located at `<dataset_id>.<table_id>`.
It creates a view that uses the query from
`<metadata_path>/<dataset_id>/<table_id>/publish.sql`.
Make sure that all columns from the query also exists at
`<metadata_path>/<dataset_id>/<table_id>/table_config.sql`, including
the partitions.
Args:
if_exists (str): Optional.
What to do if table exists.
* 'raise' : Raises Conflict exception
* 'replace' : Replace table
* 'pass' : Do nothing
Todo:
* Check if all required fields are filled
"""
if if_exists == "replace":
self.delete(mode="prod")
self.client["bigquery_prod"].query(
(self.table_folder / "publish.sql").open("r", encoding="utf-8").read()
).result()
self.update()
logger.success(
" {object} {object_id} was {action}!",
object_id=self.table_id,
object="Table",
action="published",
)
def delete(self, mode):
"""Deletes table in BigQuery.
Args:
mode (str): Table of which table to delete [prod|staging]
"""
self._check_mode(mode)
if mode == "all":
for m, n in self.table_full_name[mode].items():
self.client[f"bigquery_{m}"].delete_table(n, not_found_ok=True)
logger.info(
" {object} {object_id}_{mode} was {action}!",
object_id=self.table_id,
mode=mode,
object="Table",
action="deleted",
)
else:
self.client[f"bigquery_{mode}"].delete_table(
self.table_full_name[mode], not_found_ok=True
)
logger.info(
" {object} {object_id}_{mode} was {action}!",
object_id=self.table_id,
mode=mode,
object="Table",
action="deleted",
)
def append(self, filepath, partitions=None, if_exists="replace", **upload_args):
"""Appends new data to existing BigQuery table.
As long as the data has the same schema. It appends the data in the
filepath to the existing table.
Args:
filepath (str or pathlib.PosixPath): Where to find the file that you want to upload to create a table with
partitions (str, pathlib.PosixPath, dict): Optional.
Hive structured partition as a string or dict
* str : `<key>=<value>/<key2>=<value2>`
* dict: `dict(key=value, key2=value2)`
if_exists (str): 0ptional.
What to do if data with same name exists in storage
* 'raise' : Raises Conflict exception
* 'replace' : Replace table
* 'pass' : Do nothing
"""
if not self.table_exists("staging"):
raise BaseDosDadosException(
"You cannot append to a table that does not exist"
)
else:
Storage(self.dataset_id, self.table_id, **self.main_vars).upload(
filepath,
mode="staging",
partitions=partitions,
if_exists=if_exists,
**upload_args,
)
logger.success(
" {object} {object_id} was {action}!",
object_id=self.table_id,
object="Table",
action="appended",
)
append(self, filepath, partitions=None, if_exists='replace', **upload_args)
Appends new data to existing BigQuery table.
As long as the data has the same schema. It appends the data in the filepath to the existing table.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filepath |
str or pathlib.PosixPath |
Where to find the file that you want to upload to create a table with |
required |
partitions |
str, pathlib.PosixPath, dict |
Optional. Hive structured partition as a string or dict
|
None |
if_exists |
str |
0ptional. What to do if data with same name exists in storage
|
'replace' |
Source code in basedosdados/upload/table.py
def append(self, filepath, partitions=None, if_exists="replace", **upload_args):
"""Appends new data to existing BigQuery table.
As long as the data has the same schema. It appends the data in the
filepath to the existing table.
Args:
filepath (str or pathlib.PosixPath): Where to find the file that you want to upload to create a table with
partitions (str, pathlib.PosixPath, dict): Optional.
Hive structured partition as a string or dict
* str : `<key>=<value>/<key2>=<value2>`
* dict: `dict(key=value, key2=value2)`
if_exists (str): 0ptional.
What to do if data with same name exists in storage
* 'raise' : Raises Conflict exception
* 'replace' : Replace table
* 'pass' : Do nothing
"""
if not self.table_exists("staging"):
raise BaseDosDadosException(
"You cannot append to a table that does not exist"
)
else:
Storage(self.dataset_id, self.table_id, **self.main_vars).upload(
filepath,
mode="staging",
partitions=partitions,
if_exists=if_exists,
**upload_args,
)
logger.success(
" {object} {object_id} was {action}!",
object_id=self.table_id,
object="Table",
action="appended",
)
create(self, path=None, job_config_params=None, force_dataset=True, if_table_exists='raise', if_storage_data_exists='raise', if_table_config_exists='raise', source_format='csv', columns_config_url_or_path=None, dataset_is_public=True, location=None)
Creates BigQuery table at staging dataset.
If you add a path, it automatically saves the data in the storage, creates a datasets folder and BigQuery location, besides creating the table and its configuration files.
The new table should be located at <dataset_id>_staging.<table_id>
in BigQuery.
It looks for data saved in Storage at <bucket_name>/staging/<dataset_id>/<table_id>/*
and builds the table.
It currently supports the types:
- Comma Delimited CSV
- Apache Avro
- Apache Parquet
Data can also be partitioned following the hive partitioning scheme
<key1>=<value1>/<key2>=<value2>
- for instance,
year=2012/country=BR
. The partition is automatcally detected
by searching for partitions
on the table_config.yaml
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str or pathlib.PosixPath |
Where to find the file that you want to upload to create a table with |
None |
job_config_params |
dict |
Optional. Job configuration params from bigquery |
None |
if_table_exists |
str |
Optional What to do if table exists
|
'raise' |
force_dataset |
bool |
Creates |
True |
if_table_config_exists |
str |
Optional. What to do if config files already exist
|
'raise' |
if_storage_data_exists |
str |
Optional. What to do if data already exists on your bucket:
|
'raise' |
source_format |
str |
Optional Data source format. Only 'csv', 'avro' and 'parquet' are supported. Defaults to 'csv'. |
'csv' |
columns_config_url_or_path |
str |
Path to the local architeture file or a public google sheets URL.
Path only suports csv, xls, xlsx, xlsm, xlsb, odf, ods, odt formats.
Google sheets URL must be in the format https://docs.google.com/spreadsheets/d/ |
None |
dataset_is_public |
bool |
Control if prod dataset is public or not. By default staging datasets like |
True |
location |
str |
Optional. Location of dataset data. List of possible region names locations: https://cloud.google.com/bigquery/docs/locations |
None |
Source code in basedosdados/upload/table.py
def create(
self,
path=None,
job_config_params=None,
force_dataset=True,
if_table_exists="raise",
if_storage_data_exists="raise",
if_table_config_exists="raise",
source_format="csv",
columns_config_url_or_path=None,
dataset_is_public=True,
location=None,
):
"""Creates BigQuery table at staging dataset.
If you add a path, it automatically saves the data in the storage,
creates a datasets folder and BigQuery location, besides creating the
table and its configuration files.
The new table should be located at `<dataset_id>_staging.<table_id>` in BigQuery.
It looks for data saved in Storage at `<bucket_name>/staging/<dataset_id>/<table_id>/*`
and builds the table.
It currently supports the types:
- Comma Delimited CSV
- Apache Avro
- Apache Parquet
Data can also be partitioned following the hive partitioning scheme
`<key1>=<value1>/<key2>=<value2>` - for instance,
`year=2012/country=BR`. The partition is automatcally detected
by searching for `partitions` on the `table_config.yaml`.
Args:
path (str or pathlib.PosixPath): Where to find the file that you want to upload to create a table with
job_config_params (dict): Optional.
Job configuration params from bigquery
if_table_exists (str): Optional
What to do if table exists
* 'raise' : Raises Conflict exception
* 'replace' : Replace table
* 'pass' : Do nothing
force_dataset (bool): Creates `<dataset_id>` folder and BigQuery Dataset if it doesn't exists.
if_table_config_exists (str): Optional.
What to do if config files already exist
* 'raise': Raises FileExistError
* 'replace': Replace with blank template
* 'pass'; Do nothing
if_storage_data_exists (str): Optional.
What to do if data already exists on your bucket:
* 'raise' : Raises Conflict exception
* 'replace' : Replace table
* 'pass' : Do nothing
source_format (str): Optional
Data source format. Only 'csv', 'avro' and 'parquet'
are supported. Defaults to 'csv'.
columns_config_url_or_path (str): Path to the local architeture file or a public google sheets URL.
Path only suports csv, xls, xlsx, xlsm, xlsb, odf, ods, odt formats.
Google sheets URL must be in the format https://docs.google.com/spreadsheets/d/<table_key>/edit#gid=<table_gid>.
dataset_is_public (bool): Control if prod dataset is public or not. By default staging datasets like `dataset_id_staging` are not public.
location (str): Optional. Location of dataset data.
List of possible region names locations: https://cloud.google.com/bigquery/docs/locations
"""
if path is None:
# Look if table data already exists at Storage
data = self.client["storage_staging"].list_blobs(
self.bucket_name, prefix=f"staging/{self.dataset_id}/{self.table_id}"
)
# Raise: Cannot create table without external data
if not data:
raise BaseDosDadosException(
"You must provide a path for uploading data"
)
# Add data to storage
if isinstance(
path,
(
str,
Path,
),
):
Storage(self.dataset_id, self.table_id, **self.main_vars).upload(
path, mode="staging", if_exists=if_storage_data_exists
)
# Create Dataset if it doesn't exist
if force_dataset:
dataset_obj = Dataset(self.dataset_id, **self.main_vars)
try:
dataset_obj.init()
except FileExistsError:
pass
dataset_obj.create(
if_exists="pass", location=location, dataset_is_public=dataset_is_public
)
self.init(
data_sample_path=path,
if_folder_exists="replace",
if_table_config_exists=if_table_config_exists,
columns_config_url_or_path=columns_config_url_or_path,
source_format=source_format,
)
table = bigquery.Table(self.table_full_name["staging"])
table.external_data_configuration = Datatype(
self, source_format, "staging", partitioned=self._is_partitioned()
).external_config
# Lookup if table alreay exists
table_ref = None
try:
table_ref = self.client["bigquery_staging"].get_table(
self.table_full_name["staging"]
)
except google.api_core.exceptions.NotFound:
pass
if isinstance(table_ref, google.cloud.bigquery.table.Table):
if if_table_exists == "pass":
return None
elif if_table_exists == "raise":
raise FileExistsError(
"Table already exists, choose replace if you want to overwrite it"
)
if if_table_exists == "replace":
self.delete(mode="staging")
self.client["bigquery_staging"].create_table(table)
logger.success(
"{object} {object_id} was {action}!",
object_id=self.table_id,
object="Table",
action="created",
)
delete(self, mode)
Deletes table in BigQuery.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
mode |
str |
Table of which table to delete [prod|staging] |
required |
Source code in basedosdados/upload/table.py
def delete(self, mode):
"""Deletes table in BigQuery.
Args:
mode (str): Table of which table to delete [prod|staging]
"""
self._check_mode(mode)
if mode == "all":
for m, n in self.table_full_name[mode].items():
self.client[f"bigquery_{m}"].delete_table(n, not_found_ok=True)
logger.info(
" {object} {object_id}_{mode} was {action}!",
object_id=self.table_id,
mode=mode,
object="Table",
action="deleted",
)
else:
self.client[f"bigquery_{mode}"].delete_table(
self.table_full_name[mode], not_found_ok=True
)
logger.info(
" {object} {object_id}_{mode} was {action}!",
object_id=self.table_id,
mode=mode,
object="Table",
action="deleted",
)
init(self, data_sample_path=None, if_folder_exists='raise', if_table_config_exists='raise', source_format='csv', columns_config_url_or_path=None)
Initialize table folder at metadata_path at metadata_path/<dataset_id>/<table_id>
.
The folder should contain:
table_config.yaml
publish.sql
You can also point to a sample of the data to auto complete columns names.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data_sample_path |
str, pathlib.PosixPath |
Optional. Data sample path to auto complete columns names It supports Comma Delimited CSV, Apache Avro and Apache Parquet. |
None |
if_folder_exists |
str |
Optional. What to do if table folder exists
|
'raise' |
if_table_config_exists |
str |
Optional What to do if table_config.yaml and publish.sql exists
|
'raise' |
source_format |
str |
Optional Data source format. Only 'csv', 'avro' and 'parquet' are supported. Defaults to 'csv'. |
'csv' |
columns_config_url_or_path |
str |
Path to the local architeture file or a public google sheets URL.
Path only suports csv, xls, xlsx, xlsm, xlsb, odf, ods, odt formats.
Google sheets URL must be in the format https://docs.google.com/spreadsheets/d/ |
None |
Exceptions:
Type | Description |
---|---|
FileExistsError |
If folder exists and replace is False. |
NotImplementedError |
If data sample is not in supported type or format. |
Source code in basedosdados/upload/table.py
def init(
self,
data_sample_path=None,
if_folder_exists="raise",
if_table_config_exists="raise",
source_format="csv",
columns_config_url_or_path=None,
):
"""Initialize table folder at metadata_path at `metadata_path/<dataset_id>/<table_id>`.
The folder should contain:
* `table_config.yaml`
* `publish.sql`
You can also point to a sample of the data to auto complete columns names.
Args:
data_sample_path (str, pathlib.PosixPath): Optional.
Data sample path to auto complete columns names
It supports Comma Delimited CSV, Apache Avro and
Apache Parquet.
if_folder_exists (str): Optional.
What to do if table folder exists
* 'raise' : Raises FileExistsError
* 'replace' : Replace folder
* 'pass' : Do nothing
if_table_config_exists (str): Optional
What to do if table_config.yaml and publish.sql exists
* 'raise' : Raises FileExistsError
* 'replace' : Replace files with blank template
* 'pass' : Do nothing
source_format (str): Optional
Data source format. Only 'csv', 'avro' and 'parquet'
are supported. Defaults to 'csv'.
columns_config_url_or_path (str): Path to the local architeture file or a public google sheets URL.
Path only suports csv, xls, xlsx, xlsm, xlsb, odf, ods, odt formats.
Google sheets URL must be in the format https://docs.google.com/spreadsheets/d/<table_key>/edit#gid=<table_gid>.
Raises:
FileExistsError: If folder exists and replace is False.
NotImplementedError: If data sample is not in supported type or format.
"""
if not self.dataset_folder.exists():
raise FileExistsError(
f"Dataset folder {self.dataset_folder} folder does not exists. "
"Create a dataset before adding tables."
)
try:
self.table_folder.mkdir(exist_ok=(if_folder_exists == "replace"))
except FileExistsError:
if if_folder_exists == "raise":
raise FileExistsError(
f"Table folder already exists for {self.table_id}. "
)
elif if_folder_exists == "pass":
return self
if not data_sample_path and if_table_config_exists != "pass":
raise BaseDosDadosException(
"You must provide a path to correctly create config files"
)
partition_columns = []
if isinstance(
data_sample_path,
(
str,
Path,
),
):
# Check if partitioned and get data sample and partition columns
data_sample_path = Path(data_sample_path)
if data_sample_path.is_dir():
data_sample_path = [
f
for f in data_sample_path.glob("**/*")
if f.is_file() and f.suffix == f".{source_format}"
][0]
partition_columns = [
k.split("=")[0]
for k in data_sample_path.as_posix().split("/")
if "=" in k
]
columns = Datatype(self, source_format).header(data_sample_path)
else:
columns = ["column_name"]
if if_table_config_exists == "pass":
# Check if config files exists before passing
if (
Path(self.table_folder / "table_config.yaml").is_file()
and Path(self.table_folder / "publish.sql").is_file()
):
pass
# Raise if no sample to determine columns
elif not data_sample_path:
raise BaseDosDadosException(
"You must provide a path to correctly create config files"
)
else:
self._make_template(columns, partition_columns, if_table_config_exists)
elif if_table_config_exists == "raise":
# Check if config files already exist
if (
Path(self.table_folder / "table_config.yaml").is_file()
and Path(self.table_folder / "publish.sql").is_file()
):
raise FileExistsError(
f"table_config.yaml and publish.sql already exists at {self.table_folder}"
)
# if config files don't exist, create them
else:
self._make_template(columns, partition_columns, if_table_config_exists)
else:
# Raise: without a path to data sample, should not replace config files with empty template
self._make_template(columns, partition_columns, if_table_config_exists)
if columns_config_url_or_path is not None:
self.update_columns(columns_config_url_or_path)
return self
publish(self, if_exists='raise')
Creates BigQuery table at production dataset.
Table should be located at <dataset_id>.<table_id>
.
It creates a view that uses the query from
<metadata_path>/<dataset_id>/<table_id>/publish.sql
.
Make sure that all columns from the query also exists at
<metadata_path>/<dataset_id>/<table_id>/table_config.sql
, including
the partitions.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
if_exists |
str |
Optional. What to do if table exists.
|
'raise' |
Todo:
* Check if all required fields are filled
Source code in basedosdados/upload/table.py
def publish(self, if_exists="raise"):
"""Creates BigQuery table at production dataset.
Table should be located at `<dataset_id>.<table_id>`.
It creates a view that uses the query from
`<metadata_path>/<dataset_id>/<table_id>/publish.sql`.
Make sure that all columns from the query also exists at
`<metadata_path>/<dataset_id>/<table_id>/table_config.sql`, including
the partitions.
Args:
if_exists (str): Optional.
What to do if table exists.
* 'raise' : Raises Conflict exception
* 'replace' : Replace table
* 'pass' : Do nothing
Todo:
* Check if all required fields are filled
"""
if if_exists == "replace":
self.delete(mode="prod")
self.client["bigquery_prod"].query(
(self.table_folder / "publish.sql").open("r", encoding="utf-8").read()
).result()
self.update()
logger.success(
" {object} {object_id} was {action}!",
object_id=self.table_id,
object="Table",
action="published",
)
table_exists(self, mode)
Check if table exists in BigQuery.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
mode |
str |
Which dataset to check [prod|staging|all]. |
required |
Source code in basedosdados/upload/table.py
def table_exists(self, mode):
"""Check if table exists in BigQuery.
Args:
mode (str): Which dataset to check [prod|staging|all].
"""
try:
ref = self._get_table_obj(mode=mode)
except google.api_core.exceptions.NotFound:
ref = None
if ref:
return True
else:
return False
update(self, mode='all', not_found_ok=True)
Updates BigQuery schema and description.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
mode |
str |
Optional. Table of which table to update [prod|staging|all] |
'all' |
not_found_ok |
bool |
Optional. What to do if table is not found |
True |
Source code in basedosdados/upload/table.py
def update(self, mode="all", not_found_ok=True):
"""Updates BigQuery schema and description.
Args:
mode (str): Optional.
Table of which table to update [prod|staging|all]
not_found_ok (bool): Optional.
What to do if table is not found
"""
self._check_mode(mode)
mode = ["prod", "staging"] if mode == "all" else [mode]
for m in mode:
try:
table = self._get_table_obj(m)
except google.api_core.exceptions.NotFound:
continue
# if m == "staging":
table.description = self._render_template(
Path("table/table_description.txt"), self.table_config
)
# save table description
open(
self.metadata_path
/ self.dataset_id
/ self.table_id
/ "table_description.txt",
"w",
encoding="utf-8",
).write(table.description)
# when mode is staging the table schema already exists
table.schema = self._load_schema(m)
fields = ["description", "schema"] if m == "prod" else ["description"]
self.client[f"bigquery_{m}"].update_table(table, fields=fields)
logger.success(
" {object} {object_id} was {action}!",
object_id=self.table_id,
object="Table",
action="updated",
)
update_columns(self, columns_config_url_or_path=None)
Fills columns in table_config.yaml automatically using a public google sheets URL or a local file. Also regenerate publish.sql and autofill type using bigquery_type.
The sheet must contain the columns:
- name: column name
- description: column description
- bigquery_type: column bigquery type
- measurement_unit: column mesurement unit
- covered_by_dictionary: column related dictionary
- directory_column: column related directory in the format
Parameters:
Name | Type | Description | Default |
---|---|---|---|
columns_config_url_or_path |
str |
Path to the local architeture file or a public google sheets URL.
Path only suports csv, xls, xlsx, xlsm, xlsb, odf, ods, odt formats.
Google sheets URL must be in the format https://docs.google.com/spreadsheets/d/ |
None |
Source code in basedosdados/upload/table.py
def update_columns(self, columns_config_url_or_path=None):
"""
Fills columns in table_config.yaml automatically using a public google sheets URL or a local file. Also regenerate
publish.sql and autofill type using bigquery_type.
The sheet must contain the columns:
- name: column name
- description: column description
- bigquery_type: column bigquery type
- measurement_unit: column mesurement unit
- covered_by_dictionary: column related dictionary
- directory_column: column related directory in the format <dataset_id>.<table_id>:<column_name>
- temporal_coverage: column temporal coverage
- has_sensitive_data: the column has sensitive data
- observations: column observations
Args:
columns_config_url_or_path (str): Path to the local architeture file or a public google sheets URL.
Path only suports csv, xls, xlsx, xlsm, xlsb, odf, ods, odt formats.
Google sheets URL must be in the format https://docs.google.com/spreadsheets/d/<table_key>/edit#gid=<table_gid>.
"""
ruamel = ryaml.YAML()
ruamel.preserve_quotes = True
ruamel.indent(mapping=4, sequence=6, offset=4)
table_config_yaml = ruamel.load(
(self.table_folder / "table_config.yaml").open(encoding="utf-8")
)
if "https://docs.google.com/spreadsheets/d/" in columns_config_url_or_path:
if (
"edit#gid=" not in columns_config_url_or_path
or "https://docs.google.com/spreadsheets/d/"
not in columns_config_url_or_path
or not columns_config_url_or_path.split("=")[1].isdigit()
):
raise BaseDosDadosException(
"The Google sheet url not in correct format."
"The url must be in the format https://docs.google.com/spreadsheets/d/<table_key>/edit#gid=<table_gid>"
)
df = self._sheet_to_df(columns_config_url_or_path)
else:
file_type = columns_config_url_or_path.split(".")[-1]
if file_type == "csv":
df = pd.read_csv(columns_config_url_or_path, encoding="utf-8")
elif file_type in ["xls", "xlsx", "xlsm", "xlsb", "odf", "ods", "odt"]:
df = pd.read_excel(columns_config_url_or_path)
else:
raise BaseDosDadosException(
"File not suported. Only csv, xls, xlsx, xlsm, xlsb, odf, ods, odt are supported."
)
df = df.fillna("NULL")
required_columns = [
"name",
"bigquery_type",
"description",
"temporal_coverage",
"covered_by_dictionary",
"directory_column",
"measurement_unit",
"has_sensitive_data",
"observations",
]
not_found_columns = required_columns.copy()
for sheet_column in df.columns.tolist():
for required_column in required_columns:
if sheet_column == required_column:
not_found_columns.remove(required_column)
if not_found_columns != []:
raise BaseDosDadosException(
f"The following required columns are not found: {', '.join(not_found_columns)}."
)
columns_parameters = zip(
*[df[required_column].tolist() for required_column in required_columns]
)
for (
name,
bigquery_type,
description,
temporal_coverage,
covered_by_dictionary,
directory_column,
measurement_unit,
has_sensitive_data,
observations,
) in columns_parameters:
for col in table_config_yaml["columns"]:
if col["name"] == name:
col["bigquery_type"] = (
col["bigquery_type"]
if bigquery_type == "NULL"
else bigquery_type.lower()
)
col["description"] = (
col["description"] if description == "NULL" else description
)
col["temporal_coverage"] = (
col["temporal_coverage"]
if temporal_coverage == "NULL"
else [temporal_coverage]
)
col["covered_by_dictionary"] = (
"no"
if covered_by_dictionary == "NULL"
else covered_by_dictionary
)
dataset = directory_column.split(".")[0]
col["directory_column"]["dataset_id"] = (
col["directory_column"]["dataset_id"]
if dataset == "NULL"
else dataset
)
table = directory_column.split(".")[-1].split(":")[0]
col["directory_column"]["table_id"] = (
col["directory_column"]["table_id"]
if table == "NULL"
else table
)
column = directory_column.split(".")[-1].split(":")[-1]
col["directory_column"]["column_name"] = (
col["directory_column"]["column_name"]
if column == "NULL"
else column
)
col["measurement_unit"] = (
col["measurement_unit"]
if measurement_unit == "NULL"
else measurement_unit
)
col["has_sensitive_data"] = (
"no" if has_sensitive_data == "NULL" else has_sensitive_data
)
col["observations"] = (
col["observations"] if observations == "NULL" else observations
)
ruamel.dump(
table_config_yaml,
open(self.table_folder / "table_config.yaml", "w", encoding="utf-8"),
)
# regenerate publish.sql
self._make_publish_sql()