Skip to content

Authentication and Guard authentication

Authentication is an essential part of most applications. It refers to the methods and techniques used to verify the identity of users interacting with your application. There are many different approaches and strategies to handle authentication. The approach taken for any project depends on its particular application requirements. In this section, we shall go through different approaches to authentication in Ellar and how it will suit your authentication requirements.

There are two ways in which user authentication and identification are processed in Ellar:

1. Guard Authentication

We have discussed in detail how Guards are used to protect a route and check for user authorizations, but we never really addressed how they can be used for authentication purposes. For this, we are going to illustrate JWT authentication using Guard

Note

Read more on Guards

Let's flesh out our requirements. For this use case, clients will start by authenticating with a username and password. Once authenticated, the server will issue a JWT that can be sent as a bearer token in an authorization header on subsequent requests to prove authentication. Then, we create a protected route that is accessible only to requests that contain a valid JWT.

Let's start with the first requirement: authenticating a user, then extend that by issuing a JWT. And finally, we'll create a protected route that checks for a valid JWT on the request.

Creating an authentication module

We´ll start by scaffolding an AuthModule with the Ellar CLI tool followed by AuthService and AuthController implementations. We´ll use the AuthService to implement the authentication logic and the AuthController to expose the authentication endpoints.

ellar create-module auth

Also, the AuthService would need UserService, which encapsulates user operations. Let's also scaffold a user module.

ellar create-module user

Now, let's add some implementations to the generated files. For this application, the UserService will be working with a hard-coded list of users with a retrieve one-by-email method. In a real application, you´d build your user model and persistence layer using a library of your choice like SQLAlchemy, Django ORM, Peewee, PonyORM, etc.

user/services.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from ellar.common import Serializer
from ellar.common.serializer import SerializerFilter
from ellar.di import injectable
from ellar.core.security.hashers import make_password


class UserModel(Serializer):
    _filter = SerializerFilter(exclude={'password'})

    user_id: int
    username: str
    password: str


@injectable()
class UsersService:
    users = [
        {
            'user_id': 1,
            'username': 'john',
            'password': make_password('password'),
        },
        {
            'user_id': 2,
            'username': 'clara',
            'password': make_password('guess'),
        },
    ]

    async def get_user_by_username(self, username: str) -> UserModel | None:
        filtered_list = filter(lambda item: item["username"] == username, self.users)
        found_user = next(filtered_list)
        if found_user:
            return UserModel(**found_user)

In the above example, we have used make_password to hash the password. It is strictly advised you don't save passwords as plain text. In the UsersModule, we need to register the UserService we just created so that it will be injectable in AuthService

user/module.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
from ellar.common import Module
from ellar.core import ModuleBase

from .services import UsersService


@Module(
    providers=[UsersService],
    exports=[UsersService]
)
class UserModule(ModuleBase):
    """
    User Module
    """

Implementing the "Sign in" endpoint

The AuthService has the job of retrieving a user and verifying the password. Let's create a sign_in function for this purpose.

auth/services.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
import typing as t

from ellar.core.security.hashers import check_password
from ellar.di import injectable
from ellar.common import exceptions
from ..user.services import UsersService


@injectable()
class AuthService:
    def __init__(self, users_service: UsersService) -> None:
        self.users_service = users_service

    async def sign_in(self, username: str, password: str) -> t.Any:
        user_model = await self.users_service.get_user_by_username(username)
        if not user_model:
            raise exceptions.AuthenticationFailed()

        if not check_password(user_model.password, password):
            raise exceptions.AuthenticationFailed()

        return user_model.serialize()

Next, we create the AuthController and add a sign_in endpoint

auth/controllers.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
from ellar.common import Controller, ControllerBase, post, Body
from .services import AuthService


@Controller
class AuthController(ControllerBase):
    def __init__(self, auth_service: AuthService) -> None:
        self.auth_service = auth_service

    @post("/login")
    async def sign_in(self, username: Body[str], password: Body[str]):
        return await self.auth_service.sign_in(username=username, password=password)

At this junction, the AuthService returns a Python dictionary object of the user retrieved if the password is correct. But in the real sense, we need a token returned to the client.

For this, we need to install the ellar-jwt package

pip install ellar-jwt

Let us review and refine our requirements once again:

  • Allow users to authenticate with username/password, returning a JWT for use in subsequent calls to protected API endpoints. This is almost done. What is left is to write the code that issues a JWT.
  • Create API routes that are protected based on the presence of a valid JWT as a bearer token

