Ir para o conteúdo

Python

Esta API é composta por funções com 2 tipos de funcionalidade:

  • Módulos para requisição de dados: para aquele(as) que desejam somente consultar os dados e metadados do nosso projeto (ou qualquer outro projeto no Google Cloud).

  • Classes para gerenciamento de dados no Google Cloud: para aqueles(as) que desejam subir dados no nosso projeto (ou qualquer outro projeto no Google Cloud, seguindo a nossa metodologia e infraestrutura).

Toda documentação do código abaixo está em inglês

Módulos (Requisição de dados)

download(savepath, query=None, dataset_id=None, table_id=None, query_project_id='basedosdados', billing_project_id=None, limit=None, from_file=False, reauth=False, **pandas_kwargs)

Download table or query result from basedosdados BigQuery (or other).

  • Using a query:

    download('select * frombasedosdados.br_suporte.diretorio_municipioslimit 10')

  • Using dataset_id & table_id:

    download(dataset_id='br_suporte', table_id='diretorio_municipios')

You can also add arguments to modify save parameters:

download(dataset_id='br_suporte', table_id='diretorio_municipios', index=False, sep='|')

Parameters:

Name Type Description Default
savepath str, pathlib.PosixPath

If savepath is a folder, it saves a file as savepath / table_id.csv or savepath / query_result.csv if table_id not available. If savepath is a file, saves data to file.

required
query str

Optional. Valid SQL Standard Query to basedosdados. If query is available, dataset_id and table_id are not required.

None
dataset_id str

Optional. Dataset id available in basedosdados. It should always come with table_id.

None
table_id str

Optional. Table id available in basedosdados.dataset_id. It should always come with dataset_id.

None
query_project_id str

Optional. Which project the table lives. You can change this you want to query different projects.

'basedosdados'
billing_project_id str

Optional. Project that will be billed. Find your Project ID here https://console.cloud.google.com/projectselector2/home/dashboard

None
limit int

Optional Number of rows.

None
reauth boolean

Optional. Re-authorize Google Cloud Project in case you need to change user or reset configurations.

False
pandas_kwargs

Extra arguments accepted by pandas.to_csv

{}

Exceptions:

Type Description
Exception

If either table_id or dataset_id were are empty.

Source code in basedosdados/download/download.py
def download(
    savepath,
    query=None,
    dataset_id=None,
    table_id=None,
    query_project_id="basedosdados",
    billing_project_id=None,
    limit=None,
    from_file=False,
    reauth=False,
    **pandas_kwargs,
):
    """Download table or query result from basedosdados BigQuery (or other).

    * Using a **query**:

        `download('select * from `basedosdados.br_suporte.diretorio_municipios` limit 10')`

    * Using **dataset_id & table_id**:

        `download(dataset_id='br_suporte', table_id='diretorio_municipios')`

    You can also add arguments to modify save parameters:

    `download(dataset_id='br_suporte', table_id='diretorio_municipios', index=False, sep='|')`


    Args:
        savepath (str, pathlib.PosixPath):
            If savepath is a folder, it saves a file as `savepath / table_id.csv` or
            `savepath / query_result.csv` if table_id not available.
            If savepath is a file, saves data to file.
        query (str): Optional.
            Valid SQL Standard Query to basedosdados. If query is available,
            dataset_id and table_id are not required.
        dataset_id (str): Optional.
            Dataset id available in basedosdados. It should always come with table_id.
        table_id (str): Optional.
            Table id available in basedosdados.dataset_id.
            It should always come with dataset_id.
        query_project_id (str): Optional.
            Which project the table lives. You can change this you want to query different projects.
        billing_project_id (str): Optional.
            Project that will be billed. Find your Project ID here https://console.cloud.google.com/projectselector2/home/dashboard
        limit (int): Optional
            Number of rows.
        reauth (boolean): Optional.
            Re-authorize Google Cloud Project in case you need to change user or reset configurations.
        pandas_kwargs ():
            Extra arguments accepted by [pandas.to_csv](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_csv.html)

    Raises:
        Exception: If either table_id or dataset_id were are empty.
    """

    savepath = Path(savepath)

    # make sure that path exists
    if savepath.is_dir():
        savepath.mkdir(parents=True, exist_ok=True)
    else:
        savepath.parent.mkdir(parents=True, exist_ok=True)

    if (dataset_id is not None) and (table_id is not None):
        table = read_table(
            dataset_id,
            table_id,
            query_project_id=query_project_id,
            billing_project_id=billing_project_id,
            limit=limit,
            reauth=reauth,
            from_file=from_file,
        )

    elif query is not None:

        query += f" limit {limit}" if limit is not None else ""

        table = read_sql(
            query,
            billing_project_id=billing_project_id,
            from_file=from_file,
            reauth=reauth,
        )

    elif query is None:
        raise BaseDosDadosException(
            "Either table_id, dataset_id or query should be filled."
        )

    if savepath.is_dir():
        if table_id is not None:
            savepath = savepath / (table_id + ".csv")
        else:
            savepath = savepath / ("query_result.csv")

    pandas_kwargs["index"] = pandas_kwargs.get("index", False)

    table.to_csv(savepath, **pandas_kwargs)

get_dataset_description(dataset_id=None, query_project_id='basedosdados', from_file=False)

Prints the full dataset description.

Parameters:

Name Type Description Default
dataset_id str

Optional. Dataset id available in basedosdados.

None
query_project_id str

Optional. Which project the table lives. You can change this you want to query different projects.

'basedosdados'
Source code in basedosdados/download/download.py
def get_dataset_description(
    dataset_id=None, query_project_id="basedosdados", from_file=False
):
    """Prints the full dataset description.

    Args:
        dataset_id (str): Optional.
            Dataset id available in basedosdados.
        query_project_id (str): Optional.
            Which project the table lives. You can change this you want to query different projects.
    """

    client = bigquery.Client(
        credentials=credentials(from_file=from_file), project=query_project_id
    )

    dataset = client.get_dataset(dataset_id)

    print(dataset.description)

    return None

get_table_columns(dataset_id=None, table_id=None, query_project_id='basedosdados', from_file=False)

Fetch the names, types and descriptions for the columns in the specified table. Prints information on screen.

Parameters:

Name Type Description Default
dataset_id str

Optional. Dataset id available in basedosdados. It should always come with table_id.

None
table_id str

Optional. Table id available in basedosdados.dataset_id. It should always come with dataset_id.

None
query_project_id str

Optional. Which project the table lives. You can change this you want to query different projects.

'basedosdados'

Examples:

get_table_columns( dataset_id='br_ibge_censo2010', table_id='pessoa_renda_setor_censitario' )

Source code in basedosdados/download/download.py
def get_table_columns(
    dataset_id=None,
    table_id=None,
    query_project_id="basedosdados",
    from_file=False,
):

    """Fetch the names, types and descriptions for the columns in the specified table. Prints
    information on screen.

    Args:
        dataset_id (str): Optional.
            Dataset id available in basedosdados. It should always come with table_id.
        table_id (str): Optional.
            Table id available in basedosdados.dataset_id.
            It should always come with dataset_id.
        query_project_id (str): Optional.
            Which project the table lives. You can change this you want to query different projects.
    Example:
        get_table_columns(
        dataset_id='br_ibge_censo2010',
        table_id='pessoa_renda_setor_censitario'
        )
    """

    client = bigquery.Client(
        credentials=credentials(from_file=from_file), project=query_project_id
    )

    table_ref = client.get_table(f"{dataset_id}.{table_id}")

    columns = [
        (field.name, field.field_type, field.description) for field in table_ref.schema
    ]

    description = pd.DataFrame(columns, columns=["name", "field_type", "description"])

    _print_output(description)

    return None

