From eea42ccb7df4e07cf27de95550e17118a871a030 Mon Sep 17 00:00:00 2001 From: KafCoppelia <740677208@qq.com> Date: Mon, 6 Jun 2022 19:59:05 +0800 Subject: [PATCH] =?UTF-8?q?v0.1.0a2=F0=9F=94=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 14 +- nonebot_plugin_mute10rolls/__init__.py | 46 +++- nonebot_plugin_mute10rolls/data_source.py | 305 ++++++++++++++++++++++ nonebot_plugin_mute10rolls/utils.py | 141 ---------- pyproject.toml | 3 +- 5 files changed, 348 insertions(+), 161 deletions(-) create mode 100644 nonebot_plugin_mute10rolls/data_source.py delete mode 100644 nonebot_plugin_mute10rolls/utils.py diff --git a/README.md b/README.md index d9eda58..b7ef7a1 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,7 @@ _🤐 禁言十连 🤐_ - + @@ -28,11 +28,11 @@ _🤐 禁言十连 🤐_ ## 版本 -v0.1.0a1 +v0.1.0a2 ⚠ 适配nonebot2-2.0.0beta.2+ -[更新日志](https://github.com/MinatoAquaCrews/nonebot_plugin_mute10rolls/releases/tag/v0.1.0) +[更新日志](https://github.com/MinatoAquaCrews/nonebot_plugin_mute10rolls/releases/tag/v0.1.0a2) ## 安装 @@ -61,16 +61,18 @@ v0.1.0a1 ⚠ 注意:开启禁言十连功能使用正式版cd,BOT须有管理员权限才可禁言,否则仅抽奖。即使狗管理关闭禁言十连功能,依然能抽奖,此时使用体验版cd,且无论BOT有无管理员权限,均不可禁言 -- [ ] TODO 禁言排行榜。 +- [x] 统计本日群禁言数据,每周自动发布上周群禁言数据。 ## 命令 1. 十连抽取口球套餐:[禁言十连]; -2. [管理员权限] 开启/关闭禁言十连:[开启|启用|关闭|禁用]禁言十连; +2. 查看本日群禁言数据统计:[禁言统计]; + +3. [管理员权限] 开启/关闭禁言十连:[开启|启用|关闭|禁用]禁言十连; ⚠ 注意:初始状态下,群禁言十连功能默认关闭。 -3. [管理员权限] 修改禁言冷却:[修改禁言cd/冷却][seconds],例如:修改禁言cd 60。 +4. [管理员权限] 修改禁言冷却:[修改禁言cd/冷却][seconds],例如:修改禁言cd 60。 ⚠ 注意:冷却时间10~120,默认体验版cd比正式版cd较长。建议管理员合理设置防止刷屏与抖M群友 \ No newline at end of file diff --git a/nonebot_plugin_mute10rolls/__init__.py b/nonebot_plugin_mute10rolls/__init__.py index 7af822d..1823f3e 100644 --- a/nonebot_plugin_mute10rolls/__init__.py +++ b/nonebot_plugin_mute10rolls/__init__.py @@ -2,12 +2,13 @@ from nonebot.typing import T_State from nonebot.matcher import Matcher from nonebot.log import logger +from nonebot import require from nonebot.adapters.onebot.v11.exception import ActionFailed from nonebot.adapters.onebot.v11 import GROUP, GROUP_ADMIN, GROUP_OWNER, Message, GroupMessageEvent from nonebot.params import Depends, CommandArg, State -from .utils import * +from .data_source import * -__mute_rolls_vsrsion__ = "v0.1.0" +__mute_rolls_vsrsion__ = "v0.1.0a2" __mute_rolls_notes__ = f''' 禁言十连 {__mute_rolls_vsrsion__} 十连抽取口球套餐 @@ -16,20 +17,23 @@ '''.strip() mute_rolls = on_command(cmd="禁言十连", permission=GROUP, priority=12) +rank = on_command(cmd="禁言统计", permission=GROUP, priority=12) roll_on = on_command(cmd="启用禁言十连", aliases={"开启禁言十连"}, permission=GROUP_ADMIN | GROUP_OWNER, priority=12, block=True) roll_off = on_command(cmd="禁用禁言十连", aliases={"关闭禁言十连"}, permission=GROUP_ADMIN | GROUP_OWNER, priority=12, block=True) cd_change = on_command(cmd="修改禁言cd", aliases={"修改禁言冷却"}, permission=GROUP_ADMIN | GROUP_OWNER, priority=12, block=True) +scheduler = require("nonebot_plugin_apscheduler").scheduler + @roll_on.handle() async def _(event: GroupMessageEvent): gid = str(event.group_id) - switch_enable(gid, True) + m10r_manager.switch_enable(gid, True) await roll_on.finish("已启用禁言十连功能") @roll_off.handle() async def _(event: GroupMessageEvent): gid = str(event.group_id) - switch_enable(gid, False) + m10r_manager.switch_enable(gid, False) await roll_off.finish("已禁用禁言十连功能") async def get_new_cd(args: Message = CommandArg(), state: T_State = State()): @@ -51,34 +55,34 @@ async def get_new_cd(args: Message = CommandArg(), state: T_State = State()): async def _(event: GroupMessageEvent, state: T_State = Depends(get_new_cd)): gid = str(event.group_id) cd = state["cd"] - change_cd(gid, cd) + m10r_manager.change_cd(gid, cd) - if check_enable(gid): + if m10r_manager.check_enable(gid): await cd_change.finish(f"已修改冷却时间为 {cd}s") else: await cd_change.finish(f"已修改冷却时间为 {cd}s") @mute_rolls.handle() async def _(matcher: Matcher, event: GroupMessageEvent, args: Message = CommandArg()): - args = args.extract_plain_text().strip().split() + args = args.extract_plain_text().strip() if args == "帮助": await matcher.finish(__mute_rolls_notes__) gid = str(event.group_id) - cd = check_cd(gid) - if not check_enable(gid): + cd = m10r_manager.check_cd(gid) + if not m10r_manager.check_enable(gid): # Trial version if cd > 0: - await matcher.finish(f"技能冷却中,剩余 {cd}s\n需管理员权限可启用该功能\n通过[修改禁言cd :seconds]可修改禁言冷却时间") + await matcher.finish(f"技能冷却中,剩余 {cd}s\n需管理员启用该功能\n通过[修改禁言cd :seconds]可修改禁言冷却时间") else: - mute_time, msg = await one_go(event) + mute_time, msg = await m10r_manager.one_go(event) await matcher.finish(msg) else: if cd > 0: await matcher.finish(f"技能冷却中,剩余 {cd}s\n需管理员通过[修改禁言cd :seconds]可修改禁言冷却时间") else: - mute_time, msg = await one_go(event) + mute_time, msg = await m10r_manager.one_go(event) await matcher.send(msg) if mute_time > 0: is_botadmin = await is_BotAdmin(event.group_id) @@ -95,4 +99,20 @@ async def _(matcher: Matcher, event: GroupMessageEvent, args: Message = CommandA except ActionFailed: await matcher.finish("出错啦,这次让你跑掉了~") else: - logger.info(f"User {event.user_id} | Group {event.group_id} 禁言十连中奖 {mute_time} seconds") \ No newline at end of file + logger.info(f"User {event.user_id} | Group {event.group_id} 禁言十连中奖 {mute_time} seconds") + +@rank.handle() +async def _(event: GroupMessageEvent): + gid = str(event.group_id) + msg = await m10r_manager.day_rank(gid) + await rank.finish(msg) + +# 上周排行榜总结 +@scheduler.scheduled_job("cron", day_of_week=0, hour=0, minute=0) +async def _(): + await m10r_manager.last_week_rank() + +# 重置每日记录,且周一时重置每周记录 +@scheduler.scheduled_job("cron", hour=0, minute=1) +async def _(): + m10r_manager.reset_record() diff --git a/nonebot_plugin_mute10rolls/data_source.py b/nonebot_plugin_mute10rolls/data_source.py new file mode 100644 index 0000000..caf08fd --- /dev/null +++ b/nonebot_plugin_mute10rolls/data_source.py @@ -0,0 +1,305 @@ +from .config import m10r_config, MUTE10ROLLS_PROB_LIST +from nonebot import get_bot +from nonebot.adapters.onebot.v11 import MessageSegment, GroupMessageEvent +from typing import AsyncGenerator, Coroutine, Any, Dict, Tuple, Union, Optional +from pathlib import Path +import random +import datetime +import time +try: + import ujson as json +except ModuleNotFoundError: + import json + +class M10R_Manager: + + def __init__(self): + self.json_path: Path = m10r_config.mute10rolls_path / "m10r_data.json" + self.cd_data: Dict[str, int] = {} + + def _init_data(self, gid: str) -> None: + data = self._load_json() + if gid not in data: + data[gid] = dict() + data[gid]["settings"] = { + "enable": False, + "roll_cd": m10r_config.default_roll_cd, + "trial_cd": m10r_config.default_trial_cd + } + data[gid]["scores"] = {} + + self._save_json(data) + + def check_enable(self, gid: str) -> bool: + self._init_data(gid) + + data = self._load_json() + return data[gid]["settings"]["enable"] + + def switch_enable(self, gid: str, new_state: bool) -> None: + ''' + 更改启用状态 + ''' + self._init_data(gid) + + data = self._load_json() + if data[gid]["settings"]["enable"] != new_state: + self._clear_cd(gid) + + data[gid]["settings"]["enable"] = new_state + + self._save_json(data) + + def change_cd(self, gid: str, cd: int) -> None: + ''' + 修改当前模式下的cd + ''' + self._init_data(gid) + + data = self._load_json() + if data[gid]["settings"]["enable"]: + data[gid]["settings"]["roll_cd"] = cd + else: + data[gid]["settings"]["trial_cd"] = cd + + self._save_json(data) + + def _add_cd(self, event: GroupMessageEvent, cooldown: int) -> None: + self.cd_data[str(event.group_id)] = event.time + cooldown + + def _clear_cd(self, gid: str) -> None: + self.cd_data.pop(gid) + + def check_cd(self, gid: str) -> int: + ''' + 检查cd: + - OK则返回0 + - 冷却中,返回剩余时间(秒) + ''' + try: + rst_cd = int(self.cd_data[gid] - time.time()) + except KeyError: + rst_cd = 0 + + if rst_cd < 0: + self._clear_cd(gid) + + return 0 if rst_cd <= 0 else rst_cd + + def _mute_record(self, event: GroupMessageEvent, mute_time: int): + gid = str(event.group_id) + uid = str(event.user_id) + + data = self._load_json() + if uid not in data[gid]["scores"].keys(): + data[gid]["scores"][uid] = { + "day": { + "count": 1, + "max": mute_time, + "total": mute_time + }, + "week": { + "count": 1, + "max": mute_time, + "total": mute_time + } + } + else: + data[gid]["scores"][uid]["day"]["count"] += 1 + data[gid]["scores"][uid]["day"]["total"] += mute_time + data[gid]["scores"][uid]["week"]["count"] += 1 + data[gid]["scores"][uid]["week"]["total"] += mute_time + if mute_time > data[gid]["scores"][uid]["day"]["max"]: + data[gid]["scores"][uid]["day"]["max"] = mute_time + + if mute_time > data[gid]["scores"][uid]["week"]["max"]: + data[gid]["scores"][uid]["week"]["max"] = mute_time + + self._save_json(data) + + def _group_day_reset(self) -> None: + data = self._load_json() + for gid in data.keys(): + for uid in data[gid]["scores"].keys(): + data[gid]["scores"][uid]["day"]["count"] = 0 + data[gid]["scores"][uid]["day"]["max"] = 0 + data[gid]["scores"][uid]["day"]["total"] = 0 + + self._save_json(data) + + def _group_week_reset(self) -> None: + data = self._load_json() + for gid in data.keys(): + for uid in data[gid]["scores"].keys(): + data[gid]["scores"][uid]["week"]["count"] = 0 + data[gid]["scores"][uid]["week"]["max"] = 0 + data[gid]["scores"][uid]["week"]["total"] = 0 + + self._save_json(data) + + def reset_record(self) -> None: + self._group_day_reset() + if datetime.datetime.now().weekday() == 1: + self._group_week_reset() + + async def day_rank(self, gid: str) -> MessageSegment: + ''' + 统计本日目前的数据 + ''' + data = self._load_json() + msg: MessageSegment = "" + + if gid not in data or not bool(data[gid]["scores"]): + # logger.warning() + return MessageSegment.text("本日还没有统计数据呢~") + + for uid in data[gid]["scores"].keys(): + msg = MessageSegment.text("今日禁言统计数据\n") + day_total: int = 0 # 今日总禁言 + day_count: int = 0 # 今日总次数 + day_max_mute: int = 0 # 今日单次最长 + day_max_mute_uid: str = "" # 今日单次最长uid + + day_count += data[gid]["scores"][uid]["day"]["count"] + day_total += data[gid]["scores"][uid]["day"]["total"] + + if day_max_mute < data[gid]["scores"][uid]["day"]["max"]: + day_max_mute = data[gid]["scores"][uid]["day"]["max"] + day_max_mute_uid = uid + + day_m, day_h = divmod(day_total, 60) + max_m, max_h = divmod(day_max_mute, 60) + + bot = get_bot() + dic = await bot.get_group_member_info(group_id=int(gid), user_id=int(day_max_mute_uid)) + mute_king = dic["nickname"] if not dic["card"] else dic["card"] + + msg += MessageSegment.text(f"今日禁言时长 {day_h}时{day_m}分,今日中奖次数 {day_count}\n") + msg += MessageSegment.text(f"今日口球王为 {mute_king},累计禁言时长 {max_h}时{max_m}分") + + return msg + + async def last_week_rank(self) -> None: + ''' + 统计本周数据及排行榜 + - 群禁言时间:各[uid]["week"]["total"]相加 + - 群禁言次数:各[uid]["week"]["count"]相加 + - 口球王:比较[uid]["week"]["total"] + - 单次禁言时长最长:比较[uid]["week"]["max_mute"] + ''' + data = self._load_json() + msg: MessageSegment = "" + + if not bool(data): + # logger.warning() + return + + for gid in data.keys(): + if gid not in data or not bool(data[gid]["scores"]): + # logger.warning() + return + + msg = MessageSegment.text("周禁言统计数据\n") + week_total: int = 0 # 上周群总禁言 + week_count: int = 0 # 周总禁言次数 + week_mute_king: int = 0 # 口球王禁言时长 + week_mute_king_uid: str = "" # 口球王uid + week_max_mute: int = 0 # 单次禁言最长 + + for uid in data[gid]["scores"].keys(): + week_count += data[gid]["scores"][uid]["week"]["count"] + week_total += data[gid]["scores"][uid]["week"]["total"] + + if week_mute_king < data[gid]["scores"][uid]["week"]["total"]: + week_mute_king = data[gid]["scores"][uid]["week"]["total"] + week_mute_king_uid = uid + + if week_max_mute < data[gid]["scores"][uid]["week"]["max"]: + week_max_mute = data[gid]["scores"][uid]["week"]["max"] + + week_m, week_h = divmod(week_total, 60) + king_m, king_h = divmod(week_mute_king, 60) + max_m, max_h = divmod(week_max_mute, 60) + + bot = get_bot() + dic = await bot.get_group_member_info(group_id=int(gid), user_id=int(week_mute_king_uid)) + mute_king = dic["nickname"] if not dic["card"] else dic["card"] + + msg += MessageSegment.text(f"群累计禁言时长 {week_h}时{week_m}分,累计禁言中奖次数 {week_count}\n") + msg += MessageSegment.text(f"群口球王为 {mute_king},累计禁言时长 {king_h}时{king_m}分\n") + msg += MessageSegment.text(f"单次禁言最长为 {max_h}时{max_m}分") + + # 群发 + await bot.send_group_msg(group_id=int(gid), message=msg) + + async def one_go(self, event: GroupMessageEvent) -> Tuple[int, str]: + ''' + 一次禁言十连抽取 + :retval mute_total: 禁言总时长(秒),大于0则中奖 + :retval msg: 抽取结果 + ''' + gid = str(event.group_id) + data = self._load_json() + + cooldown = data[gid]["settings"]["roll_cd"] if data[gid]["settings"]["enable"] else data[gid]["settings"]["trial_cd"] + self._add_cd(event, cooldown) + + i = 1 + mute_total = 0 + msg = "禁言十连结果如下" + result = random.choices([0, 1, 2, 10, 30, 60, 120], weights=MUTE10ROLLS_PROB_LIST, cum_weights=None, k=10) + for roll in result: + mute_total += roll + if roll == 0: + cur_msg = "miss" + elif roll == 1: + cur_msg = f"{roll} min" + else: + cur_msg = f"{roll} mins" + msg += f"\n{i} {cur_msg}" + i += 1 + + mute_total *= 60 + + # 开启功能才会记录 + if data[gid]["settings"]["enable"]: + self._mute_record(event, mute_total) + + return mute_total, msg + + def _load_json(self) -> Dict[str, Dict[str, Union[Dict[str, Union[bool, int]], Dict[str, Dict[str, Dict[str, int]]]]]]: + if self.json_path.exists(): + with open(self.json_path, "r", encoding="utf-8") as f: + return json.load(f) + + def _save_json(self, data: Dict[str, Dict[str, Union[Dict[str, Union[bool, int]], Dict[str, Dict[str, Dict[str, int]]]]]]) -> None: + if self.json_path.exists(): + with open(self.json_path, 'w', encoding='utf-8') as f: + json.dump(data, f, ensure_ascii=False, indent=4) + +m10r_manager = M10R_Manager() + +async def muteSb(gid: int, mute_id: int, time: Optional[int]) -> AsyncGenerator[Coroutine, Any]: + ''' + 单人口球 + :param gid: 群号 + :param time: 时间 秒 + :param mute_id: qq + :return:禁言操作 + 未指定时间则随机 + ''' + if not time: + time = random.randint(1, 3600) + + yield get_bot().set_group_ban(group_id=gid, user_id=mute_id, duration=time) + +async def is_BotAdmin(gid: int) -> bool: + ''' + 检查Bot是否有权限禁言 + ''' + info = await get_bot().get_group_member_info(group_id=gid, user_id=get_bot().self_id) + return info["role"] != "member" + +def is_SenderAdmin(event: GroupMessageEvent) -> bool: + return event.sender.role != "member" + diff --git a/nonebot_plugin_mute10rolls/utils.py b/nonebot_plugin_mute10rolls/utils.py deleted file mode 100644 index 77e222c..0000000 --- a/nonebot_plugin_mute10rolls/utils.py +++ /dev/null @@ -1,141 +0,0 @@ -from .config import m10r_config, MUTE10ROLLS_PROB_LIST -from nonebot import get_bot -from nonebot.adapters.onebot.v11 import GroupMessageEvent -from typing import AsyncGenerator, Coroutine, Any, Dict, Tuple, Union, Optional -from pathlib import Path -import random -import time -try: - import ujson as json -except ModuleNotFoundError: - import json - -cd_data: Dict[str, int] = {} -data_path = m10r_config.mute10rolls_path / "m10r_data.json" - -def init_data(gid: str) -> None: - settings = load_json(data_path) - if gid not in settings.keys(): - settings[gid] = { - "enable": False, - "roll_cd": m10r_config.default_roll_cd, - "trial_cd": m10r_config.default_trial_cd - } - - save_json(data_path, settings) - -def check_enable(gid: str) -> bool: - init_data(gid) - - settings = load_json(data_path) - return settings[gid]["enable"] - -def switch_enable(gid: str, new_state: bool) -> None: - ''' - 更改启用状态 - ''' - init_data(gid) - - settings = load_json(data_path) - if settings[gid]["enable"] != new_state: - clear_cd(gid) - - settings[gid]["enable"] = new_state - - save_json(data_path, settings) - -def change_cd(gid: str, cd: int) -> None: - ''' - 修改当前模式下的cd - ''' - init_data(gid) - - settings = load_json(data_path) - if settings[gid]["enable"]: - settings[gid]["roll_cd"] = cd - else: - settings[gid]["trial_cd"] = cd - - save_json(data_path, settings) - -def add_cd(event: GroupMessageEvent, cooldown: int) -> None: - cd_data[str(event.group_id)] = event.time + cooldown - -def clear_cd(gid: str) -> None: - cd_data.pop(gid) - -def check_cd(gid: str) -> int: - ''' - 检查cd: - - OK则返回0 - - 冷却中,返回剩余时间(秒) - ''' - try: - rst_cd = int(cd_data[gid] - time.time()) - except KeyError: - rst_cd = 0 - - if rst_cd < 0: - clear_cd(gid) - - return 0 if rst_cd <= 0 else rst_cd - -async def one_go(event: GroupMessageEvent) -> Tuple[int, str]: - ''' - 一次禁言十连抽取 - :retval mute_total: 禁言总时长(秒),大于0则中奖 - :retval msg: 抽取结果 - ''' - gid = str(event.group_id) - settings = load_json(data_path) - - cooldown = settings[gid]["roll_cd"] if settings[gid]["enable"] else settings[gid]["trial_cd"] - add_cd(event, cooldown) - - i = 1 - mute_total = 0 - msg = "禁言十连结果如下" - result = random.choices([0, 1, 2, 10, 30, 60, 120], weights=MUTE10ROLLS_PROB_LIST, cum_weights=None, k=10) - for roll in result: - mute_total += roll - cur_msg = "miss" if roll == 0 else f"{roll} min(s)" - msg += f"\n{i} {cur_msg}" - i = i + 1 - - mute_total *= 60 - - return mute_total, msg - -async def muteSb(gid: int, mute_id: int, time: Optional[int]) -> AsyncGenerator[Coroutine, Any]: - ''' - 单人口球 - :param gid: 群号 - :param time: 时间 秒 - :param mute_id: qq - :return:禁言操作 - 未指定时间则随机 - ''' - if not time: - time = random.randint(1, 3600) - - yield get_bot().set_group_ban(group_id=gid, user_id=mute_id, duration=time) - -async def is_BotAdmin(gid: int) -> bool: - ''' - 检查Bot是否有权限禁言 - ''' - info = await get_bot().get_group_member_info(group_id=gid, user_id=get_bot().self_id) - return info["role"] != "member" - -def is_SenderAdmin(event: GroupMessageEvent) -> bool: - return event.sender.role != "member" - -def save_json(file: Path, data: Dict[str, Union[bool, int]]) -> None: - if file.exists(): - with open(file, 'w', encoding='utf-8') as f: - json.dump(data, f, ensure_ascii=False, indent=4) - -def load_json(file: Path) -> Dict[str, Union[bool, int]]: - if file.exists(): - with open(file, "r", encoding="utf-8") as f: - return json.load(f) \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index a85e221..5f2fc0a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "nonebot_plugin_mute10rolls" -version = "0.1.0a1" +version = "0.1.0a2" description = "10 rolls for self-muted!" authors = ["KafCoppelia "] license = "MIT" @@ -9,6 +9,7 @@ license = "MIT" python = "^3.7.3" nonebot2 = "^2.0.0-beta.2" nonebot-adapter-onebot = "^2.0.0-beta.1" +nonebot-plugin-apscheduler = "^0.1.2" [tool.poetry.dev-dependencies]