Skip to content

tanjun.dependencies#

Default dependency utilities used within Tanjun and their abstract interfaces.

tanjun.dependencies.async_cache #

Interface for an optional asynchronous gateway cache dependency.

This allows you to share data between instances using something like a redis cache (for example) and will be used by standard Tanjun components as well as extensions if implemented.

Note

While there aren't any standard implementations for these interfaces, a Redis implementation of this for the types found in Hikari's gateway cache can be found in hikari-sake >=v1.0.1a1 (exposed by sake.redis.ResourceClient.add_to_tanjun).

Tanjun will use the following type dependencies for these interfaces if they are registered with the client:

  • AsyncCache[str, hikari.InviteWithMetadata]
  • SfCache[hikari.PermissibleGuildChannel]
  • SfCache[hikari.GuildThreadChannel]
  • SfCache[hikari.KnownCustomEmoji]
  • SfCache[hikari.Guild]
  • SfCache[hikari.Role]
  • SfCache[hikari.User]
  • SfGuildBound[hikari.Member]
  • SfGuildBound[hikari.MemberPresence]
  • SfGuildBound[hikari.VoiceState]
  • SfGuildBound[hikari.Role]
  • SingleStoreCache[hikari.OwnUser]
  • SingleStoreCache[hikari.Application]
  • SingleStoreCache[hikari.AuthorizationApplication]

SfCache module-attribute #

SfCache = AsyncCache[Snowflakeish, _ValueT]

Alias of AsyncCache where the key is a snowflake.

SfChannelBound module-attribute #

SfChannelBound = ChannelBoundCache[Snowflakeish, _ValueT]

Alias of ChannelBoundCache where the key is a snowflake.

SfGuildBound module-attribute #

SfGuildBound = GuildBoundCache[Snowflakeish, _ValueT]

Alias of GuildBoundCache where the key is a snowflake.

AsyncCache #

Bases: ABC, Generic[_KeyT, _ValueT]

Abstract interface of a cache which stores globally identifiable resources.

Note

This will never be implemented for resources such as hikari.Member and hikari.MemberPresence which are only unique per-parent resource.

get abstractmethod async #

get(key, /, *, default=...)

Get an entry from this cache by ID.

Parameters:

  • key (_KeyT) –

    Unique key of the entry to get; this will often be a snowflake.

  • default (_DefaultT, default: ... ) –

    The default value to return if an entry wasn't found.

    If provided then no errors will be raised when no entry is found.

Returns:

  • _ValueT | _DefaultT

    The found entry or the default if any was provided.

Raises:

  • CacheMissError

    If the entry wasn't found.

    This won't be raised if default is passed.

  • EntryNotFound

    If the entry wasn't found and the the entry definitely doesn't exist.

    This won't be raised if default is passed.

    This is a specialisation of CacheMissError and thus may be caught as CacheMissError and otherwise would need to be beforeCacheMissError` in a try, multiple catch statement.

iter_all abstractmethod #

iter_all()

Asynchronously iterate over the globally cached entries for this resource.

Note

For more information on how this is used, see the documentation for hikari.LazyIterator.

Returns:

  • CacheIterator[_ValueT]

    An asynchronous iterator of the entries cached globally for this resource.

CacheIterator #

Bases: LazyIterator[_ValueT]

Abstract interface of a cache resource asynchronous iterator.

For more information on how this is used, see the documentation for hikari.LazyIterator.

len abstractmethod async #

len()

Get the length of the target resource.

Note

Unlike CacheIterator.count, this method will not deplete the iterator.

Returns:

  • int

    The length of the targeted resource.

CacheMissError #

Bases: TanjunError

Raised when an entry isn't found in the cache.

Note

EntryNotFound inherits from this error and will only be raised if the cache knows that the entry doesn't exist.

ChannelBoundCache #

Bases: ABC, Generic[_KeyT, _ValueT]

Abstract interface of a cache which stores channel-bound resources.

get_from_channel abstractmethod async #

get_from_channel(channel_id, key, /, *, default=...)

Get an entry from this cache for a specific channel by ID.

Parameters:

  • channel_id (Snowflakeish) –

    ID of the channel to get an entry for.

  • key (_KeyT) –

    Unique key of the entry to get; this will usually be a snowflake.

  • default (_DefaultT, default: ... ) –

    The default value to return if an entry wasn't found.

    If provided then no errors will be raised when no entry is found.

Returns:

  • _ValueT | _DefaultT

    The found entry or the default if any was provided.

Raises:

  • CacheMissError

    If the entry wasn't found.

    This won't be raised if default is passed.

  • EntryNotFound

    If the entry wasn't found and the the entry definitely doesn't exist.

    This won't be raised if default is passed.

    This is a specialisation of CacheMissError and thus may be caught as CacheMissError and otherwise would need to be beforeCacheMissError` in a try, multiple catch statement.

iter_all abstractmethod #

iter_all()

Asynchronously iterate over the globally cached entries for this resource.

Note

For more information on how this is used, see the documentation for hikari.LazyIterator.

Returns:

  • CacheIterator[_ValueT]

    An asynchronous iterator of the entries cached globally for this resource.

iter_for_channel abstractmethod #

iter_for_channel(channel_id)

Asynchronously iterate over the entries entries cached for a channel.

Parameters:

  • channel_id (Snowflakeish) –

    ID of the channel to iterate over the entries cached for.

