5.4.3. Build functions

Now we’ll build the functions for the auth module. Some of these can be used in pipelines, while others can be used independently as steps—specifically, create_roles_check_step_executor and create_teams_check_step_executor that were referenced in the customer management module ports.

Pre-setup

Create the following files inside the functions folder:

  • check_roles.py
  • check_teams.py
  • refresh_tokens.py
  • sign_in.py
  • verify_token.py

5.4.3.1

Check Role Function

Open check_roles.py and import necessary modules:

from typing import Any, Callable, List

from core.error.ClientError import ClientError
from core.context.pipeline.main import Pipeline
from core.context.runtime_context.RuntimeContext import RuntimeContext
from core.modules.auth.helpers import get_info_from_claims

Then create a wrapper to inject the pipeline and allowed roles:

def create_roles_check_step_executor(
    pipeline: Pipeline, allowed_roles: List[str]
) -> Callable[[RuntimeContext], Any]:
    """
    Tạo một Step Executor mới để kiểm tra role người dùng.

    Args:
        pipeline: pipeline.
        allowed_roles: các vai trò được phép thực hiện.
    Returns:
        hàm step executor.
    """

    def step_executor(ctx: RuntimeContext):
        if allowed_roles and allowed_roles[0] == "*":
            return

        claims = ctx.get_temp_data("claims")
        user = get_info_from_claims(claims)

        if user.get("role") not in allowed_roles:
            pipeline.stop(ctx)

            err = ClientError("You don't have permission to do this action")
            err.as_http_error("Forbidden")
            err.add_error_detail(
                {
                    "source": "roleCheck",
                    "desc": f"Role of user is not allowed: {user.get('role')}",
                }
            )
            return ctx.send_error(err)

    return step_executor

5.4.3.2

Flow:

  1. Call create_roles_check_step_executor to inject the pipeline and allowed roles.
  2. Return the main function, which will:
    1. Extract claims.
    2. Retrieve info from claims.
    3. Compare the role in claims with allowed roles:
      1. If unauthorized → return 401.
      2. Otherwise → continue.

Check Teams Function

Open check_teams.py and import necessary modules:

from typing import Any, Callable, List

from core.error.ClientError import ClientError
from core.context.pipeline.main import Pipeline
from core.context.runtime_context.RuntimeContext import RuntimeContext
from core.modules.auth.helpers import get_info_from_claims

Create a wrapper to inject the pipeline and allowed teams:

def create_teams_check_step_executor(
    pipeline: Pipeline, allowed_teams: List[str]
) -> Callable[[RuntimeContext], Any]:
    """
    Tạo một Step Executor mới để kiểm tra team người dùng.

    Args:
        pipeline: pipeline.
        allowed_teams: các vai trò được phép thực hiện.
    Returns:
        hàm step executor.
    """

    def step_executor(ctx: RuntimeContext):
        if allowed_teams and allowed_teams[0] == "*":
            return

        claims = ctx.get_temp_data("claims")
        user = get_info_from_claims(claims)

        if user.get("team") not in allowed_teams:
            pipeline.stop(ctx)

            err = ClientError("You don't have permission to do this action")
            err.as_http_error("Forbidden")
            err.add_error_detail(
                {
                    "source": "teamCheck",
                    "desc": f"Team of user is not allowed: {user.get('team')}",
                }
            )
            return ctx.send_error(err)

    return step_executor

5.4.3.3

Flow:

  1. Call create_teams_check_step_executor to inject the pipeline and allowed teams.
  2. Return the main function, which will:
    1. Extract claims.
    2. Retrieve info from claims.
    3. Compare the team in claims with allowed teams:
      1. If unauthorized → return 401.
      2. Otherwise → continue.

Refresh Token Function

Open refresh_tokens.py and import necessary modules:

import boto3

# Import from core
from core.error.main import AppError, is_standard_error

# Import from utils
from utils.aws_clients.main import get_cognito_idp_client
from utils.configs import Configs

