Skip to content

tanjun.dependencies#

Default dependency utilities used within Tanjun and their abstract interfaces.

tanjun.dependencies.async_cache #

Interface for an optional asynchronous gateway cache dependency.

This allows you to share data between instances using something like a redis cache (for example) and will be used by standard Tanjun components as well as extensions if implemented.

Note

While there aren't any standard implementations for these interfaces, a Redis implementation of this for the types found in Hikari's gateway cache can be found in hikari-sake >=v1.0.1a1 (exposed by sake.redis.ResourceClient.add_to_tanjun).

Tanjun will use the following type dependencies for these interfaces if they are registered with the client:

  • AsyncCache[str, hikari.InviteWithMetadata]
  • SfCache[hikari.GuildChannel]
  • SfCache[hikari.KnownCustomEmoji]
  • SfCache[hikari.Guild]
  • SfCache[hikari.Role]
  • SfCache[hikari.User]
  • SfGuildBound[hikari.Member]
  • SfGuildBound[hikari.MemberPresence]
  • SfGuildBound[hikari.VoiceState]
  • SfGuildBound[hikari.Role]
  • SingleStoreCache[hikari.OwnUser]
  • SingleStoreCache[hikari.Application]
  • SingleStoreCache[hikari.AuthorizationApplication]

SfCache = AsyncCache[hikari.Snowflakeish, _ValueT] module-attribute #

Alias of tanjun.dependencies.AsyncCache where the key is a snowflake.

SfChannelBound = ChannelBoundCache[hikari.Snowflakeish, _ValueT] module-attribute #

Alias of tanjun.dependencies.ChannelBoundCache where the key is a snowflake.

SfGuildBound = GuildBoundCache[hikari.Snowflakeish, _ValueT] module-attribute #

Alias of tanjun.dependencies.GuildBoundCache where the key is a snowflake.

AsyncCache #

Bases: abc.ABC, typing.Generic[_KeyT, _ValueT]

Abstract interface of a cache which stores globally identifiable resources.

Note

This will never be implemented for resources such as hikari.guilds.Member and hikari.presences.MemberPresence which are only unique per-parent resource.

get(key, /, *, default=Ellipsis) async abstractmethod #

Get an entry from this cache by ID.

PARAMETER DESCRIPTION
key

Unique key of the entry to get; this will often be a snowflake.

TYPE: _KeyT

default

The default value to return if an entry wasn't found.

If provided then no errors will be raised when no entry is found.

TYPE: _DefaultT DEFAULT: Ellipsis

RETURNS DESCRIPTION
_ValueT | _DefaultT

The found entry or the default if any was provided.

RAISES DESCRIPTION
CacheMissError

If the entry wasn't found.

This won't be raised if default is passed.

EntryNotFound

If the entry wasn't found and the the entry definitely doesn't exist.

This won't be raised if default is passed.

This is a specialisation of CacheMissError and thus may be caught as CacheMissError and otherwise would need to be beforeCacheMissError` in a try, multiple catch statement.

iter_all() abstractmethod #

Asynchronously iterate over the globally cached entries for this resource.

Note

For more information on how this is used, see the documentation for hikari.iterators.LazyIterator.

RETURNS DESCRIPTION
CacheIterator[_ValueT]

An asynchronous iterator of the entries cached globally for this resource.

CacheIterator #

Bases: hikari.LazyIterator[_ValueT]

Abstract interface of a cache resource asynchronous iterator.

For more information on how this is used, see the documentation for hikari.iterators.LazyIterator.

len() async abstractmethod #

Get the length of the target resource.

Note

Unlike [tanjun.dependencies.CacheIterator.count][], this method will not deplete the iterator.

RETURNS DESCRIPTION
int

The length of the targeted resource.

CacheMissError #

Bases: errors.TanjunError

Raised when an entry isn't found in the cache.

Note

EntryNotFound inherits from this error and will only be raised if the cache knows that the entry doesn't exist.

ChannelBoundCache #

Bases: abc.ABC, typing.Generic[_KeyT, _ValueT]

Abstract interface of a cache which stores channel-bound resources.

get_from_channel(channel_id, key, /, *, default=Ellipsis) async abstractmethod #

Get an entry from this cache for a specific channel by ID.

PARAMETER DESCRIPTION
channel_id

ID of the channel to get an entry for.

TYPE: hikari.Snowflakeish

key

Unique key of the entry to get; this will usually be a snowflake.

TYPE: _KeyT

default

The default value to return if an entry wasn't found.

If provided then no errors will be raised when no entry is found.

TYPE: _DefaultT DEFAULT: Ellipsis

RETURNS DESCRIPTION
_ValueT | _DefaultT

The found entry or the default if any was provided.

RAISES DESCRIPTION
CacheMissError

If the entry wasn't found.

This won't be raised if default is passed.

EntryNotFound

If the entry wasn't found and the the entry definitely doesn't exist.

This won't be raised if default is passed.

This is a specialisation of CacheMissError and thus may be caught as CacheMissError and otherwise would need to be beforeCacheMissError` in a try, multiple catch statement.