get_table_description(dataset_id=None, table_id=None, query_project_id='basedosdados', from_file=False)

Prints the full table description.

Parameters:

Name Type Description Default
dataset_id str

Optional. Dataset id available in basedosdados. It should always come with table_id.

None
table_id str

Optional. Table id available in basedosdados.dataset_id. It should always come with dataset_id.

None
query_project_id str

Optional. Which project the table lives. You can change this you want to query different projects.

'basedosdados'
Source code in basedosdados/download/download.py
def get_table_description(
    dataset_id=None,
    table_id=None,
    query_project_id="basedosdados",
    from_file=False,
):
    """Prints the full table description.

    Args:
        dataset_id (str): Optional.
            Dataset id available in basedosdados. It should always come with table_id.
        table_id (str): Optional.
            Table id available in basedosdados.dataset_id.
            It should always come with dataset_id.
        query_project_id (str): Optional.
            Which project the table lives. You can change this you want to query different projects.
    """

    client = bigquery.Client(
        credentials=credentials(from_file=from_file), project=query_project_id
    )

    table = client.get_table(f"{dataset_id}.{table_id}")

    print(table.description)

    return None

get_table_size(dataset_id, table_id, billing_project_id, query_project_id='basedosdados', from_file=False)

Use a query to get the number of rows and size (in Mb) of a table query from BigQuery. Prints information on screen in markdown friendly format.

WARNING: this query may cost a lot depending on the table.

Parameters:

Name Type Description Default
dataset_id str

Optional. Dataset id available in basedosdados. It should always come with table_id.

required
table_id str

Optional. Table id available in basedosdados.dataset_id. It should always come with dataset_id.

required
query_project_id str

Optional. Which project the table lives. You can change this you want to query different projects.

'basedosdados'
billing_project_id str

Optional. Project that will be billed. Find your Project ID here https://console.cloud.google.com/projectselector2/home/dashboard

required

Examples:

get_table_size( dataset_id='br_ibge_censo2010', table_id='pessoa_renda_setor_censitario', billing_project_id='yourprojectid' )

Source code in basedosdados/download/download.py
def get_table_size(
    dataset_id,
    table_id,
    billing_project_id,
    query_project_id="basedosdados",
    from_file=False,
):
    """Use a query to get the number of rows and size (in Mb) of a table query
    from BigQuery. Prints information on screen in markdown friendly format.

    WARNING: this query may cost a lot depending on the table.

    Args:
        dataset_id (str): Optional.
            Dataset id available in basedosdados. It should always come with table_id.
        table_id (str): Optional.
            Table id available in basedosdados.dataset_id.
            It should always come with dataset_id.
        query_project_id (str): Optional.
            Which project the table lives. You can change this you want to query different projects.
        billing_project_id (str): Optional.
            Project that will be billed. Find your Project ID here https://console.cloud.google.com/projectselector2/home/dashboard
    Example:
        get_table_size(
        dataset_id='br_ibge_censo2010',
        table_id='pessoa_renda_setor_censitario',
        billing_project_id='yourprojectid'
        )
    """
    billing_client = bigquery.Client(
        credentials=credentials(from_file=from_file), project=billing_project_id
    )

    query = f"""SELECT COUNT(*) FROM {query_project_id}.{dataset_id}.{table_id}"""

    job = billing_client.query(query, location="US")

    num_rows = job.to_dataframe().loc[0, "f0_"]

    size_mb = round(job.total_bytes_processed / 1024 / 1024, 2)

    table_data = pd.DataFrame(
        [
            {
                "project_id": query_project_id,
                "dataset_id": dataset_id,
                "table_id": table_id,
                "num_rows": num_rows,
                "size_mb": size_mb,
            }
        ]
    )

    _print_output(table_data)

    return None

list_dataset_tables(dataset_id, query_project_id='basedosdados', from_file=False, filter_by=None, with_description=False)

Fetch table_id for tables available at the specified dataset_id. Prints the information on screen.

Parameters:

Name Type Description Default
dataset_id str

Optional. Dataset id available in basedosdados.

required
query_project_id str

Optional. Which project the table lives. You can change this you want to query different projects.

'basedosdados'
filter_by str

Optional String to be matched in the table_id.

None
with_description bool

Optional If True, fetch short table descriptions for each table that match the search criteria.

False

Examples:

list_dataset_tables( dataset_id='br_ibge_censo2010' filter_by='renda', with_description=True, )

Source code in basedosdados/download/download.py
def list_dataset_tables(
    dataset_id,
    query_project_id="basedosdados",
    from_file=False,
    filter_by=None,
    with_description=False,
):
    """Fetch table_id for tables available at the specified dataset_id. Prints the information
    on screen.

    Args:
        dataset_id (str): Optional.
            Dataset id available in basedosdados.
        query_project_id (str): Optional.
            Which project the table lives. You can change this you want to query different projects.
        filter_by (str): Optional
            String to be matched in the table_id.
        with_description (bool): Optional
             If True, fetch short table descriptions for each table that match the search criteria.

    Example:
        list_dataset_tables(
        dataset_id='br_ibge_censo2010'
        filter_by='renda',
        with_description=True,
        )
    """
    client = bigquery.Client(
        credentials=credentials(from_file=from_file), project=query_project_id
    )

    dataset = client.get_dataset(dataset_id)

    tables_list = list(client.list_tables(dataset))

    tables = pd.DataFrame(
        [table.table_id for table in tables_list], columns=["table_id"]
    )

    if filter_by:

        tables = tables.loc[tables["table_id"].str.contains(filter_by)]

    if with_description:

        tables["description"] = [
            _get_header(client.get_table(f"{dataset_id}.{table}").description)
            for table in tables["table_id"]
        ]

    _print_output(tables)

    return None

list_datasets(query_project_id='basedosdados', filter_by=None, with_description=False, from_file=False)

Fetch the dataset_id of datasets available at query_project_id. Prints information on screen.

Parameters:

Name Type Description Default
query_project_id str

Optional. Which project the table lives. You can change this you want to query different projects.

'basedosdados'
filter_by str

Optional String to be matched in dataset_id.

None
with_description bool

Optional If True, fetch short dataset description for each dataset.

False

Examples:

list_datasets( filter_by='sp', with_description=True, )

Source code in basedosdados/download/download.py
def list_datasets(
    query_project_id="basedosdados",
    filter_by=None,
    with_description=False,
    from_file=False,
):
    """Fetch the dataset_id of datasets available at query_project_id. Prints information on
    screen.

    Args:
        query_project_id (str): Optional.
            Which project the table lives. You can change this you want to query different projects.
        filter_by (str): Optional
            String to be matched in dataset_id.
        with_description (bool): Optional
            If True, fetch short dataset description for each dataset.

    Example:
        list_datasets(
        filter_by='sp',
        with_description=True,
        )
    """

    client = bigquery.Client(
        credentials=credentials(from_file=from_file), project=query_project_id
    )

    datasets_list = list(client.list_datasets())

    datasets = pd.DataFrame(
        [dataset.dataset_id for dataset in datasets_list], columns=["dataset_id"]
    )

    if filter_by:

        datasets = datasets.loc[datasets["dataset_id"].str.contains(filter_by)]

    if with_description:

        datasets["description"] = [
            _get_header(client.get_dataset(dataset).description)
            for dataset in datasets["dataset_id"]
        ]

    _print_output(datasets)

    return None

