JWT Authentication in Ellar**¶
This guide demonstrates how to implement JWT (JSON Web Token) authentication in an Ellar application using both Guard and Authentication Handler strategies.
Prerequisites¶
- Ellar application setup
- ellar-jwtpackage installed:- pip install ellar-jwt
Basic Setup¶
First, let's set up the basic structure with user management:
# user/services.py
from ellar.common import Serializer
from ellar.common.serializer import SerializerFilter
from ellar.di import injectable
from ellar.core.security.hashers import make_password
class UserModel(Serializer):
    _filter = SerializerFilter(exclude={'password'})
    user_id: int
    username: str
    password: str
@injectable()
class UsersService:
    # For demonstration. In production, use a proper database
    users = [
        {
            'user_id': 1,
            'username': 'john',
            'password': make_password('password'),
        }
    ]
    async def get_user_by_username(self, username: str) -> UserModel | None:
        filtered_list = filter(lambda item: item["username"] == username, self.users)
        found_user = next(filtered_list, None)
        if found_user:
            return UserModel(**found_user)
Method 1: Using Authentication Handler¶
The Authentication Handler approach processes authentication at the middleware layer:
# auth/auth_scheme.py
import typing as t
from ellar.auth import UserIdentity
from ellar.auth.handlers import HttpBearerAuthenticationHandler
from ellar.common import IHostContext
from ellar.common.serializer.guard import HTTPAuthorizationCredentials
from ellar.di import injectable
from ellar_jwt import JWTService
@injectable
class JWTAuthentication(HttpBearerAuthenticationHandler):
    def __init__(self, jwt_service: JWTService) -> None:
        self.jwt_service = jwt_service
    async def authentication_handler(
        self,
        context: IHostContext,
        credentials: HTTPAuthorizationCredentials,
    ) -> t.Optional[t.Any]:
        try:
            data = await self.jwt_service.decode_async(credentials.credentials)
            return UserIdentity(auth_type=self.scheme, **data)
        except Exception:
            return None
Register the authentication handler in your application:
# server.py
from ellar.app import AppFactory, use_authentication_schemes
application = AppFactory.create_from_app_module(
    lazyLoad('project_name.root_module:ApplicationModule'),
    config_module="project_name.config:DevelopmentConfig"
)
use_authentication_schemes(JWTAuthentication)
Method 2: Using Guard Strategy¶
The Guard strategy processes authentication at the route handler level:
# auth/guards.py
from ellar.auth import UserIdentity
from ellar.auth.guards import GuardHttpBearerAuth
from ellar.di import injectable
from ellar_jwt import JWTService
@injectable
class JWTGuard(GuardHttpBearerAuth):
    def __init__(self, jwt_service: JWTService) -> None:
        self.jwt_service = jwt_service
    async def authentication_handler(
        self,
        context: IExecutionContext,
        credentials: HTTPAuthorizationCredentials,
    ) -> t.Optional[t.Any]:
        try:
            data = await self.jwt_service.decode_async(credentials.credentials)
            return UserIdentity(auth_type=self.scheme, **data)
        except Exception:
            self.raise_exception()
Authentication Service¶
Implement the authentication service:
# auth/services.py
from datetime import timedelta
from ellar.di import injectable
from ellar.common import exceptions
from ellar_jwt import JWTService
from ..user.services import UsersService
@injectable()
class AuthService:
    def __init__(self, users_service: UsersService, jwt_service: JWTService) -> None:
        self.users_service = users_service
        self.jwt_service = jwt_service
    async def sign_in(self, username: str, password: str) -> dict:
        user = await self.users_service.get_user_by_username(username)
        if not user or not check_password(password, user.password):
            raise exceptions.AuthenticationFailed()
        return {
            "access_token": await self.jwt_service.sign_async(
                dict(user.serialize(), sub=user.user_id)
            ),
            "refresh_token": await self.jwt_service.sign_async(
                dict(sub=user.username),
                lifetime=timedelta(days=30)
            )
        }
    async def refresh_token(self, refresh_token: str) -> dict:
        try:
            payload = await self.jwt_service.decode_async(refresh_token)
            user = await self.users_service.get_user_by_username(payload['sub'])
            if not user:
                raise exceptions.AuthenticationFailed()
            return {
                "access_token": await self.jwt_service.sign_async(
                    dict(user.serialize(), sub=user.user_id)
                )
            }
        except Exception:
            raise exceptions.AuthenticationFailed()
Controller Implementation¶
# auth/controllers.py
from ellar.common import Controller, post, Body, get
from ellar.auth import SkipAuth, AuthenticationRequired
from ellar.openapi import ApiTags
from .services import AuthService
@AuthenticationRequired('JWTAuthentication')
@Controller
@ApiTags(name='Authentication')
class AuthController(ControllerBase):
    def __init__(self, auth_service: AuthService) -> None:
        self.auth_service = auth_service
    @post("/login")
    @SkipAuth()
    async def sign_in(self, username: Body[str], password: Body[str]):
        return await self.auth_service.sign_in(username=username, password=password)
    @get("/profile")
    async def get_profile(self):
        return self.context.user
    @post("/refresh")
    @SkipAuth()
    async def refresh_token(self, payload: str = Body(embed=True)):
        return await self.auth_service.refresh_token(payload)
Module Configuration¶
Configure the authentication module:
# auth/module.py
from datetime import timedelta
from ellar.common import Module
from ellar.core import ModuleBase
from ellar_jwt import JWTModule
from .controllers import AuthController
from .services import AuthService
@Module(
    modules=[
        JWTModule.setup(
            signing_secret_key="your-secret-key",
            lifetime=timedelta(minutes=5)
        ),
    ],
    controllers=[AuthController],
    providers=[AuthService],
)
class AuthModule(ModuleBase):
    pass
Testing the Implementation¶
# Login
curl -X POST http://localhost:8000/auth/login \
  -H "Content-Type: application/json" \
  -d '{"username": "john", "password": "password"}'
# Access protected route
curl http://localhost:8000/auth/profile \
  -H "Authorization: Bearer <your-jwt-token>"
# Refresh token
curl -X POST http://localhost:8000/auth/refresh \
  -H "Content-Type: application/json" \
  -d '{"payload": "<your-refresh-token>"}'
Security Considerations¶
-  Secret Key: - Use a strong, environment-specific secret key
- Never commit secrets to version control
 
-  Token Lifetime: - Set appropriate expiration times
- Implement token refresh mechanism
- Consider using sliding sessions
 
-  Error Handling: - Provide secure, non-revealing error messages
- Implement proper logging
- Handle token expiration gracefully
 
Best Practices¶
-  Token Storage: - Store tokens securely (e.g., HttpOnly cookies for web apps)
- Implement proper token revocation
- Consider token rotation strategies
 
-  Refresh Token Handling: - Use longer expiration for refresh tokens
- Implement proper refresh token rotation
- Consider implementing a token blacklist
 
-  API Security: - Use HTTPS in production
- Implement rate limiting
- Consider implementing CSRF protection
 
Complete Example¶
Find a complete working example in the Ellar GitHub repository.