Ir para o conteúdo

Python

Esta API é composta por funções com 2 tipos de funcionalidade:

  • Módulos para requisição de dados: para aquele(as) que desejam somente consultar os dados e metadados do nosso projeto (ou qualquer outro projeto no Google Cloud).

  • Classes para gerenciamento de dados no Google Cloud: para aqueles(as) que desejam subir dados no nosso projeto (ou qualquer outro projeto no Google Cloud, seguindo a nossa metodologia e infraestrutura).

Toda documentação do código abaixo está em inglês

Módulos (Requisição de dados)

Functions for managing downloads

download(savepath, query=None, dataset_id=None, table_id=None, billing_project_id=None, query_project_id='basedosdados', limit=None, from_file=False, reauth=False, compression='GZIP')

Download table or query result from basedosdados BigQuery (or other).

  • Using a query:

    download('select * frombasedosdados.br_suporte.diretorio_municipioslimit 10')

  • Using dataset_id & table_id:

    download(dataset_id='br_suporte', table_id='diretorio_municipios')

You can also add arguments to modify save parameters:

download(dataset_id='br_suporte', table_id='diretorio_municipios', index=False, sep='|')

Parameters:

Name Type Description Default
savepath str, pathlib.PosixPath

savepath must be a file path. Only supports .csv.

required
query str

Optional. Valid SQL Standard Query to basedosdados. If query is available, dataset_id and table_id are not required.

None
dataset_id str

Optional. Dataset id available in basedosdados. It should always come with table_id.

None
table_id str

Optional. Table id available in basedosdados.dataset_id. It should always come with dataset_id.

None
billing_project_id str

Optional. Project that will be billed. Find your Project ID here https://console.cloud.google.com/projectselector2/home/dashboard

None
query_project_id str

Optional. Which project the table lives. You can change this you want to query different projects.

'basedosdados'
limit int

Optional Number of rows.

None
from_file boolean

Optional. Uses the credentials from file, located in `~/.basedosdados/credentials/

False
reauth boolean

Optional. Re-authorize Google Cloud Project in case you need to change user or reset configurations.

False
compression str

Optional. Compression type. Only GZIP is available for now.

'GZIP'

Exceptions:

Type Description
Exception

If either table_id, dataset_id or query are empty.

Source code in basedosdados/download/download.py
def download(
    savepath,
    query=None,
    dataset_id=None,
    table_id=None,
    billing_project_id=None,
    query_project_id="basedosdados",
    limit=None,
    from_file=False,
    reauth=False,
    compression="GZIP",
):
    """Download table or query result from basedosdados BigQuery (or other).

    * Using a **query**:

        `download('select * from `basedosdados.br_suporte.diretorio_municipios` limit 10')`

    * Using **dataset_id & table_id**:

        `download(dataset_id='br_suporte', table_id='diretorio_municipios')`

    You can also add arguments to modify save parameters:

    `download(dataset_id='br_suporte', table_id='diretorio_municipios', index=False, sep='|')`


    Args:
        savepath (str, pathlib.PosixPath):
            savepath must be a file path. Only supports `.csv`.
        query (str): Optional.
            Valid SQL Standard Query to basedosdados. If query is available,
            dataset_id and table_id are not required.
        dataset_id (str): Optional.
            Dataset id available in basedosdados. It should always come with table_id.
        table_id (str): Optional.
            Table id available in basedosdados.dataset_id.
            It should always come with dataset_id.
        billing_project_id (str): Optional.
            Project that will be billed. Find your Project ID here https://console.cloud.google.com/projectselector2/home/dashboard
        query_project_id (str): Optional.
            Which project the table lives. You can change this you want to query different projects.
        limit (int): Optional
            Number of rows.
        from_file (boolean): Optional.
            Uses the credentials from file, located in `~/.basedosdados/credentials/
        reauth (boolean): Optional.
            Re-authorize Google Cloud Project in case you need to change user or reset configurations.
        compression (str): Optional.
            Compression type. Only `GZIP` is available for now.
    Raises:
        Exception: If either table_id, dataset_id or query are empty.
    """

    billing_project_id, from_file = _set_config_variables(
        billing_project_id=billing_project_id, from_file=from_file
    )

    if (query is None) and ((table_id is None) or (dataset_id is None)):
        raise BaseDosDadosException(
            "Either table_id, dataset_id or query should be filled."
        )

    client = google_client(billing_project_id, from_file, reauth)

    # makes sure that savepath is a filepath and not a folder
    savepath = _sets_savepath(savepath)

    # if query is not defined (so it won't be overwritten) and if
    # table is a view or external or if limit is specified,
    # convert it to a query.
    if not query and (
        not _is_table(client, dataset_id, table_id, query_project_id) or limit
    ):
        query = f"""
        SELECT * 
          FROM {query_project_id}.{dataset_id}.{table_id}
        """

        if limit is not None:
            query += f" limit {limit}"

    if query:
        # sql queries produces anonymous tables, whose names
        # can be found within `job._properties`
        job = client["bigquery"].query(query)

        # views may take longer: wait for job to finish.
        _wait_for(job)

        dest_table = job._properties["configuration"]["query"]["destinationTable"]

        project_id = dest_table["projectId"]
        dataset_id = dest_table["datasetId"]
        table_id = dest_table["tableId"]

    _direct_download(client, dataset_id, table_id, savepath, project_id, compression)

read_sql(query, billing_project_id=None, from_file=False, reauth=False, use_bqstorage_api=False)

Load data from BigQuery using a query. Just a wrapper around pandas.read_gbq

Parameters:

Name Type Description Default
query sql

Valid SQL Standard Query to basedosdados

required
billing_project_id str

Optional. Project that will be billed. Find your Project ID here https://console.cloud.google.com/projectselector2/home/dashboard

None
from_file boolean

Optional. Uses the credentials from file, located in `~/.basedosdados/credentials/

False
reauth boolean

Optional. Re-authorize Google Cloud Project in case you need to change user or reset configurations.

False
use_bqstorage_api boolean

Optional. Use the BigQuery Storage API to download query results quickly, but at an increased cost(https://cloud.google.com/bigquery/docs/reference/storage/). To use this API, first enable it in the Cloud Console(https://console.cloud.google.com/apis/library/bigquerystorage.googleapis.com). You must also have the bigquery.readsessions.create permission on the project you are billing queries to.

False

Returns:

Type Description
pd.DataFrame

Query result

Source code in basedosdados/download/download.py
def read_sql(
    query,
    billing_project_id=None,
    from_file=False,
    reauth=False,
    use_bqstorage_api=False,
):
    """Load data from BigQuery using a query. Just a wrapper around pandas.read_gbq

    Args:
        query (sql):
            Valid SQL Standard Query to basedosdados
        billing_project_id (str): Optional.
            Project that will be billed. Find your Project ID here https://console.cloud.google.com/projectselector2/home/dashboard
        from_file (boolean): Optional.
            Uses the credentials from file, located in `~/.basedosdados/credentials/
        reauth (boolean): Optional.
            Re-authorize Google Cloud Project in case you need to change user or reset configurations.
        use_bqstorage_api (boolean): Optional.
            Use the BigQuery Storage API to download query results quickly, but at an increased cost(https://cloud.google.com/bigquery/docs/reference/storage/).
            To use this API, first enable it in the Cloud Console(https://console.cloud.google.com/apis/library/bigquerystorage.googleapis.com).
            You must also have the bigquery.readsessions.create permission on the project you are billing queries to.

    Returns:
        pd.DataFrame:
            Query result
    """

    billing_project_id, from_file = _set_config_variables(
        billing_project_id=billing_project_id, from_file=from_file
    )

    try:
        # Set a two hours timeout
        bigquery_storage_v1.client.BigQueryReadClient.read_rows = partialmethod(
            bigquery_storage_v1.client.BigQueryReadClient.read_rows,
            timeout=3600 * 2,
        )

        return pandas_gbq.read_gbq(
            query,
            credentials=credentials(from_file=from_file, reauth=reauth),
            project_id=billing_project_id,
            use_bqstorage_api=use_bqstorage_api,
        )

    except GenericGBQException as e:
        if "Reason: 403" in str(e):
            raise BaseDosDadosAccessDeniedException from e

        if re.match("Reason: 400 POST .* [Pp]roject[ ]*I[Dd]", str(e)):
            raise BaseDosDadosInvalidProjectIDException from e

        raise

    except PyDataCredentialsError as e:
        raise BaseDosDadosAuthorizationException from e

    except (OSError, ValueError) as e:
        no_billing_id = "Could not determine project ID" in str(e)
        no_billing_id |= "reading from stdin while output is captured" in str(e)
        if no_billing_id:
            raise BaseDosDadosNoBillingProjectIDException from e
        raise

read_table(dataset_id, table_id, billing_project_id=None, query_project_id='basedosdados', limit=None, from_file=False, reauth=False, use_bqstorage_api=False)

Load data from BigQuery using dataset_id and table_id.

Parameters:

Name Type Description Default
dataset_id str

Optional. Dataset id available in basedosdados. It should always come with table_id.

required
table_id str

Optional. Table id available in basedosdados.dataset_id. It should always come with dataset_id.

required
billing_project_id str

Optional. Project that will be billed. Find your Project ID here https://console.cloud.google.com/projectselector2/home/dashboard

None
query_project_id str

Optional. Which project the table lives. You can change this you want to query different projects.

'basedosdados'
limit int

Optional. Number of rows to read from table.

None
from_file boolean

Optional. Uses the credentials from file, located in `~/.basedosdados/credentials/

False
reauth boolean

Optional. Re-authorize Google Cloud Project in case you need to change user or reset configurations.

False
use_bqstorage_api boolean