read_sql(query, billing_project_id=None, from_file=False, reauth=False)

Load data from BigQuery using a query. Just a wrapper around pandas.read_gbq

Parameters:

Name Type Description Default
query sql

Valid SQL Standard Query to basedosdados

required
billing_project_id str

Optional. Project that will be billed. Find your Project ID here https://console.cloud.google.com/projectselector2/home/dashboard

None
reauth boolean

Optional. Re-authorize Google Cloud Project in case you need to change user or reset configurations.

False

Returns:

Type Description
pd.DataFrame

Query result

Source code in basedosdados/download/download.py
def read_sql(query, billing_project_id=None, from_file=False, reauth=False):
    """Load data from BigQuery using a query. Just a wrapper around pandas.read_gbq

    Args:
        query (sql):
            Valid SQL Standard Query to basedosdados
        billing_project_id (str): Optional.
            Project that will be billed. Find your Project ID here https://console.cloud.google.com/projectselector2/home/dashboard
        reauth (boolean): Optional.
            Re-authorize Google Cloud Project in case you need to change user or reset configurations.

    Returns:
        pd.DataFrame:
            Query result
    """

    try:

        # Set a two hours timeout
        bigquery_storage_v1.client.BigQueryReadClient.read_rows = partialmethod(
            bigquery_storage_v1.client.BigQueryReadClient.read_rows,
            timeout=3600 * 2,
        )

        return pandas_gbq.read_gbq(
            query,
            credentials=credentials(from_file=from_file, reauth=reauth),
            project_id=billing_project_id,
        )
    except (OSError, ValueError) as e:
        msg = ( "\nWe are not sure which Google Cloud project should be billed.\n"
                "First, you should make sure that you have a Google Cloud project.\n"
                "If you don't have one, set one up following these steps: \n"
                "\t1. Go to this link https://console.cloud.google.com/projectselector2/home/dashboard\n"
                "\t2. Agree with Terms of Service if asked\n"
                "\t3. Click in Create Project\n"
                "\t4. Put a cool name in your project\n"
                "\t5. Hit create\n"
                "\n"
                "Copy the Project ID, (notice that it is not the Project Name)\n"
                "Now, you have two options:\n"
                "1. Add an argument to your function poiting to the billing project id.\n"
                "   Like `bd.read_table('br_ibge_pib', 'municipios', billing_project_id=<YOUR_PROJECT_ID>)`\n"
                "2. You can set a project_id in the environment by running the following command in your terminal: `gcloud config set project <YOUR_PROJECT_ID>`.\n"
                "   Bear in mind that you need `gcloud` installed."
        )
        raise BaseDosDadosException(msg) from e
    except GenericGBQException as e:
        if "Reason: 403" in str(e):
            raise BaseDosDadosException(
                "\nYou still don't have a Google Cloud Project.\n"
                "Set one up following these steps: \n"
                "1. Go to this link https://console.cloud.google.com/projectselector2/home/dashboard\n"
                "2. Agree with Terms of Service if asked\n"
                "3. Click in Create Project\n"
                "4. Put a cool name in your project\n"
                "5. Hit create\n"
                "6. Rerun this command with the flag `reauth=True`. \n"
                "   Like `read_table('br_ibge_pib', 'municipios', reauth=True)`"
            )
        raise

read_table(dataset_id, table_id, query_project_id='basedosdados', billing_project_id=None, limit=None, from_file=False, reauth=False)

Load data from BigQuery using dataset_id and table_id.

Parameters:

Name Type Description Default
dataset_id str

Optional. Dataset id available in basedosdados. It should always come with table_id.

required
table_id str

Optional. Table id available in basedosdados.dataset_id. It should always come with dataset_id.

required
query_project_id str

Optional. Which project the table lives. You can change this you want to query different projects.

'basedosdados'
billing_project_id str

Optional. Project that will be billed. Find your Project ID here https://console.cloud.google.com/projectselector2/home/dashboard

None
reauth boolean

Optional. Re-authorize Google Cloud Project in case you need to change user or reset configurations.

False
limit int

Optional. Number of rows to read from table.

None

Returns:

Type Description
pd.DataFrame

Query result

Source code in basedosdados/download/download.py
def read_table(
    dataset_id,
    table_id,
    query_project_id="basedosdados",
    billing_project_id=None,
    limit=None,
    from_file=False,
    reauth=False,
):
    """Load data from BigQuery using dataset_id and table_id.

    Args:
        dataset_id (str): Optional.
            Dataset id available in basedosdados. It should always come with table_id.
        table_id (str): Optional.
            Table id available in basedosdados.dataset_id.
            It should always come with dataset_id.
        query_project_id (str): Optional.
            Which project the table lives. You can change this you want to query different projects.
        billing_project_id (str): Optional.
            Project that will be billed. Find your Project ID here https://console.cloud.google.com/projectselector2/home/dashboard
        reauth (boolean): Optional.
            Re-authorize Google Cloud Project in case you need to change user or reset configurations.
        limit (int): Optional.
            Number of rows to read from table.

    Returns:
        pd.DataFrame:
            Query result
    """

    if (dataset_id is not None) and (table_id is not None):
        query = f"""
        SELECT * 
        FROM `{query_project_id}.{dataset_id}.{table_id}`"""

        if limit is not None:

            query += f" LIMIT {limit}"
    else:
        raise BaseDosDadosException("Both table_id and dataset_id should be filled.")

    return read_sql(
        query, billing_project_id=billing_project_id, from_file=from_file, reauth=reauth
    )

Classes (Gerenciamento de dados)

Storage

Manage files on Google Cloud Storage.

copy_table(self, source_bucket_name='basedosdados', destination_bucket_name=None, mode='staging')

Copies table from a source bucket to your bucket, sends request in batches.

Parameters:

Name Type Description Default
source_bucket_name str

The bucket name from which to copy data. You can change it to copy from other external bucket.

'basedosdados'
destination_bucket_name str

Optional The bucket name where data will be copied to. If None, defaults to the bucket initialized when instantiating the Storage object (You can check it with the Storage().bucket property)

None
mode str

Optional Folder of which dataset to update. Defaults to "staging".

'staging'
Source code in basedosdados/upload/storage.py
def copy_table(
    self,
    source_bucket_name="basedosdados",
    destination_bucket_name=None,
    mode="staging",
):
    """Copies table from a source bucket to your bucket, sends request in batches.

    Args:
        source_bucket_name (str):
            The bucket name from which to copy data. You can change it
            to copy from other external bucket.

        destination_bucket_name (str): Optional
            The bucket name where data will be copied to.
            If None, defaults to the bucket initialized when instantiating the Storage object (You can check it with the
            Storage().bucket property)

        mode (str): Optional
            Folder of which dataset to update. Defaults to "staging".
    """

    source_table_ref = list(
        self.client["storage_staging"]
        .bucket(source_bucket_name)
        .list_blobs(prefix=f"{mode}/{self.dataset_id}/{self.table_id}/")
    )

    if source_table_ref == []:
        raise FileNotFoundError(
            f"Could not find the requested table {self.dataset_id}.{self.table_id}"
        )

    else:

        if destination_bucket_name is None:

            destination_bucket = self.bucket

        else:

            destination_bucket = self.client["storage_staging"].bucket(
                destination_bucket_name
            )

        # Divides source_table_ref list for maximum batch request size
        source_table_ref_chunks = [
            source_table_ref[i : i + 999]
            for i in range(0, len(source_table_ref), 999)
        ]

        for source_table in source_table_ref_chunks:

            with self.client["storage_staging"].batch():

                for blob in source_table:
                    self.bucket.copy_blob(
                        blob, destination_bucket=destination_bucket
                    )

