Now we’ll build the functions for the auth module. Some of these can be used in pipelines, while others can be used independently as steps—specifically, create_roles_check_step_executor and create_teams_check_step_executor that were referenced in the customer management module ports.
Create the following files inside the functions folder:
check_roles.pycheck_teams.pyrefresh_tokens.pysign_in.pyverify_token.py
Open check_roles.py and import necessary modules:
from typing import Any, Callable, List
from core.error.ClientError import ClientError
from core.context.pipeline.main import Pipeline
from core.context.runtime_context.RuntimeContext import RuntimeContext
from core.modules.auth.helpers import get_info_from_claims
Then create a wrapper to inject the pipeline and allowed roles:
def create_roles_check_step_executor(
pipeline: Pipeline, allowed_roles: List[str]
) -> Callable[[RuntimeContext], Any]:
"""
Tạo một Step Executor mới để kiểm tra role người dùng.
Args:
pipeline: pipeline.
allowed_roles: các vai trò được phép thực hiện.
Returns:
hàm step executor.
"""
def step_executor(ctx: RuntimeContext):
if allowed_roles and allowed_roles[0] == "*":
return
claims = ctx.get_temp_data("claims")
user = get_info_from_claims(claims)
if user.get("role") not in allowed_roles:
pipeline.stop(ctx)
err = ClientError("You don't have permission to do this action")
err.as_http_error("Forbidden")
err.add_error_detail(
{
"source": "roleCheck",
"desc": f"Role of user is not allowed: {user.get('role')}",
}
)
return ctx.send_error(err)
return step_executor

Flow:
create_roles_check_step_executor to inject the pipeline and allowed roles.Open check_teams.py and import necessary modules:
from typing import Any, Callable, List
from core.error.ClientError import ClientError
from core.context.pipeline.main import Pipeline
from core.context.runtime_context.RuntimeContext import RuntimeContext
from core.modules.auth.helpers import get_info_from_claims
Create a wrapper to inject the pipeline and allowed teams:
def create_teams_check_step_executor(
pipeline: Pipeline, allowed_teams: List[str]
) -> Callable[[RuntimeContext], Any]:
"""
Tạo một Step Executor mới để kiểm tra team người dùng.
Args:
pipeline: pipeline.
allowed_teams: các vai trò được phép thực hiện.
Returns:
hàm step executor.
"""
def step_executor(ctx: RuntimeContext):
if allowed_teams and allowed_teams[0] == "*":
return
claims = ctx.get_temp_data("claims")
user = get_info_from_claims(claims)
if user.get("team") not in allowed_teams:
pipeline.stop(ctx)
err = ClientError("You don't have permission to do this action")
err.as_http_error("Forbidden")
err.add_error_detail(
{
"source": "teamCheck",
"desc": f"Team of user is not allowed: {user.get('team')}",
}
)
return ctx.send_error(err)
return step_executor

Flow:
create_teams_check_step_executor to inject the pipeline and allowed teams.Open refresh_tokens.py and import necessary modules:
import boto3
# Import from core
from core.error.main import AppError, is_standard_error
# Import from utils
from utils.aws_clients.main import get_cognito_idp_client
from utils.configs import Configs
Write the main logic using AWS SDK (Cognito Identity Provider):
async def refresh_tokens(ctx):
"""
Cho phép người dùng có thể làm mới lại các tokens.
Args:
ctx: runtime context
Returns:
dict: chứa tokens hoặc AppError
"""
try:
body = await ctx.get_body()
refresh_token = body.get("refreshToken")
client = get_cognito_idp_client()
response = client.initiate_auth(
AuthFlow="REFRESH_TOKEN_AUTH",
AuthParameters={"REFRESH_TOKEN": refresh_token},
ClientId=Configs.Cognito_App_Client_Id,
)
auth_result = response.get("AuthenticationResult", {})
return {
"auth": {
"idToken": auth_result.get("IdToken"),
"accessToken": auth_result.get("AccessToken"),
"expiresIn": auth_result.get("ExpiresIn"),
}
}
except Exception as error:
if is_standard_error(error):
return error
err = AppError("Cannot refresh tokens")
err.as_http_error("InternalServerError")
err.add_error_detail({"source": "refreshToken", "desc": str(error)})
return err