Returns:

  • CacheIterator[_ValueT]

    An asynchronous iterator of the entries cached for the specified channel.

EntryNotFound #

Bases: CacheMissError

Raised when an entry does not exist.

Note

This is a specialisation of CacheMissError which indicates that the cache is sure that the entry doesn't exist.

GuildBoundCache #

Bases: ABC, Generic[_KeyT, _ValueT]

Abstract interface of a cache which stores guild-bound resources.

get_from_guild abstractmethod async #

get_from_guild(guild_id, key, /, *, default=...)

Get an entry from this cache for a specific guild by ID.

Parameters:

  • guild_id (Snowflakeish) –

    ID of the guild to get an entry for.

  • key (_KeyT) –

    Unique key of the entry to get; this will usually be a snowflake.

  • default (_DefaultT, default: ... ) –

    The default value to return if an entry wasn't found.

    If provided then no errors will be raised when no entry is found.

Returns:

  • _ValueT | _DefaultT

    The found entry or the default if any was provided.

Raises:

  • CacheMissError

    If the entry wasn't found.

    This won't be raised if default is passed.

  • EntryNotFound

    If the entry wasn't found and the the entry definitely doesn't exist.

    This won't be raised if default is passed.

    This is a specialisation of CacheMissError and thus may be caught as CacheMissError and otherwise would need to be beforeCacheMissError` in a try, multiple catch statement.

iter_all abstractmethod #

iter_all()

Asynchronously iterate over the globally cached entries for this resource.

Note

For more information on how this is used, see the documentation for hikari.LazyIterator.

Returns:

  • CacheIterator[_ValueT]

    An asynchronous iterator of the entries cached globally for this resource.

iter_for_guild abstractmethod #

iter_for_guild(guild_id)

Asynchronously iterate over the entries entries cached for a guild.

Note

For more information on how this is used, see the documentation for hikari.LazyIterator.

Parameters:

  • guild_id (Snowflakeish) –

    ID of the guild to iterate over the entries cached for.

Returns:

  • CacheIterator[_ValueT]

    An asynchronous iterator of the entries cached for the specified guild.

SingleStoreCache #

Bases: ABC, Generic[_ValueT]

Abstract interface of a cache which stores one resource.

Note

This is mostly just for the hikari.OwnUser cache store.

get abstractmethod async #

get(*, default=...)

Get the entry.

Parameters:

  • default (_DefaultT, default: ... ) –

    The default value to return if an entry wasn't found.

    If provided then no errors will be raised when no entry is found.

Returns:

  • _ValueT | _DefaultT

    The found entry or the default if any was provided.

Raises:

  • CacheMissError

    If the entry wasn't found.

    This won't be raised if default is passed.

  • EntryNotFound

    If the entry wasn't found and the the entry definitely doesn't exist.

    This won't be raised if default is passed.

    This is a specialisation of CacheMissError and thus may be caught as CacheMissError and otherwise would need to be beforeCacheMissError` in a try, multiple catch statement.

tanjun.dependencies.callbacks #

Callback dependencies used for getting context and client based data.

fetch_my_user async #

fetch_my_user(client, *, me_cache=None)

Fetch the current user from the client's cache or rest client.

Note

This is used in the standard LazyConstant[hikari.users.OwnUser] dependency.

Parameters:

Returns:

Raises:

  • RuntimeError

    If the cache couldn't be used to get the current user and the REST client is not bound to a Bot token.

tanjun.dependencies.data #

Dependency utilities used for managing data.

LazyConstant #

Bases: Generic[_T]

Injected type used to hold and generate lazy constants.

Note

To easily resolve this type use inject_lc.

callback property #

callback

Descriptor of the callback used to get this constant's initial value.

__init__ #

__init__(callback)

Initiate a new lazy constant.

Parameters:

  • callback (CallbackSig[_T]) –

    Callback used to resolve this to a constant value.

    This supports dependency injection and may either be sync or asynchronous.

acquire #

acquire()

Acquire this lazy constant as an asynchronous lock.

This is used to ensure that the value is only generated once and should be kept acquired until LazyConstant.set_value has been called.

Returns:

get_value #

get_value()

Get the value of this constant if set, else None.

reset #

reset()

Clear the internally stored value.

set_value #

set_value(value)

Set the constant value.

Parameters:

  • value (_T) –

    The value to set.

Raises:

cache_callback #

cache_callback(callback, /, *, expire_after=None)

Cache the result of a callback within a dependency injection context.

Note

This is internally used by cached_inject.

Parameters:

  • callback (CallbackSig[_T]) –

    The callback to cache the result of.

  • expire_after (Union[int, float, timedelta, None], default: None ) –

    The amount of time to cache the result for in seconds.

    Leave this as None to cache for the runtime of the application.

Returns:

  • Callable[..., Corouting[Any, Any, _T]]

    A callback which will cache the result of the given callback after the first call.

Raises:

  • ValueError

    If expire_after is not a valid value. If expire_after is not less than or equal to 0 seconds.

cached_inject #

cached_inject(callback, /, *, expire_after=None)

Inject a callback with caching.

This acts like alluka.inject and the result of it should also be assigned to a parameter's default to be used.

Example
async def resolve_database(
    client: tanjun.abc.Client = tanjun.inject(type=tanjun.abc.Client)
) -> Database:
    raise NotImplementedError