delete_file(self, filename, mode, partitions=None, not_found_ok=False)

Deletes file from path <bucket_name>/<mode>/<dataset_id>/<table_id>/<partitions>/<filename>.

Parameters:

Name Type Description Default
filename str

Name of the file to be deleted

required
mode str

Folder of which dataset to update [raw|staging|all]

required
partitions str, pathlib.PosixPath, or dict

Optional. Hive structured partition as a string or dict

  • str : <key>=<value>/<key2>=<value2>
  • dict: dict(key=value, key2=value2)
None
not_found_ok bool

Optional. What to do if file not found

False
Source code in basedosdados/upload/storage.py
def delete_file(self, filename, mode, partitions=None, not_found_ok=False):
    """Deletes file from path `<bucket_name>/<mode>/<dataset_id>/<table_id>/<partitions>/<filename>`.

    Args:
        filename (str): Name of the file to be deleted

        mode (str): Folder of which dataset to update [raw|staging|all]

        partitions (str, pathlib.PosixPath, or dict): Optional.
            Hive structured partition as a string or dict

            * str : `<key>=<value>/<key2>=<value2>`
            * dict: `dict(key=value, key2=value2)`

        not_found_ok (bool): Optional.
            What to do if file not found
    """

    self._check_mode(mode)

    if mode == "all":
        mode = ["raw", "staging"]
    else:
        mode = [mode]

    for m in mode:

        blob = self.bucket.blob(self._build_blob_name(filename, m, partitions))

        if blob.exists():
            blob.delete()
        elif not_found_ok:
            return
        else:
            blob.delete()

delete_table(self, mode='staging', bucket_name=None, not_found_ok=False)

Deletes a table from storage, sends request in batches.

Parameters:

Name Type Description Default
mode str

Optional Folder of which dataset to update.

'staging'
bucket_name str

The bucket name from which to delete the table. If None, defaults to the bucket initialized when instantiating the Storage object. (You can check it with the Storage().bucket property)

None
not_found_ok bool

Optional. What to do if table not found

False
Source code in basedosdados/upload/storage.py
def delete_table(self, mode="staging", bucket_name=None, not_found_ok=False):
    """Deletes a table from storage, sends request in batches.

    Args:
        mode (str): Optional
            Folder of which dataset to update.

        bucket_name (str):
            The bucket name from which to delete the table. If None, defaults to the bucket initialized when instantiating the Storage object.
            (You can check it with the Storage().bucket property)

        not_found_ok (bool): Optional.
            What to do if table not found

    """
    prefix = f"{mode}/{self.dataset_id}/{self.table_id}/"

    if bucket_name is not None:

        table_blobs = list(
            self.client["storage_staging"]
            .bucket(f"{bucket_name}")
            .list_blobs(prefix=prefix)
        )

    else:

        table_blobs = list(self.bucket.list_blobs(prefix=prefix))

    if table_blobs == []:
        if not_found_ok:
            return
        else:
            raise FileNotFoundError(
                f"Could not find the requested table {self.dataset_id}.{self.table_id}"
            )

    else:
        # Divides table_blobs list for maximum batch request size
        table_blobs_chunks = [
            table_blobs[i : i + 999] for i in range(0, len(table_blobs), 999)
        ]

        for source_table in table_blobs_chunks:

            with self.client["storage_staging"].batch():

                for blob in source_table:
                    blob.delete()

download(self, filename='*', savepath='', partitions=None, mode='raw', if_not_exists='raise')

Download files from Google Storage from path mode/dataset_id/table_id/partitions/filename and replicate folder hierarchy on save,

There are 2 modes: * raw: download file from raw mode * staging: download file from staging mode

You can also use the partitions argument to choose files from a partition

Parameters:

Name Type Description Default
filename str

Optional Specify which file to download. If "" , downloads all files within the bucket folder. Defaults to "".

'*'
savepath str

Where you want to save the data on your computer. Must be a path to a directory.

''
partitions str, dict

Optional If downloading a single file, use this to specify the partition path from which to download.

  • str : <key>=<value>/<key2>=<value2>
  • dict: dict(key=value, key2=value2)
None
mode str

Optional Folder of which dataset to update.[raw/staging]

'raw'
if_not_exists str

Optional. What to do if data not found.

  • 'raise' : Raises FileNotFoundError.
  • 'pass' : Do nothing and exit the function
'raise'

Exceptions:

Type Description
FileNotFoundError

If the given path <mode>/<dataset_id>/<table_id>/<partitions>/<filename> could not be found or there are no files to download.

Source code in basedosdados/upload/storage.py
def download(
    self,
    filename="*",
    savepath="",
    partitions=None,
    mode="raw",
    if_not_exists="raise",
):

    """Download files from Google Storage from path `mode`/`dataset_id`/`table_id`/`partitions`/`filename` and replicate folder hierarchy
    on save,

    There are 2 modes:
    * `raw`: download file from raw mode
    * `staging`: download file from staging mode

    You can also use the `partitions` argument to choose files from a partition

    Args:
        filename (str): Optional
            Specify which file to download. If "*" , downloads all files within the bucket folder. Defaults to "*".

        savepath (str):
            Where you want to save the data on your computer. Must be a path to a directory.

        partitions (str, dict): Optional
            If downloading a single file, use this to specify the partition path from which to download.

            * str : `<key>=<value>/<key2>=<value2>`
            * dict: `dict(key=value, key2=value2)`


        mode (str): Optional
            Folder of which dataset to update.[raw/staging]

        if_not_exists (str): Optional.
            What to do if data not found.

            * 'raise' : Raises FileNotFoundError.
            * 'pass' : Do nothing and exit the function

    Raises:
        FileNotFoundError: If the given path `<mode>/<dataset_id>/<table_id>/<partitions>/<filename>` could not be found or there are no files to download.
    """

    # Prefix to locate files within the bucket
    prefix = f"{mode}/{self.dataset_id}/{self.table_id}/"

    # Add specific partition to search prefix
    if partitions:
        prefix += self._resolve_partitions(partitions)

    # if no filename is passed, list all blobs within a given table
    if filename == "*":
        blob_list = list(self.bucket.list_blobs(prefix=prefix))

    # if filename is passed, append it to the prefix to narrow the search
    else:
        prefix += filename

        blob_list = list(self.bucket.list_blobs(prefix=prefix))

    # if there are no blobs matching the search raise FileNotFoundError or return
    if blob_list == []:
        if if_not_exists == "raise":
            raise FileNotFoundError(f"Could not locate files at {prefix}")
        else:
            return

    # download all blobs matching the search to given savepath
    for blob in blob_list:

        # parse blob.name and get the csv file name
        csv_name = blob.name.split("/")[-1]

        # build folder path replicating storage hierarchy
        blob_folder = blob.name.replace(csv_name, "")

        # replicate folder hierarchy
        (Path(savepath) / blob_folder).mkdir(parents=True, exist_ok=True)

        # download blob to savepath
        blob.download_to_filename(filename=f"{savepath}/{blob.name}")

init(self, replace=False, very_sure=False)

Initializes bucket and folders.

Folder should be:

  • raw : that contains really raw data
  • staging : preprocessed data ready to upload to BigQuery

Parameters:

Name Type Description Default
replace bool

Optional. Whether to replace if bucket already exists

False
very_sure bool