Optional. Use the BigQuery Storage API to download query results quickly, but at an increased cost(https://cloud.google.com/bigquery/docs/reference/storage/). To use this API, first enable it in the Cloud Console(https://console.cloud.google.com/apis/library/bigquerystorage.googleapis.com). You must also have the bigquery.readsessions.create permission on the project you are billing queries to.

False

Returns:

Type Description
pd.DataFrame

Query result

Source code in basedosdados/download/download.py
def read_table(
    dataset_id,
    table_id,
    billing_project_id=None,
    query_project_id="basedosdados",
    limit=None,
    from_file=False,
    reauth=False,
    use_bqstorage_api=False,
):
    """Load data from BigQuery using dataset_id and table_id.

    Args:
        dataset_id (str): Optional.
            Dataset id available in basedosdados. It should always come with table_id.
        table_id (str): Optional.
            Table id available in basedosdados.dataset_id.
            It should always come with dataset_id.
        billing_project_id (str): Optional.
            Project that will be billed. Find your Project ID here https://console.cloud.google.com/projectselector2/home/dashboard
        query_project_id (str): Optional.
            Which project the table lives. You can change this you want to query different projects.
        limit (int): Optional.
            Number of rows to read from table.
        from_file (boolean): Optional.
            Uses the credentials from file, located in `~/.basedosdados/credentials/
        reauth (boolean): Optional.
            Re-authorize Google Cloud Project in case you need to change user or reset configurations.
        use_bqstorage_api (boolean): Optional.
            Use the BigQuery Storage API to download query results quickly, but at an increased cost(https://cloud.google.com/bigquery/docs/reference/storage/).
            To use this API, first enable it in the Cloud Console(https://console.cloud.google.com/apis/library/bigquerystorage.googleapis.com).
            You must also have the bigquery.readsessions.create permission on the project you are billing queries to.


    Returns:
        pd.DataFrame:
            Query result
    """

    billing_project_id, from_file = _set_config_variables(
        billing_project_id=billing_project_id, from_file=from_file
    )

    if (dataset_id is not None) and (table_id is not None):
        query = f"""
        SELECT * 
        FROM `{query_project_id}.{dataset_id}.{table_id}`"""

        if limit is not None:

            query += f" LIMIT {limit}"
    else:
        raise BaseDosDadosException("Both table_id and dataset_id should be filled.")

    return read_sql(
        query,
        billing_project_id=billing_project_id,
        from_file=from_file,
        reauth=reauth,
        use_bqstorage_api=use_bqstorage_api,
    )

Functions to get metadata from BD's API

get_dataset_description(dataset_id, verbose=True)

Prints the full dataset description.

Parameters:

Name Type Description Default
dataset_id str

Required. Dataset id available in list_datasets.

required
verbose bool

Optional. If set to True, information is printed to the screen. If set to False, data is returned as a str.

True

Returns:

Type Description

stdout | str

Source code in basedosdados/download/metadata.py
def get_dataset_description(
    dataset_id,
    verbose=True,
):
    """
    Prints the full dataset description.

    Args:
        dataset_id (str): Required.
            Dataset id available in list_datasets.
        verbose (bool): Optional.
            If set to True, information is printed to the screen. If set to False, data is returned as a `str`.

    Returns:
        stdout | str
    """
    url = f"https://basedosdados.org/api/3/action/bd_bdm_dataset_show?dataset_id={dataset_id}"

    response = _safe_fetch(url)

    json_response = response.json()

    description = json_response["result"]["notes"]

    if verbose:
        return print(description)
    return description

get_table_columns(dataset_id, table_id, verbose=True)

Fetch the names, types and descriptions for the columns in the specified table. Prints
information on screen.

Parameters:

Name Type Description Default
dataset_id str

Required. Dataset id available in list_datasets.

required
table_id str

Required. Table id available in list_dataset_tables

required
verbose bool

Optional. If set to True, information is printed to the screen. If set to False, data is returned as a list of dicts.

True

Returns:

Type Description

stdout | list

Source code in basedosdados/download/metadata.py
def get_table_columns(
    dataset_id,
    table_id,
    verbose=True,
):

    """
        Fetch the names, types and descriptions for the columns in the specified table. Prints
        information on screen.
    Args:
        dataset_id (str): Required.
            Dataset id available in list_datasets.
        table_id (str): Required.
            Table id available in list_dataset_tables
        verbose (bool): Optional.
            If set to True, information is printed to the screen. If set to False, data is returned as a `list` of `dict`s.

    Returns:
        stdout | list
    """

    url = f"https://basedosdados.org/api/3/action/bd_bdm_table_show?dataset_id={dataset_id}&table_id={table_id}"

    response = _safe_fetch(url)

    json_response = response.json()

    columns = json_response["result"]["columns"]

    if verbose:
        return _print_output(pd.DataFrame(columns))
    return columns

get_table_description(dataset_id, table_id, verbose=True)

Prints the full table description.

Parameters:

Name Type Description Default
dataset_id str

Required. Dataset id available in list_datasets.

required
table_id str

Required. Table id available in list_dataset_tables

required
verbose bool

Optional. If set to True, information is printed to the screen. If set to False, data is returned as a str.

True

Returns:

Type Description

stdout | str

Source code in basedosdados/download/metadata.py
def get_table_description(
    dataset_id,
    table_id,
    verbose=True,
):
    """
    Prints the full table description.

    Args:
        dataset_id (str): Required.
            Dataset id available in list_datasets.
        table_id (str): Required.
            Table id available in list_dataset_tables
        verbose (bool): Optional.
            If set to True, information is printed to the screen. If set to False, data is returned as a `str`.

    Returns:
        stdout | str
    """

    url = f"https://basedosdados.org/api/3/action/bd_bdm_table_show?dataset_id={dataset_id}&table_id={table_id}"

    response = _safe_fetch(url)

    json_response = response.json()

    description = json_response["result"]["description"]

    if verbose:
        return print(description)
    return description

get_table_size(dataset_id, table_id, verbose=True)

Use a query to get the number of rows and size (in Mb) of a table.

WARNING: this query may cost a lot depending on the table.

Parameters:

Name Type Description Default
dataset_id str

Optional. Dataset id available in basedosdados. It should always come with table_id.

required
table_id str

Optional. Table id available in basedosdados.dataset_id. It should always come with dataset_id.

required
verbose bool

Optional. If set to True, information is printed to the screen. If set to False, data is returned as a list of dicts.

True
Source code in basedosdados/download/metadata.py
def get_table_size(
    dataset_id,
    table_id,
    verbose=True,
):
    """Use a query to get the number of rows and size (in Mb) of a table.

    WARNING: this query may cost a lot depending on the table.

    Args:
        dataset_id (str): Optional.
            Dataset id available in basedosdados. It should always come with table_id.
        table_id (str): Optional.
            Table id available in basedosdados.dataset_id.
            It should always come with dataset_id.
        verbose (bool): Optional.
            If set to True, information is printed to the screen. If set to False, data is returned as a `list` of `dict`s.
    """
    url = f"https://basedosdados.org/api/3/action/bd_bdm_table_show?dataset_id={dataset_id}&table_id={table_id}"

    response = _safe_fetch(url)

    json_response = response.json()

    size = json_response["result"]["size"]

    if size is None:
        return print("Size not available")
    if verbose:
        return _print_output(pd.DataFrame(size))
    return size

list_dataset_tables(dataset_id, with_description=False, verbose=True)

Fetch table_id for tables available at the specified dataset_id. Prints the information on screen or returns it as a list.

Parameters:

Name Type Description Default
dataset_id str

Optional. Dataset id returned by list_datasets function

required
limit int

Field to limit the number of results

required
with_description bool

Optional If True, fetch short table descriptions for each table that match the search criteria.

False
verbose bool

Optional. If set to True, information is printed to the screen. If set to False, a list object is returned.

True

Returns:

Type Description

stdout | list

Source code in basedosdados/download/metadata.py
def list_dataset_tables(
    dataset_id,
    with_description=False,
    verbose=True,
):
    """
    Fetch table_id for tables available at the specified dataset_id. Prints the information on screen or returns it as a list.

    Args:
        dataset_id (str): Optional.
            Dataset id returned by list_datasets function
        limit (int):
            Field to limit the number of results
        with_description (bool): Optional
             If True, fetch short table descriptions for each table that match the search criteria.
        verbose (bool): Optional.
            If set to True, information is printed to the screen. If set to False, a list object is returned.

    Returns:
        stdout | list
    """

    dataset_id = dataset_id.replace(
        "-", "_"
    )  # The dataset_id pattern in the bd_dataset_search endpoint response uses a hyphen as a separator, while in the endpoint urls that specify the dataset_id parameter the separator used is an underscore. See issue #1079

    url = f"https://basedosdados.org/api/3/action/bd_bdm_dataset_show?dataset_id={dataset_id}"

    response = _safe_fetch(url)

    json_response = response.json()

    dataset = json_response["result"]
    # this dict has all information need to output the function
    table_dict = {
        "table_id": [
            dataset["resources"][k]["name"]
            for k in range(len(dataset["resources"]))
            if dataset["resources"][k]["resource_type"] == "bdm_table"
        ],
        "description": [
            dataset["resources"][k]["description"]
            for k in range(len(dataset["resources"]))
            if dataset["resources"][k]["resource_type"] == "bdm_table"
        ],
    }
    # select desired output using table_id info. Note that the output is either a standardized string or a list
    if verbose & (with_description is False):
        return _print_output(pd.DataFrame.from_dict(table_dict)[["table_id"]])
    if verbose & with_description:
        return _print_output(
            pd.DataFrame.from_dict(table_dict)[["table_id", "description"]]
        )
    if (verbose is False) & (with_description is False):
        return table_dict["table_id"]
    if (verbose is False) & with_description:
        return [
            {
                "table_id": table_dict["table_id"][k],
                "description": table_dict["description"][k],
            }
            for k in range(len(table_dict["table_id"]))
        ]

    raise ValueError("`verbose` and `with_description` argument must be of `bool` type.")

list_datasets(with_description=False, verbose=True)

This function uses bd_dataset_search website API enpoint to retrieve a list of available datasets.

Parameters:

Name Type Description Default
with_description bool

Optional If True, fetch short dataset description for each dataset.

False
verbose bool

Optional. If set to True, information is printed to the screen. If set to False, a list object is returned.

True

Returns:

Type Description

list | stdout

Source code in basedosdados/download/metadata.py
def list_datasets(with_description=False, verbose=True):
    """
    This function uses `bd_dataset_search` website API
    enpoint to retrieve a list of available datasets.

    Args:
        with_description (bool): Optional
            If True, fetch short dataset description for each dataset.
        verbose (bool): Optional.
            If set to True, information is printed to the screen. If set to False, a list object is returned.

    Returns:
        list | stdout
    """
    # first request is made separately since we need to now the number of pages before the iteration
    page_size = 100  # this function will only made more than one requisition if there are more than 100 datasets in the API response #pylint: disable=C0301
    url = f"https://basedosdados.org/api/3/action/bd_dataset_search?q=&resource_type=bdm_table&page=1&page_size={page_size}"  # pylint: disable=C0301
    response = _safe_fetch(url)
    json_response = response.json()
    n_datasets = json_response["result"]["count"]
    n_pages = math.ceil(n_datasets / page_size)
    temp_dict = _dict_from_page(json_response)

    temp_dicts = [temp_dict]
    for page in range(2, n_pages + 1):
        url = f"https://basedosdados.org/api/3/action/bd_dataset_search?q=&resource_type=bdm_table&page={page}&page_size={page_size}"  # pylint: disable=C0301
        response = _safe_fetch(url)
        json_response = response.json()
        temp_dict = _dict_from_page(json_response)
        temp_dicts.append(temp_dict)

    dataset_dict = defaultdict(list)

    for d in temp_dicts:  # pylint: disable=C0103
        for key, value in d.items():
            dataset_dict[key].append(value)

    # flat inner lists
    dataset_dict["dataset_id"] = [
        item for sublist in dataset_dict["dataset_id"] for item in sublist
    ]  # pylint: disable=C0301
    dataset_dict["description"] = [
        item for sublist in dataset_dict["description"] for item in sublist
    ]  # pylint: disable=C0301
    # select desired output using dataset_id info. Note that the output is either a standardized string or a list #pylint: disable=C0301
    if verbose & (with_description is False):
        return _print_output(pd.DataFrame.from_dict(dataset_dict)[["dataset_id"]])
    if verbose & with_description:
        return _print_output(
            pd.DataFrame.from_dict(dataset_dict)[["dataset_id", "description"]]
        )
    if (verbose is False) & (with_description is False):
        return dataset_dict["dataset_id"]
    if (verbose is False) & with_description:
        return [
            {
                "dataset_id": dataset_dict["dataset_id"][k],
                "description": dataset_dict["description"][k],
            }
            for k in range(len(dataset_dict["dataset_id"]))
        ]
    raise ValueError("`verbose` and `with_description` argument must be of `bool` type.")

search(query, order_by)

This function works as a wrapper to the bd_dataset_search website API enpoint.

Parameters:

Name Type Description Default
query str

String to search in datasets and tables' metadata.

required
order_by str

score|popular|recent Field by which the results will be ordered.

required

Returns:

Type Description
pd.DataFrame

Response from the API presented as a pandas DataFrame. Each row is a table. Each column is a field identifying the table.

Source code in basedosdados/download/metadata.py
def search(query, order_by):
    """This function works as a wrapper to the `bd_dataset_search` website API
    enpoint.

    Args:
        query (str):
            String to search in datasets and tables' metadata.
        order_by (str): score|popular|recent
            Field by which the results will be ordered.

    Returns:
        pd.DataFrame:
            Response from the API presented as a pandas DataFrame. Each row is
            a table. Each column is a field identifying the table.
    """

    # validate order_by input
    if order_by not in ["score", "popular", "recent"]:
        raise ValueError(
            f'order_by must be score, popular or recent. Received "{order_by}"'
        )

    url = f"https://basedosdados.org/api/3/action/bd_dataset_search?q={query}&order_by={order_by}&resource_type=bdm_table"

    response = _safe_fetch(url)

    json_response = response.json()

    dataset_dfs = []
    # first loop identify the number of the tables in each datasets
    for dataset in json_response["result"]["datasets"]:
        tables_dfs = []
        len(dataset["resources"])
        # second loop extracts tables' information for each dataset
        for table in dataset["resources"]:
            data_table = pd.DataFrame(
                {k: str(table[k]) for k in list(table.keys())}, index=[0]
            )
            tables_dfs.append(data_table)
        # append tables' dataframes for each dataset
        data_ds = tables_dfs[0].append(tables_dfs[1:]).reset_index(drop=True)
        dataset_dfs.append(data_ds)
    # append datasets' dataframes
    df = dataset_dfs[0].append(dataset_dfs[1:]).reset_index(drop=True)

    return df

Classes (Gerenciamento de dados)

Class for managing the files in cloud storage.

Storage (Base)

Manage files on Google Cloud Storage.

Source code in basedosdados/upload/storage.py
class Storage(Base):
    """
    Manage files on Google Cloud Storage.
    """

    def __init__(self, dataset_id=None, table_id=None, **kwargs):
        super().__init__(**kwargs)

        self.bucket = self.client["storage_staging"].bucket(self.bucket_name)
        self.dataset_id = dataset_id.replace("-", "_")
        self.table_id = table_id.replace("-", "_")

    @staticmethod
    def _resolve_partitions(partitions):

        if isinstance(partitions, dict):

            return "/".join(f"{k}={v}" for k, v in partitions.items()) + "/"

        if isinstance(partitions, str):

            if partitions.endswith("/"):
                partitions = partitions[:-1]

            # If there is no partition
            if len(partitions) == 0:
                return ""

            # It should fail if there is folder which is not a partition
            try:
                # check if it fits rule
                {b.split("=")[0]: b.split("=")[1] for b in partitions.split("/")}
            except IndexError as e:
                raise Exception(f"The path {partitions} is not a valid partition") from e

            return partitions + "/"

        raise Exception(f"Partitions format or type not accepted: {partitions}")

    def _build_blob_name(self, filename, mode, partitions=None):
        '''
        Builds the blob name.
        '''

        # table folder
        blob_name = f"{mode}/{self.dataset_id}/{self.table_id}/"

        # add partition folder
        if partitions is not None:

            blob_name += self._resolve_partitions(partitions)

        # add file name
        blob_name += filename

        return blob_name

    def init(self, replace=False, very_sure=False):
        """Initializes bucket and folders.

        Folder should be:

        * `raw` : that contains really raw data
        * `staging` : preprocessed data ready to upload to BigQuery

        Args:
            replace (bool): Optional.
                Whether to replace if bucket already exists
            very_sure (bool): Optional.
                Are you aware that everything is going to be erased if you
                replace the bucket?

        Raises:
            Warning: very_sure argument is still False.
        """

        if replace:
            if not very_sure:
                raise Warning(
                    "\n********************************************************"
                    "\nYou are trying to replace all the data that you have "
                    f"in bucket {self.bucket_name}.\nAre you sure?\n"
                    "If yes, add the flag --very_sure\n"
                    "********************************************************"
                )
            self.bucket.delete(force=True)

        self.client["storage_staging"].create_bucket(self.bucket)

        for folder in ["staging/", "raw/"]:

            self.bucket.blob(folder).upload_from_string("")

    def upload(
        self,
        path,
        mode="all",
        partitions=None,
        if_exists="raise",
        chunk_size=None,
        **upload_args,
    ):
        """Upload to storage at `<bucket_name>/<mode>/<dataset_id>/<table_id>`. You can:

        * Add a single **file** setting `path = <file_path>`.

        * Add a **folder** with multiple files setting `path =
          <folder_path>`. *The folder should just contain the files and
          no folders.*

        * Add **partitioned files** setting `path = <folder_path>`.
          This folder must follow the hive partitioning scheme i.e.
          `<table_id>/<key>=<value>/<key2>=<value2>/<partition>.csv`
          (ex: `mytable/country=brasil/year=2020/mypart.csv`).

        *Remember all files must follow a single schema.* Otherwise, things
        might fail in the future.

        There are 6 modes:

        * `raw` : should contain raw files from datasource
        * `staging` : should contain pre-treated files ready to upload to BiqQuery
        * `header`: should contain the header of the tables
        * `auxiliary_files`: should contain auxiliary files from eache table
        * `architecture`: should contain the architecture sheet of the tables
        * `all`: if no treatment is needed, use `all`.

        Args:
            path (str or pathlib.PosixPath): Where to find the file or
                folder that you want to upload to storage

            mode (str): Folder of which dataset to update [raw|staging|header|auxiliary_files|architecture|all]

            partitions (str, pathlib.PosixPath, or dict): Optional.
                *If adding a single file*, use this to add it to a specific partition.

                * str : `<key>=<value>/<key2>=<value2>`
                * dict: `dict(key=value, key2=value2)`

            if_exists (str): Optional.
                What to do if data exists

                * 'raise' : Raises Conflict exception
                * 'replace' : Replace table
                * 'pass' : Do nothing
            chunk_size (int): Optional
                The size of a chunk of data whenever iterating (in bytes).
                This must be a multiple of 256 KB per the API specification.
                If not specified, the chunk_size of the blob itself is used. If that is not specified, a default value of 40 MB is used.

            upload_args ():
                Extra arguments accepted by [`google.cloud.storage.blob.Blob.upload_from_file`](https://googleapis.dev/python/storage/latest/blobs.html?highlight=upload_from_filename#google.cloud.storage.blob.Blob.upload_from_filename)
        """

        if (self.dataset_id is None) or (self.table_id is None):
            raise Exception("You need to pass dataset_id and table_id")

        path = Path(path)

        if path.is_dir():
            paths = [
                f
                for f in path.glob("**/*")
                if f.is_file() and f.suffix in [".csv", ".parquet", "parquet.gzip"]
            ]

            parts = [
                (
                    filepath.as_posix()
                    .replace(path.as_posix() + "/", "")
                    .replace(str(filepath.name), "")
                )
                for filepath in paths
            ]

        else:
            paths = [path]
            parts = [partitions or None]

        self._check_mode(mode)

        mode = (
            ["raw", "staging", "header", "auxiliary_files", "architecture"]
            if mode == "all"
            else [mode]
        )
        for m in mode:

            for filepath, part in tqdm(list(zip(paths, parts)), desc="Uploading files"):

                blob_name = self._build_blob_name(filepath.name, m, part)

                blob = self.bucket.blob(blob_name, chunk_size=chunk_size)

                if not blob.exists() or if_exists == "replace":

                    upload_args["timeout"] = upload_args.get("timeout", None)

                    blob.upload_from_filename(str(filepath), **upload_args)

                elif if_exists == "pass":

                    pass

                else:
                    raise BaseDosDadosException(
                        f"Data already exists at {self.bucket_name}/{blob_name}. "
                        "If you are using Storage.upload then set if_exists to "
                        "'replace' to overwrite data \n"
                        "If you are using Table.create then set if_storage_data_exists "
                        "to 'replace' to overwrite data."
                    )

        logger.success(
            " {object} {filename}_{mode} was {action}!",
            filename=filepath.name,
            mode=mode,
            object="File",
            action="uploaded",
        )

    def download(
        self,
        filename="*",
        savepath="",
        partitions=None,
        mode="raw",
        if_not_exists="raise",
    ):

        """Download files from Google Storage from path `mode`/`dataset_id`/`table_id`/`partitions`/`filename` and replicate folder hierarchy
        on save,

        There are 5 modes:
        * `raw` : should contain raw files from datasource
        * `staging` : should contain pre-treated files ready to upload to BiqQuery
        * `header`: should contain the header of the tables
        * `auxiliary_files`: should contain auxiliary files from eache table
        * `architecture`: should contain the architecture sheet of the tables

        You can also use the `partitions` argument to choose files from a partition

        Args:
            filename (str): Optional
                Specify which file to download. If "*" , downloads all files within the bucket folder. Defaults to "*".

            savepath (str):
                Where you want to save the data on your computer. Must be a path to a directory.

            partitions (str, dict): Optional
                If downloading a single file, use this to specify the partition path from which to download.

                * str : `<key>=<value>/<key2>=<value2>`
                * dict: `dict(key=value, key2=value2)`


            mode (str): Optional
                Folder of which dataset to update.[raw|staging|header|auxiliary_files|architecture]

            if_not_exists (str): Optional.
                What to do if data not found.

                * 'raise' : Raises FileNotFoundError.
                * 'pass' : Do nothing and exit the function

        Raises:
            FileNotFoundError: If the given path `<mode>/<dataset_id>/<table_id>/<partitions>/<filename>` could not be found or there are no files to download.
        """

        # Prefix to locate files within the bucket
        prefix = f"{mode}/{self.dataset_id}/{self.table_id}/"

        # Add specific partition to search prefix
        if partitions:
            prefix += self._resolve_partitions(partitions)

        # if no filename is passed, list all blobs within a given table
        if filename != "*":
            prefix += filename

        blob_list = list(self.bucket.list_blobs(prefix=prefix))

        # if there are no blobs matching the search raise FileNotFoundError or return
        if not blob_list:
            if if_not_exists == "raise":
                raise FileNotFoundError(f"Could not locate files at {prefix}")
            return

        # download all blobs matching the search to given savepath
        for blob in tqdm(blob_list, desc="Download Blob"):

            # parse blob.name and get the csv file name
            csv_name = blob.name.split("/")[-1]

            # build folder path replicating storage hierarchy
            blob_folder = blob.name.replace(csv_name, "")

            # replicate folder hierarchy
            (Path(savepath) / blob_folder).mkdir(parents=True, exist_ok=True)

            # download blob to savepath
            blob.download_to_filename(filename=f"{savepath}/{blob.name}")

        logger.success(
            " {object} {object_id}_{mode} was {action}!",
            object_id=self.dataset_id,
            mode=mode,
            object="File",
            action="downloaded",
        )

    def delete_file(self, filename, mode, partitions=None, not_found_ok=False):
        """Deletes file from path `<bucket_name>/<mode>/<dataset_id>/<table_id>/<partitions>/<filename>`.

        Args:
            filename (str): Name of the file to be deleted

            mode (str): Folder of which dataset to update [raw|staging|header|auxiliary_files|architecture|all]

            partitions (str, pathlib.PosixPath, or dict): Optional.
                Hive structured partition as a string or dict

                * str : `<key>=<value>/<key2>=<value2>`
                * dict: `dict(key=value, key2=value2)`

            not_found_ok (bool): Optional.
                What to do if file not found
        """

        self._check_mode(mode)

        mode = (
            ["raw", "staging", "header", "auxiliary_files", "architecture"]
            if mode == "all"
            else [mode]
        )

        for m in mode:

            blob = self.bucket.blob(self._build_blob_name(filename, m, partitions))

            if blob.exists() or not blob.exists() and not not_found_ok:
                blob.delete()
            else:
                return

        logger.success(
            " {object} {filename}_{mode} was {action}!",
            filename=filename,
            mode=mode,
            object="File",
            action="deleted",
        )

    def delete_table(self, mode="staging", bucket_name=None, not_found_ok=False):
        """Deletes a table from storage, sends request in batches.

        Args:
            mode (str): Folder of which dataset to update [raw|staging|header|auxiliary_files|architecture]
                Folder of which dataset to update. Defaults to "staging".

            bucket_name (str):
                The bucket name from which to delete the table. If None, defaults to the bucket initialized when instantiating the Storage object.
                (You can check it with the Storage().bucket property)

            not_found_ok (bool): Optional.
                What to do if table not found

        """

        prefix = f"{mode}/{self.dataset_id}/{self.table_id}/"

        if bucket_name is not None:

            table_blobs = list(
                self.client["storage_staging"]
                .bucket(f"{bucket_name}")
                .list_blobs(prefix=prefix)
            )

        else:

            table_blobs = list(self.bucket.list_blobs(prefix=prefix))

        if not table_blobs:
            if not_found_ok:
                return
            raise FileNotFoundError(
                f"Could not find the requested table {self.dataset_id}.{self.table_id}"
            )
        # Divides table_blobs list for maximum batch request size
        table_blobs_chunks = [
            table_blobs[i : i + 999] for i in range(0, len(table_blobs), 999)
        ]

        for i, source_table in enumerate(
            tqdm(table_blobs_chunks, desc="Delete Table Chunk")
        ):
            counter = 0
            while counter < 10:
                try:
                    with self.client["storage_staging"].batch():
                        for blob in source_table:
                            blob.delete()
                    break
                except Exception:
                    print(
                        f"Delete Table Chunk {i} | Attempt {counter}: delete operation starts again in 5 seconds...",
                    )
                    time.sleep(5)
                    counter += 1
                    traceback.print_exc(file=sys.stderr)
        logger.success(
            " {object} {object_id}_{mode} was {action}!",
            object_id=self.table_id,
            mode=mode,
            object="Table",
            action="deleted",
        )

    def copy_table(
        self,
        source_bucket_name="basedosdados",
        destination_bucket_name=None,
        mode="staging",
    ):
        """Copies table from a source bucket to your bucket, sends request in batches.

        Args:
            source_bucket_name (str):
                The bucket name from which to copy data. You can change it
                to copy from other external bucket.

            destination_bucket_name (str): Optional
                The bucket name where data will be copied to.
                If None, defaults to the bucket initialized when instantiating the Storage object (You can check it with the
                Storage().bucket property)

            mode (str): Folder of which dataset to update [raw|staging|header|auxiliary_files|architecture]
                Folder of which dataset to update. Defaults to "staging".
        """

        source_table_ref = list(
            self.client["storage_staging"]
            .bucket(source_bucket_name)
            .list_blobs(prefix=f"{mode}/{self.dataset_id}/{self.table_id}/")
        )

        if not source_table_ref:
            raise FileNotFoundError(
                f"Could not find the requested table {self.dataset_id}.{self.table_id}"
            )

        if destination_bucket_name is None:

            destination_bucket = self.bucket

        else:

            destination_bucket = self.client["storage_staging"].bucket(
                destination_bucket_name
            )

        # Divides source_table_ref list for maximum batch request size
        source_table_ref_chunks = [
            source_table_ref[i : i + 999] for i in range(0, len(source_table_ref), 999)
        ]

        for i, source_table in enumerate(
            tqdm(source_table_ref_chunks, desc="Copy Table Chunk")
        ):
            counter = 0
            while counter < 10:
                try:
                    with self.client["storage_staging"].batch():
                        for blob in source_table:
                            self.bucket.copy_blob(
                                blob,
                                destination_bucket=destination_bucket,
                            )
                    break
                except Exception:
                    print(
                        f"Copy Table Chunk {i} | Attempt {counter}: copy operation starts again in 5 seconds...",
                    )
                    counter += 1
                    time.sleep(5)
                    traceback.print_exc(file=sys.stderr)
        logger.success(
            " {object} {object_id}_{mode} was {action}!",
            object_id=self.table_id,
            mode=mode,
            object="Table",
            action="copied",
        )

copy_table(self, source_bucket_name='basedosdados', destination_bucket_name=None, mode='staging')

Copies table from a source bucket to your bucket, sends request in batches.

Parameters:

Name Type Description Default
source_bucket_name str

The bucket name from which to copy data. You can change it to copy from other external bucket.

'basedosdados'
destination_bucket_name str

Optional The bucket name where data will be copied to. If None, defaults to the bucket initialized when instantiating the Storage object (You can check it with the Storage().bucket property)

None
mode str

Folder of which dataset to update [raw|staging|header|auxiliary_files|architecture] Folder of which dataset to update. Defaults to "staging".

'staging'
Source code in basedosdados/upload/storage.py
def copy_table(
    self,
    source_bucket_name="basedosdados",
    destination_bucket_name=None,
    mode="staging",
):
    """Copies table from a source bucket to your bucket, sends request in batches.

    Args:
        source_bucket_name (str):
            The bucket name from which to copy data. You can change it
            to copy from other external bucket.

        destination_bucket_name (str): Optional
            The bucket name where data will be copied to.
            If None, defaults to the bucket initialized when instantiating the Storage object (You can check it with the
            Storage().bucket property)

        mode (str): Folder of which dataset to update [raw|staging|header|auxiliary_files|architecture]
            Folder of which dataset to update. Defaults to "staging".
    """

    source_table_ref = list(
        self.client["storage_staging"]
        .bucket(source_bucket_name)
        .list_blobs(prefix=f"{mode}/{self.dataset_id}/{self.table_id}/")
    )

    if not source_table_ref:
        raise FileNotFoundError(
            f"Could not find the requested table {self.dataset_id}.{self.table_id}"
        )

    if destination_bucket_name is None:

        destination_bucket = self.bucket

    else:

        destination_bucket = self.client["storage_staging"].bucket(
            destination_bucket_name
        )

    # Divides source_table_ref list for maximum batch request size
    source_table_ref_chunks = [
        source_table_ref[i : i + 999] for i in range(0, len(source_table_ref), 999)
    ]

    for i, source_table in enumerate(
        tqdm(source_table_ref_chunks, desc="Copy Table Chunk")
    ):
        counter = 0
        while counter < 10:
            try:
                with self.client["storage_staging"].batch():
                    for blob in source_table:
                        self.bucket.copy_blob(
                            blob,
                            destination_bucket=destination_bucket,
                        )
                break
            except Exception:
                print(
                    f"Copy Table Chunk {i} | Attempt {counter}: copy operation starts again in 5 seconds...",
                )
                counter += 1
                time.sleep(5)
                traceback.print_exc(file=sys.stderr)
    logger.success(
        " {object} {object_id}_{mode} was {action}!",
        object_id=self.table_id,
        mode=mode,
        object="Table",
        action="copied",
    )

delete_file(self, filename, mode, partitions=None, not_found_ok=False)

Deletes file from path <bucket_name>/<mode>/<dataset_id>/<table_id>/<partitions>/<filename>.

Parameters:

Name Type Description Default
filename str

Name of the file to be deleted

required
mode str

Folder of which dataset to update [raw|staging|header|auxiliary_files|architecture|all]

required
partitions str, pathlib.PosixPath, or dict

Optional. Hive structured partition as a string or dict

  • str : <key>=<value>/<key2>=<value2>
  • dict: dict(key=value, key2=value2)
None
not_found_ok bool

Optional. What to do if file not found

False
Source code in basedosdados/upload/storage.py
def delete_file(self, filename, mode, partitions=None, not_found_ok=False):
    """Deletes file from path `<bucket_name>/<mode>/<dataset_id>/<table_id>/<partitions>/<filename>`.

    Args:
        filename (str): Name of the file to be deleted

        mode (str): Folder of which dataset to update [raw|staging|header|auxiliary_files|architecture|all]

        partitions (str, pathlib.PosixPath, or dict): Optional.
            Hive structured partition as a string or dict

            * str : `<key>=<value>/<key2>=<value2>`
            * dict: `dict(key=value, key2=value2)`

        not_found_ok (bool): Optional.
            What to do if file not found
    """

    self._check_mode(mode)

    mode = (
        ["raw", "staging", "header", "auxiliary_files", "architecture"]
        if mode == "all"
        else [mode]
    )

    for m in mode:

        blob = self.bucket.blob(self._build_blob_name(filename, m, partitions))

        if blob.exists() or not blob.exists() and not not_found_ok:
            blob.delete()
        else:
            return

    logger.success(
        " {object} {filename}_{mode} was {action}!",
        filename=filename,
        mode=mode,
        object="File",
        action="deleted",
    )

delete_table(self, mode='staging', bucket_name=None, not_found_ok=False)

Deletes a table from storage, sends request in batches.

Parameters:

Name Type Description Default
mode str

Folder of which dataset to update [raw|staging|header|auxiliary_files|architecture] Folder of which dataset to update. Defaults to "staging".

'staging'
bucket_name str

The bucket name from which to delete the table. If None, defaults to the bucket initialized when instantiating the Storage object. (You can check it with the Storage().bucket property)

None
not_found_ok bool

Optional. What to do if table not found

False
Source code in basedosdados/upload/storage.py
def delete_table(self, mode="staging", bucket_name=None, not_found_ok=False):
    """Deletes a table from storage, sends request in batches.

    Args:
        mode (str): Folder of which dataset to update [raw|staging|header|auxiliary_files|architecture]
            Folder of which dataset to update. Defaults to "staging".

        bucket_name (str):
            The bucket name from which to delete the table. If None, defaults to the bucket initialized when instantiating the Storage object.
            (You can check it with the Storage().bucket property)

        not_found_ok (bool): Optional.
            What to do if table not found

    """

    prefix = f"{mode}/{self.dataset_id}/{self.table_id}/"

    if bucket_name is not None:

        table_blobs = list(
            self.client["storage_staging"]
            .bucket(f"{bucket_name}")
            .list_blobs(prefix=prefix)
        )

    else:

        table_blobs = list(self.bucket.list_blobs(prefix=prefix))

    if not table_blobs:
        if not_found_ok:
            return
        raise FileNotFoundError(
            f"Could not find the requested table {self.dataset_id}.{self.table_id}"
        )
    # Divides table_blobs list for maximum batch request size
    table_blobs_chunks = [
        table_blobs[i : i + 999] for i in range(0, len(table_blobs), 999)
    ]

    for i, source_table in enumerate(
        tqdm(table_blobs_chunks, desc="Delete Table Chunk")
    ):
        counter = 0
        while counter < 10:
            try:
                with self.client["storage_staging"].batch():
                    for blob in source_table:
                        blob.delete()
                break
            except Exception:
                print(
                    f"Delete Table Chunk {i} | Attempt {counter}: delete operation starts again in 5 seconds...",
                )
                time.sleep(5)
                counter += 1
                traceback.print_exc(file=sys.stderr)
    logger.success(
        " {object} {object_id}_{mode} was {action}!",
        object_id=self.table_id,
        mode=mode,
        object="Table",
        action="deleted",
    )

download(self, filename='*', savepath='', partitions=None, mode='raw', if_not_exists='raise')

Download files from Google Storage from path mode/dataset_id/table_id/partitions/filename and replicate folder hierarchy on save,

There are 5 modes: * raw : should contain raw files from datasource * staging : should contain pre-treated files ready to upload to BiqQuery * header: should contain the header of the tables * auxiliary_files: should contain auxiliary files from eache table * architecture: should contain the architecture sheet of the tables

You can also use the partitions argument to choose files from a partition

Parameters:

Name Type Description Default
filename str

Optional Specify which file to download. If "" , downloads all files within the bucket folder. Defaults to "".

'*'
savepath str

Where you want to save the data on your computer. Must be a path to a directory.

''
partitions str, dict

Optional If downloading a single file, use this to specify the partition path from which to download.

  • str : <key>=<value>/<key2>=<value2>
  • dict: dict(key=value, key2=value2)
None
mode str

Optional Folder of which dataset to update.[raw|staging|header|auxiliary_files|architecture]

'raw'
if_not_exists str

Optional. What to do if data not found.

  • 'raise' : Raises FileNotFoundError.
  • 'pass' : Do nothing and exit the function
'raise'

Exceptions:

Type Description
FileNotFoundError

If the given path <mode>/<dataset_id>/<table_id>/<partitions>/<filename> could not be found or there are no files to download.

Source code in basedosdados/upload/storage.py
def download(
    self,
    filename="*",
    savepath="",
    partitions=None,
    mode="raw",
    if_not_exists="raise",
):

    """Download files from Google Storage from path `mode`/`dataset_id`/`table_id`/`partitions`/`filename` and replicate folder hierarchy
    on save,

    There are 5 modes:
    * `raw` : should contain raw files from datasource
    * `staging` : should contain pre-treated files ready to upload to BiqQuery
    * `header`: should contain the header of the tables
    * `auxiliary_files`: should contain auxiliary files from eache table
    * `architecture`: should contain the architecture sheet of the tables

    You can also use the `partitions` argument to choose files from a partition

    Args:
        filename (str): Optional
            Specify which file to download. If "*" , downloads all files within the bucket folder. Defaults to "*".

        savepath (str):
            Where you want to save the data on your computer. Must be a path to a directory.

        partitions (str, dict): Optional
            If downloading a single file, use this to specify the partition path from which to download.

            * str : `<key>=<value>/<key2>=<value2>`
            * dict: `dict(key=value, key2=value2)`


        mode (str): Optional
            Folder of which dataset to update.[raw|staging|header|auxiliary_files|architecture]

        if_not_exists (str): Optional.
            What to do if data not found.

            * 'raise' : Raises FileNotFoundError.
            * 'pass' : Do nothing and exit the function

    Raises:
        FileNotFoundError: If the given path `<mode>/<dataset_id>/<table_id>/<partitions>/<filename>` could not be found or there are no files to download.
    """

    # Prefix to locate files within the bucket
    prefix = f"{mode}/{self.dataset_id}/{self.table_id}/"

    # Add specific partition to search prefix
    if partitions:
        prefix += self._resolve_partitions(partitions)

    # if no filename is passed, list all blobs within a given table
    if filename != "*":
        prefix += filename

    blob_list = list(self.bucket.list_blobs(prefix=prefix))

    # if there are no blobs matching the search raise FileNotFoundError or return
    if not blob_list:
        if if_not_exists == "raise":
            raise FileNotFoundError(f"Could not locate files at {prefix}")
        return

    # download all blobs matching the search to given savepath
    for blob in tqdm(blob_list, desc="Download Blob"):

        # parse blob.name and get the csv file name
        csv_name = blob.name.split("/")[-1]

        # build folder path replicating storage hierarchy
        blob_folder = blob.name.replace(csv_name, "")

        # replicate folder hierarchy
        (Path(savepath) / blob_folder).mkdir(parents=True, exist_ok=True)

        # download blob to savepath
        blob.download_to_filename(filename=f"{savepath}/{blob.name}")

    logger.success(
        " {object} {object_id}_{mode} was {action}!",
        object_id=self.dataset_id,
        mode=mode,
        object="File",
        action="downloaded",
    )

init(self, replace=False, very_sure=False)

Initializes bucket and folders.

Folder should be:

  • raw : that contains really raw data
  • staging : preprocessed data ready to upload to BigQuery

Parameters:

Name Type Description Default
replace bool

Optional. Whether to replace if bucket already exists

False
very_sure bool

Optional. Are you aware that everything is going to be erased if you replace the bucket?

False

Exceptions:

Type Description
Warning

very_sure argument is still False.

Source code in basedosdados/upload/storage.py
def init(self, replace=False, very_sure=False):
    """Initializes bucket and folders.

    Folder should be:

    * `raw` : that contains really raw data
    * `staging` : preprocessed data ready to upload to BigQuery

    Args:
        replace (bool): Optional.
            Whether to replace if bucket already exists
        very_sure (bool): Optional.
            Are you aware that everything is going to be erased if you
            replace the bucket?

    Raises:
        Warning: very_sure argument is still False.
    """

    if replace:
        if not very_sure:
            raise Warning(
                "\n********************************************************"
                "\nYou are trying to replace all the data that you have "
                f"in bucket {self.bucket_name}.\nAre you sure?\n"
                "If yes, add the flag --very_sure\n"
                "********************************************************"
            )
        self.bucket.delete(force=True)

    self.client["storage_staging"].create_bucket(self.bucket)

    for folder in ["staging/", "raw/"]:

        self.bucket.blob(folder).upload_from_string("")

upload(self, path, mode='all', partitions=None, if_exists='raise', chunk_size=None, **upload_args)

Upload to storage at <bucket_name>/<mode>/<dataset_id>/<table_id>. You can:

  • Add a single file setting path = <file_path>.

  • Add a folder with multiple files setting path = <folder_path>. The folder should just contain the files and no folders.

  • Add partitioned files setting path = <folder_path>. This folder must follow the hive partitioning scheme i.e. <table_id>/<key>=<value>/<key2>=<value2>/<partition>.csv (ex: mytable/country=brasil/year=2020/mypart.csv).

Remember all files must follow a single schema. Otherwise, things might fail in the future.

There are 6 modes:

  • raw : should contain raw files from datasource
  • staging : should contain pre-treated files ready to upload to BiqQuery
  • header: should contain the header of the tables
  • auxiliary_files: should contain auxiliary files from eache table
  • architecture: should contain the architecture sheet of the tables
  • all: if no treatment is needed, use all.

Parameters:

Name Type Description Default
path str or pathlib.PosixPath

Where to find the file or folder that you want to upload to storage

required
mode str

Folder of which dataset to update [raw|staging|header|auxiliary_files|architecture|all]

'all'
partitions str, pathlib.PosixPath, or dict

Optional. If adding a single file, use this to add it to a specific partition.

  • str : <key>=<value>/<key2>=<value2>
  • dict: dict(key=value, key2=value2)
None
if_exists str

Optional. What to do if data exists

  • 'raise' : Raises Conflict exception
  • 'replace' : Replace table
  • 'pass' : Do nothing
'raise'
chunk_size int

Optional The size of a chunk of data whenever iterating (in bytes). This must be a multiple of 256 KB per the API specification. If not specified, the chunk_size of the blob itself is used. If that is not specified, a default value of 40 MB is used.

None
upload_args

Extra arguments accepted by google.cloud.storage.blob.Blob.upload_from_file

{}
Source code in basedosdados/upload/storage.py
def upload(
    self,
    path,
    mode="all",
    partitions=None,
    if_exists="raise",
    chunk_size=None,
    **upload_args,
):
    """Upload to storage at `<bucket_name>/<mode>/<dataset_id>/<table_id>`. You can:

    * Add a single **file** setting `path = <file_path>`.

    * Add a **folder** with multiple files setting `path =
      <folder_path>`. *The folder should just contain the files and
      no folders.*

    * Add **partitioned files** setting `path = <folder_path>`.
      This folder must follow the hive partitioning scheme i.e.
      `<table_id>/<key>=<value>/<key2>=<value2>/<partition>.csv`
      (ex: `mytable/country=brasil/year=2020/mypart.csv`).

    *Remember all files must follow a single schema.* Otherwise, things
    might fail in the future.

    There are 6 modes:

    * `raw` : should contain raw files from datasource
    * `staging` : should contain pre-treated files ready to upload to BiqQuery
    * `header`: should contain the header of the tables
    * `auxiliary_files`: should contain auxiliary files from eache table
    * `architecture`: should contain the architecture sheet of the tables
    * `all`: if no treatment is needed, use `all`.

    Args:
        path (str or pathlib.PosixPath): Where to find the file or
            folder that you want to upload to storage

        mode (str): Folder of which dataset to update [raw|staging|header|auxiliary_files|architecture|all]

        partitions (str, pathlib.PosixPath, or dict): Optional.
            *If adding a single file*, use this to add it to a specific partition.

            * str : `<key>=<value>/<key2>=<value2>`
            * dict: `dict(key=value, key2=value2)`

        if_exists (str): Optional.
            What to do if data exists

            * 'raise' : Raises Conflict exception
            * 'replace' : Replace table
            * 'pass' : Do nothing
        chunk_size (int): Optional
            The size of a chunk of data whenever iterating (in bytes).
            This must be a multiple of 256 KB per the API specification.
            If not specified, the chunk_size of the blob itself is used. If that is not specified, a default value of 40 MB is used.

        upload_args ():
            Extra arguments accepted by [`google.cloud.storage.blob.Blob.upload_from_file`](https://googleapis.dev/python/storage/latest/blobs.html?highlight=upload_from_filename#google.cloud.storage.blob.Blob.upload_from_filename)
    """

    if (self.dataset_id is None) or (self.table_id is None):
        raise Exception("You need to pass dataset_id and table_id")

    path = Path(path)

    if path.is_dir():
        paths = [
            f
            for f in path.glob("**/*")
            if f.is_file() and f.suffix in [".csv", ".parquet", "parquet.gzip"]
        ]

        parts = [
            (
                filepath.as_posix()
                .replace(path.as_posix() + "/", "")
                .replace(str(filepath.name), "")
            )
            for filepath in paths
        ]

    else:
        paths = [path]
        parts = [partitions or None]

    self._check_mode(mode)

    mode = (
        ["raw", "staging", "header", "auxiliary_files", "architecture"]
        if mode == "all"
        else [mode]
    )
    for m in mode:

        for filepath, part in tqdm(list(zip(paths, parts)), desc="Uploading files"):

            blob_name = self._build_blob_name(filepath.name, m, part)

            blob = self.bucket.blob(blob_name, chunk_size=chunk_size)

            if not blob.exists() or if_exists == "replace":

                upload_args["timeout"] = upload_args.get("timeout", None)

                blob.upload_from_filename(str(filepath), **upload_args)

            elif if_exists == "pass":

                pass

            else:
                raise BaseDosDadosException(
                    f"Data already exists at {self.bucket_name}/{blob_name}. "
                    "If you are using Storage.upload then set if_exists to "
                    "'replace' to overwrite data \n"
                    "If you are using Table.create then set if_storage_data_exists "
                    "to 'replace' to overwrite data."
                )

    logger.success(
        " {object} {filename}_{mode} was {action}!",
        filename=filepath.name,
        mode=mode,
        object="File",
        action="uploaded",
    )

Module for manage dataset to the server.

Dataset (Base)

Manage datasets in BigQuery.

Source code in basedosdados/upload/dataset.py
class Dataset(Base):
    """
    Manage datasets in BigQuery.
    """

    def __init__(self, dataset_id, **kwargs):
        super().__init__(**kwargs)

        self.dataset_id = dataset_id.replace("-", "_")
        self.dataset_folder = Path(self.metadata_path / self.dataset_id)
        self.metadata = Metadata(self.dataset_id, **kwargs)

    @property
    def dataset_config(self):
        """
        Dataset config file.
        """

        return self._load_yaml(
            self.metadata_path / self.dataset_id / "dataset_config.yaml"
        )

    def _loop_modes(self, mode="all"):
        """
        Loop modes.
        """

        mode = ["prod", "staging"] if mode == "all" else [mode]
        dataset_tag = lambda m: f"_{m}" if m == "staging" else ""

        return (
            {
                "client": self.client[f"bigquery_{m}"],
                "id": f"{self.client[f'bigquery_{m}'].project}.{self.dataset_id}{dataset_tag(m)}",
            }
            for m in mode
        )

    @staticmethod
    def _setup_dataset_object(dataset_id, location=None):
        """
        Setup dataset object.
        """

        dataset = bigquery.Dataset(dataset_id)

        ## TODO: not being used since 1.6.0 - need to redo the description tha goes to bigquery
        dataset.description = "Para saber mais acesse https://basedosdados.org/"
        # dataset.description = self._render_template(
        #     Path("dataset/dataset_description.txt"), self.dataset_config
        # )

        dataset.location = location

        return dataset

    def _write_readme_file(self):
        """
        Write README.md file.
        """

        readme_content = (
            f"Como capturar os dados de {self.dataset_id}?\n\nPara cap"
            f"turar esses dados, basta verificar o link dos dados orig"
            f"inais indicado em dataset_config.yaml no item website.\n"
            f"\nCaso tenha sido utilizado algum código de captura ou t"
            f"ratamento, estes estarão contidos em code/. Se o dado pu"
            f"blicado for em sua versão bruta, não existirá a pasta co"
            f"de/.\n\nOs dados publicados estão disponíveis em: https:"
            f"//basedosdados.org/dataset/{self.dataset_id.replace('_','-')}"
        )

        readme_path = Path(self.metadata_path / self.dataset_id / "README.md")

        with open(readme_path, "w", encoding="utf-8") as readmefile:
            readmefile.write(readme_content)

    def init(self, replace=False):
        """Initialize dataset folder at metadata_path at `metadata_path/<dataset_id>`.

        The folder should contain:

        * `dataset_config.yaml`
        * `README.md`

        Args:
            replace (str): Optional. Whether to replace existing folder.

        Raises:
            FileExistsError: If dataset folder already exists and replace is False
        """

        # Create dataset folder
        try:
            self.dataset_folder.mkdir(exist_ok=replace, parents=True)
        except FileExistsError as e:
            raise FileExistsError(
                f"Dataset {str(self.dataset_folder.stem)} folder does not exists. "
                "Set replace=True to replace current files."
            ) from e

        # create dataset_config.yaml with metadata
        self.metadata.create(if_exists="replace")

        # create README.md file
        self._write_readme_file()

        # Add code folder
        (self.dataset_folder / "code").mkdir(exist_ok=replace, parents=True)

        return self

    def publicize(self, mode="all", dataset_is_public=True):
        """Changes IAM configuration to turn BigQuery dataset public.

        Args:
            mode (bool): Which dataset to create [prod|staging|all].
            dataset_is_public (bool): Control if prod dataset is public or not. By default staging datasets like `dataset_id_staging` are not public.
        """

        for m in self._loop_modes(mode):

            dataset = m["client"].get_dataset(m["id"])
            entries = dataset.access_entries
            # TODO https://github.com/basedosdados/mais/pull/1020
            # TODO if staging dataset is private, the prod view can't acess it: if dataset_is_public and "staging" not in dataset.dataset_id:
            if dataset_is_public:
                if "staging" not in dataset.dataset_id:
                    entries.extend(
                        [
                            bigquery.AccessEntry(
                                role="roles/bigquery.dataViewer",
                                entity_type="iamMember",
                                entity_id="allUsers",
                            ),
                            bigquery.AccessEntry(
                                role="roles/bigquery.metadataViewer",
                                entity_type="iamMember",
                                entity_id="allUsers",
                            ),
                            bigquery.AccessEntry(
                                role="roles/bigquery.user",
                                entity_type="iamMember",
                                entity_id="allUsers",
                            ),
                        ]
                    )
                else:
                    entries.extend(
                        [
                            bigquery.AccessEntry(
                                role="roles/bigquery.dataViewer",
                                entity_type="iamMember",
                                entity_id="allUsers",
                            ),
                        ]
                    )
                dataset.access_entries = entries
            m["client"].update_dataset(dataset, ["access_entries"])
        logger.success(
            " {object} {object_id}_{mode} was {action}!",
            object_id=self.dataset_id,
            mode=mode,
            object="Dataset",
            action="publicized",
        )

    def create(
        self, mode="all", if_exists="raise", dataset_is_public=True, location=None
    ):
        """Creates BigQuery datasets given `dataset_id`.

        It can create two datasets:

        * `<dataset_id>` (mode = 'prod')
        * `<dataset_id>_staging` (mode = 'staging')

        If `mode` is all, it creates both.

        Args:
            mode (str): Optional. Which dataset to create [prod|staging|all].
            if_exists (str): Optional. What to do if dataset exists

                * raise : Raises Conflict exception
                * replace : Drop all tables and replace dataset
                * update : Update dataset description
                * pass : Do nothing

            dataset_is_public (bool): Control if prod dataset is public or not. By default staging datasets like `dataset_id_staging` are not public.

            location (str): Optional. Location of dataset data.
                List of possible region names locations: https://cloud.google.com/bigquery/docs/locations


        Raises:
            Warning: Dataset already exists and if_exists is set to `raise`
        """

        if if_exists == "replace":
            self.delete(mode)
        elif if_exists == "update":

            self.update()
            return

        # Set dataset_id to the ID of the dataset to create.
        for m in self._loop_modes(mode):

            # Construct a full Dataset object to send to the API.
            dataset_obj = self._setup_dataset_object(m["id"], location=location)

            # Send the dataset to the API for creation, with an explicit timeout.
            # Raises google.api_core.exceptions.Conflict if the Dataset already
            # exists within the project.
            try:
                m["client"].create_dataset(dataset_obj)  # Make an API request.
                logger.success(
                    " {object} {object_id}_{mode} was {action}!",
                    object_id=self.dataset_id,
                    mode=mode,
                    object="Dataset",
                    action="created",
                )

            except Conflict as e:
                if if_exists == "pass":
                    return
                raise Conflict(f"Dataset {self.dataset_id} already exists") from e

        # Make prod dataset public
        self.publicize(dataset_is_public=dataset_is_public)

    def delete(self, mode="all"):
        """Deletes dataset in BigQuery. Toogle mode to choose which dataset to delete.

        Args:
            mode (str): Optional.  Which dataset to delete [prod|staging|all]
        """

        for m in self._loop_modes(mode):

            m["client"].delete_dataset(m["id"], delete_contents=True, not_found_ok=True)
        logger.info(
            " {object} {object_id}_{mode} was {action}!",
            object_id=self.dataset_id,
            mode=mode,
            object="Dataset",
            action="deleted",
        )

    def update(self, mode="all", location=None):
        """Update dataset description. Toogle mode to choose which dataset to update.

        Args:
            mode (str): Optional. Which dataset to update [prod|staging|all]
            location (str): Optional. Location of dataset data.
                List of possible region names locations: https://cloud.google.com/bigquery/docs/locations

        """

        for m in self._loop_modes(mode):

            # Send the dataset to the API to update, with an explicit timeout.
            # Raises google.api_core.exceptions.Conflict if the Dataset already
            # exists within the project.
            m["client"].update_dataset(
                self._setup_dataset_object(
                    m["id"],
                    location=location,
                ),
                fields=["description"],
            )  # Make an API request.

        logger.success(
            " {object} {object_id}_{mode} was {action}!",
            object_id=self.dataset_id,
            mode=mode,
            object="Dataset",
            action="updated",
        )

dataset_config property readonly

Dataset config file.

create(self, mode='all', if_exists='raise', dataset_is_public=True, location=None)

Creates BigQuery datasets given dataset_id.

It can create two datasets:

  • <dataset_id> (mode = 'prod')
  • <dataset_id>_staging (mode = 'staging')

If mode is all, it creates both.

Parameters:

Name Type Description Default
mode str

Optional. Which dataset to create [prod|staging|all].

'all'
if_exists str

Optional. What to do if dataset exists

  • raise : Raises Conflict exception
  • replace : Drop all tables and replace dataset
  • update : Update dataset description
  • pass : Do nothing
'raise'
dataset_is_public bool

Control if prod dataset is public or not. By default staging datasets like dataset_id_staging are not public.

True
location str

Optional. Location of dataset data. List of possible region names locations: https://cloud.google.com/bigquery/docs/locations

None

Exceptions:

Type Description
Warning

Dataset already exists and if_exists is set to raise

Source code in basedosdados/upload/dataset.py
def create(
    self, mode="all", if_exists="raise", dataset_is_public=True, location=None
):
    """Creates BigQuery datasets given `dataset_id`.

    It can create two datasets:

    * `<dataset_id>` (mode = 'prod')
    * `<dataset_id>_staging` (mode = 'staging')

    If `mode` is all, it creates both.

    Args:
        mode (str): Optional. Which dataset to create [prod|staging|all].
        if_exists (str): Optional. What to do if dataset exists

            * raise : Raises Conflict exception
            * replace : Drop all tables and replace dataset
            * update : Update dataset description
            * pass : Do nothing

        dataset_is_public (bool): Control if prod dataset is public or not. By default staging datasets like `dataset_id_staging` are not public.

        location (str): Optional. Location of dataset data.
            List of possible region names locations: https://cloud.google.com/bigquery/docs/locations


    Raises:
        Warning: Dataset already exists and if_exists is set to `raise`
    """

    if if_exists == "replace":
        self.delete(mode)
    elif if_exists == "update":

        self.update()
        return

    # Set dataset_id to the ID of the dataset to create.
    for m in self._loop_modes(mode):

        # Construct a full Dataset object to send to the API.
        dataset_obj = self._setup_dataset_object(m["id"], location=location)

        # Send the dataset to the API for creation, with an explicit timeout.
        # Raises google.api_core.exceptions.Conflict if the Dataset already
        # exists within the project.
        try:
            m["client"].create_dataset(dataset_obj)  # Make an API request.
            logger.success(
                " {object} {object_id}_{mode} was {action}!",
                object_id=self.dataset_id,
                mode=mode,
                object="Dataset",
                action="created",
            )

        except Conflict as e:
            if if_exists == "pass":
                return
            raise Conflict(f"Dataset {self.dataset_id} already exists") from e

    # Make prod dataset public
    self.publicize(dataset_is_public=dataset_is_public)

delete(self, mode='all')

Deletes dataset in BigQuery. Toogle mode to choose which dataset to delete.

Parameters:

Name Type Description Default
mode str

Optional. Which dataset to delete [prod|staging|all]

'all'
Source code in basedosdados/upload/dataset.py
def delete(self, mode="all"):
    """Deletes dataset in BigQuery. Toogle mode to choose which dataset to delete.

    Args:
        mode (str): Optional.  Which dataset to delete [prod|staging|all]
    """

    for m in self._loop_modes(mode):

        m["client"].delete_dataset(m["id"], delete_contents=True, not_found_ok=True)
    logger.info(
        " {object} {object_id}_{mode} was {action}!",
        object_id=self.dataset_id,
        mode=mode,
        object="Dataset",
        action="deleted",
    )

init(self, replace=False)

Initialize dataset folder at metadata_path at metadata_path/<dataset_id>.

The folder should contain:

  • dataset_config.yaml
  • README.md

Parameters:

Name Type Description Default
replace str

Optional. Whether to replace existing folder.

False

Exceptions:

Type Description
FileExistsError

If dataset folder already exists and replace is False

Source code in basedosdados/upload/dataset.py
def init(self, replace=False):
    """Initialize dataset folder at metadata_path at `metadata_path/<dataset_id>`.

    The folder should contain:

    * `dataset_config.yaml`
    * `README.md`

    Args:
        replace (str): Optional. Whether to replace existing folder.

    Raises:
        FileExistsError: If dataset folder already exists and replace is False
    """

    # Create dataset folder
    try:
        self.dataset_folder.mkdir(exist_ok=replace, parents=True)
    except FileExistsError as e:
        raise FileExistsError(
            f"Dataset {str(self.dataset_folder.stem)} folder does not exists. "
            "Set replace=True to replace current files."
        ) from e

    # create dataset_config.yaml with metadata
    self.metadata.create(if_exists="replace")

    # create README.md file
    self._write_readme_file()

    # Add code folder
    (self.dataset_folder / "code").mkdir(exist_ok=replace, parents=True)

    return self

publicize(self, mode='all', dataset_is_public=True)

Changes IAM configuration to turn BigQuery dataset public.

Parameters:

Name Type Description Default
mode bool

Which dataset to create [prod|staging|all].

'all'
dataset_is_public bool

Control if prod dataset is public or not. By default staging datasets like dataset_id_staging are not public.

True
Source code in basedosdados/upload/dataset.py
def publicize(self, mode="all", dataset_is_public=True):
    """Changes IAM configuration to turn BigQuery dataset public.

    Args:
        mode (bool): Which dataset to create [prod|staging|all].
        dataset_is_public (bool): Control if prod dataset is public or not. By default staging datasets like `dataset_id_staging` are not public.
    """

    for m in self._loop_modes(mode):

        dataset = m["client"].get_dataset(m["id"])
        entries = dataset.access_entries
        # TODO https://github.com/basedosdados/mais/pull/1020
        # TODO if staging dataset is private, the prod view can't acess it: if dataset_is_public and "staging" not in dataset.dataset_id:
        if dataset_is_public:
            if "staging" not in dataset.dataset_id:
                entries.extend(
                    [
                        bigquery.AccessEntry(
                            role="roles/bigquery.dataViewer",
                            entity_type="iamMember",
                            entity_id="allUsers",
                        ),
                        bigquery.AccessEntry(
                            role="roles/bigquery.metadataViewer",
                            entity_type="iamMember",
                            entity_id="allUsers",
                        ),
                        bigquery.AccessEntry(
                            role="roles/bigquery.user",
                            entity_type="iamMember",
                            entity_id="allUsers",
                        ),
                    ]
                )
            else:
                entries.extend(
                    [
                        bigquery.AccessEntry(
                            role="roles/bigquery.dataViewer",
                            entity_type="iamMember",
                            entity_id="allUsers",
                        ),
                    ]
                )
            dataset.access_entries = entries
        m["client"].update_dataset(dataset, ["access_entries"])
    logger.success(
        " {object} {object_id}_{mode} was {action}!",
        object_id=self.dataset_id,
        mode=mode,
        object="Dataset",
        action="publicized",
    )

update(self, mode='all', location=None)

Update dataset description. Toogle mode to choose which dataset to update.

Parameters:

Name Type Description Default
mode str

Optional. Which dataset to update [prod|staging|all]

'all'
location str

Optional. Location of dataset data. List of possible region names locations: https://cloud.google.com/bigquery/docs/locations

None
Source code in basedosdados/upload/dataset.py
def update(self, mode="all", location=None):
    """Update dataset description. Toogle mode to choose which dataset to update.

    Args:
        mode (str): Optional. Which dataset to update [prod|staging|all]
        location (str): Optional. Location of dataset data.
            List of possible region names locations: https://cloud.google.com/bigquery/docs/locations

    """

    for m in self._loop_modes(mode):

        # Send the dataset to the API to update, with an explicit timeout.
        # Raises google.api_core.exceptions.Conflict if the Dataset already
        # exists within the project.
        m["client"].update_dataset(
            self._setup_dataset_object(
                m["id"],
                location=location,
            ),
            fields=["description"],
        )  # Make an API request.

    logger.success(
        " {object} {object_id}_{mode} was {action}!",
        object_id=self.dataset_id,
        mode=mode,
        object="Dataset",
        action="updated",
    )

Class for manage tables in Storage and Big Query

Table (Base)

Manage tables in Google Cloud Storage and BigQuery.

Source code in basedosdados/upload/table.py
class Table(Base):
    """
    Manage tables in Google Cloud Storage and BigQuery.
    """

    def __init__(self, dataset_id, table_id, **kwargs):
        super().__init__(**kwargs)

        self.table_id = table_id.replace("-", "_")
        self.dataset_id = dataset_id.replace("-", "_")
        self.dataset_folder = Path(self.metadata_path / self.dataset_id)
        self.table_folder = self.dataset_folder / table_id
        self.table_full_name = dict(
            prod=f"{self.client['bigquery_prod'].project}.{self.dataset_id}.{self.table_id}",
            staging=f"{self.client['bigquery_staging'].project}.{self.dataset_id}_staging.{self.table_id}",
        )
        self.table_full_name.update(dict(all=deepcopy(self.table_full_name)))
        self.metadata = Metadata(self.dataset_id, self.table_id, **kwargs)

    @property
    def table_config(self):
        """
        Load table_config.yaml
        """
        return self._load_yaml(self.table_folder / "table_config.yaml")

    def _get_table_obj(self, mode):
        """
        Get table object from BigQuery
        """
        return self.client[f"bigquery_{mode}"].get_table(self.table_full_name[mode])

    def _is_partitioned(self):
        """
        Check if table is partitioned
        """
        ## check if the table are partitioned, need the split because of a change in the type of partitions in pydantic
        partitions = self.table_config["partitions"]
        if partitions is None or len(partitions) == 0:
            return False

        if isinstance(partitions, list):
            # check if any None inside list.
            # False if it is the case Ex: [None, 'partition']
            # True otherwise          Ex: ['partition1', 'partition2']
            return all(item is not None for item in partitions)

        raise ValueError("Partitions must be a list or None")

    def _load_schema(self, mode="staging"):
        """Load schema from table_config.yaml

        Args:
            mode (bool): Which dataset to create [prod|staging].
        """

        self._check_mode(mode)

        json_path = self.table_folder / f"schema-{mode}.json"
        columns = self.table_config["columns"]

        if mode == "staging":
            new_columns = []
            for c in columns:
                # case is_in_staging are None then must be True
                is_in_staging = (
                    True if c.get("is_in_staging") is None else c["is_in_staging"]
                )
                # append columns declared in table_config.yaml to schema only if is_in_staging: True
                if is_in_staging and not c.get("is_partition"):
                    c["type"] = "STRING"
                    new_columns.append(c)

            del columns
            columns = new_columns

        elif mode == "prod":
            schema = self._get_table_obj(mode).schema

            # get field names for fields at schema and at table_config.yaml
            column_names = [c["name"] for c in columns]
            schema_names = [s.name for s in schema]

            # check if there are mismatched fields
            not_in_columns = [name for name in schema_names if name not in column_names]
            not_in_schema = [name for name in column_names if name not in schema_names]

            # raise if field is not in table_config
            if not_in_columns:
                raise BaseDosDadosException(
                    "Column {error_columns} was not found in table_config.yaml. Are you sure that "
                    "all your column names between table_config.yaml, publish.sql and "
                    "{project_id}.{dataset_id}.{table_id} are the same?".format(
                        error_columns=not_in_columns,
                        project_id=self.table_config["project_id_prod"],
                        dataset_id=self.table_config["dataset_id"],
                        table_id=self.table_config["table_id"],
                    )
                )

            # raise if field is not in schema
            if not_in_schema:
                raise BaseDosDadosException(
                    "Column {error_columns} was not found in publish.sql. Are you sure that "
                    "all your column names between table_config.yaml, publish.sql and "
                    "{project_id}.{dataset_id}.{table_id} are the same?".format(
                        error_columns=not_in_schema,
                        project_id=self.table_config["project_id_prod"],
                        dataset_id=self.table_config["dataset_id"],
                        table_id=self.table_config["table_id"],
                    )
                )

            # if field is in schema, get field_type and field_mode
            for c in columns:
                for s in schema:
                    if c["name"] == s.name:
                        c["type"] = s.field_type
                        c["mode"] = s.mode
                        break
        ## force utf-8, write schema_{mode}.json
        json.dump(columns, (json_path).open("w", encoding="utf-8"))

        # load new created schema
        return self.client[f"bigquery_{mode}"].schema_from_json(str(json_path))

    def _make_publish_sql(self):
        """Create publish.sql with columns and bigquery_type"""

        ### publish.sql header and instructions
        publish_txt = """
        /*
        Query para publicar a tabela.

        Esse é o lugar para:
            - modificar nomes, ordem e tipos de colunas
            - dar join com outras tabelas
            - criar colunas extras (e.g. logs, proporções, etc.)

        Qualquer coluna definida aqui deve também existir em `table_config.yaml`.

        # Além disso, sinta-se à vontade para alterar alguns nomes obscuros
        # para algo um pouco mais explícito.

        TIPOS:
            - Para modificar tipos de colunas, basta substituir STRING por outro tipo válido.
            - Exemplo: `SAFE_CAST(column_name AS NUMERIC) column_name`
            - Mais detalhes: https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types
        */
        """

        # remove triple quotes extra space
        publish_txt = inspect.cleandoc(publish_txt)
        publish_txt = textwrap.dedent(publish_txt)

        # add create table statement
        project_id_prod = self.client["bigquery_prod"].project
        publish_txt += f"\n\nCREATE VIEW {project_id_prod}.{self.dataset_id}.{self.table_id} AS\nSELECT \n"

        # sort columns by is_partition, partitions_columns come first

        if self._is_partitioned():
            columns = sorted(
                self.table_config["columns"],
                key=lambda k: (k["is_partition"] is not None, k["is_partition"]),
                reverse=True,
            )
        else:
            columns = self.table_config["columns"]

        # add columns in publish.sql
        for col in columns:
            name = col["name"]
            bigquery_type = (
                "STRING"
                if col["bigquery_type"] is None
                else col["bigquery_type"].upper()
            )

            publish_txt += f"SAFE_CAST({name} AS {bigquery_type}) {name},\n"
        ## remove last comma
        publish_txt = publish_txt[:-2] + "\n"

        # add from statement
        project_id_staging = self.client["bigquery_staging"].project
        publish_txt += (
            f"FROM {project_id_staging}.{self.dataset_id}_staging.{self.table_id} AS t"
        )

        # save publish.sql in table_folder
        (self.table_folder / "publish.sql").open("w", encoding="utf-8").write(
            publish_txt
        )

    def _make_template(self, columns, partition_columns, if_table_config_exists, force_columns):
        # create table_config.yaml with metadata
        self.metadata.create(
            if_exists=if_table_config_exists,
            columns=partition_columns + columns,
            partition_columns=partition_columns,
            force_columns=force_columns,
            table_only=False,
        )

        self._make_publish_sql()

    @staticmethod
    def _sheet_to_df(columns_config_url_or_path):
        """
        Convert sheet to dataframe
        """
        url = columns_config_url_or_path.replace("edit#gid=", "export?format=csv&gid=")
        try:
            return pd.read_csv(StringIO(requests.get(url, timeout=10).content.decode("utf-8")))
        except Exception as e:
            raise BaseDosDadosException(
                "Check if your google sheet Share are: Anyone on the internet with this link can view"
            ) from e

    def table_exists(self, mode):
        """Check if table exists in BigQuery.

        Args:
            mode (str): Which dataset to check [prod|staging].
        """

        try:
            ref = self._get_table_obj(mode=mode)
        except google.api_core.exceptions.NotFound:
            ref = None

        return bool(ref)

    def update_columns(self, columns_config_url_or_path=None):
        """
        Fills columns in table_config.yaml automatically using a public google sheets URL or a local file. Also regenerate
        publish.sql and autofill type using bigquery_type.

        The sheet must contain the columns:
            - name: column name
            - description: column description
            - bigquery_type: column bigquery type
            - measurement_unit: column mesurement unit
            - covered_by_dictionary: column related dictionary
            - directory_column: column related directory in the format <dataset_id>.<table_id>:<column_name>
            - temporal_coverage: column temporal coverage
            - has_sensitive_data: the column has sensitive data
            - observations: column observations
        Args:
            columns_config_url_or_path (str): Path to the local architeture file or a public google sheets URL.
                Path only suports csv, xls, xlsx, xlsm, xlsb, odf, ods, odt formats.
                Google sheets URL must be in the format https://docs.google.com/spreadsheets/d/<table_key>/edit#gid=<table_gid>.

        """
        ruamel = ryaml.YAML()
        ruamel.preserve_quotes = True
        ruamel.indent(mapping=4, sequence=6, offset=4)
        table_config_yaml = ruamel.load(
            (self.table_folder / "table_config.yaml").open(encoding="utf-8")
        )

        if "https://docs.google.com/spreadsheets/d/" in columns_config_url_or_path:
            if (
                "edit#gid=" not in columns_config_url_or_path
                or "https://docs.google.com/spreadsheets/d/"
                not in columns_config_url_or_path
                or not columns_config_url_or_path.split("=")[1].isdigit()
            ):
                raise BaseDosDadosException(
                    "The Google sheet url not in correct format."
                    "The url must be in the format https://docs.google.com/spreadsheets/d/<table_key>/edit#gid=<table_gid>"
                )
            df = self._sheet_to_df(columns_config_url_or_path)
        else:
            file_type = columns_config_url_or_path.split(".")[-1]
            if file_type == "csv":
                df = pd.read_csv(columns_config_url_or_path, encoding="utf-8")
            elif file_type in ["xls", "xlsx", "xlsm", "xlsb", "odf", "ods", "odt"]:
                df = pd.read_excel(columns_config_url_or_path)
            else:
                raise BaseDosDadosException(
                    "File not suported. Only csv, xls, xlsx, xlsm, xlsb, odf, ods, odt are supported."
                )

        df = df.fillna("NULL")

        required_columns = [
            "name",
            "bigquery_type",
            "description",
            "temporal_coverage",
            "covered_by_dictionary",
            "directory_column",
            "measurement_unit",
            "has_sensitive_data",
            "observations",
        ]

        not_found_columns = required_columns.copy()
        for sheet_column in df.columns.tolist():
            for required_column in required_columns:
                if sheet_column == required_column:
                    not_found_columns.remove(required_column)
        if not_found_columns:
            raise BaseDosDadosException(
                f"The following required columns are not found: {', '.join(not_found_columns)}."
            )

        columns_parameters = zip(
            *[df[required_column].tolist() for required_column in required_columns]
        )
        for (
            name,
            bigquery_type,
            description,
            temporal_coverage,
            covered_by_dictionary,
            directory_column,
            measurement_unit,
            has_sensitive_data,
            observations,
        ) in columns_parameters:
            for col in table_config_yaml["columns"]:
                if col["name"] == name:
                    col["bigquery_type"] = (
                        col["bigquery_type"]
                        if bigquery_type == "NULL"
                        else bigquery_type.lower()
                    )

                    col["description"] = (
                        col["description"] if description == "NULL" else description
                    )

                    col["temporal_coverage"] = (
                        col["temporal_coverage"]
                        if temporal_coverage == "NULL"
                        else [temporal_coverage]
                    )

                    col["covered_by_dictionary"] = (
                        "no"
                        if covered_by_dictionary == "NULL"
                        else covered_by_dictionary
                    )

                    dataset = directory_column.split(".")[0]
                    col["directory_column"]["dataset_id"] = (
                        col["directory_column"]["dataset_id"]
                        if dataset == "NULL"
                        else dataset
                    )

                    table = directory_column.split(".")[-1].split(":")[0]
                    col["directory_column"]["table_id"] = (
                        col["directory_column"]["table_id"]
                        if table == "NULL"
                        else table
                    )

                    column = directory_column.split(".")[-1].split(":")[-1]
                    col["directory_column"]["column_name"] = (
                        col["directory_column"]["column_name"]
                        if column == "NULL"
                        else column
                    )
                    col["measurement_unit"] = (
                        col["measurement_unit"]
                        if measurement_unit == "NULL"
                        else measurement_unit
                    )

                    col["has_sensitive_data"] = (
                        "no" if has_sensitive_data == "NULL" else has_sensitive_data
                    )

                    col["observations"] = (
                        col["observations"] if observations == "NULL" else observations
                    )

        with open(self.table_folder / "table_config.yaml", "w", encoding="utf-8") as f:
            ruamel.dump(table_config_yaml, f)

        # regenerate publish.sql
        self._make_publish_sql()

    def init(
        self,
        data_sample_path=None,
        if_folder_exists="raise",
        if_table_config_exists="raise",
        source_format="csv",
        force_columns = False,
        columns_config_url_or_path=None,
    ):  # sourcery skip: low-code-quality
        """Initialize table folder at metadata_path at `metadata_path/<dataset_id>/<table_id>`.

        The folder should contain:

        * `table_config.yaml`
        * `publish.sql`

        You can also point to a sample of the data to auto complete columns names.

        Args:
            data_sample_path (str, pathlib.PosixPath): Optional.
                Data sample path to auto complete columns names
                It supports Comma Delimited CSV, Apache Avro and
                Apache Parquet.
            if_folder_exists (str): Optional.
                What to do if table folder exists

                * 'raise' : Raises FileExistsError
                * 'replace' : Replace folder
                * 'pass' : Do nothing
            if_table_config_exists (str): Optional
                What to do if table_config.yaml and publish.sql exists

                * 'raise' : Raises FileExistsError
                * 'replace' : Replace files with blank template
                * 'pass' : Do nothing
            source_format (str): Optional
                Data source format. Only 'csv', 'avro' and 'parquet'
                are supported. Defaults to 'csv'.
            force_columns (bool): Optional.
                If set to `True`, overwrite CKAN's columns with the ones provi
                ded.
                If set to `False`, keep CKAN's columns instead of the ones pro
                vided.
            columns_config_url_or_path (str): Path to the local architeture file or a public google sheets URL.
                Path only suports csv, xls, xlsx, xlsm, xlsb, odf, ods, odt formats.
                Google sheets URL must be in the format https://docs.google.com/spreadsheets/d/<table_key>/edit#gid=<table_gid>.

        Raises:
            FileExistsError: If folder exists and replace is False.
            NotImplementedError: If data sample is not in supported type or format.
        """
        if not self.dataset_folder.exists():

            raise FileExistsError(
                f"Dataset folder {self.dataset_folder} folder does not exists. "
                "Create a dataset before adding tables."
            )

        try:
            self.table_folder.mkdir(exist_ok=(if_folder_exists == "replace"))
        except FileExistsError as e:
            if if_folder_exists == "raise":
                raise FileExistsError(
                    f"Table folder already exists for {self.table_id}. "
                ) from e
            if if_folder_exists == "pass":
                return self

        if not data_sample_path and if_table_config_exists != "pass":
            raise BaseDosDadosException(
                "You must provide a path to correctly create config files"
            )

        partition_columns = []
        if isinstance(
            data_sample_path,
            (
                str,
                Path,
            ),
        ):
            # Check if partitioned and get data sample and partition columns
            data_sample_path = Path(data_sample_path)

            if data_sample_path.is_dir():

                data_sample_path = [
                    f
                    for f in data_sample_path.glob("**/*")
                    if f.is_file() and f.suffix == f".{source_format}"
                ][0]

                partition_columns = [
                    k.split("=")[0]
                    for k in data_sample_path.as_posix().split("/")
                    if "=" in k
                ]

            columns = Datatype(self, source_format).header(data_sample_path)

        else:

            columns = ["column_name"]

        if if_table_config_exists == "pass":
            # Check if config files exists before passing
            if (
                Path(self.table_folder / "table_config.yaml").is_file()
                and Path(self.table_folder / "publish.sql").is_file()
            ):
                pass
            # Raise if no sample to determine columns
            elif not data_sample_path:
                raise BaseDosDadosException(
                    "You must provide a path to correctly create config files"
                )
            else:
                self._make_template(columns, partition_columns, if_table_config_exists, force_columns=force_columns)

        elif if_table_config_exists == "raise":

            # Check if config files already exist
            if (
                Path(self.table_folder / "table_config.yaml").is_file()
                and Path(self.table_folder / "publish.sql").is_file()
            ):

                raise FileExistsError(
                    f"table_config.yaml and publish.sql already exists at {self.table_folder}"
                )
            # if config files don't exist, create them
            self._make_template(columns, partition_columns, if_table_config_exists, force_columns=force_columns)

        else:
            # Raise: without a path to data sample, should not replace config files with empty template
            self._make_template(columns, partition_columns, if_table_config_exists, force_columns=force_columns)

        if columns_config_url_or_path is not None:
            self.update_columns(columns_config_url_or_path)

        return self

    def create(
        self,
        path=None,
        force_dataset=True,
        if_table_exists="raise",
        if_storage_data_exists="raise",
        if_table_config_exists="raise",
        source_format="csv",
        force_columns=False,
        columns_config_url_or_path=None,
        dataset_is_public=True,
        location=None,
        chunk_size=None,
    ):
        """Creates BigQuery table at staging dataset.

        If you add a path, it automatically saves the data in the storage,
        creates a datasets folder and BigQuery location, besides creating the
        table and its configuration files.

        The new table should be located at `<dataset_id>_staging.<table_id>` in BigQuery.

        It looks for data saved in Storage at `<bucket_name>/staging/<dataset_id>/<table_id>/*`
        and builds the table.

        It currently supports the types:

        - Comma Delimited CSV
        - Apache Avro
        - Apache Parquet

        Data can also be partitioned following the hive partitioning scheme
        `<key1>=<value1>/<key2>=<value2>` - for instance,
        `year=2012/country=BR`. The partition is automatcally detected
        by searching for `partitions` on the `table_config.yaml`.

        Args:
            path (str or pathlib.PosixPath): Where to find the file that you want to upload to create a table with
            job_config_params (dict): Optional.
                Job configuration params from bigquery
            if_table_exists (str): Optional
                What to do if table exists

                * 'raise' : Raises Conflict exception
                * 'replace' : Replace table
                * 'pass' : Do nothing
            force_dataset (bool): Creates `<dataset_id>` folder and BigQuery Dataset if it doesn't exists.
            if_table_config_exists (str): Optional.
                What to do if config files already exist

                 * 'raise': Raises FileExistError
                 * 'replace': Replace with blank template
                 * 'pass'; Do nothing
            if_storage_data_exists (str): Optional.
                What to do if data already exists on your bucket:

                * 'raise' : Raises Conflict exception
                * 'replace' : Replace table
                * 'pass' : Do nothing
            source_format (str): Optional
                Data source format. Only 'csv', 'avro' and 'parquet'
                are supported. Defaults to 'csv'.
            force_columns (bool): Optional.
                If set to `True`, overwrite CKAN's columns with the ones provi
                ded.
                If set to `False`, keep CKAN's columns instead of the ones pro
                vided.
            columns_config_url_or_path (str): Path to the local architeture file or a public google sheets URL.
                Path only suports csv, xls, xlsx, xlsm, xlsb, odf, ods, odt formats.
                Google sheets URL must be in the format https://docs.google.com/spreadsheets/d/<table_key>/edit#gid=<table_gid>.

            dataset_is_public (bool): Control if prod dataset is public or not. By default staging datasets like `dataset_id_staging` are not public.

            location (str): Optional. Location of dataset data.
                List of possible region names locations: https://cloud.google.com/bigquery/docs/locations

            chunk_size (int): Optional
                The size of a chunk of data whenever iterating (in bytes).
                This must be a multiple of 256 KB per the API specification.
                If not specified, the chunk_size of the blob itself is used. If that is not specified, a default value of 40 MB is used.
        """

        if path is None:

            # Look if table data already exists at Storage
            data = self.client["storage_staging"].list_blobs(
                self.bucket_name, prefix=f"staging/{self.dataset_id}/{self.table_id}"
            )

            # Raise: Cannot create table without external data
            if not data:
                raise BaseDosDadosException(
                    "You must provide a path for uploading data"
                )

        # Add data to storage
        if isinstance(
            path,
            (
                str,
                Path,
            ),
        ):

            Storage(self.dataset_id, self.table_id, **self.main_vars).upload(
                path,
                mode="staging",
                if_exists=if_storage_data_exists,
                chunk_size=chunk_size,
            )

        # Create Dataset if it doesn't exist
        if force_dataset:

            dataset_obj = Dataset(self.dataset_id, **self.main_vars)

            try:
                dataset_obj.init()
            except FileExistsError:
                pass

            dataset_obj.create(
                if_exists="pass", location=location, dataset_is_public=dataset_is_public
            )

        self.init(
            data_sample_path=path,
            if_folder_exists="replace",
            if_table_config_exists=if_table_config_exists,
            columns_config_url_or_path=columns_config_url_or_path,
            source_format=source_format,
            force_columns=force_columns
        )

        table = bigquery.Table(self.table_full_name["staging"])
        table.external_data_configuration = Datatype(
            self, source_format, "staging", partitioned=self._is_partitioned()
        ).external_config

        # Lookup if table alreay exists
        table_ref = None
        try:
            table_ref = self.client["bigquery_staging"].get_table(
                self.table_full_name["staging"]
            )

        except google.api_core.exceptions.NotFound:
            pass

        if isinstance(table_ref, google.cloud.bigquery.table.Table):

            if if_table_exists == "pass":

                return None

            if if_table_exists == "raise":

                raise FileExistsError(
                    "Table already exists, choose replace if you want to overwrite it"
                )

        if if_table_exists == "replace":

            self.delete(mode="staging")

        self.client["bigquery_staging"].create_table(table)

        logger.success(
            "{object} {object_id} was {action}!",
            object_id=self.table_id,
            object="Table",
            action="created",
        )
        return None

    def update(self, mode="all"):
        """Updates BigQuery schema and description.
        Args:
            mode (str): Optional.
                Table of which table to update [prod|staging|all]
            not_found_ok (bool): Optional.
                What to do if table is not found
        """

        self._check_mode(mode)

        mode = ["prod", "staging"] if mode == "all" else [mode]
        for m in mode:

            try:
                table = self._get_table_obj(m)
            except google.api_core.exceptions.NotFound:
                continue

            # if m == "staging":

            table.description = self._render_template(
                Path("table/table_description.txt"), self.table_config
            )

            # save table description
            with open(
                self.metadata_path
                / self.dataset_id
                / self.table_id
                / "table_description.txt",
                "w",
                encoding="utf-8",
            ) as f:
                f.write(table.description)

            # when mode is staging the table schema already exists
            table.schema = self._load_schema(m)
            fields = ["description", "schema"] if m == "prod" else ["description"]
            self.client[f"bigquery_{m}"].update_table(table, fields=fields)

        logger.success(
            " {object} {object_id} was {action}!",
            object_id=self.table_id,
            object="Table",
            action="updated",
        )

    def publish(self, if_exists="raise"):
        """Creates BigQuery table at production dataset.

        Table should be located at `<dataset_id>.<table_id>`.

        It creates a view that uses the query from
        `<metadata_path>/<dataset_id>/<table_id>/publish.sql`.

        Make sure that all columns from the query also exists at
        `<metadata_path>/<dataset_id>/<table_id>/table_config.sql`, including
        the partitions.

        Args:
            if_exists (str): Optional.
                What to do if table exists.

                * 'raise' : Raises Conflict exception
                * 'replace' : Replace table
                * 'pass' : Do nothing

        Todo:

            * Check if all required fields are filled
        """

        if if_exists == "replace":
            self.delete(mode="prod")

        self.client["bigquery_prod"].query(
            (self.table_folder / "publish.sql").open("r", encoding="utf-8").read()
        ).result()

        self.update()
        logger.success(
            " {object} {object_id} was {action}!",
            object_id=self.table_id,
            object="Table",
            action="published",
        )

    def delete(self, mode):
        """Deletes table in BigQuery.

        Args:
            mode (str): Table of which table to delete [prod|staging]
        """

        self._check_mode(mode)

        if mode == "all":
            for m, n in self.table_full_name[mode].items():
                self.client[f"bigquery_{m}"].delete_table(n, not_found_ok=True)
            logger.info(
                " {object} {object_id}_{mode} was {action}!",
                object_id=self.table_id,
                mode=mode,
                object="Table",
                action="deleted",
            )
        else:
            self.client[f"bigquery_{mode}"].delete_table(
                self.table_full_name[mode], not_found_ok=True
            )

            logger.info(
                " {object} {object_id}_{mode} was {action}!",
                object_id=self.table_id,
                mode=mode,
                object="Table",
                action="deleted",
            )

    def append(
        self,
        filepath,
        partitions=None,
        if_exists="replace",
        chunk_size=None,
        **upload_args,
    ):
        """Appends new data to existing BigQuery table.

        As long as the data has the same schema. It appends the data in the
        filepath to the existing table.

        Args:
            filepath (str or pathlib.PosixPath): Where to find the file that you want to upload to create a table with
            partitions (str, pathlib.PosixPath, dict): Optional.
                Hive structured partition as a string or dict

                * str : `<key>=<value>/<key2>=<value2>`
                * dict: `dict(key=value, key2=value2)`
            if_exists (str): 0ptional.
                What to do if data with same name exists in storage

                * 'raise' : Raises Conflict exception
                * 'replace' : Replace table
                * 'pass' : Do nothing
            chunk_size (int): Optional
                The size of a chunk of data whenever iterating (in bytes).
                This must be a multiple of 256 KB per the API specification.
                If not specified, the chunk_size of the blob itself is used. If that is not specified, a default value of 40 MB is used.
        """
        if not self.table_exists("staging"):
            raise BaseDosDadosException(
                "You cannot append to a table that does not exist"
            )
        Storage(self.dataset_id, self.table_id, **self.main_vars).upload(
            filepath,
            mode="staging",
            partitions=partitions,
            if_exists=if_exists,
            chunk_size=chunk_size,
            **upload_args,
        )
        logger.success(
            " {object} {object_id} was {action}!",
            object_id=self.table_id,
            object="Table",
            action="appended",
        )

table_config property readonly

Load table_config.yaml

append(self, filepath, partitions=None, if_exists='replace', chunk_size=None, **upload_args)

Appends new data to existing BigQuery table.

As long as the data has the same schema. It appends the data in the filepath to the existing table.

Parameters:

Name Type Description Default
filepath str or pathlib.PosixPath

Where to find the file that you want to upload to create a table with

required
partitions str, pathlib.PosixPath, dict

Optional. Hive structured partition as a string or dict

  • str : <key>=<value>/<key2>=<value2>
  • dict: dict(key=value, key2=value2)
None
if_exists str

0ptional. What to do if data with same name exists in storage

  • 'raise' : Raises Conflict exception
  • 'replace' : Replace table
  • 'pass' : Do nothing
'replace'
chunk_size int

Optional The size of a chunk of data whenever iterating (in bytes). This must be a multiple of 256 KB per the API specification. If not specified, the chunk_size of the blob itself is used. If that is not specified, a default value of 40 MB is used.

None
Source code in basedosdados/upload/table.py
def append(
    self,
    filepath,
    partitions=None,
    if_exists="replace",
    chunk_size=None,
    **upload_args,
):
    """Appends new data to existing BigQuery table.

    As long as the data has the same schema. It appends the data in the
    filepath to the existing table.

    Args:
        filepath (str or pathlib.PosixPath): Where to find the file that you want to upload to create a table with
        partitions (str, pathlib.PosixPath, dict): Optional.
            Hive structured partition as a string or dict

            * str : `<key>=<value>/<key2>=<value2>`
            * dict: `dict(key=value, key2=value2)`
        if_exists (str): 0ptional.
            What to do if data with same name exists in storage

            * 'raise' : Raises Conflict exception
            * 'replace' : Replace table
            * 'pass' : Do nothing
        chunk_size (int): Optional
            The size of a chunk of data whenever iterating (in bytes).
            This must be a multiple of 256 KB per the API specification.
            If not specified, the chunk_size of the blob itself is used. If that is not specified, a default value of 40 MB is used.
    """
    if not self.table_exists("staging"):
        raise BaseDosDadosException(
            "You cannot append to a table that does not exist"
        )
    Storage(self.dataset_id, self.table_id, **self.main_vars).upload(
        filepath,
        mode="staging",
        partitions=partitions,
        if_exists=if_exists,
        chunk_size=chunk_size,
        **upload_args,
    )
    logger.success(
        " {object} {object_id} was {action}!",
        object_id=self.table_id,
        object="Table",
        action="appended",
    )

create(self, path=None, force_dataset=True, if_table_exists='raise', if_storage_data_exists='raise', if_table_config_exists='raise', source_format='csv', force_columns=False, columns_config_url_or_path=None, dataset_is_public=True, location=None, chunk_size=None)

Creates BigQuery table at staging dataset.

If you add a path, it automatically saves the data in the storage, creates a datasets folder and BigQuery location, besides creating the table and its configuration files.

The new table should be located at <dataset_id>_staging.<table_id> in BigQuery.

It looks for data saved in Storage at <bucket_name>/staging/<dataset_id>/<table_id>/* and builds the table.

It currently supports the types:

  • Comma Delimited CSV
  • Apache Avro
  • Apache Parquet

Data can also be partitioned following the hive partitioning scheme <key1>=<value1>/<key2>=<value2> - for instance, year=2012/country=BR. The partition is automatcally detected by searching for partitions on the table_config.yaml.

Parameters:

Name Type Description Default
path str or pathlib.PosixPath

Where to find the file that you want to upload to create a table with

None
job_config_params dict

Optional. Job configuration params from bigquery

required
if_table_exists str

Optional What to do if table exists

  • 'raise' : Raises Conflict exception
  • 'replace' : Replace table
  • 'pass' : Do nothing
'raise'
force_dataset bool

Creates <dataset_id> folder and BigQuery Dataset if it doesn't exists.

True
if_table_config_exists str

Optional. What to do if config files already exist

  • 'raise': Raises FileExistError
  • 'replace': Replace with blank template
  • 'pass'; Do nothing
'raise'
if_storage_data_exists str

Optional. What to do if data already exists on your bucket:

  • 'raise' : Raises Conflict exception
  • 'replace' : Replace table
  • 'pass' : Do nothing
'raise'
source_format str

Optional Data source format. Only 'csv', 'avro' and 'parquet' are supported. Defaults to 'csv'.

'csv'
force_columns bool

Optional. If set to True, overwrite CKAN's columns with the ones provi ded. If set to False, keep CKAN's columns instead of the ones pro vided.

False
columns_config_url_or_path str

Path to the local architeture file or a public google sheets URL. Path only suports csv, xls, xlsx, xlsm, xlsb, odf, ods, odt formats. Google sheets URL must be in the format https://docs.google.com/spreadsheets/d//edit#gid=.

None
dataset_is_public bool

Control if prod dataset is public or not. By default staging datasets like dataset_id_staging are not public.

True
location str

Optional. Location of dataset data. List of possible region names locations: https://cloud.google.com/bigquery/docs/locations

None
chunk_size int

Optional The size of a chunk of data whenever iterating (in bytes). This must be a multiple of 256 KB per the API specification. If not specified, the chunk_size of the blob itself is used. If that is not specified, a default value of 40 MB is used.

None
Source code in basedosdados/upload/table.py
def create(
    self,
    path=None,
    force_dataset=True,
    if_table_exists="raise",
    if_storage_data_exists="raise",
    if_table_config_exists="raise",
    source_format="csv",
    force_columns=False,
    columns_config_url_or_path=None,
    dataset_is_public=True,
    location=None,
    chunk_size=None,
):
    """Creates BigQuery table at staging dataset.

    If you add a path, it automatically saves the data in the storage,
    creates a datasets folder and BigQuery location, besides creating the
    table and its configuration files.

    The new table should be located at `<dataset_id>_staging.<table_id>` in BigQuery.

    It looks for data saved in Storage at `<bucket_name>/staging/<dataset_id>/<table_id>/*`
    and builds the table.

    It currently supports the types:

    - Comma Delimited CSV
    - Apache Avro
    - Apache Parquet

    Data can also be partitioned following the hive partitioning scheme
    `<key1>=<value1>/<key2>=<value2>` - for instance,
    `year=2012/country=BR`. The partition is automatcally detected
    by searching for `partitions` on the `table_config.yaml`.

    Args:
        path (str or pathlib.PosixPath): Where to find the file that you want to upload to create a table with
        job_config_params (dict): Optional.
            Job configuration params from bigquery
        if_table_exists (str): Optional
            What to do if table exists

            * 'raise' : Raises Conflict exception
            * 'replace' : Replace table
            * 'pass' : Do nothing
        force_dataset (bool): Creates `<dataset_id>` folder and BigQuery Dataset if it doesn't exists.
        if_table_config_exists (str): Optional.
            What to do if config files already exist

             * 'raise': Raises FileExistError
             * 'replace': Replace with blank template
             * 'pass'; Do nothing
        if_storage_data_exists (str): Optional.
            What to do if data already exists on your bucket:

            * 'raise' : Raises Conflict exception
            * 'replace' : Replace table
            * 'pass' : Do nothing
        source_format (str): Optional
            Data source format. Only 'csv', 'avro' and 'parquet'
            are supported. Defaults to 'csv'.
        force_columns (bool): Optional.
            If set to `True`, overwrite CKAN's columns with the ones provi
            ded.
            If set to `False`, keep CKAN's columns instead of the ones pro
            vided.
        columns_config_url_or_path (str): Path to the local architeture file or a public google sheets URL.
            Path only suports csv, xls, xlsx, xlsm, xlsb, odf, ods, odt formats.
            Google sheets URL must be in the format https://docs.google.com/spreadsheets/d/<table_key>/edit#gid=<table_gid>.

        dataset_is_public (bool): Control if prod dataset is public or not. By default staging datasets like `dataset_id_staging` are not public.

        location (str): Optional. Location of dataset data.
            List of possible region names locations: https://cloud.google.com/bigquery/docs/locations

        chunk_size (int): Optional
            The size of a chunk of data whenever iterating (in bytes).
            This must be a multiple of 256 KB per the API specification.
            If not specified, the chunk_size of the blob itself is used. If that is not specified, a default value of 40 MB is used.
    """

    if path is None:

        # Look if table data already exists at Storage
        data = self.client["storage_staging"].list_blobs(
            self.bucket_name, prefix=f"staging/{self.dataset_id}/{self.table_id}"
        )

        # Raise: Cannot create table without external data
        if not data:
            raise BaseDosDadosException(
                "You must provide a path for uploading data"
            )

    # Add data to storage
    if isinstance(
        path,
        (
            str,
            Path,
        ),
    ):

        Storage(self.dataset_id, self.table_id, **self.main_vars).upload(
            path,
            mode="staging",
            if_exists=if_storage_data_exists,
            chunk_size=chunk_size,
        )

    # Create Dataset if it doesn't exist
    if force_dataset:

        dataset_obj = Dataset(self.dataset_id, **self.main_vars)

        try:
            dataset_obj.init()
        except FileExistsError:
            pass

        dataset_obj.create(
            if_exists="pass", location=location, dataset_is_public=dataset_is_public
        )

    self.init(
        data_sample_path=path,
        if_folder_exists="replace",
        if_table_config_exists=if_table_config_exists,
        columns_config_url_or_path=columns_config_url_or_path,
        source_format=source_format,
        force_columns=force_columns
    )

    table = bigquery.Table(self.table_full_name["staging"])
    table.external_data_configuration = Datatype(
        self, source_format, "staging", partitioned=self._is_partitioned()
    ).external_config

    # Lookup if table alreay exists
    table_ref = None
    try:
        table_ref = self.client["bigquery_staging"].get_table(
            self.table_full_name["staging"]
        )

    except google.api_core.exceptions.NotFound:
        pass

    if isinstance(table_ref, google.cloud.bigquery.table.Table):

        if if_table_exists == "pass":

            return None

        if if_table_exists == "raise":

            raise FileExistsError(
                "Table already exists, choose replace if you want to overwrite it"
            )

    if if_table_exists == "replace":

        self.delete(mode="staging")

    self.client["bigquery_staging"].create_table(table)

    logger.success(
        "{object} {object_id} was {action}!",
        object_id=self.table_id,
        object="Table",
        action="created",
    )
    return None

delete(self, mode)

Deletes table in BigQuery.

Parameters:

Name Type Description Default
mode str

Table of which table to delete [prod|staging]

required
Source code in basedosdados/upload/table.py
def delete(self, mode):
    """Deletes table in BigQuery.

    Args:
        mode (str): Table of which table to delete [prod|staging]
    """

    self._check_mode(mode)

    if mode == "all":
        for m, n in self.table_full_name[mode].items():
            self.client[f"bigquery_{m}"].delete_table(n, not_found_ok=True)
        logger.info(
            " {object} {object_id}_{mode} was {action}!",
            object_id=self.table_id,
            mode=mode,
            object="Table",
            action="deleted",
        )
    else:
        self.client[f"bigquery_{mode}"].delete_table(
            self.table_full_name[mode], not_found_ok=True
        )

        logger.info(
            " {object} {object_id}_{mode} was {action}!",
            object_id=self.table_id,
            mode=mode,
            object="Table",
            action="deleted",
        )

init(self, data_sample_path=None, if_folder_exists='raise', if_table_config_exists='raise', source_format='csv', force_columns=False, columns_config_url_or_path=None)

Initialize table folder at metadata_path at metadata_path/<dataset_id>/<table_id>.

The folder should contain:

  • table_config.yaml
  • publish.sql

You can also point to a sample of the data to auto complete columns names.

Parameters:

Name Type Description Default
data_sample_path str, pathlib.PosixPath

Optional. Data sample path to auto complete columns names It supports Comma Delimited CSV, Apache Avro and Apache Parquet.

None
if_folder_exists str

Optional. What to do if table folder exists

  • 'raise' : Raises FileExistsError
  • 'replace' : Replace folder
  • 'pass' : Do nothing
'raise'
if_table_config_exists str

Optional What to do if table_config.yaml and publish.sql exists

  • 'raise' : Raises FileExistsError
  • 'replace' : Replace files with blank template
  • 'pass' : Do nothing
'raise'
source_format str

Optional Data source format. Only 'csv', 'avro' and 'parquet' are supported. Defaults to 'csv'.

'csv'
force_columns bool

Optional. If set to True, overwrite CKAN's columns with the ones provi ded. If set to False, keep CKAN's columns instead of the ones pro vided.

False
columns_config_url_or_path str

Path to the local architeture file or a public google sheets URL. Path only suports csv, xls, xlsx, xlsm, xlsb, odf, ods, odt formats. Google sheets URL must be in the format https://docs.google.com/spreadsheets/d//edit#gid=.

None

Exceptions:

Type Description
FileExistsError

If folder exists and replace is False.

NotImplementedError

If data sample is not in supported type or format.

Source code in basedosdados/upload/table.py
def init(
    self,
    data_sample_path=None,
    if_folder_exists="raise",
    if_table_config_exists="raise",
    source_format="csv",
    force_columns = False,
    columns_config_url_or_path=None,
):  # sourcery skip: low-code-quality
    """Initialize table folder at metadata_path at `metadata_path/<dataset_id>/<table_id>`.

    The folder should contain:

    * `table_config.yaml`
    * `publish.sql`

    You can also point to a sample of the data to auto complete columns names.

    Args:
        data_sample_path (str, pathlib.PosixPath): Optional.
            Data sample path to auto complete columns names
            It supports Comma Delimited CSV, Apache Avro and
            Apache Parquet.
        if_folder_exists (str): Optional.
            What to do if table folder exists

            * 'raise' : Raises FileExistsError
            * 'replace' : Replace folder
            * 'pass' : Do nothing
        if_table_config_exists (str): Optional
            What to do if table_config.yaml and publish.sql exists

            * 'raise' : Raises FileExistsError
            * 'replace' : Replace files with blank template
            * 'pass' : Do nothing
        source_format (str): Optional
            Data source format. Only 'csv', 'avro' and 'parquet'
            are supported. Defaults to 'csv'.
        force_columns (bool): Optional.
            If set to `True`, overwrite CKAN's columns with the ones provi
            ded.
            If set to `False`, keep CKAN's columns instead of the ones pro
            vided.
        columns_config_url_or_path (str): Path to the local architeture file or a public google sheets URL.
            Path only suports csv, xls, xlsx, xlsm, xlsb, odf, ods, odt formats.
            Google sheets URL must be in the format https://docs.google.com/spreadsheets/d/<table_key>/edit#gid=<table_gid>.

    Raises:
        FileExistsError: If folder exists and replace is False.
        NotImplementedError: If data sample is not in supported type or format.
    """
    if not self.dataset_folder.exists():

        raise FileExistsError(
            f"Dataset folder {self.dataset_folder} folder does not exists. "
            "Create a dataset before adding tables."
        )

    try:
        self.table_folder.mkdir(exist_ok=(if_folder_exists == "replace"))
    except FileExistsError as e:
        if if_folder_exists == "raise":
            raise FileExistsError(
                f"Table folder already exists for {self.table_id}. "
            ) from e
        if if_folder_exists == "pass":
            return self

    if not data_sample_path and if_table_config_exists != "pass":
        raise BaseDosDadosException(
            "You must provide a path to correctly create config files"
        )

    partition_columns = []
    if isinstance(
        data_sample_path,
        (
            str,
            Path,
        ),
    ):
        # Check if partitioned and get data sample and partition columns
        data_sample_path = Path(data_sample_path)

        if data_sample_path.is_dir():

            data_sample_path = [
                f
                for f in data_sample_path.glob("**/*")
                if f.is_file() and f.suffix == f".{source_format}"
            ][0]

            partition_columns = [
                k.split("=")[0]
                for k in data_sample_path.as_posix().split("/")
                if "=" in k
            ]

        columns = Datatype(self, source_format).header(data_sample_path)

    else:

        columns = ["column_name"]

    if if_table_config_exists == "pass":
        # Check if config files exists before passing
        if (
            Path(self.table_folder / "table_config.yaml").is_file()
            and Path(self.table_folder / "publish.sql").is_file()
        ):
            pass
        # Raise if no sample to determine columns
        elif not data_sample_path:
            raise BaseDosDadosException(
                "You must provide a path to correctly create config files"
            )
        else:
            self._make_template(columns, partition_columns, if_table_config_exists, force_columns=force_columns)

    elif if_table_config_exists == "raise":

        # Check if config files already exist
        if (
            Path(self.table_folder / "table_config.yaml").is_file()
            and Path(self.table_folder / "publish.sql").is_file()
        ):

            raise FileExistsError(
                f"table_config.yaml and publish.sql already exists at {self.table_folder}"
            )
        # if config files don't exist, create them
        self._make_template(columns, partition_columns, if_table_config_exists, force_columns=force_columns)

    else:
        # Raise: without a path to data sample, should not replace config files with empty template
        self._make_template(columns, partition_columns, if_table_config_exists, force_columns=force_columns)

    if columns_config_url_or_path is not None:
        self.update_columns(columns_config_url_or_path)

    return self

