Channels

Strawberry provides support for Channels with Consumers to provide GraphQL support over WebSockets and HTTP.

Introduction

While Channels does require Django to be installed as a dependency, you can actually run this integration without using Django’s request handler. However, the most common use case will be to run a normal Django project with GraphQL subscriptions support, typically taking advantage of the Channel Layers functionality which is exposed through the Strawberry integration.


Getting Started

Pre-requisites

Make sure you have read the following Channels documentation:

If you have read the Channels documentation, You should know by now that:

  1. ASGI application is a callable that can handle multiple send / receive operations without the need of a new application instance.
  2. Channels is all about making ASGI applications instances (whether in another processes or in another machine) talk to each other seamlessly.
  3. A scope is a single connection represented by a dict, whether it would be a websocket or an HTTP request or another protocol.
  4. A Consumer is an ASGI application abstraction that helps to handle a single scope.

Installation

Before using Strawberry’s Channels support, make sure you install all the required dependencies by running:

pip install 'strawberry-graphql[channels]'

Tutorial

The following example will pick up where the Channels tutorials left off.

By the end of this tutorial, You will have a graphql chat subscription that will be able to talk with the channels chat consumer from the tutorial.

Types setup

First, let's create some Strawberry-types for the chat.

# mysite/gqlchat/subscription.py
 
import strawberry
 
 
@strawberry.input
class ChatRoom:
    room_name: str
 
 
@strawberry.type
class ChatRoomMessage:
    room_name: str
    current_user: str
    message: str

Channel Layers

The Context for Channels integration includes the consumer, which has an instance of the channel layer and the consumer's channel name. This tutorial is an example of how this can be used in the schema to provide subscriptions to events generated by background tasks or the web server. Even if these are executed in other threads, processes, or even other servers, if you are using a Layer backend like Redis or RabbitMQ you should receive the events.

To set this up, you'll need to make sure Channel Layers is configured as per the documentation .

Then you'll want to add a subscription that accesses the channel layer and joins one or more broadcast groups.

Since listening for events and passing them along to the client is a common use case, the base consumer provides a high level API for that using a generator pattern, as we will see below.

The chat subscription

Now we will create the chat subscription .

# mysite/gqlchat/subscription.py
 
import os
import threading
 
from typing import AsyncGenerator, List
 
from strawberry.types import Info
 
 
@strawberry.type
class Subscription:
    @strawberry.subscription
    async def join_chat_rooms(
        self,
        info: Info,
        rooms: List[ChatRoom],
        user: str,
    ) -> AsyncGenerator[ChatRoomMessage, None]:
        """Join and subscribe to message sent to the given rooms."""
        ws = info.context["ws"]
        channel_layer = ws.channel_layer
 
        room_ids = [f"chat_{room.room_name}" for room in rooms]
 
        for room in room_ids:
            # Join room group
            await channel_layer.group_add(room, ws.channel_name)
 
        for room in room_ids:
            await channel_layer.group_send(
                room,
                {
                    "type": "chat.message",
                    "room_id": room,
                    "message": f"process: {os.getpid()} thread: {threading.current_thread().name}"
                    f" -> Hello my name is {user}!",
                },
            )
 
        async with ws.listen_to_channel("chat.message", groups=room_ids) as cm:
            async for message in cm:
                if message["room_id"] in room_ids:
                    yield ChatRoomMessage(
                        room_name=message["room_id"],
                        message=message["message"],
                        current_user=user,
                    )

Explanation: Info.context["ws"] or Info.context["request"] is a pointer to the ChannelsConsumer instance. Here we have first sent a message to all the channel_layer groups (specified in the subscription argument rooms ) that we have joined the chat.

Note

The ChannelsConsumer instance is shared between all subscriptions created in a single websocket connection. The ws.listen_to_channel context manager will return a function to yield all messages sent using the given message type (chat.message in the above example) but does not ensure that the message was sent to the same group or groups that it was called with - if another subscription using the same ChannelsConsumer also uses ws.listen_to_channel with some other group names, those will be returned as well.

In the example we ensure message["room_id"] in room_ids before passing messages on to the subscription client to ensure subscriptions only receive messages for the chat rooms requested in that subscription.

Note

We do not need to call await channel_layer.group_add(room, ws.channel_name) if we don’t want to send an initial message while instantiating the subscription. It is handled by ws.listen_to_channel .

Chat message mutation

If you noticed, the subscription client can’t send a message willingly. You will have to create a mutation for sending messages via the channel_layer

# mysite/gqlchat/subscription.py
 
from channels.layers import get_channel_layer
 
 
@strawberry.type
class Mutation:
    @strawberry.mutation
    async def send_chat_message(
        self,
        info: Info,
        room: ChatRoom,
        message: str,
    ) -> None:
        channel_layer = get_channel_layer()
 
        await channel_layer.group_send(
            f"chat_{room.room_name}",
            {
                "type": "chat.message",
                "room_id": f"chat_{room.room_name}",
                "message": message,
            },
        )

Creating the consumers

All we did so far is useless without creating an asgi consumer for our schema. The easiest way to do that is to use the GraphQLProtocolTypeRouter which will wrap your Django application, and route HTTP and websockets for "/graphql" to Strawberry, while sending all other requests to Django. You’ll need to modify the myproject.asgi.py file from the Channels instructions to look something like this:

import os
 
from django.core.asgi import get_asgi_application
from strawberry.channels import GraphQLProtocolTypeRouter
 
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "mysite.settings")
django_asgi_app = get_asgi_application()
 
# Import your Strawberry schema after creating the django ASGI application
# This ensures django.setup() has been called before any ORM models are imported
# for the schema.
from mysite.graphql import schema
 
 
application = GraphQLProtocolTypeRouter(
    schema,
    django_application=django_asgi_app,
)

This approach is not very flexible, taking away some useful capabilities of Channels. For more complex deployments, i.e you want to integrate several ASGI applications on different URLs and protocols, like what is described in the Channels documentation . You will probably craft your own ProtocolTypeRouter .

An example of this (continuing from channels tutorial) would be:

import os
 
from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
from django.core.asgi import get_asgi_application
from django.urls import re_path
from strawberry.channels import GraphQLHTTPConsumer, GraphQLWSConsumer
 
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "berry.settings")
django_asgi_app = get_asgi_application()
 
# Import your Strawberry schema after creating the django ASGI application
# This ensures django.setup() has been called before any ORM models are imported
# for the schema.
 
from chat import routing
from mysite.graphql import schema
 
 
gql_http_consumer = AuthMiddlewareStack(GraphQLHTTPConsumer.as_asgi(schema=schema))
gql_ws_consumer = GraphQLWSConsumer.as_asgi(schema=schema)
 
websocket_urlpatterns = routing.websocket_urlpatterns + [
    re_path(r"graphql", gql_ws_consumer),
]
 
application = ProtocolTypeRouter(
    {
        "http": URLRouter(
            [
                re_path("^graphql", gql_http_consumer),
                re_path(
                    "^", django_asgi_app
                ),  # This might be another endpoint in your app
            ]
        ),
        "websocket": AuthMiddlewareStack(URLRouter(websocket_urlpatterns)),
    }
)

This example demonstrates some ways that Channels can be set up to handle routing. A very common scenario will be that you want user and session information inside the GraphQL context, which the AuthMiddlewareStack wrapper above will provide. It might be apparent by now, there's no reason at all why you couldn't run a Channels server without any Django ASGI application at all. However, take care to ensure you run django.setup() instead of get_asgi_application() , if you need any Django ORM or other Django features in Strawberry.

Running our example

First run your asgi application (The ProtocolTypeRouter) using your asgi server. If you are coming from the channels tutorial, there is no difference. Then open three different tabs on your browser and go to the following URLs:

  1. localhost:8000/graphql
  2. localhost:8000/graphql
  3. localhost:8000/chat

If you want, you can run 3 different instances of your application with different ports it should work the same!

On tab #1 start the subscription:

subscription SubscribeToChatRooms {
  joinChatRooms(
    rooms: [{ roomName: "room1" }, { roomName: "room2" }]
    user: "foo"
  ) {
    roomName
    message
    currentUser
  }
}

On tab #2 we will run sendChatMessage mutation:

mutation echo {
  sendChatMessage(message: "hello room 1", room: { roomName: "room1" })
}

On tab #3 we will join the room you subscribed to ("room1") and start chatting. Before we do that there is a slight change we need to make in the ChatConsumer you created with channels in order to make it compatible with our ChatRoomMessage type.

# Send message to room group
await self.channel_layer.group_send(
    self.room_group_name,
    {
        "type": "chat.message",
        "room_id": self.room_group_name,  # <<< here is the change
        "message": f"process is {os.getpid()}, Thread is {threading.current_thread().name}"
        f" -> {message}",
    },
)

Look here for some more complete examples:

  1. The Strawberry Examples repo contains a basic example app demonstrating subscriptions with Channels.

Confirming GraphQL Subscriptions

By default no confirmation message is sent to the GraphQL client once the subscription has started. However, this is useful to be able to synchronize actions and detect communication errors. The code below shows how the above example can be adapted to send a null from the server to the client to confirm that the subscription has successfully started. This includes confirming that the Channels layer subscription has started.

# mysite/gqlchat/subscription.py
 
 
@strawberry.type
class Subscription:
    @strawberry.subscription
    async def join_chat_rooms(
        self,
        info: Info,
        rooms: List[ChatRoom],
        user: str,
    ) -> AsyncGenerator[ChatRoomMessage | None, None]:
        ...
        async with ws.listen_to_channel("chat.message", groups=room_ids) as cm:
            yield None
            async for message in cm:
                if message["room_id"] in room_ids:
                    yield ChatRoomMessage(
                        room_name=message["room_id"],
                        message=message["message"],
                        current_user=user,
                    )

Note the change in return signature for join_chat_rooms and the yield None after entering the listen_to_channel context manger.

Testing

We provide a minimal application communicator (GraphQLWebsocketCommunicator ) for subscribing. Here is an example based on the tutorial above: Make sure you have pytest-async installed

from channels.testing import WebsocketCommunicator
import pytest
from myapp.asgi import application  # your channels asgi
from strawberry.channels.testing import GraphQLWebsocketCommunicator
 
 
@pytest.fixture
async def gql_communicator() -> GraphQLWebsocketCommunicator:
    client = GraphQLWebsocketCommunicator(application=application, path="/graphql")
    await client.gql_init()
    yield client
    await client.disconnect()
 
 
chat_subscription_query = """
                subscription fooChat {
                joinChatRooms(
                    rooms: [{ roomName: "room1" }, { roomName: "room2" }]
                    user: "foo"){
                        roomName
                        message
                        currentUser
                    }
                }
"""
 
 
@pytest.mark.asyncio
async def test_joinChatRooms_sends_welcome_message(gql_communicator):
    async for result in gql_communicator.subscribe(query=chat_subscription_query):
        data = result.data
        assert data["currentUser"] == "foo"
        assert "room1" in data["roomName"]
        assert "hello" in data["message"]

In order to test a real server connection we can use python gql client and channels ChannelsLiveServerTestCase .

Note

This example is based on the extended ChannelsLiveServerTestCase class from channels tutorial part 4. You cannot run this test with a pytest session.

Add this test in your ChannelsLiveServerTestCase extended class:

from gql import Client, gql
from gql.transport.websockets import WebsocketsTransport
 
 
def test_send_message_via_channels_chat_joinChatRooms_recieves(self):
    transport = WebsocketsTransport(url=self.live_server_ws_url + "/graphql")
 
    client = Client(
        transport=transport,
        fetch_schema_from_transport=False,
    )
 
    query = gql(chat_subscription_query)
    for index, result in enumerate(client.subscribe(query)):
        if index == 0 or 1:
            print(result)
        # because we subscribed to 2 rooms we received two welcome messages.
        elif index == 2:
            print(result)
            assert "hello from web browser" in result["joinChatRooms"]["message"]
            break
 
        try:
            self._enter_chat_room("room1")
            self._post_message("hello from web browser")
        finally:
            self._close_all_new_windows()

The HTTP and WebSockets protocol are handled by different base classes. HTTP uses GraphQLHTTPConsumer and WebSockets uses GraphQLWSConsumer . Both of them can be extended:

GraphQLHTTPConsumer (HTTP)

Options

GraphQLHTTPConsumer supports the same options as all other integrations:

Extending the consumer

We allow to extend GraphQLHTTPConsumer , by overriding the following methods:

Context

The default context returned by get_context() is a dict that includes the following keys by default:

GraphQLWSConsumer (WebSockets / Subscriptions)

Options

Extending the consumer

We allow to extend GraphQLWSConsumer , by overriding the following methods:

Context

The default context returned by get_context() is a dict and it includes the following keys by default:

Example for defining a custom context

Here is an example for extending the base classes to offer a different context object in your resolvers. For the HTTP integration, you can also have properties to access the current user and the session. Both properties depend on the AuthMiddlewareStack wrapper.

from django.contrib.auth.models import AnonymousUser
 
from strawberry.channels import ChannelsConsumer, ChannelsRequest
from strawberry.channels import GraphQLHTTPConsumer as BaseGraphQLHTTPConsumer
from strawberry.channels import GraphQLWSConsumer as BaseGraphQLWSConsumer
from strawberry.http.temporal_response import TemporalResponse
 
 
@dataclass
class ChannelsContext:
    request: ChannelsRequest
    response: TemporalResponse
 
    @property
    def user(self):
        # Depends on Channels' AuthMiddlewareStack
        if "user" in self.request.consumer.scope:
            return self.request.consumer.scope["user"]
 
        return AnonymousUser()
 
    @property
    def session(self):
        # Depends on Channels' SessionMiddleware / AuthMiddlewareStack
        if "session" in self.request.consumer.scope:
            return self.request.consumer.scope["session"]
 
        return None
 
 
@dataclass
class ChannelsWSContext:
    request: ChannelsConsumer
    connection_params: Optional[Dict[str, Any]] = None
 
    @property
    def ws(self) -> ChannelsConsumer:
        return self.request
 
 
class GraphQLHTTPConsumer(BaseGraphQLHTTPConsumer):
    @override
    async def get_context(
        self, request: ChannelsRequest, response: TemporalResponse
    ) -> ChannelsContext:
        return ChannelsContext(
            request=request,
            response=response,
        )
 
 
class GraphQLWSConsumer(BaseGraphQLWSConsumer):
    @override
    async def get_context(
        self, request: ChannelsConsumer, connection_params: Any
    ) -> ChannelsWSContext:
        return ChannelsWSContext(
            request=request,
            connection_params=connection_params,
        )

You can import and use the extended GraphQLHTTPConsumer and GraphQLWSConsumer classes in your myproject.asgi.py file as shown before.


API

GraphQLProtocolTypeRouter

A helper for creating a common strawberry-django ProtocolTypeRouter Implementation.

Example usage:

from strawberry.channels import GraphQLProtocolTypeRouter
from django.core.asgi import get_asgi_application
 
django_asgi = get_asgi_application()
 
from myapi import schema
 
application = GraphQLProtocolTypeRouter(
    schema,
    django_application=django_asgi,
)

This will route all requests to /graphql on either HTTP or websockets to us, and everything else to the Django application.

ChannelsConsumer

Strawberries extended AsyncConsumer .

Every graphql session will have an instance of this class inside info.context["ws"] (WebSockets) or info.context["request"].consumer (HTTP).

properties

@contextlib.asynccontextmanager
async def listen_to_channel(
    self,
    type: str,
    *,
    timeout: float | None = None,
    groups: Sequence[str] | None = None
) -> AsyncGenerator[Any, None]: ...
Edit this page on GitHub