Optional. Are you aware that everything is going to be erased if you replace the bucket?

False

Exceptions:

Type Description
Warning

very_sure argument is still False.

Source code in basedosdados/upload/storage.py
def init(self, replace=False, very_sure=False):
    """Initializes bucket and folders.

    Folder should be:

    * `raw` : that contains really raw data
    * `staging` : preprocessed data ready to upload to BigQuery

    Args:
        replace (bool): Optional.
            Whether to replace if bucket already exists
        very_sure (bool): Optional.
            Are you aware that everything is going to be erased if you
            replace the bucket?

    Raises:
        Warning: very_sure argument is still False.
    """

    if replace:
        if not very_sure:
            raise Warning(
                "\n********************************************************"
                "\nYou are trying to replace all the data that you have "
                f"in bucket {self.bucket_name}.\nAre you sure?\n"
                "If yes, add the flag --very_sure\n"
                "********************************************************"
            )
        else:
            self.bucket.delete(force=True)

    self.client["storage_staging"].create_bucket(self.bucket)

    for folder in ["staging/", "raw/"]:

        self.bucket.blob(folder).upload_from_string("")

upload(self, path, mode='all', partitions=None, if_exists='raise', **upload_args)

Upload to storage at <bucket_name>/<mode>/<dataset_id>/<table_id>. You can:

  • Add a single file setting path = <file_path>.

  • Add a folder with multiple files setting path = <folder_path>. The folder should just contain the files and no folders.

  • Add partitioned files setting path = <folder_path>. This folder must follow the hive partitioning scheme i.e. <table_id>/<key>=<value>/<key2>=<value2>/<partition>.csv (ex: mytable/country=brasil/year=2020/mypart.csv).

Remember all files must follow a single schema. Otherwise, things might fail in the future.

There are 3 modes:

  • raw : should contain raw files from datasource
  • staging : should contain pre-treated files ready to upload to BiqQuery
  • all: if no treatment is needed, use all.

Parameters:

Name Type Description Default
path str or pathlib.PosixPath

Where to find the file or folder that you want to upload to storage

required
mode str

Folder of which dataset to update [raw|staging|all]

'all'
partitions str, pathlib.PosixPath, or dict

Optional. If adding a single file, use this to add it to a specific partition.

  • str : <key>=<value>/<key2>=<value2>
  • dict: dict(key=value, key2=value2)
None
if_exists str

Optional. What to do if data exists

  • 'raise' : Raises Conflict exception
  • 'replace' : Replace table
  • 'pass' : Do nothing
'raise'
upload_args

Extra arguments accepted by google.cloud.storage.blob.Blob.upload_from_file

{}
Source code in basedosdados/upload/storage.py
def upload(
    self,
    path,
    mode="all",
    partitions=None,
    if_exists="raise",
    **upload_args,
):
    """Upload to storage at `<bucket_name>/<mode>/<dataset_id>/<table_id>`. You can:

    * Add a single **file** setting `path = <file_path>`.

    * Add a **folder** with multiple files setting `path =
      <folder_path>`. *The folder should just contain the files and
      no folders.*

    * Add **partitioned files** setting `path = <folder_path>`.
      This folder must follow the hive partitioning scheme i.e.
      `<table_id>/<key>=<value>/<key2>=<value2>/<partition>.csv`
      (ex: `mytable/country=brasil/year=2020/mypart.csv`).

    *Remember all files must follow a single schema.* Otherwise, things
    might fail in the future.

    There are 3 modes:

    * `raw` : should contain raw files from datasource
    * `staging` : should contain pre-treated files ready to upload to BiqQuery
    * `all`: if no treatment is needed, use `all`.

    Args:
        path (str or pathlib.PosixPath): Where to find the file or
            folder that you want to upload to storage

        mode (str): Folder of which dataset to update [raw|staging|all]

        partitions (str, pathlib.PosixPath, or dict): Optional.
            *If adding a single file*, use this to add it to a specific partition.

            * str : `<key>=<value>/<key2>=<value2>`
            * dict: `dict(key=value, key2=value2)`

        if_exists (str): Optional.
            What to do if data exists

            * 'raise' : Raises Conflict exception
            * 'replace' : Replace table
            * 'pass' : Do nothing

        upload_args ():
            Extra arguments accepted by [`google.cloud.storage.blob.Blob.upload_from_file`](https://googleapis.dev/python/storage/latest/blobs.html?highlight=upload_from_filename#google.cloud.storage.blob.Blob.upload_from_filename)
    """

    if (self.dataset_id is None) or (self.table_id is None):
        raise Exception("You need to pass dataset_id and table_id")

    path = Path(path)

    if path.is_dir():
        paths = [f for f in path.glob("**/*") if f.is_file() and f.suffix == ".csv"]

        parts = [
            (
                filepath.as_posix()
                .replace(path.as_posix() + "/", "")
                .replace(str(filepath.name), "")
            )
            for filepath in paths
        ]

    else:
        paths = [path]
        parts = [partitions or None]

    self._check_mode(mode)

    if mode == "all":
        mode = ["raw", "staging"]
    else:
        mode = [mode]

    for m in mode:

        for filepath, part in tqdm(list(zip(paths, parts)), desc="Uploading files"):

            blob_name = self._build_blob_name(filepath.name, m, part)

            blob = self.bucket.blob(blob_name)

            if not blob.exists() or if_exists == "replace":

                upload_args["timeout"] = upload_args.get("timeout", None)

                blob.upload_from_filename(str(filepath), **upload_args)

            elif if_exists == "pass":

                pass

            else:
                raise Exception(
                    f"Data already exists at {self.bucket_name}/{blob_name}. "
                    "Set if_exists to 'replace' to overwrite data"
                )

Dataset

Manage datasets in BigQuery.

create(self, mode='all', if_exists='raise')

Creates BigQuery datasets given dataset_id.

It can create two datasets:

  • <dataset_id> (mode = 'prod')
  • <dataset_id>_staging (mode = 'staging')

If mode is all, it creates both.

Parameters:

Name Type Description Default
mode str

Optional. Which dataset to create [prod|staging|all].

'all'
if_exists str

Optional. What to do if dataset exists

  • raise : Raises Conflic exception
  • replace : Drop all tables and replace dataset
  • update : Update dataset description
  • pass : Do nothing
'raise'

Exceptions:

Type Description
Warning

Dataset already exists and if_exists is set to raise

Source code in basedosdados/upload/dataset.py
def create(self, mode="all", if_exists="raise"):
    """Creates BigQuery datasets given `dataset_id`.

    It can create two datasets:

    * `<dataset_id>` (mode = 'prod')
    * `<dataset_id>_staging` (mode = 'staging')

    If `mode` is all, it creates both.

    Args:
        mode (str): Optional. Which dataset to create [prod|staging|all].
        if_exists (str): Optional. What to do if dataset exists

            * raise : Raises Conflic exception
            * replace : Drop all tables and replace dataset
            * update : Update dataset description
            * pass : Do nothing

    Raises:
        Warning: Dataset already exists and if_exists is set to `raise`
    """

    if if_exists == "replace":
        self.delete(mode)
    elif if_exists == "update":

        self.update()
        return

    # Set dataset_id to the ID of the dataset to create.
    for m in self._loop_modes(mode):

        # Construct a full Dataset object to send to the API.
        dataset_obj = self._setup_dataset_object(m["id"])

        # Send the dataset to the API for creation, with an explicit timeout.
        # Raises google.api_core.exceptions.Conflict if the Dataset already
        # exists within the project.
        try:
            job = m["client"].create_dataset(dataset_obj)  # Make an API request.
        except Conflict:

            if if_exists == "pass":
                return
            else:
                raise Conflict(f"Dataset {self.dataset_id} already exists")

    # Make prod dataset public
    self.publicize()