publish(self, if_exists='raise')

Creates BigQuery table at production dataset.

Table should be located at <dataset_id>.<table_id>.

It creates a view that uses the query from <metadata_path>/<dataset_id>/<table_id>/publish.sql.

Make sure that all columns from the query also exists at <metadata_path>/<dataset_id>/<table_id>/table_config.sql, including the partitions.

Parameters:

Name Type Description Default
if_exists str

Optional. What to do if table exists.

  • 'raise' : Raises Conflict exception
  • 'replace' : Replace table
  • 'pass' : Do nothing
'raise'

Todo:

* Check if all required fields are filled
Source code in basedosdados/upload/table.py
def publish(self, if_exists="raise"):
    """Creates BigQuery table at production dataset.

    Table should be located at `<dataset_id>.<table_id>`.

    It creates a view that uses the query from
    `<metadata_path>/<dataset_id>/<table_id>/publish.sql`.

    Make sure that all columns from the query also exists at
    `<metadata_path>/<dataset_id>/<table_id>/table_config.sql`, including
    the partitions.

    Args:
        if_exists (str): Optional.
            What to do if table exists.

            * 'raise' : Raises Conflict exception
            * 'replace' : Replace table
            * 'pass' : Do nothing

    Todo:

        * Check if all required fields are filled
    """

    if if_exists == "replace":
        self.delete(mode="prod")

    self.client["bigquery_prod"].query(
        (self.table_folder / "publish.sql").open("r", encoding="utf-8").read()
    ).result()

    self.update()
    logger.success(
        " {object} {object_id} was {action}!",
        object_id=self.table_id,
        object="Table",
        action="published",
    )

