AI Sparks

How to Build an End-to-End Machine Learning Pipeline for Production with ZenML, Including Custom Objects, Metadata Tracking, and Hyperparameter Optimization

In this tutorial, we walk through the end-to-end implementation of a machine learning pipeline developed using it ZenML. We start by setting up the environment and running the ZenML project, then define a custom materializer that allows seamless serialization and extraction of domain-specific dataset object metadata. As we progress, we build a modular pipeline that performs data loading, preprocessing, and hyperparameter searches for fan extraction across multiple models. We evaluate each candidate, enter rich metadata at every step, and use an onboarding strategy to select and promote the best performing model. Throughout this process, we use ZenML control plane modeling, artifact tracking, and caching techniques to ensure full reproducibility, transparency, and efficiency.

Copy the CodeCopiedUse a different browser
import os, sys, subprocess, json, shutil
from pathlib import Path


def _sh(cmd, check=True):
   print(f"$ {' '.join(cmd)}")
   return subprocess.run(cmd, check=check)


_sh([sys.executable, "-m", "pip", "install", "-q",
    "zenml[server]", "scikit-learn", "pandas", "pyarrow"])


PROJECT = Path("/content/zenml_advanced_tutorial") if Path("/content").exists() 
   else Path.cwd() / "zenml_advanced_tutorial"
if PROJECT.exists():
   shutil.rmtree(PROJECT)
PROJECT.mkdir(parents=True)
os.chdir(PROJECT)


os.environ["ZENML_ANALYTICS_OPT_IN"] = "false"
os.environ["ZENML_LOGGING_VERBOSITY"] = "WARN"
_sh(["zenml", "init"], check=False)

We set up the entire environment by installing the necessary libraries and start the ZenML project workspace. We create a clean performance index and configure local variables to control logging and statistical behavior. Finally, we bootstrap the ZenML repository so that all subsequent pipeline operations are tracked and managed efficiently.

Copy the CodeCopiedUse a different browser
from typing import Annotated, Tuple, Dict, List, Any
import numpy as np
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, f1_score, roc_auc_score
from sklearn.preprocessing import StandardScaler


from zenml import pipeline, step, log_metadata, Model, get_step_context
from zenml.client import Client
from zenml.materializers.base_materializer import BaseMaterializer
from zenml.enums import ArtifactType
from zenml.io import fileio


class DatasetBundle:
   def __init__(self, X, y, feature_names, stats=None):
       self.X = np.asarray(X)
       self.y = np.asarray(y)
       self.feature_names = list(feature_names)
       self.stats = stats or {}


class DatasetBundleMaterializer(BaseMaterializer):
   ASSOCIATED_TYPES = (DatasetBundle,)
   ASSOCIATED_ARTIFACT_TYPE = ArtifactType.DATA


   def load(self, data_type):
       with fileio.open(os.path.join(self.uri, "X.npy"), "rb") as f:
           X = np.load(f)
       with fileio.open(os.path.join(self.uri, "y.npy"), "rb") as f:
           y = np.load(f)
       with fileio.open(os.path.join(self.uri, "meta.json"), "r") as f:
           meta = json.loads(f.read())
       return DatasetBundle(X, y, meta["feature_names"], meta["stats"])


   def save(self, bundle):
       with fileio.open(os.path.join(self.uri, "X.npy"), "wb") as f:
           np.save(f, bundle.X)
       with fileio.open(os.path.join(self.uri, "y.npy"), "wb") as f:
           np.save(f, bundle.y)
       with fileio.open(os.path.join(self.uri, "meta.json"), "w") as f:
           f.write(json.dumps({
               "feature_names": bundle.feature_names,
               "stats": bundle.stats,
           }))


   def extract_metadata(self, bundle):
       classes, counts = np.unique(bundle.y, return_counts=True)
       return {
           "n_samples": int(bundle.X.shape[0]),
           "n_features": int(bundle.X.shape[1]),
           "class_distribution": {str(c): int(n) for c, n in zip(classes, counts)},
       }

We import all required libraries and define a custom data container and its component. We use logic to save, load, and extract metadata from our dataset, allowing seamless artifact management in ZenML. This ensures that our data is not only well maintained but also enhanced with meaningful, questionable metadata.

Copy the CodeCopiedUse a different browser
@step(enable_cache=True)
def load_data() -> Annotated[DatasetBundle, "raw_dataset"]:
   data = load_breast_cancer()
   return DatasetBundle(
       data.data, data.target, data.feature_names,
       stats={"source": "sklearn.datasets.load_breast_cancer"},
   )