delete(self, mode='all')

Deletes dataset in BigQuery. Toogle mode to choose which dataset to delete.

Parameters:

Name Type Description Default
mode str

Optional. Which dataset to delete [prod|staging|all]

'all'
Source code in basedosdados/upload/dataset.py
def delete(self, mode="all"):
    """Deletes dataset in BigQuery. Toogle mode to choose which dataset to delete.

    Args:
        mode (str): Optional.  Which dataset to delete [prod|staging|all]
    """

    for m in self._loop_modes(mode):

        m["client"].delete_dataset(m["id"], delete_contents=True, not_found_ok=True)

init(self, replace=False)

Initialize dataset folder at metadata_path at metadata_path/<dataset_id>.

The folder should contain:

  • dataset_config.yaml
  • README.md

Parameters:

Name Type Description Default
replace str

Optional. Whether to replace existing folder.

False

Exceptions:

Type Description
FileExistsError

If dataset folder already exists and replace is False

Source code in basedosdados/upload/dataset.py
def init(self, replace=False):
    """Initialize dataset folder at metadata_path at `metadata_path/<dataset_id>`.

    The folder should contain:

    * `dataset_config.yaml`
    * `README.md`

    Args:
        replace (str): Optional. Whether to replace existing folder.

    Raises:
        FileExistsError: If dataset folder already exists and replace is False
    """

    # Create dataset folder
    try:
        self.dataset_folder.mkdir(exist_ok=replace, parents=True)
    except FileExistsError:
        raise FileExistsError(
            f"Dataset {str(self.dataset_folder.stem)} folder does not exists. "
            "Set replace=True to replace current files."
        )

    for file in (Path(self.templates) / "dataset").glob("*"):

        if file.name in ["dataset_config.yaml", "README.md"]:

            # Load and fill template
            template = self._render_template(
                f"dataset/{file.name}", dict(dataset_id=self.dataset_id)
            )

            # Write file
            (self.dataset_folder / file.name).open("w", encoding="utf-8").write(
                template
            )

    # Add code folder
    (self.dataset_folder / "code").mkdir(exist_ok=replace, parents=True)

    return self

publicize(self, mode='all')

Changes IAM configuration to turn BigQuery dataset public.

Parameters:

Name Type Description Default
mode bool

Which dataset to create [prod|staging|all].

'all'
Source code in basedosdados/upload/dataset.py
def publicize(self, mode="all"):
    """Changes IAM configuration to turn BigQuery dataset public.

    Args:
        mode (bool): Which dataset to create [prod|staging|all].
    """

    for m in self._loop_modes(mode):

        dataset = m["client"].get_dataset(m["id"])
        entries = dataset.access_entries

        entries.extend(
            [
                bigquery.AccessEntry(
                    role="roles/bigquery.dataViewer",
                    entity_type="iamMember",
                    entity_id="allUsers",
                ),
                bigquery.AccessEntry(
                    role="roles/bigquery.metadataViewer",
                    entity_type="iamMember",
                    entity_id="allUsers",
                ),
                bigquery.AccessEntry(
                    role="roles/bigquery.user",
                    entity_type="iamMember",
                    entity_id="allUsers",
                ),
            ]
        )
        dataset.access_entries = entries

        m["client"].update_dataset(dataset, ["access_entries"])

update(self, mode='all')

Update dataset description. Toogle mode to choose which dataset to update.

Parameters:

Name Type Description Default
mode str

Optional. Which dataset to update [prod|staging|all]

'all'
Source code in basedosdados/upload/dataset.py
def update(self, mode="all"):
    """Update dataset description. Toogle mode to choose which dataset to update.

    Args:
        mode (str): Optional. Which dataset to update [prod|staging|all]
    """

    for m in self._loop_modes(mode):

        # Send the dataset to the API to update, with an explicit timeout.
        # Raises google.api_core.exceptions.Conflict if the Dataset already
        # exists within the project.
        dataset = m["client"].update_dataset(
            self._setup_dataset_object(m["id"]), fields=["description"]
        )  # Make an API request.

Table

Manage tables in Google Cloud Storage and BigQuery.

append(self, filepath, partitions=None, if_exists='raise', **upload_args)

Appends new data to existing BigQuery table.

As long as the data has the same schema. It appends the data in the filepath to the existing table.

Parameters:

Name Type Description Default
filepath str or pathlib.PosixPath

Where to find the file that you want to upload to create a table with

required
partitions str, pathlib.PosixPath, dict

Optional. Hive structured partition as a string or dict

  • str : <key>=<value>/<key2>=<value2>
  • dict: dict(key=value, key2=value2)
None
if_exists str

0ptional. What to do if data with same name exists in storage

  • 'raise' : Raises Conflict exception
  • 'replace' : Replace table
  • 'pass' : Do nothing
'raise'
Source code in basedosdados/upload/table.py
def append(self, filepath, partitions=None, if_exists="raise", **upload_args):
    """Appends new data to existing BigQuery table.

    As long as the data has the same schema. It appends the data in the
    filepath to the existing table.

    Args:
        filepath (str or pathlib.PosixPath): Where to find the file that you want to upload to create a table with
        partitions (str, pathlib.PosixPath, dict): Optional.
            Hive structured partition as a string or dict

            * str : `<key>=<value>/<key2>=<value2>`
            * dict: `dict(key=value, key2=value2)`
        if_exists (str): 0ptional.
            What to do if data with same name exists in storage

            * 'raise' : Raises Conflict exception
            * 'replace' : Replace table
            * 'pass' : Do nothing
    """

    Storage(self.dataset_id, self.table_id, **self.main_vars).upload(
        filepath,
        mode="staging",
        partitions=None,
        if_exists=if_exists,
        **upload_args,
    )

    self.create(
        if_table_exists="replace",
        if_table_config_exists="pass",
        if_storage_data_exists="pass",
    )

create(self, path=None, job_config_params=None, force_dataset=True, if_table_exists='raise', if_storage_data_exists='raise', if_table_config_exists='raise', source_format='csv', columns_config_url=None)

Creates BigQuery table at staging dataset.

If you add a path, it automatically saves the data in the storage, creates a datasets folder and BigQuery location, besides creating the table and its configuration files.

The new table should be located at <dataset_id>_staging.<table_id> in BigQuery.