table_exists(self, mode)

Check if table exists in BigQuery.

Parameters:

Name Type Description Default
mode str

Which dataset to check [prod|staging].

required
Source code in basedosdados/upload/table.py
def table_exists(self, mode):
    """Check if table exists in BigQuery.

    Args:
        mode (str): Which dataset to check [prod|staging].
    """

    try:
        ref = self._get_table_obj(mode=mode)
    except google.api_core.exceptions.NotFound:
        ref = None

    return bool(ref)

update(self, mode='all')

Updates BigQuery schema and description.

Parameters:

Name Type Description Default
mode str

Optional. Table of which table to update [prod|staging|all]

'all'
not_found_ok bool

Optional. What to do if table is not found

required
Source code in basedosdados/upload/table.py
def update(self, mode="all"):
    """Updates BigQuery schema and description.
    Args:
        mode (str): Optional.
            Table of which table to update [prod|staging|all]
        not_found_ok (bool): Optional.
            What to do if table is not found
    """

    self._check_mode(mode)

    mode = ["prod", "staging"] if mode == "all" else [mode]
    for m in mode:

        try:
            table = self._get_table_obj(m)
        except google.api_core.exceptions.NotFound:
            continue

        # if m == "staging":

        table.description = self._render_template(
            Path("table/table_description.txt"), self.table_config
        )

        # save table description
        with open(
            self.metadata_path
            / self.dataset_id
            / self.table_id
            / "table_description.txt",
            "w",
            encoding="utf-8",
        ) as f:
            f.write(table.description)

        # when mode is staging the table schema already exists
        table.schema = self._load_schema(m)
        fields = ["description", "schema"] if m == "prod" else ["description"]
        self.client[f"bigquery_{m}"].update_table(table, fields=fields)

    logger.success(
        " {object} {object_id} was {action}!",
        object_id=self.table_id,
        object="Table",
        action="updated",
    )