@step
def split_and_scale(
   bundle: DatasetBundle,
   test_size: float = 0.2,
   random_state: int = 42,
) -> Tuple[
   Annotated[np.ndarray, "X_train"],
   Annotated[np.ndarray, "X_test"],
   Annotated[np.ndarray, "y_train"],
   Annotated[np.ndarray, "y_test"],
]:
   X_tr, X_te, y_tr, y_te = train_test_split(
       bundle.X, bundle.y, test_size=test_size,
       random_state=random_state, stratify=bundle.y,
   )
   scaler = StandardScaler().fit(X_tr)
   X_tr, X_te = scaler.transform(X_tr), scaler.transform(X_te)
   log_metadata(metadata={"train_size": len(X_tr), "test_size": len(X_te)})
   return X_tr, X_te, y_tr, y_te


@step
def train_candidate(
   X_train: np.ndarray,
   y_train: np.ndarray,
   model_type: str = "random_forest",
   n_estimators: int = 100,
   max_depth: int = 5,
) -> Annotated[Any, "candidate_model"]:
   if model_type == "random_forest":
       m = RandomForestClassifier(n_estimators=n_estimators,
                                  max_depth=max_depth, random_state=42)
   elif model_type == "gradient_boosting":
       m = GradientBoostingClassifier(n_estimators=n_estimators,
                                      max_depth=max_depth, random_state=42)
   else:
       m = LogisticRegression(max_iter=2000, random_state=42)
   m.fit(X_train, y_train)
   log_metadata(metadata={
       "model_type": model_type,
       "hyperparameters": {"n_estimators": n_estimators, "max_depth": max_depth},
   })
   return m

We describe the main pipeline steps for data loading, segmentation, scaling factors, and candidate training models. We ensure that data loading is cached for efficiency while logging key metadata during preprocessing and training. This forms the core of our pipeline, where each model is trained independently with its own configuration.

Copy the CodeCopiedUse a different browser
@step
def evaluate_candidate(
   model: Any,
   X_test: np.ndarray,
   y_test: np.ndarray,
   label: str,
) -> Annotated[Dict[str, Any], "metrics"]:
   preds = model.predict(X_test)
   probs = (model.predict_proba(X_test)[:, 1]
            if hasattr(model, "predict_proba") else preds)
   metrics: Dict[str, Any] = {
       "accuracy": float(accuracy_score(y_test, preds)),
       "f1":       float(f1_score(y_test, preds)),
       "roc_auc":  float(roc_auc_score(y_test, probs)),
       "label":    label,
   }
   log_metadata(metadata=metrics)
   return metrics


@step
def select_best(
   metrics_list: List[Dict[str, Any]],
   models: List[Any],
) -> Annotated[Any, "production_model"]:
   best_idx = max(range(len(metrics_list)),
                  key=lambda i: metrics_list[i]["roc_auc"])
   best = metrics_list[best_idx]


   ctx = get_step_context()
   try:
       ctx.model.log_metadata({"chosen_candidate": best,
                               "candidate_index": best_idx})
   except Exception as e:
       print(f"  (model metadata log skipped: {e})")


   log_metadata(metadata={
       "winning_metrics": {k: v for k, v in best.items() if k != "label"},
   })
   print(f"n  Best candidate: {best['label']}  →  "
         f"ROC AUC = {best['roc_auc']:.4f}n")
   return models[best_idx]

We evaluate each trained model using several performance metrics and document the results. We then use a selection method that identifies the best performing model based on the ROC AUC. Additionally, we attach relevant metadata to the model version, enabling traceability and informed decision making.

Copy the CodeCopiedUse a different browser
SEARCH_SPACE = [
   {"model_type": "random_forest",     "n_estimators": 50,  "max_depth": 3},
   {"model_type": "random_forest",     "n_estimators": 200, "max_depth": 7},
   {"model_type": "gradient_boosting", "n_estimators": 100, "max_depth": 3},
   {"model_type": "logistic",          "n_estimators": 1,   "max_depth": 1},
]


PRODUCTION_MODEL = Model(
   name="breast_cancer_classifier",
   description="Best model from in-pipeline hyperparameter search",
   tags=["tutorial", "advanced"],
)


@pipeline(model=PRODUCTION_MODEL, enable_cache=True)
def training_pipeline(test_size: float = 0.2):
   bundle = load_data()


   models, metrics = [], []
   for i, cfg in enumerate(SEARCH_SPACE):
       m = train_candidate(
           X_train, y_train, **cfg,
           id=f"train_{i}_{cfg['model_type']}",
       )
       s = evaluate_candidate(
           m, X_test, y_test,
           label=f"{cfg['model_type']}(n={cfg['n_estimators']},d={cfg['max_depth']})",
           id=f"eval_{i}",
       )
       models.append(m)
       metrics.append(s)


   select_best(metrics, models)