Write the main logic using AWS SDK (Cognito Identity Provider):

async def refresh_tokens(ctx):
    """
    Cho phép người dùng có thể làm mới lại các tokens.

    Args:
        ctx: runtime context
    Returns:
        dict: chứa tokens hoặc AppError
    """
    try:
        body = await ctx.get_body()
        refresh_token = body.get("refreshToken")

        client = get_cognito_idp_client()

        response = client.initiate_auth(
            AuthFlow="REFRESH_TOKEN_AUTH",
            AuthParameters={"REFRESH_TOKEN": refresh_token},
            ClientId=Configs.Cognito_App_Client_Id,
        )

        auth_result = response.get("AuthenticationResult", {})

        return {
            "auth": {
                "idToken": auth_result.get("IdToken"),
                "accessToken": auth_result.get("AccessToken"),
                "expiresIn": auth_result.get("ExpiresIn"),
            }
        }

    except Exception as error:
        if is_standard_error(error):
            return error

        err = AppError("Cannot refresh tokens")
        err.as_http_error("InternalServerError")
        err.add_error_detail({"source": "refreshToken", "desc": str(error)})

        return err

5.4.3.4

Flow:

  1. Extract info from the body.
  2. Call initiate_auth with the parameters.
  3. Transform the returned structure and return.

Sign In Function

Open sign_in.py and import necessary modules:

# Import from core
from core.error.main import AppError, is_standard_error

# Import from utils
from utils.aws_clients.main import get_cognito_idp_client
from utils.configs import Configs

Create a wrapper for pipeline injection:

async def sign_in(ctx):
    """
    Cho phép một người dùng đăng nhập vào trong hệ thống.

    Args:
        ctx: runtime context

    Returns:
        dict: chứa tokens hoặc AppError
    """
    try:
        body = await ctx.get_body()
        username = body.get("username")
        password = body.get("password")

        client = get_cognito_idp_client()

        response = client.initiate_auth(
            AuthFlow="USER_PASSWORD_AUTH",
            AuthParameters={
                "USERNAME": username,
                "PASSWORD": password,
            },
            ClientId=Configs.Cognito_App_Client_Id,
        )

        auth_result = response.get("AuthenticationResult", {})

        return {
            "auth": {
                "idToken": auth_result.get("IdToken"),
                "accessToken": auth_result.get("AccessToken"),
                "refreshToken": auth_result.get("RefreshToken"),
                "expiresIn": auth_result.get("ExpiresIn"),
            }
        }

    except Exception as error:
        if is_standard_error(error):
            return error

        err = AppError("Cannot authenticate user")
        err.as_http_error("InternalServerError")
        err.add_error_detail({"source": "signIn", "desc": str(error)})

        return err

5.4.3.5

Flow:

  1. Extract info from the body.
  2. Call initiate_auth with different parameters from refresh_tokens.
  3. Transform the response and return.

Verify Token Function

Open verify_token.py and import necessary modules:

# Import built-in libraries
import time

# Import external libraries
import jwt
from jwcrypto import jwk

# Import from core
from core.error import is_standard_error, AppError
from core.context.internal_context import initialize_internal_context

# Import from utils
from core.error import is_standard_error
from utils.configs import Configs

# Import helpers
from core.modules.auth.helpers import (
    get_authorization_token,
    get_public_keys,
    get_info_from_claims,
)

Write the main logic for token verification:

def verify_token(ctx):
    """Xác thực token và trả về kết quả xác thực và claims (nếu thành công).

    Args:
        token (str): token

    Raises:
        ValueError: ném ra lỗi nếu như header của JWT Token là không hợp lệ
        ValueError: ném ra lỗi nếu như không tìm thấy Public Key
        ValueError: ném ra lỗi nếu như token được tạo ra không phải là từ
        Cognito Provider nguồn

    Returns:
        dict: kết quả xác thực
    """
    try:
        # Setup context before go further
        in_ctx = initialize_internal_context()
        in_ctx.params["headers"] = ctx.get_headers()
        in_ctx.options["can_catch_error"] = True

        token = get_authorization_token(in_ctx)

        keys = get_public_keys()

        if is_standard_error(keys):
            return keys

        decoded_header = jwt.get_unverified_header(token)
        kid = decoded_header.get("kid")

        if not kid:
            err = AppError("Token is invalid")
            err.add_error_detail(
                {"source": "verify_token", "desc": "Kid of token not found"}
            )
            err.as_http_error("Unauthorized")
            return err

        key = next((k for k in keys if k["kid"] == kid), None)
        if not key:
            err = AppError("Token is invalid")
            err.add_error_detail(
                {"source": "verify_token", "desc": "Public key not found"}
            )
            err.as_http_error("Unauthorized")
            return err

        jwk_key = jwk.JWK(**key)
        public_key_pem = jwk_key.export_to_pem()

        claims = jwt.decode(
            token,
            public_key_pem,
            algorithms=["RS256"],
            options={"verify_exp": True},
        )

        # Post check claims
        if int(time.time()) > claims.get("exp"):
            err = AppError("Token is invalid")
            err.add_error_detail({"source": "verify_token", "desc": "Token is expired"})
            err.as_http_error("Unauthorized")
            return err

        if claims.get("client_id") != Configs.Cognito_App_Client_Id:
            err = AppError("Token is invalid")
            err.add_error_detail(
                {
                    "source": "verify_token",
                    "desc": "Token was not issued for this audience",
                }
            )
            err.as_http_error("Unauthorized")
            return err

        return claims

    except Exception as e:
        if is_standard_error(e):
            return e

        err = AppError("Verify token failed")
        err.add_error_detail({"source": "verify_token", "desc": str(e)})
        err.as_http_error("InternalServerError")
        return err

5.4.3.6

Before explaining the flow, let me clarify a few things. A JWT Token from Cognito is generated using the RSA algorithm, meaning the token is created from a private key and can be verified with a public key. So, when a user logs in through Cognito, the token is signed with the private key associated with that app client. To verify whether the token is valid, we need the corresponding public key.

However, the public key cannot be obtained directly and must be retrieved as follows:

  1. First, request the JWKS (JSON Web Key Set).
  2. Then, find the JWK that corresponds to the kid in the token header.
  3. Once the correct JWK is found, create a JWK key object.
  4. After that, convert the JWK key to a public key (PEM format).

Once the public key is available:

  1. Verify the token and extract the claims. If the claims are successfully retrieved, the token is valid.
  2. To be certain, additionally check the token’s expiration time and ensure the client ID matches.
  3. After the token is fully verified, return the claims.

That is the flow for verifying the token—pretty straightforward, right?

To use this in a pipeline, we need to create a wrapper to inject the pipeline into the process.

def create_verify_token_step_executor(pipeline):
    """Tạo ra một step executor có gán pipeline trong đó để xác thực token.

    Args:
        class: pipeline

    Returns:
        Callable: executor của step.
    """

    def executor(ctx):
        result = verify_token(ctx)

        if is_standard_error(result):
            # Stop pipeline
            pipeline.stop(ctx)

            return ctx.send_error(result)

        ctx.add_temp_data("claims", result)

        return result

    return executor

5.4.3.7

You could also implement this as middleware for runtime integration.

Finally, create __init__.py in the functions folder to export all functions:

from .check_roles import create_roles_check_step_executor
from .check_teams import create_teams_check_step_executor
from .refresh_tokens import refresh_tokens
from .sign_in import sign_in
from .verify_token import verify_token, create_verify_token_step_executor

__all__ = [
    "create_roles_check_step_executor",
    "create_teams_check_step_executor",
    "refresh_tokens",
    "sign_in",
    "verify_token",
    "create_verify_token_step_executor",
]

5.4.3.8