update_columns(self, columns_config_url_or_path=None)

Fills columns in table_config.yaml automatically using a public google sheets URL or a local file. Also regenerate publish.sql and autofill type using bigquery_type.

The sheet must contain the columns: - name: column name - description: column description - bigquery_type: column bigquery type - measurement_unit: column mesurement unit - covered_by_dictionary: column related dictionary - directory_column: column related directory in the format .: - temporal_coverage: column temporal coverage - has_sensitive_data: the column has sensitive data - observations: column observations

Parameters:

Name Type Description Default
columns_config_url_or_path str

Path to the local architeture file or a public google sheets URL. Path only suports csv, xls, xlsx, xlsm, xlsb, odf, ods, odt formats. Google sheets URL must be in the format https://docs.google.com/spreadsheets/d//edit#gid=.

None
Source code in basedosdados/upload/table.py
def update_columns(self, columns_config_url_or_path=None):
    """
    Fills columns in table_config.yaml automatically using a public google sheets URL or a local file. Also regenerate
    publish.sql and autofill type using bigquery_type.

    The sheet must contain the columns:
        - name: column name
        - description: column description
        - bigquery_type: column bigquery type
        - measurement_unit: column mesurement unit
        - covered_by_dictionary: column related dictionary
        - directory_column: column related directory in the format <dataset_id>.<table_id>:<column_name>
        - temporal_coverage: column temporal coverage
        - has_sensitive_data: the column has sensitive data
        - observations: column observations
    Args:
        columns_config_url_or_path (str): Path to the local architeture file or a public google sheets URL.
            Path only suports csv, xls, xlsx, xlsm, xlsb, odf, ods, odt formats.
            Google sheets URL must be in the format https://docs.google.com/spreadsheets/d/<table_key>/edit#gid=<table_gid>.

    """
    ruamel = ryaml.YAML()
    ruamel.preserve_quotes = True
    ruamel.indent(mapping=4, sequence=6, offset=4)
    table_config_yaml = ruamel.load(
        (self.table_folder / "table_config.yaml").open(encoding="utf-8")
    )

    if "https://docs.google.com/spreadsheets/d/" in columns_config_url_or_path:
        if (
            "edit#gid=" not in columns_config_url_or_path
            or "https://docs.google.com/spreadsheets/d/"
            not in columns_config_url_or_path
            or not columns_config_url_or_path.split("=")[1].isdigit()
        ):
            raise BaseDosDadosException(
                "The Google sheet url not in correct format."
                "The url must be in the format https://docs.google.com/spreadsheets/d/<table_key>/edit#gid=<table_gid>"
            )
        df = self._sheet_to_df(columns_config_url_or_path)
    else:
        file_type = columns_config_url_or_path.split(".")[-1]
        if file_type == "csv":
            df = pd.read_csv(columns_config_url_or_path, encoding="utf-8")
        elif file_type in ["xls", "xlsx", "xlsm", "xlsb", "odf", "ods", "odt"]:
            df = pd.read_excel(columns_config_url_or_path)
        else:
            raise BaseDosDadosException(
                "File not suported. Only csv, xls, xlsx, xlsm, xlsb, odf, ods, odt are supported."
            )

    df = df.fillna("NULL")

    required_columns = [
        "name",
        "bigquery_type",
        "description",
        "temporal_coverage",
        "covered_by_dictionary",
        "directory_column",
        "measurement_unit",
        "has_sensitive_data",
        "observations",
    ]

    not_found_columns = required_columns.copy()
    for sheet_column in df.columns.tolist():
        for required_column in required_columns:
            if sheet_column == required_column:
                not_found_columns.remove(required_column)
    if not_found_columns:
        raise BaseDosDadosException(
            f"The following required columns are not found: {', '.join(not_found_columns)}."
        )

    columns_parameters = zip(
        *[df[required_column].tolist() for required_column in required_columns]
    )
    for (
        name,
        bigquery_type,
        description,
        temporal_coverage,
        covered_by_dictionary,
        directory_column,
        measurement_unit,
        has_sensitive_data,
        observations,
    ) in columns_parameters:
        for col in table_config_yaml["columns"]:
            if col["name"] == name:
                col["bigquery_type"] = (
                    col["bigquery_type"]
                    if bigquery_type == "NULL"
                    else bigquery_type.lower()
                )

                col["description"] = (
                    col["description"] if description == "NULL" else description
                )

                col["temporal_coverage"] = (
                    col["temporal_coverage"]
                    if temporal_coverage == "NULL"
                    else [temporal_coverage]
                )

                col["covered_by_dictionary"] = (
                    "no"
                    if covered_by_dictionary == "NULL"
                    else covered_by_dictionary
                )

                dataset = directory_column.split(".")[0]
                col["directory_column"]["dataset_id"] = (
                    col["directory_column"]["dataset_id"]
                    if dataset == "NULL"
                    else dataset
                )

                table = directory_column.split(".")[-1].split(":")[0]
                col["directory_column"]["table_id"] = (
                    col["directory_column"]["table_id"]
                    if table == "NULL"
                    else table
                )

                column = directory_column.split(".")[-1].split(":")[-1]
                col["directory_column"]["column_name"] = (
                    col["directory_column"]["column_name"]
                    if column == "NULL"
                    else column
                )
                col["measurement_unit"] = (
                    col["measurement_unit"]
                    if measurement_unit == "NULL"
                    else measurement_unit
                )

                col["has_sensitive_data"] = (
                    "no" if has_sensitive_data == "NULL" else has_sensitive_data
                )

                col["observations"] = (
                    col["observations"] if observations == "NULL" else observations
                )

    with open(self.table_folder / "table_config.yaml", "w", encoding="utf-8") as f:
        ruamel.dump(table_config_yaml, f)

    # regenerate publish.sql
    self._make_publish_sql()

