Socket IO - python-socketio¶
Ellar integration with python-socketio, a library that enables real-time, bidirectional and event-based communication between the browser and the server.
Gateways¶
A class annotated with WebSocketGateway
decorator is like a controller that creates a compatibles with python-socketio, ellar and websocket. A gateway class also supports dependency injection and guards.
from ellar.socket_io import WebSocketGateway
@WebSocketGateway(path='/events-ws', name='event-gateway')
class EventGateway:
pass
Installation¶
To start building Socket.IO webSockets-based applications, first install the required package:
$(venv) pip install python-socketio
Overview¶
In general, each gateway is listening on the same port as the HTTP server and has a path /socket.io
unless changed manually. This default behavior can be modified by passing an argument to the @WebSocketGateway(path='/event-ws')
. You can also set a namespace used by the gateway as shown below:
# project_name/events/gateway.py
from ellar.socket_io import WebSocketGateway
@WebSocketGateway(path='/socket.io', namespace='events')
class EventGateway:
pass
Warning
Gateways are not instantiated until they are referenced in the controllers
array of an existing module.
You can pass any supported option to the socket constructor with the second argument to the @WebSocketGateway()
decorator, as shown below:
# project_name/events/gateway.py
from ellar.socket_io import WebSocketGateway, GatewayBase
@WebSocketGateway(path='/socket.io', transports=['websocket'])
class EventGateway(GatewayBase):
pass
The gateway is now listening, but we have not yet subscribed to any incoming messages. Let's create a handler that will subscribe to the events
messages and respond to the user with the exact same data.
# project_name/events/gateway.py
from ellar.socket_io import WebSocketGateway, subscribe_message, GatewayBase
from ellar.common import WsBody
@WebSocketGateway(path='/socket.io', transports=['websocket'])
class EventGateway(GatewayBase):
@subscribe_message('events')
async def handle_event(self, data: str = WsBody()):
return data
You can also define schema for the data receive, for example:
# project_name/events/gateway.py
from ellar.socket_io import WebSocketGateway, subscribe_message, GatewayBase
from ellar.common import WsBody
from pydantic import BaseModel
class MessageBody(BaseModel):
data: str
@WebSocketGateway(path='/socket.io', transports=['websocket'])
class EventGateway(GatewayBase):
@subscribe_message('events')
async def handle_event(self, data: MessageBody = WsBody()):
return data.dict()
Once the gateway is created, we can register it in our module.
# project_name/events/module.py
from ellar.common import Module
from .gateway import EventGateway
@Module(controllers=[EventGateway])
class EventsModule:
pass
WebSocketGateway
decorated class comes with a different context that providers extra information/access to server
, sid
and current message environment
.
from ellar.socket_io import GatewayBase
from socketio import AsyncServer
@WebSocketGateway(path='/socket.io', transports=['websocket'])
class EventGateway(GatewayBase):
@subscribe_message('events')
async def handle_event(self, data: MessageBody = WsBody()):
assert isinstance(self.context.server, AsyncServer)
assert isinstance(self.context.sid, str)
assert isinstance(self.context.environment, dict)
await self.context.server.emit('my_custom_event', data.dict(), room=None)
WsResponse¶
You may return a WsResponse
object and supply two properties. The event
which is a name of the emitted event and the data
that has to be forwarded to the client.
from ellar.socket_io import GatewayBase
from ellar.socket_io import WsResponse
@WebSocketGateway(path='/socket.io', transports=['websocket'])
class EventGateway(GatewayBase):
@subscribe_message('events')
async def handle_event(self, data: MessageBody = WsBody()):
return WsResponse('events', data.dict())
Hint
The WsResponse
class is imported from ellar.socketio
package. And its has similar interface as AsyncServer().emit
Warning
If you return a response that is not a WsResponse
object, ellar will assume handler as the event
to emit the response. Or you can use self.context.server.emit
to send the message back to the client.
In order to listen for the incoming response(s), the client has to apply another event listener.
socket.on('events', (data) => console.log(data));
Gateway Connection and Disconnection Handling¶
on_connected
and on_disconnected
can be used to define on_connect
and on_disconnect
handler in your gateway controller.
For example,
from ellar.socket_io import GatewayBase, WebSocketGateway, subscribe_message, on_connected, on_disconnected
from ellar.socket_io import WsResponse
@WebSocketGateway(path='/socket.io', transports=['websocket'])
class EventGateway(GatewayBase):
@on_connected()
async def connect(self):
await self.context.server.emit(
"my_response", {"data": "Connected", "count": 0}, room=self.context.sid
)
@on_disconnected()
async def disconnect(self):
print("Client disconnected")
@subscribe_message('events')
async def handle_event(self, data: MessageBody = WsBody()):
return WsResponse('events', data.dict())
Info
@on_connected
and @on_disconnected()
handlers doesn't take any argument because all its arguments are already available in the self.context
Exceptions¶
All exceptions that happens on the server in a gateway controller after successful handshake between the server and client are sent to the client through error
event. This is a standard practice when working socketio client. The client is required to subscribe to error
event inorder to receive error message from the server.
for example:
from ellar.socket_io import GatewayBase, WebSocketGateway, subscribe_message
from ellar.common.exceptions import WebSocketException
from starlette import status
@WebSocketGateway(path='/socket.io', transports=['websocket'])
class EventGateway(GatewayBase):
@subscribe_message('events')
async def handle_event(self, data: MessageBody = WsBody()):
raise WebSocketException(status.WS_1009_MESSAGE_TOO_BIG, reason='Message is too big')
events
, an exception will be raised. And the client will receive the error message if it subscribed to error
events. For example:
const socket = io.connect()
socket.on('error', (error) => {
console.error(error)
})
Guards¶
There is no fundamental difference between web sockets guards and regular HTTP application guards. The only difference is that instead of throwing HttpException
, you should use WebSocketException
Hint
WebSocketException
is an exception class located in ellar.common.exceptions
from ellar.common import Guards
...
@Guards(MyCustomGuards)
@subscribe_message('events')
async def handle_event(self, data: MessageBody = WsBody()):
return WsResponse('events', data.dict())
...
@Guards
can be applied at handler level as shown in the last construct or at class level as shown below:
...
@Guards(MyGuard)
@WebSocketGateway(path='/socket.io', transports=['websocket'])
class EventGateway(GatewayBase):
@on_connected()
async def connect(self):
await self.context.server.emit(
"my_response", {"data": "Connected", "count": 0}, room=self.context.sid
)
...
Testing¶
Gateway can be unit tested just like regular ellar controllers. But for integration testing, a separate testing module, TestGateway
, is needed to set up a socketio client to simulation activity between server and client.
Hint
TestGateway
class is located at ellar.socket_io.testing
For example:
@WebSocketGateway(path="/ws", async_mode="asgi", cors_allowed_origins="*")
class EventGateway:
@subscribe_message("my_event")
async def my_event(self, message: MessageData = WsBody()):
return WsResponse("my_response", {"data": message.data}, room=self.context.sid)
@subscribe_message
async def my_broadcast_event(self, message: MessageData = WsBody()):
await self.context.server.emit("my_response", {"data": message.data})
@on_connected()
async def connect(self):
await self.context.server.emit(
"my_response", {"data": "Connected", "count": 0}, room=self.context.sid
)
@on_disconnected()
async def disconnect(self):
print("Client disconnected")
import pytest
from ellar.socket_io.testing import TestGateway
@pytest.mark.asyncio
class TestEventGateway:
test_client = TestGateway.create_test_module(controllers=[EventGateway])
async def test_socket_connection_work(self):
my_response_message = []
connected_called = False
disconnected_called = False
async with self.test_client.run_with_server() as ctx:
@ctx.sio.event
async def my_response(message):
my_response_message.append(message)
@ctx.sio.event
async def disconnect():
nonlocal disconnected_called
disconnected_called = True
@ctx.sio.event
async def connect(*args):
nonlocal connected_called
await ctx.sio.emit("my_event", {"data": "I'm connected!"})
connected_called = True
await ctx.connect(socketio_path="/ws/")
await ctx.wait()
assert len(my_response_message) == 2
assert my_response_message == [
{"data": "Connected", "count": 0},
{"data": "I'm connected!"},
]
assert disconnected_called and connected_called
async def test_broadcast_work(self):
sio_1_response_message = []
sio_2_response_message = []
async with self.test_client.run_with_server() as ctx:
ctx_2 = ctx.new_socket_client_context()
@ctx.sio.event
async def my_response(message):
sio_1_response_message.append(message)
@ctx_2.sio.event
async def my_response(message):
sio_2_response_message.append(message)
await ctx.connect(socketio_path="/ws/")
await ctx_2.connect(socketio_path="/ws/")
await ctx.sio.emit(
"my_broadcast_event", {"data": "Testing Broadcast"}
) # both sio_1 and sio_2 would receive this message
await ctx.wait()
await ctx_2.wait()
assert len(sio_1_response_message) == 2
assert sio_1_response_message == [
{"data": "Connected", "count": 0},
{"data": "Testing Broadcast"},
]
assert len(sio_2_response_message) == 2
assert sio_2_response_message == [
{"data": "Connected", "count": 0},
{"data": "Testing Broadcast"},
]
self.test_client.run_with_server()
setup a server and returns RunWithServerContext
object. The RunWithServerContext
contains a socket io client and created server url. And with the client(sio
) returned, you can subscribe to events and send messages as shown in the above construct. Warning
It is important to have all the event subscription written before calling ctx.connect
Also, it is possible to test with more than one client as you can see in test_broadcast_work
in construct above. We created another instance of RunWithServerContext as ctx_2
from the already existing ctx
with ctx.new_socket_client_context()
. And both were used to test for message broadcast.
SocketIO Ellar Example¶
python-socketio provided a sample project on how to integrate python-socketio with django. The sample project was converted to ellar gateway and it can find it here