EllarJWT comes with JWTModule and JWTService for encoding and decoding tokens. Let us configure the JWTModule inside AuthModule. 

Hint

You can also see docs on how to use the EllarJWT

auth/module.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from datetime import timedelta

from ellar.common import Module
from ellar.core import ModuleBase, LazyModuleImport as lazyLoad
from ellar_jwt import JWTModule

from .controllers import AuthController
from .services import AuthService


@Module(
    modules=[
        lazyLoad('project_name.users.module:UserModule'),
        JWTModule.setup(
            signing_secret_key="my_poor_secret_key_lol", lifetime=timedelta(minutes=5)
        ),
    ],
    controllers=[AuthController],
    providers=[AuthService],
)
class AuthModule(ModuleBase):
    """
    Auth Module
    """

In the above example, we configured JWTModule with very minimal configurations and registered it as a module dependency together with UserModule. Also we have registered AuthController and AuthService to AuthModule as well. With that done, we have completed the AuthModule setup. 

Now, let us finish the AuthService by returning a token using JWTService.

auth/services.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import typing as t

from ellar.core.security.hashers import check_password
from ellar.di import injectable
from ellar.common import exceptions
from ellar_jwt import JWTService

from ..user.services import UsersService


@injectable()
class AuthService:
    def __init__(self, users_service: UsersService, jwt_service: JWTService) -> None:
        self.users_service = users_service
        self.jwt_service = jwt_service

    async def sign_in(self, username: str, password: str) -> t.Any:
        user_model = await self.users_service.get_user_by_username(username)
        if not user_model:
            raise exceptions.AuthenticationFailed()

        if not check_password(user_model.password, password):
            raise exceptions.AuthenticationFailed()

        result = {"access_token": await self.jwt_service.sign_async(dict(user_model.serialize(), sub=user_model.user_id))}
        return result

At this point, we can run the application to test what we have done so far. To do that, we need to register AuthModule to the ApplicationModule.

project_name/root_module.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
from ellar.common import Module, exception_handler
from ellar.common import IExecutionContext, JSONResponse, Response
from ellar.core import ModuleBase, LazyModuleImport as lazyLoad
from ellar.samples.modules import HomeModule


@Module(
    modules=[HomeModule, lazyLoad('project_name.auth.module:AuthModule'),],
)
class ApplicationModule(ModuleBase):
    pass

Then restart the server if it is not running.

ellar runserver --reload
Let us make some requests to the server. Open a terminal window and run the code below:
$ # POST to /auth/login
$ curl -X POST http://localhost:8000/auth/login -d '{"username": "john", "password": "password"}' -H "Content-Type: application/json"
{"access_token":"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE2OTg3OTE0OTE..."}
$ # Note: above JWT truncated

Implementing the authentication guard

At this point, we can now comfortably address our final requirement: protecting endpoints by requiring a valid JWT to be present on the request. We will do this by creating an AuthGuard that will be used to guard our routes. 

auth/guards.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import typing as t

from ellar.auth import UserIdentity
from ellar.common.serializer.guard import (
    HTTPAuthorizationCredentials,
)
from ellar.auth.guards import GuardHttpBearerAuth
from ellar.di import injectable
from ellar_jwt import JWTService
from ellar.common import IExecutionContext
from ellar.common.logging import logger


@injectable
class AuthGuard(GuardHttpBearerAuth):
    def __init__(self, jwt_service: JWTService) -> None:
        self.jwt_service = jwt_service

    async def authentication_handler(
        self,
        context: IExecutionContext,
        credentials: HTTPAuthorizationCredentials,
    ) -> t.Optional[t.Any]:
        try:
            data = await self.jwt_service.decode_async(credentials.credentials)
            return UserIdentity(auth_type=self.scheme, **data)
        except Exception as ex:
            logger.logger.error(f"[AuthGuard] Exception: {ex}")
            self.raise_exception()

We can now implement our protected route and register our AuthGuard to guard it.

auth/controllers.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
from ellar.common import Controller, ControllerBase, post, Body, get, UseGuards
from .services import AuthService
from .guards import AuthGuard


@Controller
class AuthController(ControllerBase):
    def __init__(self, auth_service: AuthService) -> None:
        self.auth_service = auth_service

    @post("/login")
    async def sign_in(self, username: Body[str], password: Body[str]):
        return await self.auth_service.sign_in(username=username, password=password)

    @get("/profile")
    @UseGuards(AuthGuard)
    async def get_profile(self):
        return self.context.user

Ensure the app is running, and test the routes using cURL.

$ # GET /auth/profile
$ curl http://localhost:8000/auth/profile
{"detail":"Forbidden"} # status_code=403