iter_all() abstractmethod #

Asynchronously iterate over the globally cached entries for this resource.

Note

For more information on how this is used, see the documentation for hikari.iterators.LazyIterator.

RETURNS DESCRIPTION
CacheIterator[_ValueT]

An asynchronous iterator of the entries cached globally for this resource.

iter_for_channel(channel_id) abstractmethod #

Asynchronously iterate over the entries entries cached for a channel.

PARAMETER DESCRIPTION
channel_id

ID of the channel to iterate over the entries cached for.

TYPE: hikari.Snowflakeish

RETURNS DESCRIPTION
CacheIterator[_ValueT]

An asynchronous iterator of the entries cached for the specified channel.

EntryNotFound #

Bases: CacheMissError

Raised when an entry does not exist.

Note

This is a specialisation of CacheMissError which indicates that the cache is sure that the entry doesn't exist.

GuildBoundCache #

Bases: abc.ABC, typing.Generic[_KeyT, _ValueT]

Abstract interface of a cache which stores guild-bound resources.

get_from_guild(guild_id, key, /, *, default=Ellipsis) async abstractmethod #

Get an entry from this cache for a specific guild by ID.

PARAMETER DESCRIPTION
guild_id

ID of the guild to get an entry for.

TYPE: hikari.Snowflakeish

key

Unique key of the entry to get; this will usually be a snowflake.

TYPE: _KeyT

default

The default value to return if an entry wasn't found.

If provided then no errors will be raised when no entry is found.

TYPE: _DefaultT DEFAULT: Ellipsis

RETURNS DESCRIPTION
_ValueT | _DefaultT

The found entry or the default if any was provided.

RAISES DESCRIPTION
CacheMissError

If the entry wasn't found.

This won't be raised if default is passed.

EntryNotFound

If the entry wasn't found and the the entry definitely doesn't exist.

This won't be raised if default is passed.

This is a specialisation of CacheMissError and thus may be caught as CacheMissError and otherwise would need to be beforeCacheMissError` in a try, multiple catch statement.

iter_all() abstractmethod #

Asynchronously iterate over the globally cached entries for this resource.

Note

For more information on how this is used, see the documentation for hikari.iterators.LazyIterator.

RETURNS DESCRIPTION
CacheIterator[_ValueT]

An asynchronous iterator of the entries cached globally for this resource.

iter_for_guild(guild_id) abstractmethod #

Asynchronously iterate over the entries entries cached for a guild.

Note

For more information on how this is used, see the documentation for hikari.iterators.LazyIterator.

PARAMETER DESCRIPTION
guild_id

ID of the guild to iterate over the entries cached for.

TYPE: hikari.Snowflakeish

RETURNS DESCRIPTION
CacheIterator[_ValueT]

An asynchronous iterator of the entries cached for the specified guild.

SingleStoreCache #

Bases: abc.ABC, typing.Generic[_ValueT]

Abstract interface of a cache which stores one resource.

Note

This is mostly just for the hikari.users.OwnUser cache store.

get(*, default=Ellipsis) async abstractmethod #

Get the entry.

PARAMETER DESCRIPTION
default

The default value to return if an entry wasn't found.

If provided then no errors will be raised when no entry is found.

TYPE: _DefaultT DEFAULT: Ellipsis

RETURNS DESCRIPTION
_ValueT | _DefaultT

The found entry or the default if any was provided.

RAISES DESCRIPTION
CacheMissError

If the entry wasn't found.

This won't be raised if default is passed.

EntryNotFound

If the entry wasn't found and the the entry definitely doesn't exist.

This won't be raised if default is passed.

This is a specialisation of CacheMissError and thus may be caught as CacheMissError and otherwise would need to be beforeCacheMissError` in a try, multiple catch statement.