@tanjun.as_message_command("command name")
async def command(
    ctx: tanjun.abc.Context, db: Database = tanjun.cached_inject(resolve_database)
) -> None:
    raise NotImplementedError

Parameters:

  • callback (CallbackSig[_T]) –

    The callback to inject.

  • expire_after (Union[float, int, timedelta, None], default: None ) –

    The amount of time to cache the result for in seconds.

    Leave this as None to cache for the runtime of the application.

Returns:

Raises:

  • ValueError

    If expire_after is not a valid value. If expire_after is not less than or equal to 0 seconds.

inject_lc #

inject_lc(type_)

Make a LazyConstant injector.

This acts like alluka.inject and the result of it should also be assigned to a parameter's default to be used.

Note

For this to work, a LazyConstant must've been set as a type dependency for the passed type_.

Parameters:

  • type_ (type[_T]) –

    The type of the constant to resolve.

Returns:

Example
@component.with_command
@tanjun.as_message_command
async def command(
    ctx: tanjun.abc.MessageCommand,
    application: hikari.Application = tanjun.inject_lc(hikari.Application)
) -> None:
    raise NotImplementedError

...

async def resolve_app(
    client: tanjun.abc.Client = tanjun.inject(type=tanjun.abc.Client)
) -> hikari.Application:
    raise NotImplementedError

tanjun.Client.from_gateway_bot(...).set_type_dependency(
    tanjun.LazyConstant[hikari.Application] = tanjun.LazyConstant(resolve_app)
)

make_lc_resolver #

make_lc_resolver(type_)

Make an injected callback which resolves a LazyConstant.

Note

This is internally used by inject_lc.

Note

For this to work, a LazyConstant must've been set as a type dependency for the passed type_.

Parameters:

  • type_ (type[_T]) –

    The type of the constant to resolve.

Returns:

  • Callable[..., Coroutine[Any, Any, _T]]

    An injected callback used to resolve the LazyConstant.

tanjun.dependencies.limiters #

Command cooldown and concurrency limiters.

AbstractConcurrencyBucket #

Bases: ABC

Interface used for implementing custom concurrency limiter buckets for the standard manager.

For more information see InMemoryConcurrencyLimiter.set_custom_bucket.

release abstractmethod async #

release(bucket_id, ctx)

Release a concurrency lock on a bucket.

Parameters:

  • bucket_id (str) –

    The concurrency bucket to release.

  • ctx (Context) –

    The context to release.

Raises:

try_acquire abstractmethod async #

try_acquire(bucket_id, ctx)

Try to acquire a concurrency lock on a bucket.

Parameters:

  • bucket_id (str) –

    The concurrency bucket to acquire.

  • ctx (Context) –

    The context to acquire this bucket lock with.

Raises:

AbstractConcurrencyLimiter #

Bases: ABC

Interface used for limiting command concurrent usage.

acquire #

acquire(bucket_id, ctx, /, *, error=lambda: errors.CommandError('This resource is currently busy; please try again later.'))

Acquire an concurrency lock on a bucket through an async context manager.

Parameters:

  • bucket_id (str) –

    The concurrency bucket to acquire.

  • ctx (Context) –

    The context to acquire this resource lock with.

  • error (Callable[[], Exception], default: lambda: CommandError('This resource is currently busy; please try again later.') ) –

    Callback which returns the error that's raised when the lock couldn't be acquired due to being at it's limit.

    This will be raised on entering the returned context manager and defaults to an English command error.

Returns:

Raises:

  • CommandError

    The default error that's raised while entering the returned async context manager if it couldn't acquire the lock.

release abstractmethod async #

release(bucket_id, ctx)

Release a concurrency lock on a bucket.

Parameters:

  • bucket_id (str) –

    The concurrency bucket to release.

  • ctx (Context) –

    The context to release.

Raises:

try_acquire abstractmethod async #

try_acquire(bucket_id, ctx)

Try to acquire a concurrency lock on a bucket.

Parameters:

  • bucket_id (str) –

    The concurrency bucket to acquire.

  • ctx (Context) –

    The context to acquire this bucket lock with.

Raises:

AbstractCooldownBucket #

Bases: ABC

Interface used for implementing custom cooldown buckets for the standard manager.

For more information see InMemoryCooldownManager.set_custom_bucket.

release abstractmethod async #

release(bucket_id, ctx)

Release a bucket's cooldown.

Parameters:

  • bucket_id (str) –

    The bucket to decrement a cooldown for.

  • ctx (Context) –

    Context of the command call.

Raises:

  • ResourceNotTracked

    If the passed bucket ID and context are not currently contributing towards the cooldown.

try_acquire abstractmethod async #

try_acquire(bucket_id, ctx)

Increment a bucket's cooldown.

Parameters:

  • bucket_id (str) –

    The bucket to increment a cooldown for.

  • ctx (Context) –

    Context of the command call

Raises:

AbstractCooldownManager #

Bases: ABC

Interface used for managing command cooldowns.

acquire #

acquire(bucket_id, ctx, /, error=lambda cooldown: errors.CommandError('This command is currently in cooldown.' + f' Try again {conversion.from_datetime(cooldown, style='R')}.' if cooldown else ''))

Acquire a cooldown lock on a bucket through an async context manager.

