5.2.2. Build pipeline

What is Pipeline?

Before building this part, let’s briefly discuss the concept of a pipeline. A pipeline is a sequence of functions connected within a certain context. These functions are called Steps. When an input enters the pipeline, it is passed from one step to another to produce the final result. The output of one step can be the input for the next step (or not).

We will define our own Pipeline, which will help us perform tasks that can be repeated and reused in different parts of the code.

Pre-setup

First, we need to create a pipeline directory inside context/. In the pipeline directory, we will add 3 files: Step.py, main.py, __init__.py, and type.py.

In type.py, add the following code:

from typing import Generic, Optional, TypeVar, Awaitable, Union, Callable

TContext = TypeVar("TContext")
TResult = TypeVar("TResult")
TStepExecutor = Callable[[TContext], Union[TResult, Awaitable[TResult]]]

5.2.2.1

Step

Here, we need to create a class to manage steps.

First, import some necessary modules.

from typing import Generic, Optional, Awaitable, Union

from .type import TContext, TResult, TStepExecutor

Add the definition of Step.

class Step(Generic[TContext, TResult]):
    """
    Lớp định nghĩa bước xử lý trong một pipeline
    """

    def __init__(self, executor: Optional[TStepExecutor[TContext, TResult]] = None):
        self._executor: Optional[TStepExecutor[TContext, TResult]] = None
        if executor:
            self._executor = executor

    def set_executor(self, executor: TStepExecutor[TContext, TResult]) -> None:
        """
        Gán executor cho Step

        Args:
            executor: là một hàm
        """
        self._executor = executor

    def execute(self, ctx: TContext) -> Union[TResult, Awaitable[TResult]]:
        """
        Thực thi executor của một Step

        Args:
            ctx: ngữ cảnh thực thi, có thể là runtime (RuntimeContext)
                 hoặc trong core (InternalContext)
        """
        if not self._executor:
            raise ValueError("Executor of Step is not set")

        return self._executor(ctx)

5.2.2.2

Pipeline

Finally, we come to the pipeline definition. First, add the following code to main.py.

from typing import Generic, List, Dict, Any
from dataclasses import dataclass

from .Step import Step
from .type import TContext, TStepExecutor

Initialize the Pipeline definition with some basic properties and functions.

@dataclass
class PipelineRunState:
    """State của pipeline khi đang chạy"""

    current_step: int = 0
    can_stop_now: bool = False


class Pipeline(Generic[TContext]):
    """
    Lớp định nghĩa một pipeline.

    Khi các steps được chạy trong một pipeline, thì step sau có
    thể sẽ có được kết quả từ step trước đó. Trong trường hợp này
    thì kết quả đó sẽ được hiểu là params cho step tiếp theo.
    Chính vì thế, khi sử dụng pipeline trong core thì mình phải lưu ý.
    """

    def __init__(self, name: str):
        self.name = name
        self._steps: List[Step[TContext, Any]] = []
        self._run_states: Dict[Any, PipelineRunState] = {}

    def get_steps(self) -> List[Step[TContext, Any]]:
        """
        Trả về các steps trong một pipeline.

        Returns:
            List các steps
        """
        return self._steps

    def add_step(self, executor: TStepExecutor[TContext, Any]) -> "Pipeline[TContext]":
        """
        Thêm một step mới vào trong pipeline.

        Args:
            executor: executor của step.

        Returns:
            Self để chain methods
        """
        new_step = Step[TContext, Any](executor)
        self._steps.append(new_step)
        return self

    def stop(self, ctx: TContext) -> None:
        """
        Cho phép dừng pipeline sau sau một step, mà step này gọi stop().

        Args:
            ctx: Context hiện tại
        """
        curr_state = self._run_states.get(id(ctx))
        if not curr_state:
            return
        curr_state.can_stop_now = True

    async def run(self, ctx: TContext) -> Any:
        """
        Chạy pipeline theo context được truyền vào.

        Args:
            ctx: Context để chạy pipeline

        Returns:
            Kết quả của step cuối cùng
        """
        current_result: Any = None
        ctx_id = id(ctx)
        self._run_states[ctx_id] = PipelineRunState(current_step=0, can_stop_now=False)

        for step in self._steps:
            maybe_promise = step.execute(ctx)

            # Xử lý coroutine
            if hasattr(maybe_promise, "__await__"):
                current_result = await maybe_promise
            else:
                current_result = maybe_promise

            # Set prev result vào context
            if hasattr(ctx, "prev_result"):
                ctx.prev_result = current_result
            else:
                # Fallback nếu context không có prev_result attribute
                setattr(ctx, "prev_result", current_result)

            # Process post step execution
            if self._run_states[ctx_id].can_stop_now:
                break

            # Update state
            self._run_states[ctx_id].current_step += 1

        # Clear state
        del self._run_states[ctx_id]

        return current_result

5.2.2.3

5.2.2.4

If vscode shows an error at ctx.prev_result, you don’t need to worry about it.

In __init__.py, add the following code.

from .main import Pipeline, PipelineRunState
from .Step import Step

__all__ = ["Pipeline", "PipelineRunState", "Step"]

5.2.2.5