Python API
Esta API é composta de módulos para requisição de dados: :
download
, que permite baixar tabelas do BigQuery em CSV direto na sua máquina.read_sql
, que permite fazer uma query SQL e carregar os dados no ambiente do Python.read_table
, que permite ler uma tabela do BigQuery pelo nome e carregar os dados no ambiente do Python.
E também de classes para gerenciamento de dados no Google Cloud:
Storage
, que permite manusear arquivos no StorageDataset
, que permite manusear datasets no BigQueryTable
, que permite manusear tables
Toda documentação do código abaixo está em inglês
Módulos
download(savepath, query=None, dataset_id=None, table_id=None, query_project_id='basedosdados', billing_project_id=None, limit=None, reauth=False, **pandas_kwargs)
Download table or query result from basedosdados BigQuery (or other).
-
Using a query:
download('select * from
basedosdados.br_suporte.diretorio_municipioslimit 10')
-
Using dataset_id & table_id:
download(dataset_id='br_suporte', table_id='diretorio_municipios')
You can also add arguments to modify save parameters:
download(dataset_id='br_suporte', table_id='diretorio_municipios', index=False, sep='|')
Parameters:
Name | Type | Description | Default |
---|---|---|---|
savepath |
str, pathlib.PosixPath |
If savepath is a folder, it saves a file as |
required |
query |
str |
Optional. Valid SQL Standard Query to basedosdados. If query is available, dataset_id and table_id are not required. |
None |
dataset_id |
str |
Optional. Dataset id available in basedosdados. It should always come with table_id. |
None |
table_id |
str |
Optional. Table id available in basedosdados.dataset_id. It should always come with dataset_id. |
None |
query_project_id |
str |
Optional. Which project the table lives. You can change this you want to query different projects. |
'basedosdados' |
billing_project_id |
str |
Optional. Project that will be billed. Find your Project ID here https://console.cloud.google.com/projectselector2/home/dashboard |
None |
limit |
int |
Optional Number of rows. |
None |
reauth |
boolean |
Optional. Re-authorize Google Cloud Project in case you need to change user or reset configurations. |
False |
pandas_kwargs |
|
Extra arguments accepted by pandas.to_csv |
{} |
Exceptions:
Type | Description |
---|---|
Exception |
If either table_id or dataset_id were are empty. |
Source code in basedosdados/download.py
def download(
savepath,
query=None,
dataset_id=None,
table_id=None,
query_project_id="basedosdados",
billing_project_id=None,
limit=None,
reauth=False,
**pandas_kwargs,
):
"""Download table or query result from basedosdados BigQuery (or other).
* Using a **query**:
`download('select * from `basedosdados.br_suporte.diretorio_municipios` limit 10')`
* Using **dataset_id & table_id**:
`download(dataset_id='br_suporte', table_id='diretorio_municipios')`
You can also add arguments to modify save parameters:
`download(dataset_id='br_suporte', table_id='diretorio_municipios', index=False, sep='|')`
Args:
savepath (str, pathlib.PosixPath):
If savepath is a folder, it saves a file as `savepath / table_id.csv` or
`savepath / query_result.csv` if table_id not available.
If savepath is a file, saves data to file.
query (str): Optional.
Valid SQL Standard Query to basedosdados. If query is available,
dataset_id and table_id are not required.
dataset_id (str): Optional.
Dataset id available in basedosdados. It should always come with table_id.
table_id (str): Optional.
Table id available in basedosdados.dataset_id.
It should always come with dataset_id.
query_project_id (str): Optional.
Which project the table lives. You can change this you want to query different projects.
billing_project_id (str): Optional.
Project that will be billed. Find your Project ID here https://console.cloud.google.com/projectselector2/home/dashboard
limit (int): Optional
Number of rows.
reauth (boolean): Optional.
Re-authorize Google Cloud Project in case you need to change user or reset configurations.
pandas_kwargs ():
Extra arguments accepted by [pandas.to_csv](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_csv.html)
Raises:
Exception: If either table_id or dataset_id were are empty.
"""
savepath = Path(savepath)
if (dataset_id is not None) and (table_id is not None):
table = read_table(
dataset_id,
table_id,
query_project_id=query_project_id,
billing_project_id=billing_project_id,
limit=limit,
reauth=reauth,
)
elif query is not None:
query += f" limit {limit}" if limit is not None else ""
table = read_sql(query, billing_project_id=billing_project_id, reauth=reauth)
elif query is None:
raise BaseDosDadosException(
"Either table_id, dataset_id or query should be filled."
)
if savepath.is_dir():
if table_id is not None:
savepath = savepath / (table_id + ".csv")
else:
savepath = savepath / ("query_result.csv")
table.to_csv(savepath, **pandas_kwargs)
read_sql(query, billing_project_id=None, reauth=False)
Load data from BigQuery using a query. Just a wrapper around pandas.read_gbq
Parameters:
Name | Type | Description | Default |
---|---|---|---|
query |
sql |
Valid SQL Standard Query to basedosdados |
required |
billing_project_id |
str |
Optional. Project that will be billed. Find your Project ID here https://console.cloud.google.com/projectselector2/home/dashboard |
None |
reauth |
boolean |
Optional. Re-authorize Google Cloud Project in case you need to change user or reset configurations. |
False |
Returns:
Type | Description |
---|---|
pd.DataFrame |
Query result |
Source code in basedosdados/download.py
def read_sql(query, billing_project_id=None, reauth=False):
"""Load data from BigQuery using a query. Just a wrapper around pandas.read_gbq
Args:
query (sql):
Valid SQL Standard Query to basedosdados
billing_project_id (str): Optional.
Project that will be billed. Find your Project ID here https://console.cloud.google.com/projectselector2/home/dashboard
reauth (boolean): Optional.
Re-authorize Google Cloud Project in case you need to change user or reset configurations.
Returns:
pd.DataFrame:
Query result
"""
try:
return pandas_gbq.read_gbq(
query,
credentials=credentials(reauth=reauth),
project_id=billing_project_id,
)
except (OSError, ValueError):
raise BaseDosDadosException(
"\nWe are not sure which Google Cloud project should be billed.\n"
"First, you should make sure that you have a Google Cloud project.\n"
"If you don't have one, set one up following these steps: \n"
"\t1. Go to this link https://console.cloud.google.com/projectselector2/home/dashboard\n"
"\t2. Agree with Terms of Service if asked\n"
"\t3. Click in Create Project\n"
"\t4. Put a cool name in your project\n"
"\t5. Hit create\n"
""
"Copy the Project ID, (notice that it is not the Project Name)\n"
"Now, you have two options:\n"
"1. Add an argument to your function poiting to the billing project id.\n"
" Like `bd.read_table('br_ibge_pib', 'municipios', billing_project_id=<YOUR_PROJECT_ID>)`\n"
"2. You can set a project_id in the environment by running the following command in your terminal: `gcloud config set project <YOUR_PROJECT_ID>`."
" Bear in mind that you need `gcloud` installed."
)
except GenericGBQException as e:
if "Reason: 403" in str(e):
raise BaseDosDadosException(
"\nYou still don't have a Google Cloud Project.\n"
"Set one up following these steps: \n"
"1. Go to this link https://console.cloud.google.com/projectselector2/home/dashboard\n"
"2. Agree with Terms of Service if asked\n"
"3. Click in Create Project\n"
"4. Put a cool name in your project\n"
"5. Hit create\n"
"6. Rerun this command with the flag `reauth=True`. \n"
" Like `read_table('br_ibge_pib', 'municipios', reauth=True)`"
)
else:
raise e
read_table(dataset_id, table_id, query_project_id='basedosdados', billing_project_id=None, limit=None, reauth=False)
Load data from BigQuery using dataset_id and table_id.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dataset_id |
str |
Optional. Dataset id available in basedosdados. It should always come with table_id. |
required |
table_id |
str |
Optional. Table id available in basedosdados.dataset_id. It should always come with dataset_id. |
required |
query_project_id |
str |
Optional. Which project the table lives. You can change this you want to query different projects. |
'basedosdados' |
billing_project_id |
str |
Optional. Project that will be billed. Find your Project ID here https://console.cloud.google.com/projectselector2/home/dashboard |
None |
reauth |
boolean |
Optional. Re-authorize Google Cloud Project in case you need to change user or reset configurations. |
False |
limit |
int |
Optional. Number of rows to read from table. |
None |
Returns:
Type | Description |
---|---|
pd.DataFrame |
Query result |
Source code in basedosdados/download.py
def read_table(
dataset_id,
table_id,
query_project_id="basedosdados",
billing_project_id=None,
limit=None,
reauth=False,
):
"""Load data from BigQuery using dataset_id and table_id.
Args:
dataset_id (str): Optional.
Dataset id available in basedosdados. It should always come with table_id.
table_id (str): Optional.
Table id available in basedosdados.dataset_id.
It should always come with dataset_id.
query_project_id (str): Optional.
Which project the table lives. You can change this you want to query different projects.
billing_project_id (str): Optional.
Project that will be billed. Find your Project ID here https://console.cloud.google.com/projectselector2/home/dashboard
reauth (boolean): Optional.
Re-authorize Google Cloud Project in case you need to change user or reset configurations.
limit (int): Optional.
Number of rows to read from table.
Returns:
pd.DataFrame:
Query result
"""
if (dataset_id is not None) and (table_id is not None):
query = f"""
SELECT *
FROM `{query_project_id}.{dataset_id}.{table_id}`"""
if limit is not None:
query += f" LIMIT {limit}"
else:
raise BaseDosDadosException("Both table_id and dataset_id should be filled.")
return read_sql(query, billing_project_id=billing_project_id, reauth=reauth)
Classes
Storage
Manage files on Google Cloud Storage.
delete_file(self, filename, mode, partitions=None, not_found_ok=False)
Deletes file from path <bucket_name>/<mode>/<dataset_id>/<table_id>/<partitions>/<filename>
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filename |
str |
Name of the file to be deleted |
required |
mode |
str |
Folder of which dataset to update [raw|staging|all] |
required |
partitions |
str, pathlib.PosixPath, or dict |
Optional. Hive structured partition as a string or dict
|
None |
not_found_ok |
bool |
Optional. What to do if file not found |
False |
Source code in basedosdados/storage.py
def delete_file(self, filename, mode, partitions=None, not_found_ok=False):
"""Deletes file from path `<bucket_name>/<mode>/<dataset_id>/<table_id>/<partitions>/<filename>`.
Args:
filename (str): Name of the file to be deleted
mode (str): Folder of which dataset to update [raw|staging|all]
partitions (str, pathlib.PosixPath, or dict): Optional.
Hive structured partition as a string or dict
* str : `<key>=<value>/<key2>=<value2>`
* dict: `dict(key=value, key2=value2)`
not_found_ok (bool): Optional.
What to do if file not found
"""
self._check_mode(mode)
if mode == "all":
mode = ["raw", "staging"]
else:
mode = [mode]
for m in mode:
blob = self.bucket.blob(self._build_blob_name(filename, m, partitions))
if blob.exists():
blob.delete()
elif not_found_ok:
return
else:
blob.delete()
init(self, replace=False, very_sure=False)
Initializes bucket and folders.
Folder should be:
raw
: that contains really raw datastaging
: preprocessed data ready to upload to BigQuery
Parameters:
Name | Type | Description | Default |
---|---|---|---|
replace |
bool |
Optional. Whether to replace if bucket already exists |
False |
very_sure |
bool |
Optional. Are you aware that everything is going to be erased if you replace the bucket? |
False |
Exceptions:
Type | Description |
---|---|
Warning |
very_sure argument is still False. |
Source code in basedosdados/storage.py
def init(self, replace=False, very_sure=False):
"""Initializes bucket and folders.
Folder should be:
* `raw` : that contains really raw data
* `staging` : preprocessed data ready to upload to BigQuery
Args:
replace (bool): Optional.
Whether to replace if bucket already exists
very_sure (bool): Optional.
Are you aware that everything is going to be erased if you
replace the bucket?
Raises:
Warning: very_sure argument is still False.
"""
if replace:
if not very_sure:
raise Warning(
"\n********************************************************"
"\nYou are trying to replace all the data that you have "
f"in bucket {self.bucket_name}.\nAre you sure?\n"
"If yes, add the flag --very_sure\n"
"********************************************************"
)
else:
self.bucket.delete(force=True)
self.client["storage_staging"].create_bucket(self.bucket)
for folder in ["staging/", "raw/"]:
self.bucket.blob(folder).upload_from_string("")
upload(self, path, mode='all', partitions=None, if_exists='raise', **upload_args)
Upload to storage at <bucket_name>/<mode>/<dataset_id>/<table_id>
. You can:
-
Add a single file setting
path = <file_path>
. -
Add a folder with multiple files setting
path = <folder_path>
. The folder should just contain the files and no folders. -
Add partitioned files setting
path = <folder_path>
. This folder must follow the hive partitioning scheme i.e.<table_id>/<key>=<value>/<key2>=<value2>/<partition>.csv
(ex:mytable/country=brasil/year=2020/mypart.csv
).
Remember all files must follow a single schema. Otherwise, things might fail in the future.
There are 3 modes:
raw
: should contain raw files from datasourcestaging
: should contain pre-treated files ready to upload to BiqQueryall
: if no treatment is needed, useall
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str or pathlib.PosixPath |
Where to find the file or folder that you want to upload to storage |
required |
mode |
str |
Folder of which dataset to update [raw|staging|all] |
'all' |
partitions |
str, pathlib.PosixPath, or dict |
Optional. If adding a single file, use this to add it to a specific partition.
|
None |
if_exists |
str |
Optional. What to do if data exists
|
'raise' |
upload_args |
|
Extra arguments accepted by |
{} |
Source code in basedosdados/storage.py
def upload(
self,
path,
mode="all",
partitions=None,
if_exists="raise",
**upload_args,
):
"""Upload to storage at `<bucket_name>/<mode>/<dataset_id>/<table_id>`. You can:
* Add a single **file** setting `path = <file_path>`.
* Add a **folder** with multiple files setting `path =
<folder_path>`. *The folder should just contain the files and
no folders.*
* Add **partitioned files** setting `path = <folder_path>`.
This folder must follow the hive partitioning scheme i.e.
`<table_id>/<key>=<value>/<key2>=<value2>/<partition>.csv`
(ex: `mytable/country=brasil/year=2020/mypart.csv`).
*Remember all files must follow a single schema.* Otherwise, things
might fail in the future.
There are 3 modes:
* `raw` : should contain raw files from datasource
* `staging` : should contain pre-treated files ready to upload to BiqQuery
* `all`: if no treatment is needed, use `all`.
Args:
path (str or pathlib.PosixPath): Where to find the file or
folder that you want to upload to storage
mode (str): Folder of which dataset to update [raw|staging|all]
partitions (str, pathlib.PosixPath, or dict): Optional.
*If adding a single file*, use this to add it to a specific partition.
* str : `<key>=<value>/<key2>=<value2>`
* dict: `dict(key=value, key2=value2)`
if_exists (str): Optional.
What to do if data exists
* 'raise' : Raises Conflict exception
* 'replace' : Replace table
* 'pass' : Do nothing
upload_args ():
Extra arguments accepted by [`google.cloud.storage.blob.Blob.upload_from_file`](https://googleapis.dev/python/storage/latest/blobs.html?highlight=upload_from_filename#google.cloud.storage.blob.Blob.upload_from_filename)
"""
if (self.dataset_id is None) or (self.table_id is None):
raise Exception("You need to pass dataset_id and table_id")
path = Path(path)
if path.is_dir():
paths = [f for f in path.glob("**/*") if f.is_file() and f.suffix == ".csv"]
parts = [
(
str(filepath)
.replace(str(path) + "/", "")
.replace(str(filepath.name), "")
)
for filepath in paths
]
else:
paths = [path]
parts = [partitions or None]
self._check_mode(mode)
if mode == "all":
mode = ["raw", "staging"]
else:
mode = [mode]
for m in mode:
for filepath, part in tqdm(list(zip(paths, parts)), desc="Uploading files"):
blob_name = self._build_blob_name(filepath.name, m, part)
blob = self.bucket.blob(blob_name)
if not blob.exists() or if_exists == "replace":
upload_args["timeout"] = upload_args.get("timeout", None)
blob.upload_from_filename(str(filepath), **upload_args)
else:
raise Exception(
f"Data already exists at {self.bucket_name}/{blob_name}. "
"Set if_exists to 'replace' to overwrite data"
)
Dataset
Manage datasets in BigQuery.
create(self, mode='all', if_exists='raise')
Creates BigQuery datasets given dataset_id
.
It can create two datasets:
<dataset_id>
(mode = 'prod')<dataset_id>_staging
(mode = 'staging')
If mode
is all, it creates both.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
mode |
str |
Optional. Which dataset to create [prod|staging|all]. |
'all' |
if_exists |
str |
Optional. What to do if dataset exists
|
'raise' |
Exceptions:
Type | Description |
---|---|
Warning |
Dataset already exists and if_exists is set to |
Source code in basedosdados/dataset.py
def create(self, mode="all", if_exists="raise"):
"""Creates BigQuery datasets given `dataset_id`.
It can create two datasets:
* `<dataset_id>` (mode = 'prod')
* `<dataset_id>_staging` (mode = 'staging')
If `mode` is all, it creates both.
Args:
mode (str): Optional. Which dataset to create [prod|staging|all].
if_exists (str): Optional. What to do if dataset exists
* raise : Raises Conflic exception
* replace : Drop all tables and replace dataset
* update : Update dataset description
* pass : Do nothing
Raises:
Warning: Dataset already exists and if_exists is set to `raise`
"""
if if_exists == "replace":
self.delete(mode)
elif if_exists == "update":
self.update()
return
# Set dataset_id to the ID of the dataset to create.
for m in self._loop_modes(mode):
# Construct a full Dataset object to send to the API.
dataset_obj = self._setup_dataset_object(m["id"])
# Send the dataset to the API for creation, with an explicit timeout.
# Raises google.api_core.exceptions.Conflict if the Dataset already
# exists within the project.
try:
job = m["client"].create_dataset(dataset_obj) # Make an API request.
except Conflict:
if if_exists == "pass":
return
else:
raise Conflict(f"Dataset {self.dataset_id} already exists")
# Make prod dataset public
self.publicize()
delete(self, mode='all')
Deletes dataset in BigQuery. Toogle mode to choose which dataset to delete.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
mode |
str |
Optional. Which dataset to delete [prod|staging|all] |
'all' |
Source code in basedosdados/dataset.py
def delete(self, mode="all"):
"""Deletes dataset in BigQuery. Toogle mode to choose which dataset to delete.
Args:
mode (str): Optional. Which dataset to delete [prod|staging|all]
"""
for m in self._loop_modes(mode):
m["client"].delete_dataset(m["id"], delete_contents=True, not_found_ok=True)
init(self, replace=False)
Initialize dataset folder at metadata_path at metadata_path/<dataset_id>
.
The folder should contain:
dataset_config.yaml
README.md
Parameters:
Name | Type | Description | Default |
---|---|---|---|
replace |
str |
Optional. Whether to replace existing folder. |
False |
Exceptions:
Type | Description |
---|---|
FileExistsError |
If dataset folder already exists and replace is False |
Source code in basedosdados/dataset.py
def init(self, replace=False):
"""Initialize dataset folder at metadata_path at `metadata_path/<dataset_id>`.
The folder should contain:
* `dataset_config.yaml`
* `README.md`
Args:
replace (str): Optional. Whether to replace existing folder.
Raises:
FileExistsError: If dataset folder already exists and replace is False
"""
# Create dataset folder
try:
self.dataset_folder.mkdir(exist_ok=replace, parents=True)
except FileExistsError:
raise FileExistsError(
f"Dataset {str(self.dataset_folder.stem)} folder does not exists. "
"Set replace=True to replace current files."
)
for file in (Path(self.templates) / "dataset").glob("*"):
if file.name in ["dataset_config.yaml", "README.md"]:
# Load and fill template
template = self._render_template(
f"dataset/{file.name}", dict(dataset_id=self.dataset_id)
)
# Write file
(self.dataset_folder / file.name).open("w").write(template)
# Add code folder
(self.dataset_folder / "code").mkdir(exist_ok=replace, parents=True)
return self
publicize(self, mode='all')
Changes IAM configuration to turn BigQuery dataset public.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
mode |
bool |
Which dataset to create [prod|staging|all]. |
'all' |
Source code in basedosdados/dataset.py
def publicize(self, mode="all"):
"""Changes IAM configuration to turn BigQuery dataset public.
Args:
mode (bool): Which dataset to create [prod|staging|all].
"""
for m in self._loop_modes(mode):
dataset = m["client"].get_dataset(m["id"])
entries = dataset.access_entries
entries.extend(
[
bigquery.AccessEntry(
role="roles/bigquery.dataViewer",
entity_type="iamMember",
entity_id="allUsers",
),
bigquery.AccessEntry(
role="roles/bigquery.metadataViewer",
entity_type="iamMember",
entity_id="allUsers",
),
bigquery.AccessEntry(
role="roles/bigquery.user",
entity_type="iamMember",
entity_id="allUsers",
),
]
)
dataset.access_entries = entries
m["client"].update_dataset(dataset, ["access_entries"])
update(self, mode='all')
Update dataset description. Toogle mode to choose which dataset to update.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
mode |
str |
Optional. Which dataset to update [prod|staging|all] |
'all' |
Source code in basedosdados/dataset.py
def update(self, mode="all"):
"""Update dataset description. Toogle mode to choose which dataset to update.
Args:
mode (str): Optional. Which dataset to update [prod|staging|all]
"""
for m in self._loop_modes(mode):
# Send the dataset to the API to update, with an explicit timeout.
# Raises google.api_core.exceptions.Conflict if the Dataset already
# exists within the project.
dataset = m["client"].update_dataset(
self._setup_dataset_object(m["id"]), fields=["description"]
) # Make an API request.
Table
Manage tables in Google Cloud Storage and BigQuery.
append(self, filepath, partitions=None, if_exists='raise', **upload_args)
Appends new data to existing BigQuery table.
As long as the data has the same schema. It appends the data in the filepath to the existing table.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filepath |
str or pathlib.PosixPath |
Where to find the file that you want to upload to create a table with |
required |
partitions |
str, pathlib.PosixPath, dict |
Optional. Hive structured partition as a string or dict
|
None |
if_exists |
str |
0ptional. What to do if data with same name exists in storage
|
'raise' |
Source code in basedosdados/table.py
def append(self, filepath, partitions=None, if_exists="raise", **upload_args):
"""Appends new data to existing BigQuery table.
As long as the data has the same schema. It appends the data in the
filepath to the existing table.
Args:
filepath (str or pathlib.PosixPath): Where to find the file that you want to upload to create a table with
partitions (str, pathlib.PosixPath, dict): Optional.
Hive structured partition as a string or dict
* str : `<key>=<value>/<key2>=<value2>`
* dict: `dict(key=value, key2=value2)`
if_exists (str): 0ptional.
What to do if data with same name exists in storage
* 'raise' : Raises Conflict exception
* 'replace' : Replace table
* 'pass' : Do nothing
"""
Storage(self.dataset_id, self.table_id, **self.main_vars).upload(
filepath,
mode="staging",
partitions=None,
if_exists=if_exists,
**upload_args,
)
self.create(if_exists="replace")
create(self, path=None, job_config_params=None, partitioned=False, if_exists='raise', force_dataset=True)
Creates BigQuery table at staging dataset.
If you add a path, it automatically saves the data in the storage, creates a datasets folder and BigQuery location, besides creating the table and its configuration files.
The new table should be located at <dataset_id>_staging.<table_id>
in BigQuery.
It looks for data saved in Storage at <bucket_name>/staging/<dataset_id>/<table_id>/*
and builds the table.
It currently supports the types: - Comma Delimited CSV
Data can also be partitioned following the hive partitioning scheme
<key1>=<value1>/<key2>=<value2>
, for instance, year=2012/country=BR
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str or pathlib.PosixPath |
Where to find the file that you want to upload to create a table with |
None |
job_config_params |
dict |
Optional. Job configuration params from bigquery |
None |
partitioned |
bool |
Optional. Whether data is partitioned |
False |
if_exists |
str |
Optional What to do if table exists
|
'raise' |
force_dataset |
bool |
Creates |
True |
Todo:
* Implement if_exists=raise
* Implement if_exists=pass
Source code in basedosdados/table.py
def create(
self,
path=None,
job_config_params=None,
partitioned=False,
if_exists="raise",
force_dataset=True,
):
"""Creates BigQuery table at staging dataset.
If you add a path, it automatically saves the data in the storage,
creates a datasets folder and BigQuery location, besides creating the
table and its configuration files.
The new table should be located at `<dataset_id>_staging.<table_id>` in BigQuery.
It looks for data saved in Storage at `<bucket_name>/staging/<dataset_id>/<table_id>/*`
and builds the table.
It currently supports the types:
- Comma Delimited CSV
Data can also be partitioned following the hive partitioning scheme
`<key1>=<value1>/<key2>=<value2>`, for instance, `year=2012/country=BR`
Args:
path (str or pathlib.PosixPath): Where to find the file that you want to upload to create a table with
job_config_params (dict): Optional.
Job configuration params from bigquery
partitioned (bool): Optional.
Whether data is partitioned
if_exists (str): Optional
What to do if table exists
* 'raise' : Raises Conflict exception
* 'replace' : Replace table
* 'pass' : Do nothing
force_dataset (bool): Creates `<dataset_id>` folder and BigQuery Dataset if it doesn't exists.
Todo:
* Implement if_exists=raise
* Implement if_exists=pass
"""
# Add data to storage
if isinstance(
path,
(
str,
PosixPath,
),
):
Storage(self.dataset_id, self.table_id, **self.main_vars).upload(
path, mode="staging", if_exists="replace"
)
# Create Dataset if it doesn't exist
if force_dataset:
dataset_obj = Dataset(self.dataset_id, **self.main_vars)
try:
dataset_obj.init()
except FileExistsError:
pass
dataset_obj.create(if_exists="pass")
self.init(data_sample_path=path, if_exists="replace")
external_config = external_config = bigquery.ExternalConfig("CSV")
external_config.options.skip_leading_rows = 1
external_config.options.allow_quoted_newlines = True
external_config.options.allow_jagged_rows = True
external_config.autodetect = False
external_config.schema = self._load_schema("staging")
external_config.source_uris = (
f"gs://{self.bucket_name}/staging/{self.dataset_id}/{self.table_id}/*"
)
if partitioned:
hive_partitioning = bigquery.external_config.HivePartitioningOptions()
hive_partitioning.mode = "AUTO"
hive_partitioning.source_uri_prefix = self.uri.format(
dataset=self.dataset_id, table=self.table_id
).replace("*", "")
external_config.hive_partitioning = hive_partitioning
table = bigquery.Table(self.table_full_name["staging"])
table.external_data_configuration = external_config
if if_exists == "replace":
self.delete(mode="staging")
self.client["bigquery_staging"].create_table(table)
table = bigquery.Table(self.table_full_name["staging"])
delete(self, mode)
Deletes table in BigQuery.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
mode |
str |
Table of which table to delete [prod|staging|all] |
required |
Source code in basedosdados/table.py
def delete(self, mode):
"""Deletes table in BigQuery.
Args:
mode (str): Table of which table to delete [prod|staging|all]
"""
self._check_mode(mode)
if mode == "all":
for m, n in self.table_full_name[mode].items():
self.client[f"bigquery_{m}"].delete_table(n, not_found_ok=True)
else:
self.client[f"bigquery_{mode}"].delete_table(
self.table_full_name[mode], not_found_ok=True
)
init(self, data_sample_path=None, if_exists='raise')
Initialize table folder at metadata_path at metadata_path/<dataset_id>/<table_id>
.
The folder should contain:
table_config.yaml
publish.sql
You can also point to a sample of the data to auto complete columns names.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data_sample_path |
str, pathlib.PosixPath |
Optional. Data sample path to auto complete columns names It supports Comma Delimited CSV. |
None |
if_exists |
str |
Optional. What to do if table folder exists
|
'raise' |
Exceptions:
Type | Description |
---|---|
FileExistsError |
If folder exists and replace is False. |
NotImplementedError |
If data sample is not in supported type or format. |
Source code in basedosdados/table.py
def init(self, data_sample_path=None, if_exists="raise"):
"""Initialize table folder at metadata_path at `metadata_path/<dataset_id>/<table_id>`.
The folder should contain:
* `table_config.yaml`
* `publish.sql`
You can also point to a sample of the data to auto complete columns names.
Args:
data_sample_path (str, pathlib.PosixPath): Optional.
Data sample path to auto complete columns names
It supports Comma Delimited CSV.
if_exists (str): Optional.
What to do if table folder exists
* 'raise' : Raises FileExistsError
* 'replace' : Replace folder
* 'pass' : Do nothing
Raises:
FileExistsError: If folder exists and replace is False.
NotImplementedError: If data sample is not in supported type or format.
"""
if not self.dataset_folder.exists():
raise FileExistsError(
f"Dataset folder {self.dataset_folder} folder does not exists. "
"Create a dataset before adding tables."
)
try:
self.table_folder.mkdir(exist_ok=(if_exists == "replace"))
except FileExistsError:
if if_exists == "raise":
raise FileExistsError(
f"Table folder already exists for {self.table_id}. "
)
elif if_exists == "pass":
return self
partition_columns = []
if isinstance(
data_sample_path,
(
str,
PosixPath,
),
):
# Check if partitioned and get data sample and partition columns
data_sample_path = Path(data_sample_path)
if data_sample_path.is_dir():
data_sample_path = [
f
for f in data_sample_path.glob("**/*")
if f.is_file() and f.suffix == ".csv"
][0]
partition_columns = [
k.split("=")[0]
for k in str(data_sample_path).split("/")
if "=" in k
]
if data_sample_path.suffix == ".csv":
columns = next(csv.reader(open(data_sample_path, "r")))
else:
raise NotImplementedError(
"Data sample just supports comma separated csv files"
)
else:
columns = ["column_name"]
for file in (Path(self.templates) / "table").glob("*"):
if file.name in ["table_config.yaml", "publish.sql"]:
# Load and fill template
template = Template(file.open("r").read()).render(
table_id=self.table_id,
dataset_id=self.dataset_folder.stem,
project_id=self.client["bigquery_staging"].project,
columns=columns,
partition_columns=partition_columns,
now=datetime.datetime.now().strftime("%Y-%m-%d"),
)
# Write file
(self.table_folder / file.name).open("w").write(template)
return self
publish(self, if_exists='raise')
Creates BigQuery table at production dataset.
Table should be located at <dataset_id>.<table_id>
.
It creates a view that uses the query from
<metadata_path>/<dataset_id>/<table_id>/publish.sql
.
Make sure that all columns from the query also exists at
<metadata_path>/<dataset_id>/<table_id>/table_config.sql
, including
the partitions.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
if_exists |
str |
Optional. What to do if table exists.
|
'raise' |
Todo:
* Check if all required fields are filled
Source code in basedosdados/table.py
def publish(self, if_exists="raise"):
"""Creates BigQuery table at production dataset.
Table should be located at `<dataset_id>.<table_id>`.
It creates a view that uses the query from
`<metadata_path>/<dataset_id>/<table_id>/publish.sql`.
Make sure that all columns from the query also exists at
`<metadata_path>/<dataset_id>/<table_id>/table_config.sql`, including
the partitions.
Args:
if_exists (str): Optional.
What to do if table exists.
* 'raise' : Raises Conflict exception
* 'replace' : Replace table
* 'pass' : Do nothing
Todo:
* Check if all required fields are filled
"""
if if_exists == "replace":
self.delete(mode="prod")
self.client["bigquery_prod"].query(
(self.table_folder / "publish.sql").open("r").read()
)
self.update("prod")
update(self, mode='all', not_found_ok=True)
Updates BigQuery schema and description.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
mode |
str |
Optional. Table of which table to update [prod|staging|all] |
'all' |
not_found_ok |
bool |
Optional. What to do if table is not found |
True |
Source code in basedosdados/table.py
def update(self, mode="all", not_found_ok=True):
"""Updates BigQuery schema and description.
Args:
mode (str): Optional.
Table of which table to update [prod|staging|all]
not_found_ok (bool): Optional.
What to do if table is not found
"""
self._check_mode(mode)
if mode == "all":
mode = ["prod", "staging"]
else:
mode = [mode]
for m in mode:
try:
table = self._get_table_obj(m)
except google.api_core.exceptions.NotFound:
continue
table.description = self._render_template(
"table/table_description.txt", self.table_config
)
# save table description
open(
self.metadata_path
/ self.dataset_id
/ self.table_id
/ "table_description.txt",
"w",
).write(table.description)
# if m == "prod":/
table.schema = self._load_schema(m)
self.client[f"bigquery_{m}"].update_table(
table, fields=["description", "schema"]
)