Before building this part, let’s briefly discuss the concept of a pipeline. A pipeline is a sequence of functions connected within a certain context. These functions are called Steps. When an input enters the pipeline, it is passed from one step to another to produce the final result. The output of one step can be the input for the next step (or not).
We will define our own Pipeline, which will help us perform tasks that can be repeated and reused in different parts of the code.
First, we need to create a pipeline directory inside context/. In the pipeline directory, we will add 3 files: Step.py, main.py, __init__.py, and type.py.
In type.py, add the following code:
from typing import Generic, Optional, TypeVar, Awaitable, Union, Callable
TContext = TypeVar("TContext")
TResult = TypeVar("TResult")
TStepExecutor = Callable[[TContext], Union[TResult, Awaitable[TResult]]]

Here, we need to create a class to manage steps.
First, import some necessary modules.
from typing import Generic, Optional, Awaitable, Union
from .type import TContext, TResult, TStepExecutor
Add the definition of Step.
class Step(Generic[TContext, TResult]):
"""
Lớp định nghĩa bước xử lý trong một pipeline
"""
def __init__(self, executor: Optional[TStepExecutor[TContext, TResult]] = None):
self._executor: Optional[TStepExecutor[TContext, TResult]] = None
if executor:
self._executor = executor
def set_executor(self, executor: TStepExecutor[TContext, TResult]) -> None:
"""
Gán executor cho Step
Args:
executor: là một hàm
"""
self._executor = executor
def execute(self, ctx: TContext) -> Union[TResult, Awaitable[TResult]]:
"""
Thực thi executor của một Step
Args:
ctx: ngữ cảnh thực thi, có thể là runtime (RuntimeContext)
hoặc trong core (InternalContext)
"""
if not self._executor:
raise ValueError("Executor of Step is not set")
return self._executor(ctx)

Finally, we come to the pipeline definition. First, add the following code to main.py.
from typing import Generic, List, Dict, Any
from dataclasses import dataclass
from .Step import Step
from .type import TContext, TStepExecutor
Initialize the Pipeline definition with some basic properties and functions.
@dataclass
class PipelineRunState:
"""State của pipeline khi đang chạy"""
current_step: int = 0
can_stop_now: bool = False
class Pipeline(Generic[TContext]):
"""
Lớp định nghĩa một pipeline.
Khi các steps được chạy trong một pipeline, thì step sau có
thể sẽ có được kết quả từ step trước đó. Trong trường hợp này
thì kết quả đó sẽ được hiểu là params cho step tiếp theo.
Chính vì thế, khi sử dụng pipeline trong core thì mình phải lưu ý.
"""
def __init__(self, name: str):
self.name = name
self._steps: List[Step[TContext, Any]] = []
self._run_states: Dict[Any, PipelineRunState] = {}
def get_steps(self) -> List[Step[TContext, Any]]:
"""
Trả về các steps trong một pipeline.
Returns:
List các steps
"""
return self._steps
def add_step(self, executor: TStepExecutor[TContext, Any]) -> "Pipeline[TContext]":
"""
Thêm một step mới vào trong pipeline.
Args:
executor: executor của step.
Returns:
Self để chain methods
"""
new_step = Step[TContext, Any](executor)
self._steps.append(new_step)
return self
def stop(self, ctx: TContext) -> None:
"""
Cho phép dừng pipeline sau sau một step, mà step này gọi stop().
Args:
ctx: Context hiện tại
"""
curr_state = self._run_states.get(id(ctx))
if not curr_state:
return
curr_state.can_stop_now = True
async def run(self, ctx: TContext) -> Any:
"""
Chạy pipeline theo context được truyền vào.
Args:
ctx: Context để chạy pipeline
Returns:
Kết quả của step cuối cùng
"""
current_result: Any = None
ctx_id = id(ctx)
self._run_states[ctx_id] = PipelineRunState(current_step=0, can_stop_now=False)
for step in self._steps:
maybe_promise = step.execute(ctx)
# Xử lý coroutine
if hasattr(maybe_promise, "__await__"):
current_result = await maybe_promise
else:
current_result = maybe_promise
# Set prev result vào context
if hasattr(ctx, "prev_result"):
ctx.prev_result = current_result
else:
# Fallback nếu context không có prev_result attribute
setattr(ctx, "prev_result", current_result)
# Process post step execution
if self._run_states[ctx_id].can_stop_now:
break
# Update state
self._run_states[ctx_id].current_step += 1
# Clear state
del self._run_states[ctx_id]
return current_result


If vscode shows an error at ctx.prev_result, you don’t need to worry about it.
In __init__.py, add the following code.
from .main import Pipeline, PipelineRunState
from .Step import Step
__all__ = ["Pipeline", "PipelineRunState", "Step"]