tanjun.dependencies.callbacks #

Callback dependencies used for getting context and client based data.

fetch_my_user(client, *, me_cache=None) async #

Fetch the current user from the client's cache or rest client.

Note

This is used in the standard LazyConstant[hikari.users.OwnUser] dependency.

PARAMETER DESCRIPTION
client

The client to use to fetch the user.

TYPE: alluka.Injected[tanjun.Client]

RETURNS DESCRIPTION
hikari.OwnUser

The current user.

RAISES DESCRIPTION
RuntimeError

If the cache couldn't be used to get the current user and the REST client is not bound to a Bot token.

tanjun.dependencies.data #

Dependency utilities used for managing data.

LazyConstant #

Bases: typing.Generic[_T]

Injected type used to hold and generate lazy constants.

Note

To easily resolve this type use inject_lc.

__init__(callback) #

Initiate a new lazy constant.

PARAMETER DESCRIPTION
callback

Callback used to resolve this to a constant value.

This supports dependency injection and may either be sync or asynchronous.

TYPE: alluka.abc.CallbackSig[_T]

acquire() #

Acquire this lazy constant as an asynchronous lock.

This is used to ensure that the value is only generated once and should be kept acquired until LazyConstant.set_value has been called.

RETURNS DESCRIPTION
contextlib.AbstractAsyncContextManager[typing.Any]

Context manager that can be used to acquire the lock.

callback() property #

Descriptor of the callback used to get this constant's initial value.

get_value() #

Get the value of this constant if set, else None.

reset() #

Clear the internally stored value.

set_value(value) #

Set the constant value.

PARAMETER DESCRIPTION
value

The value to set.

TYPE: _T

RAISES DESCRIPTION
RuntimeError

If the constant has already been set.

cache_callback(callback, /, *, expire_after=None) #

Cache the result of a callback within a dependency injection context.

Note

This is internally used by cached_inject.

PARAMETER DESCRIPTION
callback

The callback to cache the result of.

TYPE: alluka.abc.CallbackSig[_T]

expire_after

The amount of time to cache the result for in seconds.

Leave this as None to cache for the runtime of the application.

TYPE: typing.Union[int, float, datetime.timedelta, None] DEFAULT: None

RETURNS DESCRIPTION
collections.abc.Callable[..., collections.abc.Corouting[typing.Any, typing.Any, _T]]

A callback which will cache the result of the given callback after the first call.

RAISES DESCRIPTION
ValueError

If expire_after is not a valid value. If expire_after is not less than or equal to 0 seconds.

cached_inject(callback, /, *, expire_after=None) #

Inject a callback with caching.

This acts like alluka.inject and the result of it should also be assigned to a parameter's default to be used.

Example#
async def resolve_database(
    client: tanjun.abc.Client = tanjun.inject(type=tanjun.abc.Client)
) -> Database:
    raise NotImplementedError

@tanjun.as_message_command("command name")
async def command(
    ctx: tanjun.abc.Context, db: Database = tanjun.cached_inject(resolve_database)
) -> None:
    raise NotImplementedError
PARAMETER DESCRIPTION
callback

The callback to inject.

TYPE: alluka.abc.CallbackSig[_T]

expire_after

The amount of time to cache the result for in seconds.

Leave this as None to cache for the runtime of the application.