Parameters:

  • bucket_id (str) –

    The cooldown bucket to acquire.

  • ctx (Context) –

    The context to acquire this resource lock with.

  • error (Callable[[Optional[datetime]], Exception], default: lambda cooldown: CommandError('This command is currently in cooldown.' + f' Try again {from_datetime(cooldown, style='R')}.' if cooldown else '') ) –

    Callback which returns the error that's raised when the lock couldn't be acquired due to it being on cooldown.

    This will be raised on entering the returned context manager and defaults to an English command error.

Returns:

Raises:

  • CommandError

    The default error that's raised while entering the returned async context manager if it couldn't acquire the lock.

check_cooldown abstractmethod async #

check_cooldown(bucket_id, ctx, /, *, increment=False)

Deprecated method.

increment_cooldown async #

increment_cooldown(bucket_id, ctx)

Deprecated function for incrementing a cooldown.

Use AbstractCooldownManager.acquire and AbstractCooldownManager.release.

release abstractmethod async #

release(bucket_id, ctx)

Release a bucket's cooldown.

Parameters:

  • bucket_id (str) –

    The bucket to decrement a cooldown for.

  • ctx (Context) –

    Context of the command call.

Raises:

  • ResourceNotTracked

    If the passed bucket ID and context are not currently contributing towards the cooldown.

try_acquire abstractmethod async #

try_acquire(bucket_id, ctx)

Increment a bucket's cooldown.

Parameters:

  • bucket_id (str) –

    The bucket to increment a cooldown for.

  • ctx (Context) –

    Context of the command call

Raises:

BucketResource #

Bases: int, Enum

Resource target types used within command cooldowns and concurrency limiters.

CHANNEL class-attribute instance-attribute #

CHANNEL = 2

A per-channel resource bucket.

GLOBAL class-attribute instance-attribute #

GLOBAL = 7

A global resource bucket.

GUILD class-attribute instance-attribute #

GUILD = 6

A per-guild resource bucket.

When executed in a DM this will be per-DM.

MEMBER class-attribute instance-attribute #

MEMBER = 1

A per-guild member resource bucket.

When executed in a DM this will be per-DM.

PARENT_CHANNEL class-attribute instance-attribute #

PARENT_CHANNEL = 3

A per-parent channel resource bucket.

For DM channels this will be per-DM, for guild channels with no parents this'll be per-guild.

TOP_ROLE class-attribute instance-attribute #

TOP_ROLE = 5

A per-highest role resource bucket.

When executed in a DM this will be per-DM, with this defaulting to targeting the @everyone role if they have no real roles.

USER class-attribute instance-attribute #

USER = 0

A per-user resource bucket.

ConcurrencyPostExecution #

Post-execution hook used to release a bucket concurrency limiter.

__init__ #

__init__(bucket_id)

Initialise a concurrency post-execution hook.

Parameters:

  • bucket_id (str) –

    The concurrency limit bucket's ID.

ConcurrencyPreExecution #

Pre-execution hook used to acquire a bucket concurrency limiter.

__init__ #

__init__(bucket_id, /, *, error=None, error_message='This resource is currently busy; please try again later.')

Initialise a concurrency pre-execution hook.

Parameters:

  • bucket_id (str) –

    The concurrency limit bucket's ID.

  • error (Optional[Callable[[str], Exception]], default: None ) –

    Callback used to create a custom error to raise if the check fails.

    This should two one str argument which is the limiting bucket's ID.

    This takes priority over error_message.

  • error_message (Union[str, Mapping[str, str]], default: 'This resource is currently busy; please try again later.' ) –

    The error message to send in response as a command error if this fails to acquire the concurrency limit.

    This supports localisation and uses the check name "tanjun.concurrency" for global overrides.

CooldownDepleted #

Bases: ResourceDepleted

Raised when a cooldown bucket is already depleted.

wait_until instance-attribute #

wait_until = wait_until

When this resource will next be available, if known.

CooldownPostExecution #

Post-execution hook used to manager a command's cooldown.

CooldownPreExecution #

Pre-execution hook used to manage a command's cooldowns.

To avoid race-conditions this handles both erroring when the bucket is hit instead and incrementing the bucket's use counter.

__init__ #

__init__(bucket_id, /, *, error=None, error_message='This command is currently in cooldown. Try again {cooldown}.', unknown_message=None, owners_exempt=True)

Initialise a pre-execution cooldown command hook.

Parameters:

  • bucket_id (str) –

    The cooldown bucket's ID.

  • error (Optional[Callable[[str, Optional[datetime]], Exception]], default: None ) –

    Callback used to create a custom error to raise if the check fails.

    This should two arguments one of type str and datetime.datetime | None where the first is the limiting bucket's ID and the second is when said bucket can be used again if known.

    This takes priority over error_message.

  • error_message (Union[str, Mapping[str, str]], default: 'This command is currently in cooldown. Try again {cooldown}.' ) –

    The error message to send in response as a command error if the check fails.

    This supports localisation and uses the check name "tanjun.cooldown" for global overrides.

  • unknown_message (Union[str, Mapping[str, str], None], default: None ) –

    Response error message for when cooldown is unknown.

    This supports localisation and uses the check name "tanjun.cooldown_unknown" for global overrides.

    This defaults to error_message but takes no format args.

  • owners_exempt (bool, default: True ) –

    Whether owners should be exempt from the cooldown.

InMemoryConcurrencyLimiter #

Bases: AbstractConcurrencyLimiter

