Skip to content

Commit

Permalink
small bug fix
Browse files Browse the repository at this point in the history
  • Loading branch information
itskekoff committed Apr 27, 2024
1 parent e2eabd7 commit 8fd57fa
Showing 1 changed file with 21 additions and 17 deletions.
38 changes: 21 additions & 17 deletions modules/cloner.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ def __init__(
clone_messages_toggled: bool = False,
oldest_first: bool = True,
):

self.guild = from_guild
self.new_guild = to_guild
self.delay = delay
Expand Down Expand Up @@ -122,7 +121,7 @@ async def populate_queue(self, limit: int = 512):
async for message in original_channel.history(limit=limit, oldest_first=self.clone_oldest_first):
self.message_queue.append((channel, message))

async def prepare_server(self):
async def prepare_server(self) -> None:
methods = [
self.new_guild.fetch_roles,
self.new_guild.fetch_channels,
Expand All @@ -149,7 +148,7 @@ async def prepare_server(self):
"Community mode is toggled. Will be set up after channel processing (if enabled)."
)

async def fetch_required_data(self):
async def fetch_required_data(self) -> None:
self.logger.info("Fetching all required data (this can take a few minutes)")

entities = ['roles', 'channels', 'emojis', 'stickers']
Expand All @@ -165,13 +164,13 @@ async def fetch_required_data(self):
entity) else getattr(
self.guild, entity)

async def clone_icon(self):
async def clone_icon(self) -> None:
if self.guild.icon:
icon_bytes = await self.get_first_frame(self.guild.icon)
await self.new_guild.edit(icon=icon_bytes)
await asyncio.sleep(self.delay)

async def clone_banner(self):
async def clone_banner(self) -> None:
if self.guild.banner and ("ANIMATED_BANNER" or "BANNER") in self.guild.features:
if (
self.guild.banner.is_animated()
Expand Down Expand Up @@ -207,7 +206,7 @@ async def clone_roles(self):
)
await asyncio.sleep(self.delay)

async def clone_categories(self, perms: bool = True):
async def clone_categories(self, perms: bool = True) -> None:
categories = [
channel for channel in self.mappings["fetched_data"]["channels"]
if isinstance(channel, CategoryChannel)
Expand All @@ -230,7 +229,7 @@ async def clone_categories(self, perms: bool = True):
)
await asyncio.sleep(self.delay)

async def clone_channels(self, perms: bool = True):
async def clone_channels(self, perms: bool = True) -> None:
for channel in self.mappings["fetched_data"]["channels"]:
try:
channel = await self.guild.fetch_channel(channel.id)
Expand Down Expand Up @@ -285,7 +284,7 @@ async def clone_channels(self, perms: bool = True):
)
await asyncio.sleep(self.delay)

async def process_community(self):
async def process_community(self) -> None:
if self.enabled_community:
afk_channel = None
try:
Expand All @@ -310,7 +309,7 @@ async def process_community(self):
self.logger.info("Updated guild community settings")
await asyncio.sleep(self.delay)

async def add_community_channels(self, perms: bool = True):
async def add_community_channels(self, perms: bool = True) -> None:
if self.enabled_community:
all_channels = []
for channel in self.mappings["fetched_data"]["channels"]:
Expand Down Expand Up @@ -370,7 +369,7 @@ async def add_community_channels(self, perms: bool = True):
)
await asyncio.sleep(self.delay)

async def clone_emojis(self):
async def clone_emojis(self) -> None:
emoji_limit = min(self.new_guild.emoji_limit, self.new_guild.emoji_limit - 5)
for emoji in self.mappings["fetched_data"]["emojis"][:emoji_limit]:
if len(self.new_guild.emojis) >= emoji_limit:
Expand All @@ -385,7 +384,7 @@ async def clone_emojis(self):
)
await asyncio.sleep(self.delay)

async def clone_stickers(self):
async def clone_stickers(self) -> None:
sticker_limit = self.new_guild.sticker_limit
created_stickers = 0
for sticker in self.mappings["fetched_data"]["stickers"]:
Expand All @@ -409,7 +408,10 @@ async def clone_stickers(self):
else:
break

async def send_webhook(self, webhook: discord.Webhook, message: discord.Message, delay: float = 0.85):
async def send_webhook(self,
webhook: discord.Webhook,
message: discord.Message,
delay: float = 0.85) -> None:
author: discord.User = message.author
files = []
if message.attachments:
Expand Down Expand Up @@ -457,7 +459,7 @@ async def send_webhook(self, webhook: discord.Webhook, message: discord.Message,
await asyncio.sleep(delay)

async def clone_messages(self, messages_limit: int = main.messages_limit,
clear_webhooks: bool = main.messages_webhook_clear):
clear_webhooks: bool = main.messages_webhook_clear) -> None:
if not self.clone_messages_toggled:
return

Expand All @@ -469,7 +471,7 @@ async def clone_messages(self, messages_limit: int = main.messages_limit,

await self.clone_messages_from_queue(clear_webhooks=clear_webhooks)

async def cleanup_after_cloning(self, clear=False):
async def cleanup_after_cloning(self, clear: bool = False) -> None:
self.message_queue.clear()

if clear:
Expand All @@ -481,7 +483,7 @@ async def cleanup_after_cloning(self, clear=False):
self.logger.debug(f"Successfully cleaned up")
self.processing_messages = False

async def clone_messages_from_queue(self, clear_webhooks: bool = main.messages_webhook_clear):
async def clone_messages_from_queue(self, clear_webhooks: bool = main.messages_webhook_clear) -> None:
channel_messages_map = self._split_messages_by_channel()
tasks = []
for channel, messages in channel_messages_map.items():
Expand All @@ -492,7 +494,7 @@ async def clone_messages_from_queue(self, clear_webhooks: bool = main.messages_w

await self.cleanup_after_cloning(clear=clear_webhooks)

def _split_messages_by_channel(self):
def _split_messages_by_channel(self) -> typing.Dict[discord.channel.TextChannel, typing.List[typing.Any]]:
channel_messages_map = {}
while self.message_queue:
channel, message = self.message_queue.popleft()
Expand All @@ -501,7 +503,9 @@ def _split_messages_by_channel(self):
channel_messages_map[channel].append(message)
return channel_messages_map

async def _clone_messages_for_channel(self, channel: discord.abc.TextChannel, messages):
async def _clone_messages_for_channel(self,
channel: discord.channel.TextChannel,
messages: typing.List[discord.Message]) -> None:
webhook = self.find_webhook(channel)
if not webhook:
webhook = await channel.create_webhook(name="bot by itskekoff")
Expand Down

0 comments on commit 8fd57fa

Please sign in to comment.