TYPE: typing.Union[float, int, datetime.timedelta, None] DEFAULT: None

RETURNS DESCRIPTION
alluka.InjectedDescriptor[_T]

Injector used to resolve the cached callback.

RAISES DESCRIPTION
ValueError

If expire_after is not a valid value. If expire_after is not less than or equal to 0 seconds.

inject_lc(type_) #

Make a LazyConstant injector.

This acts like alluka.inject and the result of it should also be assigned to a parameter's default to be used.

Note

For this to work, a LazyConstant must've been set as a type dependency for the passed type_.

PARAMETER DESCRIPTION
type_

The type of the constant to resolve.

TYPE: type[_T]

RETURNS DESCRIPTION
alluka.InjectedDescriptor[_T]

Injector used to resolve the LazyConstant.

Example#
@component.with_command
@tanjun.as_message_command
async def command(
    ctx: tanjun.abc.MessageCommand,
    application: hikari.Application = tanjun.inject_lc(hikari.Application)
) -> None:
    raise NotImplementedError

...

async def resolve_app(
    client: tanjun.abc.Client = tanjun.inject(type=tanjun.abc.Client)
) -> hikari.Application:
    raise NotImplementedError

tanjun.Client.from_gateway_bot(...).set_type_dependency(
    tanjun.LazyConstant[hikari.Application] = tanjun.LazyConstant(resolve_app)
)

make_lc_resolver(type_) #

Make an injected callback which resolves a LazyConstant.

Note

This is internally used by inject_lc.

Note

For this to work, a LazyConstant must've been set as a type dependency for the passed type_.

PARAMETER DESCRIPTION
type_

The type of the constant to resolve.

TYPE: type[_T]

RETURNS DESCRIPTION
collections.abc.Callable[..., collections.abc.Coroutine[typing.Any, typing.Any, _T]]

An injected callback used to resolve the LazyConstant.

tanjun.dependencies.limiters #

Command cooldown and concurrency limiters.

AbstractConcurrencyLimiter #

Bases: abc.ABC

Interface used for limiting command concurrent usage.

release(bucket_id, ctx) async abstractmethod #

Release a concurrency lock on a bucket.

try_acquire(bucket_id, ctx) async abstractmethod #

Try to acquire a concurrency lock on a bucket.

PARAMETER DESCRIPTION
bucket_id

The concurrency bucket to acquire.

TYPE: str

ctx

The context to acquire this resource lock with.

TYPE: tanjun.Context

RETURNS DESCRIPTION
bool

Whether the lock was acquired.

AbstractCooldownManager #

Bases: abc.ABC

Interface used for managing command calldowns.

check_cooldown(bucket_id, ctx, /, *, increment=False) async abstractmethod #

Check if a bucket is on cooldown for the provided context.

PARAMETER DESCRIPTION
bucket_id

The cooldown bucket to check.

TYPE: str

ctx

The context of the command.

TYPE: tanjun.Context

increment

Whether this call should increment the bucket's use counter if it isn't depleted.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
datetime.datetime | None

When this command will next be usable for the provided context if it's in cooldown else None.

increment_cooldown(bucket_id, ctx) async abstractmethod #

Increment the cooldown of a cooldown bucket.

PARAMETER DESCRIPTION
bucket_id

The cooldown bucket's ID.

TYPE: str

ctx

The context of the command.

TYPE: tanjun.Context

BucketResource #

Bases: int, enum.Enum

Resource target types used within command calldowns and concurrency limiters.

CHANNEL = 2 class-attribute #

A per-channel resource bucket.

GLOBAL = 7 class-attribute #

A global resource bucket.

GUILD = 6 class-attribute #

A per-guild resource bucket.

When executed in a DM this will be per-DM.

MEMBER = 1 class-attribute #

A per-guild member resource bucket.

When executed in a DM this will be per-DM.

PARENT_CHANNEL = 3 class-attribute #

A per-parent channel resource bucket.

For DM channels this will be per-DM, for guild channels with no parents this'll be per-guild.

TOP_ROLE = 5 class-attribute #