In-memory standard implementation of AbstractConcurrencyLimiter.

Examples:

InMemoryConcurrencyLimiter.set_bucket may be used to set the concurrency limits for a specific bucket:

(
    InMemoryConcurrencyLimiter()
    # Set the default bucket template to 10 concurrent uses of the command per-user.
    .set_bucket("default", tanjun.BucketResource.USER, 10)
    # Set the "moderation" bucket with a limit of 5 concurrent uses per-guild.
    .set_bucket("moderation", tanjun.BucketResource.GUILD, 5)
    # add_to_client will setup the concurrency manager (setting it as an
    # injected dependency and registering callbacks to manage it).
    .add_to_client(client)
)

acquire #

acquire(bucket_id, ctx, /, *, error=lambda: errors.CommandError('This resource is currently busy; please try again later.'))

Acquire an concurrency lock on a bucket through an async context manager.

Parameters:

  • bucket_id (str) –

    The concurrency bucket to acquire.

  • ctx (Context) –

    The context to acquire this resource lock with.

  • error (Callable[[], Exception], default: lambda: CommandError('This resource is currently busy; please try again later.') ) –

    Callback which returns the error that's raised when the lock couldn't be acquired due to being at it's limit.

    This will be raised on entering the returned context manager and defaults to an English command error.

Returns:

Raises:

  • CommandError

    The default error that's raised while entering the returned async context manager if it couldn't acquire the lock.

add_to_client #

add_to_client(client)

Add this concurrency manager to a tanjun client.

Note

This registers the manager as a type dependency and manages opening and closing the manager based on the client's life cycle.

Parameters:

  • client (Client) –

    The client to add this concurrency manager to.

close #

close()

Stop the concurrency manager.

Raises:

  • RuntimeError

    If the concurrency manager is not running.

disable_bucket #

disable_bucket(bucket_id)

Disable a concurrency limit bucket.

This will stop the bucket from ever hitting a concurrency limit and also prevents the bucket from defaulting.

Note

"default" is a special bucket_id which is used as a template for unknown bucket IDs.

Parameters:

  • bucket_id (str) –

    The bucket to disable.

Returns:

  • Self

    This concurrency manager to allow for chaining.

open #

open(*, _loop=None)

Start the concurrency manager.

Raises:

  • RuntimeError

    If the concurrency manager is already running. If called in a thread with no running event loop.

set_bucket #

set_bucket(bucket_id, resource, limit)

Set the concurrency limit for a specific bucket.

Note

"default" is a special bucket_id which is used as a template for unknown bucket IDs.

Parameters:

  • bucket_id (str) –

    The ID of the bucket to set the concurrency limit for.

  • resource (BucketResource) –

    The type of resource to target for the concurrency limit.

  • limit (int) –

    The maximum number of concurrent uses to allow.

Returns:

  • Self

    The concurrency manager to allow call chaining.

Raises:

  • ValueError

    If any of the following cases are met:

    • If an invalid resource is passed.
    • If limit is less 0 or negative.

set_custom_bucket #

set_custom_bucket(resource, /, *bucket_ids)

Set a custom concurrency limit resource.

Parameters:

  • resource (AbstractConcurrencyBucket) –

    Object which handles the concurrency limits for these buckets.

  • bucket_ids (str, default: () ) –

    IDs of buckets to set this custom resource for.

Returns:

  • Self

    The concurrency manager to allow call chaining.

Examples:

class CustomBucket(tanjun.dependencies.AbstractConcurrencyBucket):
    __slots__ = ()

    async def try_acquire(
        self, bucket_id: str, ctx: tanjun.abc.Context, /
    ) -> bool:
        # ResourceDepleted should be raised if this couldn't be acquired.
        raise tanjun.dependencies.ResourceDepleted

    async def release(
        self, bucket_id: str, ctx: tanjun.abc.Context, /
    ) -> None:
        ...

(
    tanjun.dependencies.InMemoryConcurrencyLimiter()
    .set_custom_bucket(CustomBucket(), "BUCKET_ID", "OTHER_BUCKET_ID")
)

InMemoryCooldownManager #

Bases: AbstractCooldownManager

In-memory standard implementation of AbstractCooldownManager.

Examples:

InMemoryCooldownManager.set_bucket may be used to set the cooldown for a specific bucket:

(
    InMemoryCooldownManager()
    # Set the default bucket template to a per-user 10 uses per-60 seconds cooldown.
    .set_bucket("default", tanjun.BucketResource.USER, 10, 60)
    # Set the "moderation" bucket to a per-guild 100 uses per-5 minutes cooldown.
    .set_bucket("moderation", tanjun.BucketResource.GUILD, 100, datetime.timedelta(minutes=5))
    # add_to_client will setup the cooldown manager (setting it as an
    # injected dependency and registering callbacks to manage it).
    .add_to_client(client)
)

acquire #

acquire(bucket_id, ctx, /, error=lambda cooldown: errors.CommandError('This command is currently in cooldown.' + f' Try again {conversion.from_datetime(cooldown, style='R')}.' if cooldown else ''))

Acquire a cooldown lock on a bucket through an async context manager.

Parameters:

  • bucket_id (str) –

    The cooldown bucket to acquire.

  • ctx (Context) –

    The context to acquire this resource lock with.

  • error (Callable[[Optional[datetime]], Exception], default: lambda cooldown: CommandError('This command is currently in cooldown.' + f' Try again {from_datetime(cooldown, style='R')}.' if cooldown else '') ) –

    Callback which returns the error that's raised when the lock couldn't be acquired due to it being on cooldown.

    This will be raised on entering the returned context manager and defaults to an English command error.