It looks for data saved in Storage at <bucket_name>/staging/<dataset_id>/<table_id>/* and builds the table.

It currently supports the types:

  • Comma Delimited CSV

Data can also be partitioned following the hive partitioning scheme <key1>=<value1>/<key2>=<value2> - for instance, year=2012/country=BR. The partition is automatcally detected by searching for partitions on the table_config.yaml.

Parameters:

Name Type Description Default
path str or pathlib.PosixPath

Where to find the file that you want to upload to create a table with

None
job_config_params dict

Optional. Job configuration params from bigquery

None
if_table_exists str

Optional What to do if table exists

  • 'raise' : Raises Conflict exception
  • 'replace' : Replace table
  • 'pass' : Do nothing
'raise'
force_dataset bool

Creates <dataset_id> folder and BigQuery Dataset if it doesn't exists.

True
if_table_config_exists str

Optional. What to do if config files already exist

  • 'raise': Raises FileExistError
  • 'replace': Replace with blank template
  • 'pass'; Do nothing
'raise'
if_storage_data_exists str

Optional. What to do if data already exists on your bucket:

  • 'raise' : Raises Conflict exception
  • 'replace' : Replace table
  • 'pass' : Do nothing
'raise'
source_format str

Optional Data source format. Only 'csv' is supported. Defaults to 'csv'.

'csv'
columns_config_url str

google sheets URL. The URL must be in the format https://docs.google.com/spreadsheets/d//edit#gid=. The sheet must contain the column name: "coluna" and column description: "descricao"

None
Source code in basedosdados/upload/table.py
def create(
    self,
    path=None,
    job_config_params=None,
    force_dataset=True,
    if_table_exists="raise",
    if_storage_data_exists="raise",
    if_table_config_exists="raise",
    source_format="csv",
    columns_config_url=None,
):
    """Creates BigQuery table at staging dataset.

    If you add a path, it automatically saves the data in the storage,
    creates a datasets folder and BigQuery location, besides creating the
    table and its configuration files.

    The new table should be located at `<dataset_id>_staging.<table_id>` in BigQuery.

    It looks for data saved in Storage at `<bucket_name>/staging/<dataset_id>/<table_id>/*`
    and builds the table.

    It currently supports the types:

    - Comma Delimited CSV

    Data can also be partitioned following the hive partitioning scheme
    `<key1>=<value1>/<key2>=<value2>` - for instance,
    `year=2012/country=BR`. The partition is automatcally detected
    by searching for `partitions` on the `table_config.yaml`.

    Args:
        path (str or pathlib.PosixPath): Where to find the file that you want to upload to create a table with
        job_config_params (dict): Optional.
            Job configuration params from bigquery
        if_table_exists (str): Optional
            What to do if table exists

            * 'raise' : Raises Conflict exception
            * 'replace' : Replace table
            * 'pass' : Do nothing
        force_dataset (bool): Creates `<dataset_id>` folder and BigQuery Dataset if it doesn't exists.
        if_table_config_exists (str): Optional.
            What to do if config files already exist

             * 'raise': Raises FileExistError
             * 'replace': Replace with blank template
             * 'pass'; Do nothing
        if_storage_data_exists (str): Optional.
            What to do if data already exists on your bucket:

            * 'raise' : Raises Conflict exception
            * 'replace' : Replace table
            * 'pass' : Do nothing
        source_format (str): Optional
            Data source format. Only 'csv' is supported. Defaults to 'csv'.

        columns_config_url (str): google sheets URL.
            The URL must be in the format https://docs.google.com/spreadsheets/d/<table_key>/edit#gid=<table_gid>.
            The sheet must contain the column name: "coluna" and column description: "descricao"

    """

    if path is None:

        # Look if table data already exists at Storage
        data = self.client["storage_staging"].list_blobs(
            self.bucket_name, prefix=f"staging/{self.dataset_id}/{self.table_id}"
        )

        # Raise: Cannot create table without external data
        if not data:
            raise BaseDosDadosException(
                "You must provide a path for uploading data"
            )

    # Add data to storage
    if isinstance(
        path,
        (
            str,
            Path,
        ),
    ):

        Storage(self.dataset_id, self.table_id, **self.main_vars).upload(
            path, mode="staging", if_exists=if_storage_data_exists
        )

    # Create Dataset if it doesn't exist
    if force_dataset:

        dataset_obj = Dataset(self.dataset_id, **self.main_vars)

        try:
            dataset_obj.init()
        except FileExistsError:
            pass

        dataset_obj.create(if_exists="pass")

    self.init(
        data_sample_path=path,
        if_folder_exists="replace",
        if_table_config_exists=if_table_config_exists,
        columns_config_url=columns_config_url,
    )

    table = bigquery.Table(self.table_full_name["staging"])

    table.external_data_configuration = Datatype(
        self, source_format, "staging", partitioned=self._is_partitioned()
    ).external_config

    # Lookup if table alreay exists
    table_ref = None
    try:
        table_ref = self.client["bigquery_staging"].get_table(
            self.table_full_name["staging"]
        )

    except google.api_core.exceptions.NotFound:
        pass

    if isinstance(table_ref, google.cloud.bigquery.table.Table):

        if if_table_exists == "pass":

            return None

        elif if_table_exists == "raise":

            raise FileExistsError(
                "Table already exists, choose replace if you want to overwrite it"
            )

    if if_table_exists == "replace":

        self.delete(mode="staging")

    self.client["bigquery_staging"].create_table(table)

delete(self, mode)

Deletes table in BigQuery.

Parameters:

Name Type Description Default
mode str

Table of which table to delete [prod|staging|all]

required
Source code in basedosdados/upload/table.py
def delete(self, mode):
    """Deletes table in BigQuery.

    Args:
        mode (str): Table of which table to delete [prod|staging|all]
    """

    self._check_mode(mode)

    if mode == "all":
        for m, n in self.table_full_name[mode].items():
            self.client[f"bigquery_{m}"].delete_table(n, not_found_ok=True)
    else:
        self.client[f"bigquery_{mode}"].delete_table(
            self.table_full_name[mode], not_found_ok=True
        )

init(self, data_sample_path=None, if_folder_exists='raise', if_table_config_exists='raise', source_format='csv', columns_config_url=None)

Initialize table folder at metadata_path at metadata_path/<dataset_id>/<table_id>.

The folder should contain:

  • table_config.yaml
  • publish.sql

You can also point to a sample of the data to auto complete columns names.

Parameters:

Name Type Description Default
data_sample_path str, pathlib.PosixPath

Optional. Data sample path to auto complete columns names It supports Comma Delimited CSV.

None
if_folder_exists str

Optional. What to do if table folder exists

  • 'raise' : Raises FileExistsError
  • 'replace' : Replace folder
  • 'pass' : Do nothing
'raise'
if_table_config_exists str

Optional What to do if table_config.yaml and publish.sql exists

  • 'raise' : Raises FileExistsError
  • 'replace' : Replace files with blank template
  • 'pass' : Do nothing
'raise'
source_format str

Optional Data source format. Only 'csv' is supported. Defaults to 'csv'.

'csv'
columns_config_url str

google sheets URL. The URL must be in the format https://docs.google.com/spreadsheets/d//edit#gid=. The sheet must contain the column name: "coluna" and column description: "descricao"

None

Exceptions:

Type Description
FileExistsError

If folder exists and replace is False.

NotImplementedError

If data sample is not in supported type or format.

Source code in basedosdados/upload/table.py
def init(
    self,
    data_sample_path=None,
    if_folder_exists="raise",
    if_table_config_exists="raise",
    source_format="csv",
    columns_config_url=None,
):
    """Initialize table folder at metadata_path at `metadata_path/<dataset_id>/<table_id>`.

    The folder should contain:

    * `table_config.yaml`
    * `publish.sql`

    You can also point to a sample of the data to auto complete columns names.

    Args:
        data_sample_path (str, pathlib.PosixPath): Optional.
            Data sample path to auto complete columns names
            It supports Comma Delimited CSV.
        if_folder_exists (str): Optional.
            What to do if table folder exists

            * 'raise' : Raises FileExistsError
            * 'replace' : Replace folder
            * 'pass' : Do nothing
        if_table_config_exists (str): Optional
            What to do if table_config.yaml and publish.sql exists

            * 'raise' : Raises FileExistsError
            * 'replace' : Replace files with blank template
            * 'pass' : Do nothing
        source_format (str): Optional
            Data source format. Only 'csv' is supported. Defaults to 'csv'.

        columns_config_url (str): google sheets URL.
            The URL must be in the format https://docs.google.com/spreadsheets/d/<table_key>/edit#gid=<table_gid>.
            The sheet must contain the column name: "coluna" and column description: "descricao"

    Raises:
        FileExistsError: If folder exists and replace is False.
        NotImplementedError: If data sample is not in supported type or format.
    """
    if not self.dataset_folder.exists():

        raise FileExistsError(
            f"Dataset folder {self.dataset_folder} folder does not exists. "
            "Create a dataset before adding tables."
        )

    try:
        self.table_folder.mkdir(exist_ok=(if_folder_exists == "replace"))
    except FileExistsError:
        if if_folder_exists == "raise":
            raise FileExistsError(
                f"Table folder already exists for {self.table_id}. "
            )
        elif if_folder_exists == "pass":
            return self

    if not data_sample_path and if_table_config_exists != "pass":
        raise BaseDosDadosException(
            "You must provide a path to correctly create config files"
        )

    partition_columns = []
    if isinstance(
        data_sample_path,
        (
            str,
            Path,
        ),
    ):
        # Check if partitioned and get data sample and partition columns
        data_sample_path = Path(data_sample_path)

        if data_sample_path.is_dir():

            data_sample_path = [
                f
                for f in data_sample_path.glob("**/*")
                if f.is_file() and f.suffix == ".csv"
            ][0]

            partition_columns = [
                k.split("=")[0]
                for k in data_sample_path.as_posix().split("/")
                if "=" in k
            ]

        columns = Datatype(self, source_format).header(data_sample_path)

    else:

        columns = ["column_name"]

    if if_table_config_exists == "pass":
        # Check if config files exists before passing
        if (
            Path(self.table_folder / "table_config.yaml").is_file()
            and Path(self.table_folder / "publish.sql").is_file()
        ):
            pass
        # Raise if no sample to determine columns
        elif not data_sample_path:
            raise BaseDosDadosException(
                "You must provide a path to correctly create config files"
            )
        else:
            self._make_template(columns, partition_columns)

    elif if_table_config_exists == "raise":

        # Check if config files already exist
        if (
            Path(self.table_folder / "table_config.yaml").is_file()
            and Path(self.table_folder / "publish.sql").is_file()
        ):

            raise FileExistsError(
                f"table_config.yaml and publish.sql already exists at {self.table_folder}"
            )
        # if config files don't exist, create them
        else:
            self._make_template(columns, partition_columns)

    else:
        # Raise: without a path to data sample, should not replace config files with empty template
        self._make_template(columns, partition_columns)

    if columns_config_url is not None:
        self.update_columns(columns_config_url)

    return self