Class to manage the metadata of datasets and tables

Metadata (Base)

Manage metadata in CKAN backend.

Source code in basedosdados/upload/metadata.py
class Metadata(Base):
    """
    Manage metadata in CKAN backend.
    """

    def __init__(self, dataset_id, table_id=None, **kwargs):
        super().__init__(**kwargs)

        self.table_id = table_id
        self.dataset_id = dataset_id

        if self.table_id:
            self.dataset_metadata_obj = Metadata(self.dataset_id, **kwargs)

        url = "https://basedosdados.org"
        self.CKAN_API_KEY = self.config.get("ckan", {}).get("api_key")
        self.CKAN_URL = self.config.get("ckan", {}).get("url", "") or url

    @property
    def filepath(self) -> str:
        """Build the dataset or table filepath"""

        filename = "dataset_config.yaml"
        if self.table_id:
            filename = f"{self.table_id}/table_config.yaml"
        return self.metadata_path / self.dataset_id / filename

    @property
    def local_metadata(self) -> dict:
        """Load dataset or table local metadata"""

        if self.filepath.exists():
            with open(self.filepath, "r", encoding="utf-8") as file:
                return ryaml.safe_load(file.read())
        return {}

    @property
    def ckan_metadata(self) -> dict:
        """Load dataset or table metadata from Base dos Dados CKAN"""

        ckan_dataset, ckan_table = self.ckan_metadata_extended
        return ckan_table or ckan_dataset

    @property
    def ckan_metadata_extended(self) -> dict:
        """Load dataset and table metadata from Base dos Dados CKAN"""

        dataset_id = self.dataset_id.replace("_", "-")
        url = f"{self.CKAN_URL}/api/3/action/package_show?id={dataset_id}"

        ckan_response = requests.get(url, timeout=10).json()
        dataset = ckan_response.get("result")

        if not ckan_response.get("success"):
            return {}, {}

        if self.table_id:
            for resource in dataset["resources"]:
                if resource["name"] == self.table_id:
                    return dataset, resource

        return dataset, {}

    @property
    def owner_org(self):
        """
        Build `owner_org` field for each use case: table, dataset, new
        or existing.
        """

        # in case `self` refers to a CKAN table's metadata
        if self.table_id and self.exists_in_ckan():
            return self.dataset_metadata_obj.ckan_metadata.get("owner_org")

        # in case `self` refers to a new table's metadata
        if self.table_id and not self.exists_in_ckan():
            if self.dataset_metadata_obj.exists_in_ckan():
                return self.dataset_metadata_obj.ckan_metadata.get("owner_org")
            # mock `owner_org` for validation
            return "3626e93d-165f-42b8-bde1-2e0972079694"

        # for datasets, `owner_org` must come from the YAML file
        organization_id = "".join(self.local_metadata.get("organization") or [])
        url = f"{self.CKAN_URL}/api/3/action/organization_show?id={organization_id}"
        response = requests.get(url, timeout=10).json()

        if not response.get("success"):
            raise BaseDosDadosException("Organization not found")

        owner_org = response.get("result", {}).get("id")

        return owner_org

    @property
    def ckan_data_dict(self) -> dict:
        """Helper function that structures local metadata for validation"""

        ckan_dataset, ckan_table = self.ckan_metadata_extended

        metadata = {
            "id": ckan_dataset.get("id"),
            "name": ckan_dataset.get("name") or self.dataset_id.replace("_", "-"),
            "type": ckan_dataset.get("type") or "dataset",
            "title": self.local_metadata.get("title"),
            "private": ckan_dataset.get("private") or False,
            "owner_org": self.owner_org,
            "resources": ckan_dataset.get("resources", [])
            or [{"resource_type": "external_link", "name": ""}]
            or [{"resource_type": "information_request", "name": ""}],
            "groups": [
                {"name": group} for group in self.local_metadata.get("groups", []) or []
            ],
            "tags": [
                {"name": tag} for tag in self.local_metadata.get("tags", []) or []
            ],
            "organization": {"name": self.local_metadata.get("organization")},
            "extras": [
                {
                    "key": "dataset_args",
                    "value": {
                        "short_description": self.local_metadata.get(
                            "short_description"
                        ),
                        "description": self.local_metadata.get("description"),
                        "ckan_url": self.local_metadata.get("ckan_url"),
                        "github_url": self.local_metadata.get("github_url"),
                    },
                }
            ],
        }

        if self.table_id:
            metadata["resources"] = [
                {
                    "id": ckan_table.get("id"),
                    "description": self.local_metadata.get("description"),
                    "name": self.local_metadata.get("table_id"),
                    "resource_type": ckan_table.get("resource_type") or "bdm_table",
                    "version": self.local_metadata.get("version"),
                    "dataset_id": self.local_metadata.get("dataset_id"),
                    "table_id": self.local_metadata.get("table_id"),
                    "spatial_coverage": self.local_metadata.get("spatial_coverage"),
                    "temporal_coverage": self.local_metadata.get("temporal_coverage"),
                    "update_frequency": self.local_metadata.get("update_frequency"),
                    "observation_level": self.local_metadata.get("observation_level"),
                    "last_updated": self.local_metadata.get("last_updated"),
                    "published_by": self.local_metadata.get("published_by"),
                    "data_cleaned_by": self.local_metadata.get("data_cleaned_by"),
                    "data_cleaning_description": self.local_metadata.get(
                        "data_cleaning_description"
                    ),
                    "data_cleaning_code_url": self.local_metadata.get(
                        "data_cleaning_code_url"
                    ),
                    "partner_organization": self.local_metadata.get(
                        "partner_organization"
                    ),
                    "raw_files_url": self.local_metadata.get("raw_files_url"),
                    "auxiliary_files_url": self.local_metadata.get(
                        "auxiliary_files_url"
                    ),
                    "architecture_url": self.local_metadata.get("architecture_url"),
                    "source_bucket_name": self.local_metadata.get("source_bucket_name"),
                    "project_id_prod": self.local_metadata.get("project_id_prod"),
                    "project_id_staging": self.local_metadata.get("project_id_staging"),
                    "partitions": self.local_metadata.get("partitions"),
                    "uncompressed_file_size": self.local_metadata.get(
                        "uncompressed_file_size"
                    ),
                    "compressed_file_size": self.local_metadata.get(
                        "compressed_file_size"
                    ),
                    "columns": self.local_metadata.get("columns"),
                    "metadata_modified": self.local_metadata.get("metadata_modified"),
                    "package_id": ckan_dataset.get("id"),
                }
            ]

        return metadata

    @property
    @lru_cache(256)
    def columns_schema(self) -> dict:
        """Returns a dictionary with the schema of the columns"""

        url = f"{self.CKAN_URL}/api/3/action/bd_bdm_columns_schema"

        return requests.get(url, timeout=10).json().get("result")

    @property
    @lru_cache(256)
    def metadata_schema(self) -> dict:
        """Get metadata schema from CKAN API endpoint"""

        if self.table_id:
            table_url = f"{self.CKAN_URL}/api/3/action/bd_bdm_table_schema"
            return requests.get(table_url, timeout=10).json().get("result")

        dataset_url = f"{self.CKAN_URL}/api/3/action/bd_dataset_schema"
        return requests.get(dataset_url, timeout=10).json().get("result")

    def exists_in_ckan(self) -> bool:
        """Check if Metadata object refers to an existing CKAN package or reso
        urce.

        Returns:
            bool: The existence condition of the metadata in CKAN. `True` if i
            t exists, `False` otherwise.
        """

        if self.table_id:
            url = f"{self.CKAN_URL}/api/3/action/bd_bdm_table_show?"
            url += f"dataset_id={self.dataset_id}&table_id={self.table_id}"
        else:
            id = self.dataset_id.replace("_", "-")
            # TODO: use `bd_bdm_dataset_show` when it's available for empty packages
            url = f"{self.CKAN_URL}/api/3/action/package_show?id={id}"

        exists_in_ckan = requests.get(url, timeout=10).json().get("success")

        return exists_in_ckan

    def is_updated(self) -> bool:
        """Check if a dataset or table is updated

        Returns:
            bool: The update condition of local metadata. `True` if it corresp
            onds to the most recent version of the given table or dataset's me
            tadata in CKAN, `False` otherwise.
        """

        if not self.local_metadata.get("metadata_modified"):
            return bool(not self.exists_in_ckan())
        ckan_modified = self.ckan_metadata.get("metadata_modified")
        local_modified = self.local_metadata.get("metadata_modified")
        return ckan_modified == local_modified

    def create(
        self,
        if_exists: str = "raise",
        columns: list = None,
        partition_columns: list = None,
        force_columns: bool = False,
        table_only: bool = True,
    ) -> Metadata:
        """Create metadata file based on the current version saved to CKAN database

        Args:
            if_exists (str): Optional. What to do if config exists
                * raise : Raises Conflict exception
                * replace : Replaces config file with most recent
                * pass : Do nothing
            columns (list): Optional.
                A `list` with the table columns' names.
            partition_columns(list): Optional.
                A `list` with the name of the table columns that partition the
                 data.
            force_columns (bool): Optional.
                If set to `True`, overwrite CKAN's columns with the ones provi
                ded.
                If set to `False`, keep CKAN's columns instead of the ones pro
                vided.
            table_only (bool): Optional. If set to `True`, only `table_config.
                yaml` is created, even if there is no `dataset_config.yaml` fo
                r the correspondent dataset metadata. If set to `False`, both
                files are created if `dataset_config.yaml` doesn't exist yet.
                Defaults to `True`.

        Returns:
            Metadata: An instance of the `Metadata` class.

        Raises:
            FileExistsError: If the correspodent YAML configuration file alrea
            dy exists and `if_exists` is set to `"raise"`.
        """

        # see: https://docs.python.org/3/reference/compound_stmts.html#function-definitions
        columns = [] if columns is None else columns
        partition_columns = [] if partition_columns is None else partition_columns

        if self.filepath.exists() and if_exists == "raise":
            raise FileExistsError(
                f"{self.filepath} already exists."
                + " Set the arg `if_exists` to `replace` to replace it."
            )
        if if_exists != "pass":
            ckan_metadata = self.ckan_metadata

            # Add local columns if
            # 1. columns is empty and
            # 2. force_columns is True

            # TODO: Is this sufficient to add columns?
            if self.table_id and (force_columns or not ckan_metadata.get("columns")):
                ckan_metadata["columns"] = [{"name": c} for c in columns]

            yaml_obj = build_yaml_object(
                dataset_id=self.dataset_id,
                table_id=self.table_id,
                config=self.config,
                schema=self.metadata_schema,
                metadata=ckan_metadata,
                columns_schema=self.columns_schema,
                partition_columns=partition_columns,
            )

            self.filepath.parent.mkdir(parents=True, exist_ok=True)

            with open(self.filepath, "w", encoding="utf-8") as file:
                ruamel = ryaml.YAML()
                ruamel.preserve_quotes = True
                ruamel.indent(mapping=4, sequence=6, offset=4)
                ruamel.dump(yaml_obj, file)

            # if `dataset_config.yaml` doesn't exist but user wants to create
            # it alongside `table_config.yaml`
            dataset_config_exists = (
                self.metadata_path / self.dataset_id / "dataset_config.yaml"
            ).exists()
            if self.table_id and not table_only and not dataset_config_exists:
                self.dataset_metadata_obj.create(if_exists=if_exists)

            logger.success(
                " {object} {object_id} was {action}!",
                object_id=self.table_id,
                object="Metadata",
                action="created",
            )

        return self

    def validate(self) -> bool:
        """Validate dataset_config.yaml or table_config.yaml files.
        The yaml file should be located at
        metadata_path/dataset_id[/table_id/],
        as defined in your config.toml

        Returns:
            bool:
                True if the metadata is valid. False if it is invalid.

        Raises:
            BaseDosDadosException:
                when the file has validation errors.
        """

        ckan = RemoteCKAN(self.CKAN_URL, user_agent="", apikey=None)
        response = ckan.action.bd_dataset_validate(**self.ckan_data_dict)

        if response.get("errors"):
            error = {self.ckan_data_dict.get("name"): response["errors"]}
            message = f"{self.filepath} has validation errors: {error}"
            raise BaseDosDadosException(message)

        logger.success(
            " {object} {object_id} was {action}!",
            object_id=self.table_id,
            object="Metadata",
            action="validated",
        )

        return True

    def publish(
        self,
        all: bool = False,
        if_exists: str = "raise",
        update_locally: bool = False,
    ) -> dict:
        """Publish local metadata modifications.
        `Metadata.validate` is used to make sure no local invalid metadata is
        published to CKAN. The `config.toml` `api_key` variable must be set
        at the `[ckan]` section for this method to work.

        Args:
            all (bool): Optional. If set to `True`, both `dataset_config.yaml`
                and `table_config.yaml` are published for the given dataset_id
                and table_id.
            if_exists (str): Optional. What to do if config exists
                * raise : Raises BaseDosDadosException if metadata already exi
                sts in CKAN
                * replace : Overwrite metadata in CKAN if it exists
                * pass : Do nothing
            update_locally (bool): Optional. If set to `True`, update the local
                metadata with the one published to CKAN.

        Returns:
            dict:
                In case of success, a `dict` with the modified data
                is returned.

        Raises:
            BaseDosDadosException:
                In case of CKAN's ValidationError or
                NotAuthorized exceptions.
        """

        # alert user if they don't have an api_key set up yet
        if not self.CKAN_API_KEY:
            raise BaseDosDadosException(
                "You can't use `Metadata.publish` without setting an `api_key`"
                "in your ~/.basedosdados/config.toml. Please set it like this:"
                '\n\n```\n[ckan]\nurl="<CKAN_URL>"\napi_key="<API_KEY>"\n```'
            )

        # check if metadata exists in CKAN and handle if_exists options
        if self.exists_in_ckan():
            if if_exists == "raise":
                raise BaseDosDadosException(
                    f"{self.dataset_id or self.table_id} already exists in CKAN."
                    f" Set the arg `if_exists` to `replace` to replace it."
                )
            if if_exists == "pass":
                return {}

        ckan = RemoteCKAN(self.CKAN_URL, user_agent="", apikey=self.CKAN_API_KEY)

        try:
            self.validate()

            assert self.is_updated(), (
                f"Could not publish metadata due to out-of-date config file. "
                f"Please run `basedosdados metadata create {self.dataset_id} "
                f"{self.table_id or ''}` to get the most recently updated met"
                f"adata and apply your changes to it."
            )

            data_dict = self.ckan_data_dict.copy()

            if self.table_id:

                # publish dataset metadata first if user wants to publish both
                if all:
                    self.dataset_metadata_obj.publish(if_exists=if_exists)

                data_dict = data_dict["resources"][0]

                published = ckan.call_action(
                    action="resource_patch"
                    if self.exists_in_ckan()
                    else "resource_create",
                    data_dict=data_dict,
                )

            else:
                data_dict["resources"] = []

                published = ckan.call_action(
                    action="package_patch"
                    if self.exists_in_ckan()
                    else "package_create",
                    data_dict=data_dict,
                )

            # recreate local metadata YAML file with the published data
            if published and update_locally:
                self.create(if_exists="replace")
                self.dataset_metadata_obj.create(if_exists="replace")

            logger.success(
                " {object} {object_id} was {action}!",
                object_id=data_dict,
                object="Metadata",
                action="published",
            )

            return published

        except (BaseDosDadosException, ValidationError) as e:
            message = (
                f"Could not publish metadata due to a validation error. Pleas"
                f"e see the traceback below to get information on how to corr"
                f"ect it.\n\n{repr(e)}"
            )
            raise BaseDosDadosException(message) from e

        except NotAuthorized as e:
            message = (
                "Could not publish metadata due to an authorization error. Pl"
                "ease check if you set the `api_key` at the `[ckan]` section "
                "of your ~/.basedosdados/config.toml correctly. You must be a"
                "n authorized user to publish modifications to a dataset or t"
                "able's metadata."
            )
            raise BaseDosDadosException(message) from e