$ # POST /auth/login
$ curl -X POST http://localhost:8000/auth/login -d '{"username": "john", "password": "password"}' -H "Content-Type: application/json"
{"access_token":"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE2OTg3OTE0OTE..."}

$ # GET /profile using access_token returned from previous step as bearer code
$ curl http://localhost:8000/auth/profile -H "Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2Vybm..."
{"exp":1698793558,"iat":1698793258,"jti":"e96e94c5c3ef4fbbbd7c2468eb64534b","sub":1,"user_id":1,"username":"john", "id":null,"auth_type":"bearer"}

Note in the AuthModule configuration, we configured the JWT to have an expiration of 5 minutes. If you wait 5 minutes after authenticating before attempting a GET /auth/profile request, you'll receive a 401 Unauthorized response. This is because the EllarJWT package automatically checks the JWT for its expiration time, saving you the trouble of doing so in your application.

Refresh Token

This was not included in the requirement, but it might be useful to some developers. So, let us address token refresh using EllarJWT. Depending your application, this illustration may vary.

To get this done, we need to edit the sign_in in AuthService to return access_token and refresh_token. We also need to add a refresh_token endpoint to our AuthController.

auth/services.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import typing as t
from datetime import timedelta

from ellar.core.security.hashers import check_password
from ellar.di import injectable
from ellar.common import exceptions
from ellar_jwt import JWTService

from ..user.services import UsersService


@injectable()
class AuthService:
    def __init__(self, users_service: UsersService, jwt_service: JWTService) -> None:
        self.users_service = users_service
        self.jwt_service = jwt_service

    async def sign_in(self, username: str, password: str) -> t.Any:
        user_model = await self.users_service.get_user_by_username(username)
        if user_model is None:
            raise exceptions.AuthenticationFailed()

        if not check_password(password, user_model.password):
            raise exceptions.AuthenticationFailed()

        result = {
            "access_token": await self.jwt_service.sign_async(
                payload=dict(user_model.serialize(), sub=user_model.user_id)
            ),
            "refresh_token": await self.jwt_service.sign_async(
                payload=dict(sub=user_model.username),
                lifetime=timedelta(days=30)
            ),
        }
        return result

    async def refresh_token(self, refresh_token: str) -> t.Dict:
        try:
            payload = await self.jwt_service.decode_async(refresh_token)
        except Exception:
            raise exceptions.AuthenticationFailed()

        user_model = await self.users_service.get_user_by_username(payload['username'])
        if user_model is None:
            raise exceptions.AuthenticationFailed()

        return {
            "access_token": await self.jwt_service.sign_async(
                payload=dict(user_model.serialize(), sub=user_model.user_id)
            ),
        }

We have modified the sign_in method and added the refresh_token method to handle refresh token actions. The sign_in method return access_token and refresh_token that expires in 30days.

auth/controllers.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from ellar.common import Controller, ControllerBase, post, Body, get, UseGuards
from ellar.openapi import ApiTags
from .services import AuthService
from .guards import AuthGuard


@Controller
@ApiTags(name='Authentication', description='User Authentication Endpoints')
class AuthController(ControllerBase):
    def __init__(self, auth_service: AuthService) -> None:
        self.auth_service = auth_service

    @post("/login")
    async def sign_in(self, username: Body[str], password: Body[str]):
        return await self.auth_service.sign_in(username=username, password=password)

    @get("/profile")
    @UseGuards(AuthGuard)
    async def get_profile(self):
        return self.context.user

    @post("/refresh")
    async def refresh_token(self, payload: str = Body(embed=True)):
        return await self.auth_service.refresh_token(payload)
With the above illustration refresh token mechanism is complete.

Note

There are ways of refreshing tokens. This was to illustrate how you can achieve it using ellar-jwt package.

You can also vist http://localhost:8000/docs Swagger UI

Apply AuthGuard Globally

In situations when you need to protect all your endpoints, you can register AuthGuard as a global guard instead of using the @UseGuards decorator in all your controllers or route functions. However, you also need to implement a mechanism to skip the auth guard for certain route functions that don't require it, such as the sign_in route function.

First, let us register AuthGuard a global guard in AuthModule.

auth/module.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from datetime import timedelta
from ellar.app import use_global_guards
from ellar.common import Module
from ellar.core import ModuleBase, LazyModuleImport as lazyLoad
from ellar.di import ProviderConfig
from ellar_jwt import JWTModule

from .controllers import AuthController
from .services import AuthService
from .guards import AuthGuard