Flow:
initiate_auth with the parameters.Open sign_in.py and import necessary modules:
# Import from core
from core.error.main import AppError, is_standard_error
# Import from utils
from utils.aws_clients.main import get_cognito_idp_client
from utils.configs import Configs
Create a wrapper for pipeline injection:
async def sign_in(ctx):
"""
Cho phép một người dùng đăng nhập vào trong hệ thống.
Args:
ctx: runtime context
Returns:
dict: chứa tokens hoặc AppError
"""
try:
body = await ctx.get_body()
username = body.get("username")
password = body.get("password")
client = get_cognito_idp_client()
response = client.initiate_auth(
AuthFlow="USER_PASSWORD_AUTH",
AuthParameters={
"USERNAME": username,
"PASSWORD": password,
},
ClientId=Configs.Cognito_App_Client_Id,
)
auth_result = response.get("AuthenticationResult", {})
return {
"auth": {
"idToken": auth_result.get("IdToken"),
"accessToken": auth_result.get("AccessToken"),
"refreshToken": auth_result.get("RefreshToken"),
"expiresIn": auth_result.get("ExpiresIn"),
}
}
except Exception as error:
if is_standard_error(error):
return error
err = AppError("Cannot authenticate user")
err.as_http_error("InternalServerError")
err.add_error_detail({"source": "signIn", "desc": str(error)})
return err

Flow:
initiate_auth with different parameters from refresh_tokens.Open verify_token.py and import necessary modules:
# Import built-in libraries
import time
# Import external libraries
import jwt
from jwcrypto import jwk
# Import from core
from core.error import is_standard_error, AppError
from core.context.internal_context import initialize_internal_context
# Import from utils
from core.error import is_standard_error
from utils.configs import Configs
# Import helpers
from core.modules.auth.helpers import (
get_authorization_token,
get_public_keys,
get_info_from_claims,
)
Write the main logic for token verification:
def verify_token(ctx):
"""Xác thực token và trả về kết quả xác thực và claims (nếu thành công).
Args:
token (str): token
Raises:
ValueError: ném ra lỗi nếu như header của JWT Token là không hợp lệ
ValueError: ném ra lỗi nếu như không tìm thấy Public Key
ValueError: ném ra lỗi nếu như token được tạo ra không phải là từ
Cognito Provider nguồn
Returns:
dict: kết quả xác thực
"""
try:
# Setup context before go further
in_ctx = initialize_internal_context()
in_ctx.params["headers"] = ctx.get_headers()
in_ctx.options["can_catch_error"] = True
token = get_authorization_token(in_ctx)
keys = get_public_keys()
if is_standard_error(keys):
return keys
decoded_header = jwt.get_unverified_header(token)
kid = decoded_header.get("kid")
if not kid:
err = AppError("Token is invalid")
err.add_error_detail(
{"source": "verify_token", "desc": "Kid of token not found"}
)
err.as_http_error("Unauthorized")
return err
key = next((k for k in keys if k["kid"] == kid), None)
if not key:
err = AppError("Token is invalid")
err.add_error_detail(
{"source": "verify_token", "desc": "Public key not found"}
)
err.as_http_error("Unauthorized")
return err
jwk_key = jwk.JWK(**key)
public_key_pem = jwk_key.export_to_pem()
claims = jwt.decode(
token,
public_key_pem,
algorithms=["RS256"],
options={"verify_exp": True},
)
# Post check claims
if int(time.time()) > claims.get("exp"):
err = AppError("Token is invalid")
err.add_error_detail({"source": "verify_token", "desc": "Token is expired"})
err.as_http_error("Unauthorized")
return err
if claims.get("client_id") != Configs.Cognito_App_Client_Id:
err = AppError("Token is invalid")
err.add_error_detail(
{
"source": "verify_token",
"desc": "Token was not issued for this audience",
}
)
err.as_http_error("Unauthorized")
return err
return claims
except Exception as e:
if is_standard_error(e):
return e
err = AppError("Verify token failed")
err.add_error_detail({"source": "verify_token", "desc": str(e)})
err.as_http_error("InternalServerError")
return err

Before explaining the flow, let me clarify a few things. A JWT Token from Cognito is generated using the RSA algorithm, meaning the token is created from a private key and can be verified with a public key. So, when a user logs in through Cognito, the token is signed with the private key associated with that app client. To verify whether the token is valid, we need the corresponding public key.
However, the public key cannot be obtained directly and must be retrieved as follows:
kid in the token header.Once the public key is available:
That is the flow for verifying the token—pretty straightforward, right?
To use this in a pipeline, we need to create a wrapper to inject the pipeline into the process.
def create_verify_token_step_executor(pipeline):
"""Tạo ra một step executor có gán pipeline trong đó để xác thực token.
Args:
class: pipeline
Returns:
Callable: executor của step.
"""
def executor(ctx):
result = verify_token(ctx)
if is_standard_error(result):
# Stop pipeline
pipeline.stop(ctx)
return ctx.send_error(result)
ctx.add_temp_data("claims", result)
return result
return executor

You could also implement this as middleware for runtime integration.
Finally, create __init__.py in the functions folder to export all functions:
from .check_roles import create_roles_check_step_executor
from .check_teams import create_teams_check_step_executor
from .refresh_tokens import refresh_tokens
from .sign_in import sign_in
from .verify_token import verify_token, create_verify_token_step_executor
__all__ = [
"create_roles_check_step_executor",
"create_teams_check_step_executor",
"refresh_tokens",
"sign_in",
"verify_token",
"create_verify_token_step_executor",
]