Returns:

Raises:

  • CommandError

    The default error that's raised while entering the returned async context manager if it couldn't acquire the lock.

add_to_client #

add_to_client(client)

Add this cooldown manager to a tanjun client.

Note

This registers the manager as a type dependency and manages opening and closing the manager based on the client's life cycle.

Parameters:

  • client (Client) –

    The client to add this cooldown manager to.

close #

close()

Stop the cooldown manager.

Raises:

disable_bucket #

disable_bucket(bucket_id)

Disable a cooldown bucket.

This will stop the bucket from ever hitting a cooldown and also prevents the bucket from defaulting.

Note

"default" is a special bucket_id which is used as a template for unknown bucket IDs.

Parameters:

  • bucket_id (str) –

    The bucket to disable.

Returns:

  • Self

    This cooldown manager to allow for chaining.

increment_cooldown async #

increment_cooldown(bucket_id, ctx)

Deprecated function for incrementing a cooldown.

Use AbstractCooldownManager.acquire and AbstractCooldownManager.release.

open #

open(*, _loop=None)

Start the cooldown manager.

Raises:

  • RuntimeError

    If the cooldown manager is already running. If called in a thread with no running event loop.

set_bucket #

set_bucket(bucket_id, resource, limit, reset_after)

Set the cooldown for a specific bucket.

Note

"default" is a special bucket_id which is used as a template for unknown bucket IDs.

Parameters:

  • bucket_id (str) –

    The ID of the bucket to set the cooldown for.

  • resource (BucketResource) –

    The type of resource to target for the cooldown.

  • limit (int) –

    The number of uses per cooldown period.

  • reset_after (Union[int, float, timedelta]) –

    The cooldown period.

Returns:

  • Self

    The cooldown manager to allow call chaining.

Raises:

  • ValueError

    If any of the following cases are met:

    • If an invalid resource is passed.
    • If reset_after or limit are negative, 0 or invalid.
    • If limit is less 0 or negative.

set_custom_bucket #

set_custom_bucket(resource, /, *bucket_ids)

Set a custom cooldown limit resource.

Parameters:

  • resource (AbstractCooldownBucket) –

    Object which handles the cooldowns for these buckets.

  • bucket_ids (str, default: () ) –

    IDs of buckets to set this custom resource for.

Returns:

  • Self

    The cooldown manager to allow call chaining.

Examples:

class CustomBucket(tanjun.dependencies.AbstractCooldownBucket):
    __slots__ = ()

    async def try_acquire(
        self, bucket_id: str, ctx: tanjun.abc.Context, /
    ) -> None:
        # CooldownDepleted should be raised if this couldn't be acquired.
        raise tanjun.dependencies.CooldownDepleted(None)

    async def release(
        self, bucket_id: str, ctx: tanjun.abc.Context, /
    ) -> None:
        ...

(
    tanjun.dependencies.InMemoryCooldownManager()
    .set_custom_bucket(CustomBucket(), "BUCKET_ID", "OTHER_BUCKET_ID")
)

ResourceDepleted #

Bases: Exception

Raised when a cooldown or concurrency limit bucket has already been depleted.

ResourceNotTracked #

Bases: Exception

Raised when a cooldown or concurrency bucket is not being tracked for a context.

add_concurrency_limit #

add_concurrency_limit(command, bucket_id, /, *, error=None, error_message='This resource is currently busy; please try again later.')

Add the hooks used to manage a command's concurrency limit.

Warning

Concurrency limiters will only work if there's a setup injected AbstractConcurrencyLimiter dependency with InMemoryConcurrencyLimiter being usable as a standard in-memory concurrency manager.

Parameters:

  • command (ExecutableCommand[Any]) –

    The command to add the concurrency limit to.

  • bucket_id (str) –

    The concurrency limit bucket's ID.

  • error (Optional[Callable[[str], Exception]], default: None ) –

    Callback used to create a custom error to raise if the check fails.

    This should two one str argument which is the limiting bucket's ID.

    This takes priority over error_message.

  • error_message (Union[str, Mapping[str, str]], default: 'This resource is currently busy; please try again later.' ) –

    The error message to send in response as a command error if this fails to acquire the concurrency limit.

    This supports localisation and uses the check name "tanjun.concurrency" for global overrides.

add_cooldown #

add_cooldown(command, bucket_id, /, *, error=None, error_message='This command is currently in cooldown. Try again {cooldown}.', unknown_message=None, owners_exempt=True)

Add a pre-execution hook used to manage a command's cooldown.

Warning

Cooldowns will only work if there's a setup injected AbstractCooldownManager dependency with InMemoryCooldownManager being usable as a standard in-memory cooldown manager.

