Skip to content

Managing pipelines configurations

It is a very good practice to make your pipelines parameterizable. That means replacing all hardcoded values by variables that you are going to pass to the pipeline and eventually to the components.

Pipeline parameters can be:

  • Model training parameters
  • Start date and end date of the data you want to work with
  • Product categories
  • Geography (country, region, stores, ...)
  • Customer segments
  • ...

Leveraging parametrized pipelines will allow you to run the same pipeline with different parameter sets. This is much more practical to deploy and maintain compared to pipelines with slightly different hardcoded values.

Passing config values to the pipeline

Basic pipeline parametrization

In most cases, just passing values to your pipeline as parameters is the simplest and best way to go.

import os

import kfp
from kfp import compiler
import google.cloud.aiplatform as aip
from kfp.dsl import component


@component(base_image=f'europe-west1-docker.pkg.dev/{os.getenv("PROJECT_ID")}/vertex-pipelines-docker/vertex-pipelines-base:latest')
def dummy_task(project_id: str, country: str, start_date: str, end_date: str):
    pass


# This part defines the pipeline and its parameters
@kfp.dsl.pipeline(name="parametrized-pipeline")
def pipeline(project_id: str, country: str, start_date: str, end_date: str):
    dummy_task(
        project_id=project_id,
        country=country,
        start_date=start_date,
        end_date=end_date
    )


# This part compiles the pipeline and runs it
if __name__ == '__main__':
    PROJECT_ID = os.getenv("PROJECT_ID")
    PIPELINE_NAME = "parametrized-pipeline"
    BUCKET_NAME = f"gs://vertex-{PROJECT_ID}"
    SERVICE_ACCOUNT = f"vertex@{PROJECT_ID}.iam.gserviceaccount.com"

    compiler.Compiler().compile(pipeline_func=pipeline, package_path="./pipeline.json")
    aip.init(project=PROJECT_ID, staging_bucket=BUCKET_NAME)

    job = aip.PipelineJob(
        display_name=PIPELINE_NAME,
        template_path="pipeline.json".replace(" ", "_"),
        pipeline_root=f"{BUCKET_NAME}/root",
        location="europe-west1",
        enable_caching=False,

        # Parameters values are passed from here to the pipeline
        parameter_values={
            "project_id": PROJECT_ID,
            "country": "france",
            "start_date": "2022-01-01",
            "end_date": "2022-12-31"
        },
    )

    job.run(service_account=SERVICE_ACCOUNT)

The parameters will be clearly displayed in the UI:

Making all your parameters explicit as pipeline and component arguments represents some overhead in terms of code but for us it is the best solution as it makes all parameters visible in the UI and allows people who read the code to see very easily how each parameter is defined

Alternative options that are possible (that we generally do not recommend!): - Importing parameters directly in lib or component files => this bypasses the vertex UI and can make debugging much harder - Having a config dictionary and passing it as an arguments in pipeline / components. This solution is less bad and can actually have some application to pass on very large or well defined configs (ex: model parameters to pass directly to Catboost). Config will be visible in the UI but not in an easy to read way.

Dynamically loading pipeline parameters

Dynamically loading a config in the pipeline

Pipeline parameters values are rendered when passed to components. That means you can not easily pass a configuration name and load it in the pipeline body itself.

@kfp.dsl.pipeline(name="parametrized-pipeline")
def pipeline(config_name: str):
    print(config_name)  # Result: {{pipelineparam:op=;name=config_name}} -> not rendered

You would need a dedicated component to load your configuration and then output the values to downstream tasks. This is very complex for no benefits and probably not worth it.

Instead, load the values before compiling the pipeline

job = aip.PipelineJob(
    display_name=PIPELINE_NAME,
    template_path="pipeline.json".replace(" ", "_"),
    pipeline_root=f"{BUCKET_NAME}/root",
    location="europe-west1",
    enable_caching=False,

    parameter_values=load_config("config_1"),
)

??? info "load_config function"

This function loads configuration values from a file as a `dict`.
````python3
def load_config(config_name: str) -> Dict:
    with open(Path(__file__).parent.parent / "configs" / f"{config_name}.json") as f:
        config = json.load(f)
    return config
````

??? info "config_1.json"

This file contains the configuration we want to load.
````json
{
  "project_id": "ocmlf-vial-16",
  "country": "france",
  "start_date": "2022-01-01",
  "end_date": "2022-12-31"
}
````

Storing configs

As your project grows in scale, you will want a way to cleanly store your pipeline configurations. Here are a few suggestions for this

Locally, in a markup language (json, yaml, toml, etc)

Storing configurations this way is simple and works well. Write one configuration per file. Then you can easily load the right one as a python dict and pass it to your pipeline.

Locally, in python files

You can also centralize your parameters in python files and import them to your pipeline. The issue with that approach is you are going to need to store all your configs in the same file, which can get messy.

If you want to split the configs, you will either need to:

  • Manually import them all in your pipeline file. This creates a tight coupling between your config files and your pipelines, which is undesirable.
  • Dynamically import them. This is complex, and you do not get autocompletion anymore.

Basically, this is only a good option when you know you will never have a lot of different configs. And even then the benefits are fairly small.

Remotely

If you need your configs to be centrally available on GCP you may want to store them remotely in a database.

This can be useful when you need these configurations available elsewhere on GCP and not just in your pipeline.

It is also practical if your configurations are not defined in the same codebase as your pipelines. For example, if you let users pilot pipelines via a Google sheet or a Streamlit, etc...

In this case, using Firestore is a very good option. It requires very little setup, and is very easy to use since it stores data as documents (essentially dicts). You will also be able to go and view your stored configurations and their contents in the UI.

??? example "Firestore usage examples" !!! example "Interacting with Firestore"

    ````python3
    from typing import Dict, Any, Union

    from google.cloud import firestore


    def set(collection: str, document_name: str, document_as_dict: Dict[str, Any]) -> None:
        client = firestore.Client()
        client.collection(collection).document(document_name).set(document_as_dict)


    def get(collection: str, document: str) -> Union[Dict[str, Any], None]:
        client = firestore.Client()
        doc_ref = client.collection(collection).document(document)
        doc = doc_ref.get().to_dict()  # type: Union[Dict[str, Any], None]
        return doc


    if __name__ == '__main__':
        # Add a config in Firestore
        set(
            collection="Pipeline1",
            document_name="config2",
            document_as_dict={
                "project_id": "ocmlf-vial-16",
                "country": "france",
                "start_date": "2022-01-01",
                "end_date": "2022-12-31"
            }
        )

        # Fetch a config from Firestore
        conf = get("Pipeline1", "config1")
        print(conf)  # {'end_date': '2022-12-31', 'country': 'france', 'project_id': 'ocmlf-vial-16', 'start_date': '2022-01-01'}
    ````

![](assets/firestore_config.png)

You can also store configurations as files on a bucket, but that is slightly less practical.