ckan_data_dict: dict property readonly

Helper function that structures local metadata for validation

ckan_metadata: dict property readonly

Load dataset or table metadata from Base dos Dados CKAN

ckan_metadata_extended: dict property readonly

Load dataset and table metadata from Base dos Dados CKAN

columns_schema: dict property readonly

Returns a dictionary with the schema of the columns

filepath: str property readonly

Build the dataset or table filepath

local_metadata: dict property readonly

Load dataset or table local metadata

metadata_schema: dict property readonly

Get metadata schema from CKAN API endpoint

owner_org property readonly

Build owner_org field for each use case: table, dataset, new or existing.

create(self, if_exists='raise', columns=None, partition_columns=None, force_columns=False, table_only=True)

Create metadata file based on the current version saved to CKAN database

Parameters:

Name Type Description Default
if_exists str

Optional. What to do if config exists * raise : Raises Conflict exception * replace : Replaces config file with most recent * pass : Do nothing

'raise'
columns list

Optional. A list with the table columns' names.

None
partition_columns(list)

Optional. A list with the name of the table columns that partition the data.

required
force_columns bool

Optional. If set to True, overwrite CKAN's columns with the ones provi ded. If set to False, keep CKAN's columns instead of the ones pro vided.

False
table_only bool

Optional. If set to True, only table_config. yaml is created, even if there is no dataset_config.yaml fo r the correspondent dataset metadata. If set to False, both files are created if dataset_config.yaml doesn't exist yet. Defaults to True.