Parameters:

  • command (ExecutableCommand[Any]) –

    The command to add a cooldown to.

  • bucket_id (str) –

    The cooldown bucket's ID.

  • error (Optional[Callable[[str, Optional[datetime]], Exception]], default: None ) –

    Callback used to create a custom error to raise if the check fails.

    This should two arguments one of type str and datetime.datetime | None where the first is the limiting bucket's ID and the second is when said bucket can be used again if known.

    This takes priority over error_message.

  • error_message (Union[str, Mapping[str, str]], default: 'This command is currently in cooldown. Try again {cooldown}.' ) –

    The error message to send in response as a command error if the check fails.

    This supports localisation and uses the check name "tanjun.cooldown" for global overrides.

  • unknown_message (Union[str, Mapping[str, str], None], default: None ) –

    Response error message for when cooldown is unknown.

    This supports localisation and uses the check name "tanjun.cooldown_unknown" for global overrides.

    This defaults to error_message but takes no format args.

  • owners_exempt (bool, default: True ) –

    Whether owners should be exempt from the cooldown.

with_concurrency_limit #

with_concurrency_limit(bucket_id, /, *, error=None, error_message='This resource is currently busy; please try again later.', follow_wrapped=False)

Add the hooks used to manage a command's concurrency limit through a decorator call.

Warning

Concurrency limiters will only work if there's a setup injected AbstractConcurrencyLimiter dependency with InMemoryConcurrencyLimiter being usable as a standard in-memory concurrency manager.

Parameters:

  • bucket_id (str) –

    The concurrency limit bucket's ID.

  • error (Optional[Callable[[str], Exception]], default: None ) –

    Callback used to create a custom error to raise if the check fails.

    This should two one str argument which is the limiting bucket's ID.

    This takes priority over error_message.

  • error_message (Union[str, Mapping[str, str]], default: 'This resource is currently busy; please try again later.' ) –

    The error message to send in response as a command error if this fails to acquire the concurrency limit.

    This supports localisation and uses the check name "tanjun.concurrency" for global overrides.

  • follow_wrapped (bool, default: False ) –

    Whether to also add this check to any other command objects this command wraps in a decorator call chain.

Returns:

  • Callable[[ExecutableCommand], ExecutableCommand]

    A decorator which adds the concurrency limiter hooks to a command.

with_cooldown #

with_cooldown(bucket_id, /, *, error=None, error_message='This command is currently in cooldown. Try again {cooldown}.', unknown_message=None, follow_wrapped=False, owners_exempt=True)

Add a pre-execution hook used to manage a command's cooldown through a decorator call.

Warning

Cooldowns will only work if there's a setup injected AbstractCooldownManager dependency with InMemoryCooldownManager being usable as a standard in-memory cooldown manager.

Parameters:

  • bucket_id (str) –

    The cooldown bucket's ID.

  • error (Optional[Callable[[str, Optional[datetime]], Exception]], default: None ) –

    Callback used to create a custom error to raise if the check fails.

    This should two arguments one of type str and datetime.datetime | None where the first is the limiting bucket's ID and the second is when said bucket can be used again if known.

    This takes priority over error_message.

  • error_message (Union[str, Mapping[str, str]], default: 'This command is currently in cooldown. Try again {cooldown}.' ) –

    The error message to send in response as a command error if the check fails.

    This supports localisation and uses the check name "tanjun.cooldown" for global overrides.

  • unknown_message (Union[str, Mapping[str, str], None], default: None ) –

    Response error message for when cooldown is unknown.

    This supports localisation and uses the check name "tanjun.cooldown_unknown" for global overrides.

    This defaults to error_message but takes no format args.

  • follow_wrapped (bool, default: False ) –

    Whether to also add this check to any other command objects this command wraps in a decorator call chain.

  • owners_exempt (bool, default: True ) –

    Whether owners should be exempt from the cooldown.

Returns:

  • Callable[[ExecutableCommand], ExecutableCommand]

    A decorator which adds the relevant cooldown hooks.

tanjun.dependencies.locales #

Dependency used for managing localising strings around interactions commands.

AbstractLocalizer module-attribute #

AbstractLocalizer = AbstractLocaliser

Alias of AbstractLocaliser.

BasicLocalizer module-attribute #

BasicLocalizer = BasicLocaliser

Alias of BasicLocaliser.

AbstractLocaliser #

Bases: ABC

Abstract class of a string localiser.

get_all_variants abstractmethod #

get_all_variants(identifier, /, **kwargs)

Get all the localisation variants for an identifier.

localise abstractmethod #

localise(identifier, tag, /, **kwargs)

Localise a string with the given identifier and arguments.

Parameters:

  • identifier (str) –

    The unique identifier of the string to localise.

    This may be in any format but the formats used by the standard implementations can be found at client-localiser.

  • tag (str) –

    The "IETF lang tag" to localise the string to.

    This should usually be a hikari.Locale.

  • **kwargs (Any, default: {} ) –

    Key-word arguments to pass to the string as format args.

Returns:

  • str

    The localised string.

localize #

localize(identifier, tag, /, **kwargs)

Alias for AbstractLocaliser.localise.

BasicLocaliser #

Bases: AbstractLocaliser

Standard implementation of AbstractLocaliser with only basic text mapping support.

__init__ #

__init__()

Initialise a new BasicLocaliser.

add_to_client #

add_to_client(client)

Add this global localiser to a tanjun client.

Note

This registers the manager as a type dependency to let Tanjun use it.

Parameters:

  • client (Client) –

    The client to add this global localiser to.

localize #

localize(identifier, tag, /, **kwargs)

Alias for AbstractLocaliser.localise.

set_variants #

set_variants(identifier, variants=None, /, **other_variants)

Set the variants for a localised field.