publish(self, if_exists='raise')

Creates BigQuery table at production dataset.

Table should be located at <dataset_id>.<table_id>.

It creates a view that uses the query from <metadata_path>/<dataset_id>/<table_id>/publish.sql.

Make sure that all columns from the query also exists at <metadata_path>/<dataset_id>/<table_id>/table_config.sql, including the partitions.

Parameters:

Name Type Description Default
if_exists str

Optional. What to do if table exists.

  • 'raise' : Raises Conflict exception
  • 'replace' : Replace table
  • 'pass' : Do nothing
'raise'

Todo:

* Check if all required fields are filled
Source code in basedosdados/upload/table.py
def publish(self, if_exists="raise"):
    """Creates BigQuery table at production dataset.

    Table should be located at `<dataset_id>.<table_id>`.

    It creates a view that uses the query from
    `<metadata_path>/<dataset_id>/<table_id>/publish.sql`.

    Make sure that all columns from the query also exists at
    `<metadata_path>/<dataset_id>/<table_id>/table_config.sql`, including
    the partitions.

    Args:
        if_exists (str): Optional.
            What to do if table exists.

            * 'raise' : Raises Conflict exception
            * 'replace' : Replace table
            * 'pass' : Do nothing

    Todo:

        * Check if all required fields are filled
    """

    if if_exists == "replace":
        self.delete(mode="prod")

    self.client["bigquery_prod"].query(
        (self.table_folder / "publish.sql").open("r", encoding="utf-8").read()
    ).result()

    self.update("prod")

update(self, mode='all', not_found_ok=True)

Updates BigQuery schema and description.

Parameters:

Name Type Description Default
mode str

Optional. Table of which table to update [prod|staging|all]

'all'
not_found_ok bool

Optional. What to do if table is not found

True
Source code in basedosdados/upload/table.py
def update(self, mode="all", not_found_ok=True):
    """Updates BigQuery schema and description.

    Args:
        mode (str): Optional.
            Table of which table to update [prod|staging|all]
        not_found_ok (bool): Optional.
            What to do if table is not found
    """

    self._check_mode(mode)

    mode = ["prod", "staging"] if mode == "all" else [mode]
    for m in mode:

        try:
            table = self._get_table_obj(m)
        except google.api_core.exceptions.NotFound:
            continue

        # if m == "staging":

        table.description = self._render_template(
            Path("table/table_description.txt"), self.table_config
        )

        # save table description
        open(
            self.metadata_path
            / self.dataset_id
            / self.table_id
            / "table_description.txt",
            "w",
            encoding="utf-8",
        ).write(table.description)

        if m == "prod":
            table.schema = self._load_schema(m)

            self.client[f"bigquery_{m}"].update_table(
                table, fields=["description", "schema"]
            )

update_columns(self, columns_config_url)

Fills descriptions of tables automatically using a public google sheets URL. The URL must be in the format https://docs.google.com/spreadsheets/d//edit#gid=. The sheet must contain the column name: "coluna" and column description: "descricao"

Parameters:

Name Type Description Default
columns_config_url str

google sheets URL.

required
Source code in basedosdados/upload/table.py
def update_columns(self, columns_config_url):
    """Fills descriptions of tables automatically using a public google sheets URL.
    The URL must be in the format https://docs.google.com/spreadsheets/d/<table_key>/edit#gid=<table_gid>.
    The sheet must contain the column name: "coluna" and column description: "descricao"
    Args:
        columns_config_url (str): google sheets URL.

    """
    ruamel = ryaml.YAML()
    ruamel.preserve_quotes = True
    ruamel.indent(mapping=4, sequence=6, offset=4)
    table_config_yaml = ruamel.load(
        (self.table_folder / "table_config.yaml").open()
    )
    if (
        "edit#gid=" not in columns_config_url
        or "https://docs.google.com/spreadsheets/d/" not in columns_config_url
        or not columns_config_url.split("=")[1].isdigit()
    ):
        raise Exception(
            "The Google sheet url not in correct format."
            "The url must be in the format https://docs.google.com/spreadsheets/d/<table_key>/edit#gid=<table_gid>"
        )

    df = self._sheet_to_df(columns_config_url)

    if "coluna" not in df.columns.tolist():
        raise Exception(
            "Column 'coluna' not found in Google the google sheet. "
            "The sheet must contain the column name: 'coluna' and column description: 'descricao'"
        )
    elif "descricao" not in df.columns.tolist():
        raise Exception(
            "Column 'descricao' not found in Google the google sheet. "
            "The sheet must contain the column name: 'coluna' and column description: 'descricao'"
        )

    columns_parameters = zip(df["coluna"].tolist(), df["descricao"].tolist())
    for name, description in columns_parameters:
        for col in table_config_yaml["columns"]:
            if col["name"] == name:
                col["description"] = description
    ruamel.dump(table_config_yaml, stream=self.table_folder / "table_config.yaml")