True

Returns:

Type Description
Metadata

An instance of the Metadata class.

Exceptions:

Type Description
FileExistsError

If the correspodent YAML configuration file alrea

Source code in basedosdados/upload/metadata.py
def create(
    self,
    if_exists: str = "raise",
    columns: list = None,
    partition_columns: list = None,
    force_columns: bool = False,
    table_only: bool = True,
) -> Metadata:
    """Create metadata file based on the current version saved to CKAN database

    Args:
        if_exists (str): Optional. What to do if config exists
            * raise : Raises Conflict exception
            * replace : Replaces config file with most recent
            * pass : Do nothing
        columns (list): Optional.
            A `list` with the table columns' names.
        partition_columns(list): Optional.
            A `list` with the name of the table columns that partition the
             data.
        force_columns (bool): Optional.
            If set to `True`, overwrite CKAN's columns with the ones provi
            ded.
            If set to `False`, keep CKAN's columns instead of the ones pro
            vided.
        table_only (bool): Optional. If set to `True`, only `table_config.
            yaml` is created, even if there is no `dataset_config.yaml` fo
            r the correspondent dataset metadata. If set to `False`, both
            files are created if `dataset_config.yaml` doesn't exist yet.
            Defaults to `True`.

    Returns:
        Metadata: An instance of the `Metadata` class.

    Raises:
        FileExistsError: If the correspodent YAML configuration file alrea
        dy exists and `if_exists` is set to `"raise"`.
    """

    # see: https://docs.python.org/3/reference/compound_stmts.html#function-definitions
    columns = [] if columns is None else columns
    partition_columns = [] if partition_columns is None else partition_columns

    if self.filepath.exists() and if_exists == "raise":
        raise FileExistsError(
            f"{self.filepath} already exists."
            + " Set the arg `if_exists` to `replace` to replace it."
        )
    if if_exists != "pass":
        ckan_metadata = self.ckan_metadata

        # Add local columns if
        # 1. columns is empty and
        # 2. force_columns is True

        # TODO: Is this sufficient to add columns?
        if self.table_id and (force_columns or not ckan_metadata.get("columns")):
            ckan_metadata["columns"] = [{"name": c} for c in columns]

        yaml_obj = build_yaml_object(
            dataset_id=self.dataset_id,
            table_id=self.table_id,
            config=self.config,
            schema=self.metadata_schema,
            metadata=ckan_metadata,
            columns_schema=self.columns_schema,
            partition_columns=partition_columns,
        )

        self.filepath.parent.mkdir(parents=True, exist_ok=True)

        with open(self.filepath, "w", encoding="utf-8") as file:
            ruamel = ryaml.YAML()
            ruamel.preserve_quotes = True
            ruamel.indent(mapping=4, sequence=6, offset=4)
            ruamel.dump(yaml_obj, file)

        # if `dataset_config.yaml` doesn't exist but user wants to create
        # it alongside `table_config.yaml`
        dataset_config_exists = (
            self.metadata_path / self.dataset_id / "dataset_config.yaml"
        ).exists()
        if self.table_id and not table_only and not dataset_config_exists:
            self.dataset_metadata_obj.create(if_exists=if_exists)

        logger.success(
            " {object} {object_id} was {action}!",
            object_id=self.table_id,
            object="Metadata",
            action="created",
        )

    return self

exists_in_ckan(self)

Check if Metadata object refers to an existing CKAN package or reso urce.

Returns:

Type Description
bool

The existence condition of the metadata in CKAN. True if i t exists, False otherwise.

Source code in basedosdados/upload/metadata.py
def exists_in_ckan(self) -> bool:
    """Check if Metadata object refers to an existing CKAN package or reso
    urce.

    Returns:
        bool: The existence condition of the metadata in CKAN. `True` if i
        t exists, `False` otherwise.
    """

    if self.table_id:
        url = f"{self.CKAN_URL}/api/3/action/bd_bdm_table_show?"
        url += f"dataset_id={self.dataset_id}&table_id={self.table_id}"
    else:
        id = self.dataset_id.replace("_", "-")
        # TODO: use `bd_bdm_dataset_show` when it's available for empty packages
        url = f"{self.CKAN_URL}/api/3/action/package_show?id={id}"

    exists_in_ckan = requests.get(url, timeout=10).json().get("success")

    return exists_in_ckan

is_updated(self)

Check if a dataset or table is updated

Returns:

Type Description
bool

The update condition of local metadata. True if it corresp onds to the most recent version of the given table or dataset's me tadata in CKAN, False otherwise.

Source code in basedosdados/upload/metadata.py
def is_updated(self) -> bool:
    """Check if a dataset or table is updated

    Returns:
        bool: The update condition of local metadata. `True` if it corresp
        onds to the most recent version of the given table or dataset's me
        tadata in CKAN, `False` otherwise.
    """

    if not self.local_metadata.get("metadata_modified"):
        return bool(not self.exists_in_ckan())
    ckan_modified = self.ckan_metadata.get("metadata_modified")
    local_modified = self.local_metadata.get("metadata_modified")
    return ckan_modified == local_modified

publish(self, all=False, if_exists='raise', update_locally=False)

Publish local metadata modifications. Metadata.validate is used to make sure no local invalid metadata is published to CKAN. The config.toml api_key variable must be set at the [ckan] section for this method to work.

Parameters:

Name Type Description Default
all bool

Optional. If set to True, both dataset_config.yaml and table_config.yaml are published for the given dataset_id and table_id.

False
if_exists str

Optional. What to do if config exists * raise : Raises BaseDosDadosException if metadata already exi sts in CKAN * replace : Overwrite metadata in CKAN if it exists * pass : Do nothing

'raise'
update_locally bool

Optional. If set to True, update the local metadata with the one published to CKAN.

False

Returns:

Type Description
dict

In case of success, a dict with the modified data is returned.

Source code in basedosdados/upload/metadata.py
def publish(
    self,
    all: bool = False,
    if_exists: str = "raise",
    update_locally: bool = False,
) -> dict:
    """Publish local metadata modifications.
    `Metadata.validate` is used to make sure no local invalid metadata is
    published to CKAN. The `config.toml` `api_key` variable must be set
    at the `[ckan]` section for this method to work.

    Args:
        all (bool): Optional. If set to `True`, both `dataset_config.yaml`
            and `table_config.yaml` are published for the given dataset_id
            and table_id.
        if_exists (str): Optional. What to do if config exists
            * raise : Raises BaseDosDadosException if metadata already exi
            sts in CKAN
            * replace : Overwrite metadata in CKAN if it exists
            * pass : Do nothing
        update_locally (bool): Optional. If set to `True`, update the local
            metadata with the one published to CKAN.

    Returns:
        dict:
            In case of success, a `dict` with the modified data
            is returned.

    Raises:
        BaseDosDadosException:
            In case of CKAN's ValidationError or
            NotAuthorized exceptions.
    """

    # alert user if they don't have an api_key set up yet
    if not self.CKAN_API_KEY:
        raise BaseDosDadosException(
            "You can't use `Metadata.publish` without setting an `api_key`"
            "in your ~/.basedosdados/config.toml. Please set it like this:"
            '\n\n```\n[ckan]\nurl="<CKAN_URL>"\napi_key="<API_KEY>"\n```'
        )

    # check if metadata exists in CKAN and handle if_exists options
    if self.exists_in_ckan():
        if if_exists == "raise":
            raise BaseDosDadosException(
                f"{self.dataset_id or self.table_id} already exists in CKAN."
                f" Set the arg `if_exists` to `replace` to replace it."
            )
        if if_exists == "pass":
            return {}

    ckan = RemoteCKAN(self.CKAN_URL, user_agent="", apikey=self.CKAN_API_KEY)

    try:
        self.validate()

        assert self.is_updated(), (
            f"Could not publish metadata due to out-of-date config file. "
            f"Please run `basedosdados metadata create {self.dataset_id} "
            f"{self.table_id or ''}` to get the most recently updated met"
            f"adata and apply your changes to it."
        )

        data_dict = self.ckan_data_dict.copy()

        if self.table_id:

            # publish dataset metadata first if user wants to publish both
            if all:
                self.dataset_metadata_obj.publish(if_exists=if_exists)

            data_dict = data_dict["resources"][0]

            published = ckan.call_action(
                action="resource_patch"
                if self.exists_in_ckan()
                else "resource_create",
                data_dict=data_dict,
            )

        else:
            data_dict["resources"] = []

            published = ckan.call_action(
                action="package_patch"
                if self.exists_in_ckan()
                else "package_create",
                data_dict=data_dict,
            )

        # recreate local metadata YAML file with the published data
        if published and update_locally:
            self.create(if_exists="replace")
            self.dataset_metadata_obj.create(if_exists="replace")

        logger.success(
            " {object} {object_id} was {action}!",
            object_id=data_dict,
            object="Metadata",
            action="published",
        )

        return published

    except (BaseDosDadosException, ValidationError) as e:
        message = (
            f"Could not publish metadata due to a validation error. Pleas"
            f"e see the traceback below to get information on how to corr"
            f"ect it.\n\n{repr(e)}"
        )
        raise BaseDosDadosException(message) from e

    except NotAuthorized as e:
        message = (
            "Could not publish metadata due to an authorization error. Pl"
            "ease check if you set the `api_key` at the `[ckan]` section "
            "of your ~/.basedosdados/config.toml correctly. You must be a"
            "n authorized user to publish modifications to a dataset or t"
            "able's metadata."
        )
        raise BaseDosDadosException(message) from e

validate(self)

Validate dataset_config.yaml or table_config.yaml files. The yaml file should be located at metadata_path/dataset_id[/table_id/], as defined in your config.toml

Returns:

Type Description
bool

True if the metadata is valid. False if it is invalid.

Source code in basedosdados/upload/metadata.py
def validate(self) -> bool:
    """Validate dataset_config.yaml or table_config.yaml files.
    The yaml file should be located at
    metadata_path/dataset_id[/table_id/],
    as defined in your config.toml

    Returns:
        bool:
            True if the metadata is valid. False if it is invalid.

    Raises:
        BaseDosDadosException:
            when the file has validation errors.
    """

    ckan = RemoteCKAN(self.CKAN_URL, user_agent="", apikey=None)
    response = ckan.action.bd_dataset_validate(**self.ckan_data_dict)

    if response.get("errors"):
        error = {self.ckan_data_dict.get("name"): response["errors"]}
        message = f"{self.filepath} has validation errors: {error}"
        raise BaseDosDadosException(message)

    logger.success(
        " {object} {object_id} was {action}!",
        object_id=self.table_id,
        object="Metadata",
        action="validated",
    )

    return True

add_yaml_property(yaml, properties=None, definitions=None, metadata=None, goal=None, has_column=False)

Recursivelly adds properties to yaml to maintain order.

Parameters:

Name Type Description Default
yaml CommentedMap

A YAML object with complex fields.

required
properties dict

A dictionary that contains the description of the c omplex field.

None
definitions dict

A dictionary with the schemas of each complex fiel d.

None
metadata dict

A dictionary with the metadata to fill the YAML.

None
goal str

The next key to be added to the YAML.

None
has_column bool

If the goal is a column, no comments are written.

False
Source code in basedosdados/upload/metadata.py
def add_yaml_property(
    yaml: CommentedMap,
    properties: dict = None,
    definitions: dict = None,
    metadata: dict = None,
    goal=None,
    has_column=False,
):
    """Recursivelly adds properties to yaml to maintain order.

    Args:
        yaml (CommentedMap): A YAML object with complex fields.
        properties (dict): A dictionary that contains the description of the c
            omplex field.
        definitions (dict): A dictionary with the schemas of each complex fiel
            d.
        metadata (dict): A dictionary with the metadata to fill the YAML.
        goal (str): The next key to be added to the YAML.
        has_column (bool): If the goal is a column, no comments are written.
    """

    # see: https://docs.python.org/3/reference/compound_stmts.html#function-definitions
    properties = {} if properties is None else properties
    definitions = {} if definitions is None else definitions
    metadata = {} if metadata is None else metadata

    # Looks for the key
    # If goal is none has to look for id_before == None
    for key, property in properties.items():
        goal_was_reached = key == goal
        goal_was_reached |= property["yaml_order"]["id_before"] is None

        if goal_was_reached:
            if "allOf" in property:
                yaml = handle_complex_fields(
                    yaml_obj=yaml,
                    k=key,
                    properties=properties,
                    definitions=definitions,
                    data=metadata,
                )

                if yaml[key] == ordereddict():
                    yaml[key] = handle_data(k=key, data=metadata)
            else:
                yaml[key] = handle_data(k=key, data=metadata)

            # Add comments
            comment = None
            if not has_column:
                description = properties[key].get("description", [])
                comment = "\n" + "".join(description)
            yaml.yaml_set_comment_before_after_key(key, before=comment)
            break

    # Return a ruaml object when property doesn't point to any other property
    id_after = properties[key]["yaml_order"]["id_after"]

    if id_after is None:
        return yaml
    if id_after not in properties.keys():
        raise BaseDosDadosException(
            f"Inconsistent YAML ordering: {id_after} is pointed to by {key}"
            f" but doesn't have itself a `yaml_order` field in the JSON S"
            f"chema."
        )
    updated_props = deepcopy(properties)
    updated_props.pop(key)
    return add_yaml_property(
        yaml=yaml,
        properties=updated_props,
        definitions=definitions,
        metadata=metadata,
        goal=id_after,
        has_column=has_column,
    )

build_yaml_object(dataset_id, table_id, config, schema, metadata=None, columns_schema=None, partition_columns=None)

Build a dataset_config.yaml or table_config.yaml

Parameters:

Name Type Description Default
dataset_id str

The dataset id.

required
table_id str

The table id.

required
config dict

A dict with the basedosdados client configurations.

required
schema dict

A dict with the JSON Schema of the dataset or table.

required
metadata dict

A dict with the metadata of the dataset or table.

None
columns_schema dict

A dict with the JSON Schema of the columns of the table.

None
partition_columns list

A list with the partition columns of the table.

None

Returns:

Type Description
CommentedMap

A YAML object with the dataset or table metadata.

Source code in basedosdados/upload/metadata.py
def build_yaml_object(
    dataset_id: str,
    table_id: str,
    config: dict,
    schema: dict,
    metadata: dict = None,
    columns_schema: dict = None,
    partition_columns: list = None,
):
    """Build a dataset_config.yaml or table_config.yaml

    Args:
        dataset_id (str): The dataset id.
        table_id (str): The table id.
        config (dict): A dict with the `basedosdados` client configurations.
        schema (dict): A dict with the JSON Schema of the dataset or table.
        metadata (dict): A dict with the metadata of the dataset or table.
        columns_schema (dict): A dict with the JSON Schema of the columns of
            the table.
        partition_columns (list): A list with the partition columns of the
            table.

    Returns:
        CommentedMap: A YAML object with the dataset or table metadata.
    """

    # see: https://docs.python.org/3/reference/compound_stmts.html#function-definitions
    metadata = {} if metadata is None else metadata
    columns_schema = {} if columns_schema is None else columns_schema
    partition_columns = [] if partition_columns is None else partition_columns

    properties: dict = schema["properties"]
    definitions: dict = schema["definitions"]

    # Drop all properties without yaml_order
    properties = {
        key: value for key, value in properties.items() if value.get("yaml_order")
    }

    # Add properties
    yaml = add_yaml_property(
        yaml=ryaml.CommentedMap(),
        properties=properties,
        definitions=definitions,
        metadata=metadata,
    )

    # Add columns
    if metadata.get("columns"):
        yaml["columns"] = []
        for metadatum in metadata.get("columns"):
            properties = add_yaml_property(
                yaml=ryaml.CommentedMap(),
                properties=columns_schema["properties"],
                definitions=columns_schema["definitions"],
                metadata=metadatum,
                has_column=True,
            )
            yaml["columns"].append(properties)

    # Add partitions in case of new dataset/talbe or local overwriting
    if partition_columns and partition_columns != ["[]"]:
        yaml["partitions"] = ""
        for local_column in partition_columns:
            for remote_column in yaml["columns"]:
                if remote_column["name"] == local_column:
                    remote_column["is_partition"] = True
        yaml["partitions"] = partition_columns

    # Nullify `partitions` field in case of other-than-None empty values
    if yaml.get("partitions") == "":
        yaml["partitions"] = None

    if table_id:
        # Add dataset_id and table_id
        yaml["dataset_id"] = dataset_id
        yaml["table_id"] = table_id

        # Add gcloud config variables
        yaml["source_bucket_name"] = str(config.get("bucket_name"))
        yaml["project_id_prod"] = str(
            config.get("gcloud-projects", {}).get("prod", {}).get("name")
        )
        yaml["project_id_staging"] = str(
            config.get("gcloud-projects", {}).get("staging", {}).get("name")
        )

    return yaml

handle_complex_fields(yaml_obj, k, properties, definitions, data)

Parse complex fields and send each part of them to handle_data.

Parameters:

Name Type Description Default
yaml_obj ruamel.yaml.CommentedMap

A YAML object with complex fields .

required
k str

The name of the key of the complex field.

required
properties dict

A dictionary that contains the description of the c omplex field.

required
definitions dict

A dictionary with the schemas of the each component of the complex field.

required
data dict

A dictionary with the metadata of the complex field.

required

Returns:

Type Description
CommentedMap

A YAML object augmented with the complex field.

Source code in basedosdados/upload/metadata.py
def handle_complex_fields(yaml_obj, k, properties, definitions, data):
    """Parse complex fields and send each part of them to `handle_data`.

    Args:
        yaml_obj (ruamel.yaml.CommentedMap): A YAML object with complex fields
            .
        k (str): The name of the key of the complex field.
        properties (dict): A dictionary that contains the description of the c
            omplex field.
        definitions (dict): A dictionary with the schemas of the each component
            of the complex field.
        data (dict): A dictionary with the metadata of the complex field.

    Returns:
        CommentedMap: A YAML object augmented with the complex field.
    """

    yaml_obj[k] = ryaml.CommentedMap()

    # Parsing 'allOf': [{'$ref': '#/definitions/PublishedBy'}]
    # To get PublishedBy
    d = properties[k]["allOf"][0]["$ref"].split("/")[-1]
    if "properties" in definitions[d].keys():
        for dk, _ in definitions[d]["properties"].items():

            yaml_obj[k][dk] = handle_data(
                k=dk,
                data=data.get(k, {}),
            )

    return yaml_obj

handle_data(k, data, local_default=None)

Parse API's response data so that it is used in the YAML configuration files.

Parameters:

Name Type Description Default
k str

a key of the CKAN API's response metadata dictionary.

required
data dict

a dictionary of metadata generated from the API.

required
local_default Any

the default value of the given key in ca se its value is set to None in CKAN.

None

Returns:

Type Description
list

a list of metadata values

Source code in basedosdados/upload/metadata.py
def handle_data(k, data, local_default=None):
    """Parse API's response data so that it is used in the YAML configuration
    files.

    Args:
        k (str): a key of the CKAN API's response metadata dictionary.
        data (dict): a dictionary of metadata generated from the API.
        local_default (Any): the default value of the given key in ca
            se its value is set to `None` in CKAN.

    Returns:
        list: a list of metadata values
    """

    # If no data is None then return a empty dict
    data = data if data is not None else {}
    # If no data is found for that key, uses local default
    selected = data.get(k, local_default)

    # In some cases like `tags`, `groups`, `organization`
    # the API default is to return a dict or list[dict] with all info.
    # But, we just use `name` to build the yaml
    _selected = deepcopy(selected)

    if _selected == []:
        return _selected

    if not isinstance(_selected, list):
        _selected = [_selected]

    if isinstance(_selected[0], dict):
        if _selected[0].get("id") is not None:
            return [s.get("name") for s in _selected]

    return selected