A per-highest role resource bucket.

When executed in a DM this will be per-DM, with this defaulting to targeting the @everyone role if they have no real roles.

USER = 0 class-attribute #

A per-user resource bucket.

ConcurrencyPostExecution #

Post-execution hook used to release a bucket concurrency limiter.

Note

For a concurrency limiter to work properly, both ConcurrencyPreExecution and ConcurrencyPostExecution hooks must be registered for a command scope.

__init__(bucket_id) #

Initialise a concurrency post-execution hook.

PARAMETER DESCRIPTION
bucket_id

The concurrency limit bucket's ID.

TYPE: str

ConcurrencyPreExecution #

Pre-execution hook used to acquire a bucket concurrency limiter.

Note

For a concurrency limiter to work properly, both ConcurrencyPreExecution and ConcurrencyPostExecution hooks must be registered for a command scope.

__init__(bucket_id, /, *, error=None, error_message='This resource is currently busy; please try again later.') #

Initialise a concurrency pre-execution hook.

PARAMETER DESCRIPTION
bucket_id

The concurrency limit bucket's ID.

TYPE: str

error

Callback used to create a custom error to raise if the check fails.

This should two one str argument which is the limiting bucket's ID.

This takes priority over error_message.

TYPE: typing.Optional[collections.Callable[[str], Exception]] DEFAULT: None

error_message

The error message to send in response as a command error if this fails to acquire the concurrency limit.

TYPE: str DEFAULT: 'This resource is currently busy; please try again later.'

CooldownPreExecution #

Pre-execution hook used to manage a command's cooldowns.

To avoid race-conditions this handles both erroring when the bucket is hit instead and incrementing the bucket's use counter.

__init__(bucket_id, /, *, error=None, error_message='This command is currently in cooldown. Try again {cooldown}.', owners_exempt=True) #

Initialise a pre-execution cooldown command hook.

PARAMETER DESCRIPTION
bucket_id

The cooldown bucket's ID.

TYPE: str

error

Callback used to create a custom error to raise if the check fails.

This should two arguments one of type str and datetime.datetime where the first is the limiting bucket's ID and the second is when said bucket can be used again.

This takes priority over error_message.

TYPE: typing.Optional[collections.Callable[[str, datetime.datetime], Exception]] DEFAULT: None

error_message

The error message to send in response as a command error if the check fails.

TYPE: str DEFAULT: 'This command is currently in cooldown. Try again {cooldown}.'

owners_exempt

Whether owners should be exempt from the cooldown.

TYPE: bool DEFAULT: True

InMemoryConcurrencyLimiter #

Bases: AbstractConcurrencyLimiter

In-memory standard implementation of AbstractConcurrencyLimiter.

Examples:

InMemoryConcurrencyLimiter.set_bucket may be used to set the concurrency limits for a specific bucket:

(
    InMemoryConcurrencyLimiter()
    # Set the default bucket template to 10 concurrent uses of the command per-user.
    .set_bucket("default", tanjun.BucketResource.USER, 10)
    # Set the "moderation" bucket with a limit of 5 concurrent uses per-guild.
    .set_bucket("moderation", tanjun.BucketResource.GUILD, 5)
    .set_bucket()
    # add_to_client will setup the concurrency manager (setting it as an
    # injected dependency and registering callbacks to manage it).
    .add_to_client(client)
)

add_to_client(client) #

Add this concurrency manager to a tanjun client.

Note

This registers the manager as a type dependency and manages opening and closing the manager based on the client's life cycle.

PARAMETER DESCRIPTION
client

The client to add this concurrency manager to.

TYPE: tanjun.Client

close() #

Stop the concurrency manager.

RAISES DESCRIPTION
RuntimeError

If the concurrency manager is not running.

disable_bucket(bucket_id) #

Disable a concurrency limit bucket.

This will stop the bucket from ever hitting a concurrency limit and also prevents the bucket from defaulting.

Note

"default" is a special bucket_id which is used as a template for unknown bucket IDs.

PARAMETER DESCRIPTION
bucket_id

The bucket to disable.

TYPE: str

RETURNS DESCRIPTION
Self

This concurrency manager to allow for chaining.

open(*, _loop=None) #

Start the concurrency manager.

RAISES DESCRIPTION
RuntimeError

If the concurrency manager is already running. If called in a thread with no running event loop.

set_bucket(bucket_id, resource, limit) #

Set the concurrency limit for a specific bucket.

Note

"default" is a special bucket_id which is used as a template for unknown bucket IDs.

PARAMETER DESCRIPTION
bucket_id

The ID of the bucket to set the concurrency limit for.

TYPE: str

resource

The type of resource to target for the concurrency limit.

TYPE: BucketResource

limit

The maximum number of concurrent uses to allow.

TYPE: int

RETURNS DESCRIPTION
Self

The concurrency manager to allow call chaining.

RAISES DESCRIPTION
ValueError

If an invalid resource type is given. if limit is less 0 or negative.

InMemoryCooldownManager #

Bases: AbstractCooldownManager

In-memory standard implementation of AbstractCooldownManager.

Examples:

InMemoryCooldownManager.set_bucket may be used to set the cooldown for a specific bucket:

(
    InMemoryCooldownManager()
    # Set the default bucket template to a per-user 10 uses per-60 seconds cooldown.
    .set_bucket("default", tanjun.BucketResource.USER, 10, 60)
    # Set the "moderation" bucket to a per-guild 100 uses per-5 minutes cooldown.
    .set_bucket("moderation", tanjun.BucketResource.GUILD, 100, datetime.timedelta(minutes=5))
    .set_bucket()
    # add_to_client will setup the cooldown manager (setting it as an
    # injected dependency and registering callbacks to manage it).
    .add_to_client(client)
)

add_to_client(client) #

Add this cooldown manager to a tanjun client.

Note

This registers the manager as a type dependency and manages opening and closing the manager based on the client's life cycle.

PARAMETER DESCRIPTION
client

The client to add this cooldown manager to.

TYPE: tanjun.Client

close() #

Stop the cooldown manager.

RAISES DESCRIPTION
RuntimeError

If the cooldown manager is not running.

disable_bucket(bucket_id) #

Disable a cooldown bucket.

This will stop the bucket from ever hitting a cooldown and also prevents the bucket from defaulting.

Note

"default" is a special bucket_id which is used as a template for unknown bucket IDs.

PARAMETER DESCRIPTION
bucket_id

The bucket to disable.

TYPE: str

RETURNS DESCRIPTION
Self

This cooldown manager to allow for chaining.

open(*, _loop=None) #

Start the cooldown manager.

RAISES DESCRIPTION
RuntimeError

If the cooldown manager is already running. If called in a thread with no running event loop.

set_bucket(bucket_id, resource, limit, reset_after) #

Set the cooldown for a specific bucket.

Note

"default" is a special bucket_id which is used as a template for unknown bucket IDs.

PARAMETER DESCRIPTION
bucket_id

The ID of the bucket to set the cooldown for.

TYPE: str

resource

The type of resource to target for the cooldown.

TYPE: BucketResource

limit

The number of uses per cooldown period.

TYPE: int

reset_after

The cooldown period.

TYPE: typing.Union[int, float, datetime.timedelta]

RETURNS DESCRIPTION
Self

The cooldown manager to allow call chaining.

RAISES DESCRIPTION
ValueError

If an invalid resource type is given. If reset_after or limit are negative, 0 or invalid. if limit is less 0 or negative.

with_concurrency_limit(bucket_id, /, *, error=None, error_message='This resource is currently busy; please try again later.', follow_wrapped=False) #

Add the hooks used to manage a command's concurrency limit through a decorator call.

Warning

Concurrency limiters will only work if there's a setup injected AbstractConcurrencyLimiter dependency with InMemoryConcurrencyLimiter being usable as a standard in-memory concurrency manager.

PARAMETER DESCRIPTION
bucket_id

The concurrency limit bucket's ID.

TYPE: str

error

Callback used to create a custom error to raise if the check fails.

This should two one str argument which is the limiting bucket's ID.

This takes priority over error_message.

TYPE: typing.Optional[collections.Callable[[str], Exception]] DEFAULT: None

error_message

The error message to send in response as a command error if this fails to acquire the concurrency limit.

TYPE: str DEFAULT: 'This resource is currently busy; please try again later.'

follow_wrapped

Whether to also add this check to any other command objects this command wraps in a decorator call chain.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
collections.abc.Callable[[tanjun.abc.ExecutableCommand], tanjun.abc.ExecutableCommand]

A decorator that adds the concurrency limiter hooks to a command.

with_cooldown(bucket_id, /, *, error=None, error_message='This command is currently in cooldown. Try again {cooldown}.', follow_wrapped=False, owners_exempt=True) #

Add a pre-execution hook used to manage a command's cooldown through a decorator call.

Warning

Cooldowns will only work if there's a setup injected AbstractCooldownManager dependency with InMemoryCooldownManager being usable as a standard in-memory cooldown manager.

PARAMETER DESCRIPTION
bucket_id

The cooldown bucket's ID.

TYPE: str

error

Callback used to create a custom error to raise if the check fails.

This should two arguments one of type str and datetime.datetime where the first is the limiting bucket's ID and the second is when said bucket can be used again.

This takes priority over error_message.

TYPE: typing.Optional[collections.Callable[[str, datetime.datetime], Exception]] DEFAULT: None

error_message

The error message to send in response as a command error if the check fails.

TYPE: str DEFAULT: 'This command is currently in cooldown. Try again {cooldown}.'

follow_wrapped

Whether to also add this check to any other command objects this command wraps in a decorator call chain.

TYPE: bool DEFAULT: False

owners_exempt

Whether owners should be exempt from the cooldown.

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
collections.abc.Callable[[tanjun.abc.ExecutableCommand], tanjun.abc.ExecutableCommand]

A decorator that adds a CooldownPreExecution hook to the command.

tanjun.dependencies.owners #

Dependency used for managing owner checks.

AbstractOwners #

Bases: abc.ABC

Interface used to check if a user is deemed to be the bot's "owner".

check_ownership(client, user) async abstractmethod #

Check whether this object is owned by the given object.

PARAMETER DESCRIPTION
client

The Tanjun client this check is being called by.

TYPE: tanjun.Client

user

The user to check ownership for.

TYPE: hikari.User

RETURNS DESCRIPTION
bool

Whether the bot is owned by the provided user.

Owners #

Bases: AbstractOwners

Default implementation of the owner check interface.

Warning

fallback_to_application is only possible when the REST client is bound to a Bot token or if a type dependency is registered for tanjun.dependencies.SingleStoreCache[hikari.Application].

__init__(*, expire_after=datetime.timedelta(minutes=5), fallback_to_application=True, owners=None) #

Initiate a new owner check dependency.

PARAMETER DESCRIPTION
expire_after

The amount of time to cache application owner data for in seconds.

This is only applicable if rest is also passed.

TYPE: typing.Union[datetime.timedelta, int, float] DEFAULT: datetime.timedelta(minutes=5)

fallback_to_application

Whether this check should fallback to checking the application's owners if the user isn't in owners.

This only works when the bot's rest client is bound to a Bot token or if tanjun.dependencies.SingleStoreCache[hikari.Application] is available.

TYPE: bool DEFAULT: True

owners

Sequence of objects and IDs of the users that are allowed to use the bot's owners-only commands.

TYPE: typing.Optional[hikari.SnowflakeishSequence[hikari.User]] DEFAULT: None

tanjun.dependencies.reloaders #

Implementation of a hot reloader for Tanjun.

HotReloader #

Manages hot reloading modules for a Tanjun client..

Warning

An instance of this can only be linked to 1 client.

Examples:

client = tanjun.Client.from_gateway_bot(bot)
(
    tanjun.dependencies.HotReloader()
    .add_modules("python.module.path", pathlib.Path("./module.py"))
    .add_directory("./modules/")
    .add_to_client(client)
)

__init__(*, commands_guild=None, interval=datetime.timedelta(microseconds=500000), redeclare_cmds_after=datetime.timedelta(seconds=10), unload_on_delete=True) #

Initialise a hot reloader.

Warning

redeclare_cmds_after is not aware of commands declared outside of the reloader and will lead to commands being redeclared on startup when mixed with tanjun.clients.Client.init's declare_global_commands argument when it is not None.

PARAMETER DESCRIPTION
commands_guild

Object or ID of the guild to declare commands in if redeclare_cmds_after is not None.

TYPE: typing.Optional[hikari.SnowflakeishOr[hikari.PartialGuild]] DEFAULT: None

interval

How often this should scan files and directories for changes in seconds.

TYPE: typing.Union[int, float, datetime.timedelta] DEFAULT: datetime.timedelta(microseconds=500000)

redeclare_cmds_after

How often to redeclare application commands after a change to the commands is detected.

If None is passed here then this will not redeclare the application's commands.

TYPE: typing.Union[int, float, datetime.timedelta, None] DEFAULT: datetime.timedelta(seconds=10)

unload_on_delete

Whether this should unload modules when their relevant file is deleted.

TYPE: bool DEFAULT: True

add_directory(directory, /, *, namespace=None) #

Add a directory for this hot reloader to track.

Note

This will only reload modules directly in the target directory and will not scan sub-directories.

PARAMETER DESCRIPTION
directory

Path of the directory to hot reload.

TYPE: typing.Union[str, pathlib.Path]

namespace

The python namespace this directory's modules should be imported from, if applicable.

This work as {namespace}.{file.name.removesuffix(".py")} and will have the same behaviour as when a str is passed to tanjun.abc.Client.load_modules if passed.

If left as None then this will have the same behaviour as when a pathlib.Path is passed to tanjun.abc.Client.load_modules.

TYPE: typing.Optional[str] DEFAULT: None

RETURNS DESCRIPTION
Self

The hot reloader to enable chained calls.

RAISES DESCRIPTION
FileNotFoundError

If the directory cannot be found

add_directory_async(directory, /, *, namespace=None) async #

Asynchronous variant of tanjun.dependencies.reloaders.HotReloader.add_directory.

Unlike tanjun.dependencies.reloaders.HotReloader.add_directory, this method will run blocking code in a background thread.

For more information on the behaviour of this method see the documentation for tanjun.dependencies.reloaders.HotReloader.add_directory.

add_modules(*paths) #

Add modules for this hot reloader to track.

PARAMETER DESCRIPTION
*paths

Module paths for this hot reloader to track.

This has the same behaviour as [tanjun.abc.Client.load_modules][ for how [pathlib.Path][] and str are treated.

TYPE: typing.Union[str, pathlib.Path] DEFAULT: ()

RAISES DESCRIPTION
FileNotFoundError

If the module's file doesn't exist anymore.

ModuleNotFoundError

If the str module path cannot be imported.

add_modules_async(*paths) async #

Asynchronous variant of tanjun.dependencies.reloaders.HotReloader.add_modules.

Unlike tanjun.dependencies.reloaders.HotReloader.add_modules, this method will run blocking code in a background thread.

For more information on the behaviour of this method see the documentation for tanjun.abc.Client.load_modules.

add_to_client(client) #

Add this to a tanjun.abc.Client instance.

This registers start and closing callbacks which handle the lifetime of this and adds this as a type dependency.

PARAMETER DESCRIPTION
client

The client to link this hot reloader to.

TYPE: tanjun.Client

RETURNS DESCRIPTION
Self

The hot reloader to enable chained calls.

scan(client) async #

Manually scan this hot reloader's tracked modules for changes.

PARAMETER DESCRIPTION
client

The client to reload and unload modules in.

TYPE: tanjun.Client

start(client) #

Start the hot reloader.

RAISES DESCRIPTION
RuntimeError

If the hot reloader is already running.

stop() #

Stop the hot reloader.

RAISES DESCRIPTION
RuntimeError

If the hot reloader isn't running.