print("n" + "=" * 70 + "n  RUNNING TRAINING PIPELINEn" + "=" * 70)
run_obj = training_pipeline()


print("n" + "=" * 70 + "n  INSPECTING THE RUNn" + "=" * 70)
client = Client()
run = client.get_pipeline_run(run_obj.id)


print(f"nPipeline:   {run.pipeline.name}")
print(f"Run name:   {run.name}")
print(f"Status:     {run.status}")
print(f"Step runs:  {len(run.steps)}")
for name, step_run in run.steps.items():
   print(f"  • {name:35s} status={step_run.status}")


print("nRun-level metadata (aggregated from steps):")
for k, v in (run.run_metadata or {}).items():
   short = str(v)
   print(f"  {k}: {short[:80]}{'…' if len(short) > 80 else ''}")


print("n" + "-" * 70 + "n  MODEL CONTROL PLANEn" + "-" * 70)
try:
   mv = client.get_model_version(PRODUCTION_MODEL.name, "latest")
except Exception:
   mv = client.list_model_versions(model_name_or_id=PRODUCTION_MODEL.name)[0]


print(f"Model:           {mv.model.name}")
print(f"Version:         {mv.name} (number={mv.number})")
linked = list(mv.data_artifact_ids.keys()) if hasattr(mv, "data_artifact_ids") else []
print(f"Linked outputs:  {linked or '(see dashboard)'}")
if mv.run_metadata:
   print("Version metadata:")
   for k, v in dict(mv.run_metadata).items():
       print(f"  {k}: {str(v)[:80]}")


print("n" + "-" * 70 + "n  RELOADING ARTIFACTS DIRECTLYn" + "-" * 70)
prod_artifact = client.get_artifact_version("production_model")
prod_model = prod_artifact.load()
print(f"Loaded model class:   {type(prod_model).__name__}")
print(f"Artifact metadata:    {dict(prod_artifact.run_metadata) if prod_artifact.run_metadata else '{}'}"[:120])


X_test_arr = client.get_artifact_version("X_test").load()
y_test_arr = client.get_artifact_version("y_test").load()
acc = accuracy_score(y_test_arr, prod_model.predict(X_test_arr))
print(f"Sanity-check accuracy on stored X_test: {acc:.4f}")


ds_artifact = client.get_artifact_version("raw_dataset")
print(f"nraw_dataset auto-extracted metadata:")
for k, v in (ds_artifact.run_metadata or {}).items():
   print(f"  {k}: {v}")


print("n" + "=" * 70 + "n  RE-RUNNING — STEPS SHOULD BE CACHEDn" + "=" * 70)
training_pipeline()


print("""
✅ Tutorial complete.


What just happened:
 • Custom materializer serialized a domain object + auto-extracted metadata.
 • Fan-out: 4 candidates trained + evaluated as 8 distinct step runs.
 • Fan-in: select_best joined them and promoted the winner.
 • Model Control Plane created a versioned 'breast_cancer_classifier'.
 • Every artifact, metric, and hyperparameter was logged and queryable.
 • Second run hit the cache — zero recomputation.


Explore further from this same Python session:
 Client().list_pipeline_runs()
 Client().list_model_versions(model_name_or_id="breast_cancer_classifier")
 Client().list_artifact_versions(name="metrics")
""")

We define a full pipeline, execute it, and test the results using the ZenML Client API. We perform a fan-out over multiple settings, followed by a fan-in step to select the best model. Finally, we demonstrate artifact reuse, metadata inspection, and cache behavior by restarting the pipeline without redundant computation.

In conclusion, we’ve built a robust, production-style ML pipeline that showcases the full power of ZenML’s orchestration capabilities. We noted how custom tools enrich artifacts with meaningful metadata, how multiple models can be trained and tested in parallel, and how the best model is automatically selected and adapted. We also explored how to evaluate pipeline performance, retrieve artifacts directly without recalculation, and validate model performance using archived data. Also, we’ve seen caching work during restarts, which ensures that unnecessary calculations are avoided. This workflow provides a solid foundation for building machine learning systems that are scalable, maintainable, and reproducible in real-world situations.


Check out Full Codes with Notebook here. Also, feel free to follow us Twitter and don’t forget to join our 130k+ ML SubReddit and Subscribe to Our newspaper. Wait! are you on telegram? now you can join us on telegram too.

Need to work with us on developing your GitHub Repo OR Hug Face Page OR Product Release OR Webinar etc.?contact us

The post How to Build an End-to-End Machine Learning Pipeline with ZenML, Including Custom Objects, Metadata Tracking, and Hyperparameter Optimization appeared first on MarkTechPost.

Related Articles

Leave a Reply

Your email address will not be published. Required fields are marked *

Back to top button