Parameters:

  • identifier (str) –

    Identifier of the field to set the localised variants for.

    This may be in any format but the formats used by the standard implementations can be found at client-localiser.

  • variants (Optional[Mapping[str, str]], default: None ) –

    Mapping of hikari.Locales to the localised values.

Returns:

  • Self

    The localiser object to enable chained calls.

tanjun.dependencies.owners #

Dependency used for managing owner checks.

AbstractOwners #

Bases: ABC

Interface used to check if a user is deemed to be the bot's "owner".

check_ownership abstractmethod async #

check_ownership(client, user)

Check whether this object is owned by the given object.

Parameters:

  • client (Client) –

    The Tanjun client this check is being called by.

  • user (User) –

    The user to check ownership for.

Returns:

  • bool

    Whether the bot is owned by the provided user.

Owners #

Bases: AbstractOwners

Default implementation of the owner check interface.

Warning

fallback_to_application is only possible when the REST client is bound to a Bot token or if a type dependency is registered for tanjun.dependencies.SingleStoreCache[hikari.Application].

__init__ #

__init__(*, expire_after=datetime.timedelta(minutes=5), fallback_to_application=True, owners=None)

Initiate a new owner check dependency.

Parameters:

  • expire_after (Union[timedelta, int, float], default: timedelta(minutes=5) ) –

    The amount of time to cache application owner data for in seconds.

    This is only applicable if rest is also passed.

  • fallback_to_application (bool, default: True ) –

    Whether this check should fallback to checking the application's owners if the user isn't in owners.

    This only works when the bot's rest client is bound to a Bot token or if tanjun.dependencies.SingleStoreCache[hikari.Application] is available.

  • owners (Optional[SnowflakeishSequence[User]], default: None ) –

    Sequence of objects and IDs of the users that are allowed to use the bot's owners-only commands.

tanjun.dependencies.reloaders #

Implementation of a hot reloader for Tanjun.

HotReloader #

Manages hot reloading modules for a Tanjun client..

Warning

An instance of this can only be linked to 1 client.

Examples:

client = tanjun.Client.from_gateway_bot(bot)
(
    tanjun.dependencies.HotReloader()
    .add_modules("python.module.path", pathlib.Path("./module.py"))
    .add_directory("./modules/")
    .add_to_client(client)
)

__init__ #

__init__(*, commands_guild=None, interval=datetime.timedelta(microseconds=500000), redeclare_cmds_after=datetime.timedelta(seconds=10), unload_on_delete=True)

Initialise a hot reloader.

Warning

redeclare_cmds_after is not aware of commands declared outside of the reloader and will lead to commands being redeclared on startup when mixed with tanjun.clients.Client.__init__'s declare_global_commands argument when it is not None.

Parameters:

  • commands_guild (Optional[SnowflakeishOr[PartialGuild]], default: None ) –

    Object or ID of the guild to declare commands in if redeclare_cmds_after is not None.

  • interval (Union[int, float, timedelta], default: timedelta(microseconds=500000) ) –

    How often this should scan files and directories for changes in seconds.

  • redeclare_cmds_after (Union[int, float, timedelta, None], default: timedelta(seconds=10) ) –

    How often to redeclare application commands after a change to the commands is detected.

    If None is passed here then this will not redeclare the application's commands.

  • unload_on_delete (bool, default: True ) –

    Whether this should unload modules when their relevant file is deleted.

add_directory #

add_directory(directory, /, *, namespace=None)

Add a directory for this hot reloader to track.

Note

This will only reload modules directly in the target directory and will not scan sub-directories.

Parameters:

  • directory (Union[str, Path]) –

    Path of the directory to hot reload.

  • namespace (Optional[str], default: None ) –

    The python namespace this directory's modules should be imported from, if applicable.

    This work as {namespace}.{file.name.removesuffix(".py")} and will have the same behaviour as when a str is passed to Client.load_modules if passed.

    If left as None then this will have the same behaviour as when a pathlib.Path is passed to Client.load_modules.

Returns:

  • Self

    The hot reloader to enable chained calls.

Raises:

add_directory_async async #

add_directory_async(directory, /, *, namespace=None)

Asynchronous variant of HotReloader.add_directory.

Unlike HotReloader.add_directory, this method will run blocking code in a background thread.

For more information on the behaviour of this method see the documentation for HotReloader.add_directory.

add_modules #

add_modules(*paths)

Add modules for this hot reloader to track.

Parameters:

  • *paths (Union[str, Path], default: () ) –

    Module paths for this hot reloader to track.

    This has the same behaviour as [tanjun.abc.Client.load_modules][ for how [pathlib.Path][] and str are treated.

Raises:

add_modules_async async #

add_modules_async(*paths)

Asynchronous variant of HotReloader.add_modules.

Unlike HotReloader.add_modules, this method will run blocking code in a background thread.

For more information on the behaviour of this method see the documentation for HotReloader.add_modules.

add_to_client #

add_to_client(client)

Add this to a tanjun.abc.Client instance.

This registers start and closing callbacks which handle the lifetime of this and adds this as a type dependency.

Parameters:

  • client (Client) –

    The client to link this hot reloader to.

scan async #

scan(client)

Manually scan this hot reloader's tracked modules for changes.

Parameters:

  • client (Client) –

    The client to reload and unload modules in.

start #

start(client)

Start the hot reloader.

Raises:

stop #

stop()

Stop the hot reloader.

Raises: