PubSub Ext#

The PubSub Ext is designed to make receiving events from twitch’s PubSub websocket simple. This ext handles all the necessary connection management, authorizing, and dispatching events through TwitchIO’s Client event system.

A quick example#

import twitchio
import asyncio
from twitchio.ext import pubsub

my_token = "..."
users_oauth_token = "..."
users_channel_id = 12345
client = twitchio.Client(token=my_token)
client.pubsub = pubsub.PubSubPool(client)

@client.event()
async def event_pubsub_bits(event: pubsub.PubSubBitsMessage):
    pass # do stuff on bit redemptions

@client.event()
async def event_pubsub_channel_points(event: pubsub.PubSubChannelPointsMessage):
    pass # do stuff on channel point redemptions

async def main():
    topics = [
        pubsub.channel_points(users_oauth_token)[users_channel_id],
        pubsub.bits(users_oauth_token)[users_channel_id]
    ]
    await client.pubsub.subscribe_topics(topics)
    await client.start()

client.loop.run_until_complete(main())

This will connect to to the pubsub server, and subscribe to the channel points and bits events for user 12345, using the oauth token they have given us with the corresponding scopes.

Topics#

Each of the topics below needs to first be called with a user oauth token, and then needs channel id(s) passed to it, as such:

from twitchio.ext import pubsub
user_token = "..."
user_channel_id = 12345
topic = pubsub.bits(user_token)[user_channel_id]

If the topic requires multiple channel ids, they should be passed as such:

from twitchio.ext import pubsub
user_token = "..."
user_channel_id = 12345 # the channel to listen to
mods_channel_id = 67890 # the mod to listen for actions from
topic = pubsub.moderation_user_action(user_token)[user_channel_id][mods_channel_id]
twitchio.ext.pubsub.bits(oauth_token: str)#

This topic listens for bit redemptions on the given channel. This topic dispatches the pubsub_bits client event. This topic takes one channel id, the channel to listen on, e.g.:

from twitchio.ext import pubsub
user_token = "..."
user_channel_id = 12345
topic = pubsub.bits(user_token)[user_channel_id]

This can be received via the following:

import twitchio
from twitchio.ext import pubsub

client = twitchio.Client(token="...")

@client.event()
async def event_pubsub_bits(event: pubsub.PubSubBitsMessage):
    ...
twitchio.ext.pubsub.bits_badge(oauth_token: str)#

This topic listens for bit badge upgrades on the given channel. This topic dispatches the pubsub_bits_badge client event. This topic takes one channel id, the channel to listen on, e.g.:

from twitchio.ext import pubsub
user_token = "..."
user_channel_id = 12345
topic = pubsub.bits_badge(user_token)[user_channel_id]

This can be received via the following:

import twitchio
from twitchio.ext import pubsub

client = twitchio.Client(token="...")

@client.event()
async def event_pubsub_bits_badge(event: pubsub.PubSubBitsBadgeMessage):
    ...
twitchio.ext.pubsub.channel_points(oauth_token: str)#

This topic listens for channel point redemptions on the given channel. This topic dispatches the pubsub_channel_points client event. This topic takes one channel id, the channel to listen on, e.g.:

from twitchio.ext import pubsub
user_token = "..."
user_channel_id = 12345
topic = pubsub.channel_points(user_token)[user_channel_id]

This can be received via the following:

import twitchio
from twitchio.ext import pubsub

client = twitchio.Client(token="...")

@client.event()
async def event_pubsub_channel_points(event: pubsub.PubSubChannelPointsMessage):
    ...
twitchio.ext.pubsub.channel_subscriptions(oauth_token: str)#

This topic listens for subscriptions on the given channel. This topic dispatches the pubsub_subscription client event. This topic takes one channel id, the channel to listen on, e.g.:

from twitchio.ext import pubsub
user_token = "..."
user_channel_id = 12345
topic = pubsub.channel_subscriptions(user_token)[user_channel_id]
twitchio.ext.pubsub.moderation_user_action(oauth_token: str)#

This topic listens for moderation actions on the given channel. This topic dispatches the pubsub_moderation client event. This topic takes two channel ids, the channel to listen on, and the user to listen to, e.g.:

from twitchio.ext import pubsub
user_token = "..."
user_channel_id = 12345
moderator_id = 67890
topic = pubsub.bits_badge(user_token)[user_channel_id][moderator_id]

This event can receive many different events; PubSubModerationActionBanRequest, PubSubModerationActionChannelTerms, PubSubModerationActionModeratorAdd, or PubSubModerationAction

It can be received via the following:

import twitchio
from twitchio.ext import pubsub

client = twitchio.Client(token="...")

@client.event()
async def event_pubsub_moderation(event):
    ...
twitchio.ext.pubsub.whispers(oauth_token: str)#

Warning

This does not have a model created yet, and will error when a whisper event is received

This topic listens for bit badge upgrades on the given channel. This topic dispatches the pubsub_whisper client event. This topic takes one channel id, the channel to listen to whispers from, e.g.:

from twitchio.ext import pubsub
user_token = "..."
listen_to_id = 12345
topic = pubsub.whispers(user_token)[listen_to_id]

Hooks#

There are two hooks available in the PubSubPool class. To access these hooks, subclass the PubSubPool. After subclassing, use the subclass like normal.

The auth_fail_hook is called whenever you attempt to subscribe to a topic and the auth token is invalid. From the hook, you are able to fix your token (maybe you need to prompt the user for a new token), and then subscribe again.

The reconnect_hook is called whenevever a node has to reconnect to twitch, for any reason. The node will wait for you to return a list of topics before reconnecting. Any modifications to the topics will be applied to the node.

from typing import List
from twitchio.ext import pubsub

class MyPool(pubsub.PubSubPool):
    async def auth_fail_hook(self, topics: List[pubsub.Topic]) -> None:
        fixed_topics = fix_my_auth_tokens(topics) # somehow fix your auth tokens
        await self.subscribe_topics(topics)

    async def reconnect_hook(self, node: pubsub.PubSubWebsocket, topics: List[pubsub.Topic]) -> List[pubsub.Topic]:
        return topics

Api Reference#

Attributes
class twitchio.ext.pubsub.Topic(topic, args)#

Represents a PubSub Topic. This should not be created manually, use the provided methods to create these.

token#

The token to use to authorize this topic

Type

str

args#

The arguments to substitute in to the topic string

Type

List[Union[int, Any]]

property present: Optional[str]#

Returns a websocket-ready topic string, if all the arguments needed have been provided. Otherwise returns None

class twitchio.ext.pubsub.PubSubPool(client: Client, *, max_pool_size=10, max_connection_topics=50, mode='group')#

The pool that manages connections to the pubsub server, and handles distributing topics across the connections.

client#

The client that the pool will dispatch events to.

Type

twitchio.Client

coroutine auth_fail_hook(topics: List[Topic])#

This function is a coroutine. This is a hook that can be overridden in a subclass. From this hook, you can refresh expired tokens (or prompt a user for new ones), and resubscribe to the events.

Note

The topics will not be automatically resubscribed to. You must do it yourself by calling subscribe_topics() with the topics after obtaining new tokens.

An example of what this method should do:

class MyPubSubPool(pubsub.PubSubPool):
    async def auth_fail_hook(self, topics: List[pubsub.Topic]):
        token = topics[0].token
        new_token = await some_imaginary_function_that_refreshes_tokens(token)

        for topic in topics:
            topic.token = new_token

        await self.subscribe_topics(topics)
Parameters

topics (List[Topic]) – The topics that have been deauthorized. Typically these will all contain the same token.

coroutine reconnect_hook(node: PubSubWebsocket, topics: List[Topic]) List[Topic]#

This is a low-level hook that can be overridden in a subclass. it is called whenever a node has to reconnect for any reason, from the twitch edge lagging out to being told to by twitch. This hook allows you to modify the topics, potentially updating tokens or removing topics altogether.

Parameters
  • node (PubSubWebsocket) – The node that is reconnecting.

  • topics (List[Topic]) – The topics that this node has.

Returns

List[Topic] – The list of topics this node should have. Any additions, modifications, or removals will be respected.