## Registers AuthGuard in Application Config GLOBAL GUARDS
use_global_guards(AuthGuard)


@Module(
    modules=[
        lazyLoad('project_name.users.module:UserModule'),
        JWTModule.setup(
            signing_secret_key="my_poor_secret_key_lol", lifetime=timedelta(minutes=5)
        ),
    ],
    controllers=[AuthController],
    providers=[AuthService, ProviderConfig(GlobalGuard, use_class=AuthGuard)],
)
class AuthModule(ModuleBase):
    """
    Auth Module
    """

With this, AuthGuard will be available to all endpoints.

Anonymous Route Function Mechanism

Let us define a mechanism for declaring routes as anonymous or public.

One way to achieve this, is by using the set_metadata decorator. We can set some metadata on those functions, and it can be read in AuthGuard. If the metadata is present, we exit the authentication verification and allow the execution to continue.

auth/guards.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import typing as t

from ellar.auth import UserIdentity
from ellar.common.serializer.guard import (
    HTTPAuthorizationCredentials,
)
from ellar.common import IExecutionContext, set_metadata, logger
from ellar.auth.guards import GuardHttpBearerAuth
from ellar.core import Reflector
from ellar.di import injectable
from ellar_jwt import JWTService

IS_ANONYMOUS = 'is_anonymous'


def allow_any() -> t.Callable:
    return set_metadata(IS_ANONYMOUS, True)    


@injectable
class AuthGuard(GuardHttpBearerAuth):
    def __init__(self, jwt_service: JWTService, reflector: Reflector) -> None:
        self.jwt_service = jwt_service
        self.reflector = reflector

    async def authentication_handler(
        self,
        context: IExecutionContext,
        credentials: HTTPAuthorizationCredentials,
    ) -> t.Optional[t.Any]:
        is_anonymous = self.reflector.get_all_and_override(IS_ANONYMOUS, context.get_handler(), context.get_class())

        if is_anonymous:
            return True

        try:
            data = await self.jwt_service.decode_async(credentials.credentials)
            return UserIdentity(auth_type=self.scheme, **data)
        except Exception as ex:
            logger.error(f"[AuthGuard] Exception: {ex}")
            self.raise_exception()

We have defined the allow_any metadata decorator in the above illustration and have used Ellar's built-in class Reflector to read the metadata defined at the controller or route function.

We can create an allow_any decorator function that defines a guard metadata on the decorated function to override the global guard

auth/guards.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import typing as t

from ellar.auth import UserIdentity
from ellar.common.serializer.guard import (
    HTTPAuthorizationCredentials,
)
from ellar.common import IExecutionContext, set_metadata, constants, GuardCanActivate, logger
from ellar.auth.guards import GuardHttpBearerAuth
from ellar.di import injectable
from ellar_jwt import JWTService


def allow_any() -> t.Callable:
    return set_metadata(constants.GUARDS_KEY, [AllowAny])


class AllowAny(GuardCanActivate):
    async def can_activate(self, context: IExecutionContext) -> bool:
        return True


@injectable
class AuthGuard(GuardHttpBearerAuth):
    def __init__(self, jwt_service: JWTService) -> None:
        self.jwt_service = jwt_service

    async def authentication_handler(
        self,
        context: IExecutionContext,
        credentials: HTTPAuthorizationCredentials,
    ) -> t.Optional[t.Any]:
        try:
            data = await self.jwt_service.decode_async(credentials.credentials)
            return UserIdentity(auth_type="bearer", **data)
        except Exception as ex:
            logger.logger.error(f"[AuthGuard] Exception: {ex}")
            self.raise_exception()

Using allow_any decorator function

We have seen from above how to get allow_any decorator function. Now we use it on the refresh and sign in endpoints as shown below:

auth/controllers.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from ellar.common import Controller, ControllerBase, post, Body, get
from ellar.openapi import ApiTags
from .services import AuthService
from .guards import AuthGuard, allow_any


@Controller
@ApiTags(name='Authentication', description='User Authentication Endpoints')
class AuthController(ControllerBase):
    def __init__(self, auth_service: AuthService) -> None:
        self.auth_service = auth_service

    @post("/login")
    @allow_any()
    async def sign_in(self, username: Body[str], password: Body[str]):
        return await self.auth_service.sign_in(username=username, password=password)

    @get("/profile")
    async def get_profile(self):
        return self.context.user

    @allow_any()
    @post("/refresh")
    async def refresh_token(self, payload: str = Body(embed=True)):
        return await self.auth_service.refresh_token(payload)
Source Code to this example is here