coroutine subscribe_topics(topics: List[Topic])#

This function is a coroutine. Subscribes to a list of topics.

Parameters

topics (List[Topic]) – The topics to subscribe to

coroutine unsubscribe_topics(topics: List[Topic])#

This function is a coroutine. Unsubscribes from a list of topics.

Parameters

topics (List[Topic]) – The topics to unsubscribe from

Attributes
class twitchio.ext.pubsub.PubSubChatMessage(content: str, id: str, type: str)#

A message received from twitch.

content#

The content received

Type

str

id#

The id of the payload

Type

str

type#

The payload type

Type

str

Attributes
class twitchio.ext.pubsub.PubSubBadgeEntitlement(new: int, old: int)#

A badge entitlement

new#

The new badge

Type

int

old#

The old badge

Type

int

Attributes
class twitchio.ext.pubsub.PubSubMessage(client: Client, topic: Optional[str], data: dict)#

A message from the pubsub websocket

topic#

The topic subscribed to

Type

str

class twitchio.ext.pubsub.PubSubBitsMessage(client: Client, topic: str, data: dict)#

A Bits message

message#

The message sent along with the bits.

Type

PubSubChatMessage

badge_entitlement#

The badges received, if any.

Type

Optional[PubSubBadgeEntitlement]

bits_used#

The amount of bits used.

Type

int

channel_id#

The channel the bits were given to.

Type

int

user#

The user giving the bits. Can be None if anonymous.

Type

Optional[twitchio.PartialUser]

version#

The event version.

Type

str

class twitchio.ext.pubsub.PubSubBitsBadgeMessage(client: Client, topic: str, data: dict)#

A Badge message

user#

The user receiving the badge.

Type

twitchio.PartialUser

channel#

The channel the user received the badge on.

Type

twitchio.Channel

badge_tier#

The tier of the badge

Type

int

message#

The message sent in chat.

Type

str

timestamp#

The time the event happened

Type

datetime.datetime

class twitchio.ext.pubsub.PubSubChannelPointsMessage(client: Client, topic: str, data: dict)#

A Channel points redemption

timestamp#

The timestamp the event happened.

Type

datetime.datetime

channel_id#

The channel the reward was redeemed on.

Type

int

id#

The id of the reward redemption.

Type

str

user#

The user redeeming the reward.

Type

twitchio.PartialUser

reward#

The reward being redeemed.

Type

twitchio.CustomReward

input#

The input the user gave, if any.

Type

Optional[str]

status#

The status of the reward.

Type

str

class twitchio.ext.pubsub.PubSubModerationAction(client: Client, topic: str, data: dict)#

A basic moderation action.

action#

The action taken.

Type

str

args#

The arguments given to the command.

Type

List[str]

created_by#

The user that created the action.

Type

twitchio.PartialUser

message_id#

The id of the message that created this action.

Type

Optional[str]

target#

The target of this action.

Type

twitchio.PartialUser

from_automod#

Whether this action was done automatically or not.

Type

bool

class twitchio.ext.pubsub.PubSubModerationActionBanRequest(client: Client, topic: str, data: dict)#

A Ban/Unban event

action#

The action taken.

Type

str

args#

The arguments given to the command.

Type

List[str]

created_by#

The user that created the action.

Type

twitchio.PartialUser

target#

The target of this action.

Type

twitchio.PartialUser

class twitchio.ext.pubsub.PubSubModerationActionChannelTerms(client: Client, topic: str, data: dict)#

A channel Terms update.

type#

The type of action taken.

Type

str

channel_id#

The channel id the action occurred on.

Type

int

id#

The id of the Term.

Type

str

text#

The text of the modified Term.

Type

str

requester#

The requester of this Term.

Type

twitchio.PartialUser

class twitchio.ext.pubsub.PubSubModerationActionModeratorAdd(client: Client, topic: str, data: dict)#

A moderator add event.

channel_id#

The channel id the moderator was added to.

Type

int

moderation_action#

Redundant.

Type

str

target#

The person who was added as a mod.

Type

twitchio.PartialUser

created_by#

The person who added the mod.

Type

twitchio.PartialUser