import json import discord from discord import Interaction, TextStyle, app_commands from discord.ui import TextInput, View, Modal import os import psycopg2 from psycopg2 import sql import hashlib import string import random from datetime import datetime, timedelta from openpyxl import Workbook import threading import time class DL(): def __init__(self): self.config_dir_path = "./config/" self.export_dir_path = "./export/" self.server_config_path = self.config_dir_path + "server.json" try: if not os.path.isdir(self.config_dir_path): print("config ディレクトリが見つかりません... 作成します。") os.mkdir(self.config_dir_path) if not os.path.isfile(self.server_config_path): print("config ファイルが見つかりません... 作成します。") self.server_config = { "db": { "host": "localhost", "port": "5432", "db_name": "dislocker", "username": "user", "password": "password" }, "bot": { "token": "TYPE HERE BOTS TOKEN KEY", "log_channel_id" : "TYPE HERE CHANNEL ID (YOU MUST USE INT !!!!)", "config_channel_id": "TYPE HERE CHANNEL ID (YOU MUST USE INT !!!!)", "config_public_channel_id": "TYPE HERE CHANNEL ID (YOU MUST USE INT !!!!)" } } with open(self.server_config_path, "w") as w: json.dump(self.server_config, w, indent=4) elif os.path.isfile(self.server_config_path): with open(self.server_config_path, "r") as r: self.server_config = json.load(r) print("config ファイルを読み込みました。") if not os.path.isdir(self.export_dir_path): print("export ディレクトリが見つかりません... 作成します。") os.mkdir(self.export_dir_path) if type(self.server_config["bot"]["log_channel_id"]) is not int or type(self.server_config["bot"]["config_channel_id"]) is not int: print("config ファイル内でチャンネルIDがint型で記述されていません。int型で記述して、起動してください。") self.init_result = "not_int" else: self.db = psycopg2.connect(f"host={self.server_config['db']['host']} dbname={self.server_config['db']['db_name']} port={self.server_config['db']['port']} user={self.server_config['db']['username']} password={self.server_config['db']['password']}") cursor = self.db.cursor() cursor.execute("SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'club_member')") find_club_member_table = cursor.fetchall() print(find_club_member_table) if find_club_member_table[0][0] == False: cursor.execute("CREATE TABLE club_member (id SERIAL NOT NULL, name VARCHAR(30) NOT NULL, discord_username VARCHAR(32) NOT NULL, discord_userid VARCHAR(18) NOT NULL, PRIMARY KEY (id))") self.db.commit() cursor.execute("SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'pc_list')") find_pc_list_table = cursor.fetchall() print(find_pc_list_table) if find_pc_list_table[0][0] == False: cursor.execute("CREATE TABLE pc_list (pc_number INTEGER NOT NULL, using_user_id INTEGER, password_hash VARCHAR(32), PRIMARY KEY (pc_number), FOREIGN KEY (using_user_id) REFERENCES club_member(id))") for i in range(10): print(i) cursor.execute("INSERT INTO pc_list (pc_number) VALUES (%s)", (i + 1,)) self.db.commit() cursor.execute("SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'pc_usage_history')") find_pc_usage_history_table = cursor.fetchall() print(find_pc_usage_history_table) if find_pc_usage_history_table[0][0] == False: cursor.execute("CREATE TABLE pc_usage_history (id SERIAL NOT NULL, member_id INTEGER NOT NULL, pc_number INTEGER NOT NULL, device_number INTEGER NOT NULL, start_use_time TIMESTAMP NOT NULL, end_use_time TIMESTAMP, use_detail VARCHAR(30), PRIMARY KEY (id), FOREIGN KEY (member_id) REFERENCES club_member(id), FOREIGN KEY (pc_number) REFERENCES pc_list(pc_number))") self.db.commit() cursor.close() self.init_result = "ok" except (Exception) as error: print("初回処理でエラーが発生しました。\nエラー内容\n" + str(error)) self.init_result = "error" finally: pass class Bot(discord.Client): def password_generate(self, length): numbers = string.digits # (1) password = ''.join(random.choice(numbers) for _ in range(length)) # (2) return password def hash_genarate(self, source): hashed = hashlib.md5(source.encode()) return hashed.hexdigest() def register(self, **kwargs): try: user_info = { "id": str(kwargs["user_id"]), "name": str(kwargs["name"]), "display_name": str(kwargs["display_name"]), "pc_number": int(kwargs["pc_number"]), "device_number": int(kwargs["device_number"]), "detail": None } if "detail" in kwargs: user_info["detail"] = str(kwargs["detail"]) else: pass #パスワード生成、ハッシュ化 password = self.password_generate(4) password_hash = self.hash_genarate(password) print("password generated") #メンバーリストと送信者を照合 cursor = dislocker.db.cursor() cursor.execute("SELECT * FROM club_member WHERE discord_userid = %s", (user_info["id"],)) user_record = cursor.fetchall() #ユーザーデータがなかったら(未登録の場合) if not user_record: result = {"result": "user_data_not_found"} #ユーザーデータが見つかった場合(登録済みの場合) else: print("found user data") cursor.execute("SELECT * FROM pc_usage_history WHERE member_id=%s ORDER BY id DESC LIMIT 1", (user_record[0][0],)) pc_usage_history_record = cursor.fetchall() if pc_usage_history_record: print("used") if pc_usage_history_record[0][5] == None: result = {"result": "pc_already_in_use_by_you", "pc_number": str(pc_usage_history_record[0][2]), "device_number": str(pc_usage_history_record[0][3]), "start_time": str(pc_usage_history_record[0][4]), "detail": str(pc_usage_history_record[0][6])} else: cursor.execute("SELECT * FROM pc_list WHERE pc_number=%s", (user_info["pc_number"],)) pc_list_record = cursor.fetchall() if not pc_list_record[0][1] == None: result = {"result": "pc_already_in_use_by_other"} else: if user_info["detail"] == None: cursor.execute("INSERT INTO pc_usage_history (member_id, pc_number, device_number, start_use_time) VALUES (%s, %s, %s, current_timestamp)", (user_record[0][0], user_info["pc_number"], user_info["device_number"])) #使用用途があるとき else: cursor.execute("INSERT INTO pc_usage_history (member_id, pc_number, device_number, start_use_time, use_detail) VALUES (%s, %s, %s, current_timestamp, %s)", (user_record[0][0], user_info["pc_number"], user_info["device_number"], user_info["detail"])) cursor.execute("UPDATE pc_list SET using_user_id = %s WHERE pc_number = %s", (user_record[0][0], user_info["pc_number"])) cursor.execute("UPDATE pc_list SET password_hash = %s WHERE pc_number = %s", (password_hash, user_info["pc_number"])) dislocker.db.commit() result = {"result": "ok", "password": str(password), "name": str(user_record[0][1])} else: print("unused") cursor.execute("SELECT * FROM pc_list WHERE pc_number=%s", (user_info["pc_number"],)) pc_list_record = cursor.fetchall() if pc_list_record: if not pc_list_record[0][1] == None: result = {"result": "pc_already_in_use_by_other"} else: if user_info["detail"] == None: cursor.execute("INSERT INTO pc_usage_history (member_id, pc_number, device_number, start_use_time) VALUES (%s, %s, %s, current_timestamp)", (user_record[0][0], user_info["pc_number"], user_info["device_number"])) #使用用途があるとき else: cursor.execute("INSERT INTO pc_usage_history (member_id, pc_number, device_number, start_use_time, use_detail) VALUES (%s, %s, %s, current_timestamp, %s)", (user_record[0][0], user_info["pc_number"], user_info["device_number"], user_info["detail"])) cursor.execute("UPDATE pc_list SET using_user_id = %s WHERE pc_number = %s", (user_record[0][0], user_info["pc_number"])) cursor.execute("UPDATE pc_list SET password_hash = %s WHERE pc_number = %s", (password_hash, user_info["pc_number"])) dislocker.db.commit() result = {"result": "ok", "password": str(password), "name": str(user_record[0][1])} else: if user_info["detail"] == None: cursor.execute("INSERT INTO pc_usage_history (member_id, pc_number, device_number, start_use_time) VALUES (%s, %s, %s, current_timestamp)", (user_record[0][0], user_info["pc_number"], user_info["device_number"])) #使用用途があるとき else: cursor.execute("INSERT INTO pc_usage_history (member_id, pc_number, device_number, start_use_time, use_detail) VALUES (%s, %s, %s, current_timestamp, %s)", (user_record[0][0], user_info["pc_number"], user_info["device_number"], user_info["detail"])) cursor.execute("UPDATE pc_list SET using_user_id = %s WHERE pc_number = %s", (user_record[0][0], user_info["pc_number"])) cursor.execute("UPDATE pc_list SET password_hash = %s WHERE pc_number = %s", (password_hash, user_info["pc_number"])) dislocker.db.commit() result = {"result": "ok", "password": str(password), "name": str(user_record[0][1])} except Exception as error: print("登録処理中にエラーが発生しました。\nエラー内容") print(str(error.__class__.__name__)) print(str(error.args)) print(str(error)) result = {"result": "error"} finally: cursor.close() return result def stop(self, **kwargs): try: discord_user_id = str(kwargs["user_id"]) cursor = dislocker.db.cursor() cursor.execute("SELECT * FROM club_member WHERE discord_userid = %s", (discord_user_id,)) user_record = cursor.fetchall() cursor.execute("SELECT * FROM pc_usage_history WHERE member_id= %s ORDER BY id DESC LIMIT 1", (user_record[0][0],)) pc_usage_history_record = cursor.fetchall() if pc_usage_history_record: if not pc_usage_history_record[0][5] == None: result = {"result": "unused"} else: cursor.execute("UPDATE pc_usage_history SET end_use_time = current_timestamp WHERE id = %s", (pc_usage_history_record[0][0],)) cursor.execute("UPDATE pc_list SET using_user_id = NULL WHERE pc_number = %s", (pc_usage_history_record[0][2],)) cursor.execute("UPDATE pc_list SET password_hash = NULL WHERE pc_number = %s", (pc_usage_history_record[0][2],)) dislocker.db.commit() result = {"result": "ok", "pc_number": str(pc_usage_history_record[0][2]), "name": str(user_record[0][1])} except: print("停止処理にエラーが発生しました。") result = {"result": "error"} finally: cursor.close() return result def user_register(self, **kwargs): try: discord_user_id = str(kwargs["discord_user_id"]) discord_user_name = str(kwargs["discord_user_name"]) name = str(kwargs["name"]) cursor = dislocker.db.cursor() cursor.execute("SELECT * FROM club_member WHERE discord_userid = %s", (discord_user_id,)) user_record = cursor.fetchall() if not user_record: cursor.execute("INSERT INTO club_member (name, discord_username, discord_userid) VALUES (%s, %s, %s)", (name, discord_user_name, discord_user_id)) dislocker.db.commit() result = {"result": "ok"} else: result = {"result": "already_exists"} except Exception as error: print("ユーザー登録中にエラーが発生しました。\nエラー内容") print(str(error.__class__.__name__)) print(str(error.args)) print(str(error)) result = {"result": "error"} finally: cursor.close() return result def format_datetime(self, value): if isinstance(value, datetime): return value.strftime('%Y-%m-%d %H:%M:%S') return value def pc_register(self, **kwargs): try: pc_number = int(kwargs["pc_number"]) cursor = dislocker.db.cursor() cursor.execute("SELECT * FROM pc_list WHERE pc_number = %s", (pc_number,)) pc_list = cursor.fetchall() if not pc_list: cursor.execute("INSERT INTO pc_list (pc_number) VALUES (%s)", (pc_number,)) dislocker.db.commit() result = {"result": "ok"} else: result = {"result": "already_exists"} except Exception as error: print("PCの登録中にエラーが発生しました。\nエラー内容") print(str(error.__class__.__name__)) print(str(error.args)) print(str(error)) result = {"result": "error"} finally: cursor.close() return result def report_export(self, **kwargs): try: cursor = dislocker.db.cursor() csv_file_path = dislocker.export_dir_path + "pc_usage_history.csv" main_table = "pc_usage_history" related_table = "club_member" excel_file_path = dislocker.export_dir_path + "pc_usage_history.xlsx" # メインテーブルの列情報を取得(user_idを除く) cursor.execute(sql.SQL("SELECT * FROM {} LIMIT 0").format(sql.Identifier(main_table))) main_columns = [desc[0] for desc in cursor.description if desc[0] != 'member_id'] # クエリを作成(列名を明確に指定) query = sql.SQL(""" SELECT {main_columns}, {related_table}.name FROM {main_table} LEFT JOIN {related_table} ON {main_table}.member_id = {related_table}.id ORDER BY id """).format( main_columns=sql.SQL(', ').join([sql.SQL("{}.{}").format(sql.Identifier(main_table), sql.Identifier(col)) for col in main_columns]), main_table=sql.Identifier(main_table), related_table=sql.Identifier(related_table) ) cursor.execute(query) # 列名を再構成(nameを2番目に配置) column_names = [main_columns[0], 'name'] + main_columns[1:] rows = cursor.fetchall() # Excelワークブックを作成 wb = Workbook() ws = wb.active # 列名を書き込み ws.append(column_names) # データを書き込み for row in rows: # nameを2番目に移動 formatted_row = [self.format_datetime(row[0])] + [row[-1]] + [self.format_datetime(field) if field is not None else '' for field in row[1:-1]] ws.append(formatted_row) # 列幅を自動調整 for col in ws.columns: max_length = 0 column = col[0].column_letter for cell in col: try: if len(str(cell.value)) > max_length: max_length = len(str(cell.value)) except: pass adjusted_width = (max_length + 2) * 1.2 ws.column_dimensions[column].width = adjusted_width # Excelファイルを保存 wb.save(excel_file_path) print(f"テーブル '{main_table}' の内容を '{excel_file_path}' に出力しました。") result = {"result": "ok", "file_path": excel_file_path} except (Exception, psycopg2.Error) as error: print("使用履歴のエクスポート時にエラーが発生しました\nエラー内容\n", str(error)) result = {"result": "export_error"} finally: cursor.close() return result def force_stop(self, **kwargs): try: pc_number = kwargs["pc_number"] cursor = dislocker.db.cursor() cursor.execute("SELECT * FROM pc_list WHERE pc_number = %s", (pc_number,)) pc_list_record = cursor.fetchall() if not pc_list_record[0][1] == None: cursor.execute("UPDATE pc_list SET using_user_id = NULL WHERE pc_number = %s", (pc_number,)) if not pc_list_record[0][2] == None: cursor.execute("UPDATE pc_list SET password_hash = NULL WHERE pc_number = %s", (pc_number,)) cursor.execute("SELECT * FROM pc_usage_history WHERE member_id = %s AND pc_number = %s ORDER BY id DESC LIMIT 1", (pc_list_record[0][1], pc_number)) pc_usage_history_record = cursor.fetchall() cursor.execute("UPDATE pc_usage_history SET end_use_time = current_timestamp WHERE id = %s", (pc_usage_history_record[0][0],)) dislocker.db.commit() result = {"result": "ok"} else: result = {"result": "not_used"} except: result = {"result": "error"} finally: cursor.close() return result async def timeout_notify(self, **kwargs): try: pc_number = kwargs["pc_number"] discord_display_name = kwargs["discord_display_name"] await self.get_channel(dislocker.server_config["bot"]["log_channel_id"]).send(f':negative_squared_cross_mark: {discord_display_name} さんのPC {pc_number} の使用登録はタイムアウトにより解除されました。') result = {"result": "ok"} except Exception as error: print("自動停止処理中にエラーが発生しました。\nエラー内容") print(str(error.__class__.__name__)) print(str(error.args)) print(str(error)) result = {"result": "error"} finally: return result async def on_ready(self): print("DiscordのBotが起動しました。") async def on_interaction(self, interaction:discord.Interaction): try: if interaction.data["component_type"] == 2: await self.on_button(interaction) except KeyError: pass async def on_message(self, message): if message.author.bot: pass elif isinstance(message.channel, discord.DMChannel): msg_split = message.content.split() if msg_split[0] == "/password" or msg_split[0] == "/start": #メッセージの要素が2つ以下の場合は拒否 if len(msg_split) <= 2: await message.channel.send("# :warning: PC番号、もしくはデバイス番号が入力されていません。") #メッセージの要素が3つ以上の場合 elif len(msg_split) >= 3: #番号が数字であることを確認 if msg_split[1].isdigit() and msg_split[2].isdigit(): #PC番号が1以上10以下であることを確認 if int(msg_split[1]) <= 10 and int(msg_split[1]) >= 1: if len(msg_split) == 3: register = self.register(user_id=message.author.id, name=message.author.name, display_name=message.author.display_name, pc_number=msg_split[1], device_number=msg_split[2]) elif len(msg_split) == 4: register = self.register(user_id=message.author.id, name=message.author.name, display_name=message.author.display_name, pc_number=msg_split[1], device_number=msg_split[2], detail=msg_split[3]) if register["result"] == "ok": if len(msg_split) == 3: await message.channel.send(f":white_check_mark: 使用が開始されました。\n>>> # パスワード | {register["password"]}\n## PC番号 | {msg_split[1]}\n## デバイス番号 | {msg_split[2]}") elif len(msg_split) == 4: await message.channel.send(f":white_check_mark: 使用が開始されました。\n>>> # パスワード | {register["password"]}\n## PC番号 | {msg_split[1]}\n## デバイス番号 | {msg_split[2]}\n## 使用目的 | {msg_split[3]}") await self.get_channel(dislocker.server_config["bot"]["log_channel_id"]).send(f':white_check_mark: {register["name"]} さんがPC {msg_split[1]} の使用を開始しました。') elif register["result"] == "user_data_not_found": await message.channel.send("# :dizzy_face: ユーザーとして登録されていないようです。\n最初にサーバーで登録を行ってください。") elif register["result"] == "pc_already_in_use_by_you": await message.channel.send(f"# :exploding_head: あなたはPCをもう使用されているようです。\n使用状態を解除するには /stop で使用終了をお知らせください。\n>>> # PC番号 | {register["pc_number"]}\n# デバイス番号 | {register["device_number"]}\n# 使用開始時刻 | {register["start_time"]}\n# 使用目的 | {register["detail"]}") elif register["result"] == "pc_already_in_use_by_other": await message.channel.send(f"# :man_gesturing_no: そのPCは他のメンバーによって使用されています。\n別のPC番号を指定して、再度お試しください。") else: await message.channel.send("# :dizzy_face: 番号がおかしいようです。") else: await message.channel.send("# :dizzy_face: 指定された番号は不正です。") elif msg_split[0] == "/stop": stop = self.stop(user_id=message.author.id) if stop["result"] == "unused": await message.channel.send("# :shaking_face: 使用されていないようです...") elif stop["result"] == "ok": await message.channel.send(f":white_check_mark: PC番号 {stop["pc_number"]} の使用が終了されました。") await self.get_channel(dislocker.server_config["bot"]["log_channel_id"]).send(f':negative_squared_cross_mark: {stop["name"]} さんがPC {stop["pc_number"]} の使用を終了しました。') elif message.channel.id == dislocker.server_config["bot"]["config_channel_id"]: msg_split = message.content.split() if msg_split[0] == "/register": print(len(msg_split)) if len(msg_split) == 1: register = self.user_register(name=message.author.display_name, discord_user_name=message.author.name, discord_user_id=message.author.id) print(register) if register["result"] == "ok": await message.channel.send(f"# :white_check_mark: ユーザー情報が登録されました。\n>>> ユーザー名:{message.author.display_name}") elif register["result"] == "already_exists": await message.channel.send("# :no_entry: 登録できませんでした。\nもう登録されている可能性があります。") else: await message.channel.send("# :no_entry: 登録できませんでした。\n内部エラーが発生しています。") elif len(msg_split) <= 3: await message.channel.send("# :japanese_goblin: 入力内容に不備があります。\n名前、Discordのユーザー名、DiscordのユーザーIDのいずれかが入力されていません。") elif len(msg_split) == 4: if msg_split[3].isdigit(): register = self.user_register(name=msg_split[1], discord_user_name=msg_split[2], discord_user_id=msg_split[3]) if register["result"] == "ok": await message.channel.send(f"# :white_check_mark: 登録が完了しました。\n>>> # 名前 | {msg_split[1]}\n# Discordのユーザー名 | {msg_split[2]}\n# DiscordのユーザーID | {msg_split[3]}") elif register["result"] == "already_exists": await message.channel.send("# :skull_crossbones: 登録できませんでした。\nそのDiscordアカウントはすでに登録されています。") else: await message.channel.send("# :skull_crossbones: 登録できませんでした。\nDiscordのユーザーIDが不正です。") else: await message.channel.send("# :skull_crossbones: 登録できませんでした。\n内部エラーが発生しています。") elif msg_split[0] == "/export": export = self.report_export() if export["result"] == "ok": await message.channel.send("# :page_facing_up: 使用履歴のレポートです。", file=discord.File(export["file_path"])) pass elif export["result"] == "export_error": await message.channel.send("# :volcano: エクスポートに失敗しました。") elif msg_split[0] == "/fstop": if len(msg_split) == 1: await message.channel.send("# :warning: 登録を解除できませんでした。\n使用を停止したいPC番号を指定してください。\n-# /fstop PC番号") elif len(msg_split) == 2: if msg_split[1].isdigit(): fstop = self.force_stop(pc_number=msg_split[1]) if fstop["result"] == "ok": await message.channel.send(f"# :white_check_mark: PC番号 {msg_split[1]} の使用登録を解除しました。") elif fstop["result"] == "not_used": await message.channel.send("# :exploding_head: 登録を解除できませんでした。\nPCは使用されていないようです...") else: await message.channel.send("# :x: 登録を解除できませんでした。\n内部エラーが発生しています。") else: await message.channel.send("# :warning: 登録を解除できませんでした。\nPC番号を認識できません。\n-# 半角数字で入力してください。") else: await message.channel.send("# warning: 登録を解除できませんでした。\構文が間違っています。\n-# /fstop PC番号") elif msg_split[0] == "/pcregister": if len(msg_split) == 1: await message.channel.send("# :warning: PCを登録できませんでした。\n登録したいPC番号を指定してください。\n-# 半角数字で入力してください。") elif len(msg_split) == 2: if msg_split[1].isdigit(): pc_register = self.pc_register(pc_number=msg_split[1]) if pc_register["result"] == "ok": await message.channel.send(f"# :white_check_mark: PCを登録しました。\n>>> # PC番号 | {msg_split[1]}") elif pc_register["result"] == "already_exists": await message.channel.send(f":x: PCを登録できませんでした。\nその番号のPCは既に存在します。") else: await message.channel.send("# :x: PCを登録できませんでした。\n内部エラーが発生しています。") else: await message.channel.send("# :warning: PCを登録できませんでした。\nPC番号を認識できません。\n-# 半角数字で入力してください。") else: await message.channel.send("# :warning: PCを登録できませんでした。\n構文が間違っています。\n-# /pcregister PC番号") elif msg_split[0] == "/registerbutton": pc_button_view = View(timeout=None) for i in range(1, 11): pc_register_button = discord.ui.Button(style=discord.ButtonStyle.primary, label=f"{i}", custom_id=f"pcregister_{i}") pc_button_view.add_item(pc_register_button) await self.get_channel(dislocker.server_config["bot"]["config_public_channel_id"]).send(f'# :index_pointing_at_the_viewer: 使いたいPCの番号を選んでください!', view=pc_button_view) elif msg_split[0] == "/stopbutton": stop_button_view = View(timeout=None) stop_button = discord.ui.Button(style=discord.ButtonStyle.danger, label="PCの使用を停止", custom_id="stop") stop_button_view.add_item(stop_button) await self.get_channel(dislocker.server_config["bot"]["config_public_channel_id"]).send(f'# :index_pointing_at_the_viewer: 使用を停止しますか?', view=stop_button_view) elif msg_split[0] == "/userbutton": user_register_button_view = View(timeout=None) user_register_button = discord.ui.Button(style=discord.ButtonStyle.green, label="ユーザー登録", custom_id="user_register") user_register_button_view.add_item(user_register_button) await self.get_channel(dislocker.server_config["bot"]["config_public_channel_id"]).send(f'# :index_pointing_at_the_viewer: ユーザー登録はお済ですか?', view=user_register_button_view) elif message.channel.id == dislocker.server_config["bot"]["config_public_channel_id"]: msg_split = message.content.split() if msg_split[0] == "/register": print(len(msg_split)) if len(msg_split) == 1: register = self.user_register(name=message.author.display_name, discord_user_name=message.author.name, discord_user_id=message.author.id) print(register) if register["result"] == "ok": await message.channel.send(f"# :white_check_mark: ユーザー情報が登録されました。\nユーザー名:{message.author.display_name}") elif register["result"] == "already_exists": await message.channel.send("# :skull_crossbones: 登録できませんでした。\nそのDiscordアカウントはすでに登録されています。") else: await message.channel.send("# :skull_crossbones: 登録できませんでした。\n内部エラーが発生しています。") else: await message.channel.send("# :skull_crossbones: 登録できませんでした。\n\n-# もしかして...\n-# 手動でメンバーを登録したいですか?\n-# もしそうなら、このチャンネルにはその権限がありません。\n-# そのチャンネルに移動してから、もう一度試してみてください!") async def on_button(self, interaction: Interaction): custom_id = interaction.data["custom_id"] custom_id_split = custom_id.split("_") print(custom_id, custom_id_split[0]) if custom_id_split[0] == "pcregister": device_register_view = View(timeout=15) pc_number = custom_id_split[1] print(custom_id_split) for i in range(1, 20): device_register_button = discord.ui.Button(style=discord.ButtonStyle.primary, label=f"{i}", custom_id=f"deviceregister_{str(pc_number)}_{i}") device_register_view.add_item(device_register_button) await interaction.response.send_message(f"# :keyboard: デバイス番号を選んでください!\n>>> # PC番号 | {str(pc_number)}", view=device_register_view, ephemeral=True) elif custom_id_split[0] == "deviceregister": pc_number = custom_id_split[1] device_number = custom_id_split[2] reason_register_view = View(timeout=15) reason_button = discord.ui.Button(style=discord.ButtonStyle.primary, label="使用目的を入力する", custom_id=f"reasonregister_{str(pc_number)}_{str(device_number)}") reason_register_view.add_item(reason_button) await interaction.response.send_message(f"# :regional_indicator_q: 使用目的を書いてください!\n>>> # PC番号 | {str(pc_number)}\n# デバイス番号 | {str(device_number)}", view=reason_register_view, ephemeral=True) elif custom_id_split[0] == "reasonregister": pc_number = custom_id_split[1] device_number = custom_id_split[2] reason_input_form = Reason(title="Dislocker | 登録", pc_number=str(pc_number), device_number=str(device_number)) await interaction.response.send_modal(reason_input_form) elif custom_id_split[0] == "stop": print("STOP running") pc_stop = self.stop(user_id=interaction.user.id) print(pc_stop) stop_view = View(timeout=15) if pc_stop["result"] == "unused": await interaction.response.send_message("# :shaking_face: 使用されていないようです...", ephemeral=True) elif pc_stop["result"] == "ok": await interaction.response.send_message(f":white_check_mark: PC番号 {pc_stop["pc_number"]} の使用が終了されました。", ephemeral=True) await self.get_channel(dislocker.server_config["bot"]["log_channel_id"]).send(f':negative_squared_cross_mark: {pc_stop["name"]} さんがPC {pc_stop["pc_number"]} の使用を終了しました。') else: await interaction.response.send_message("# :skull_crossbones: 停止できませんでした。\n内部エラーが発生しています。", ephemeral=True) elif custom_id_split[0] == "user" and custom_id_split[1] == "register": print("User Register RUnning") user_register = self.user_register(name=interaction.user.display_name, discord_user_name=interaction.user.name, discord_user_id=interaction.user.id) if user_register["result"] == "ok": await interaction.response.send_message(f"# :white_check_mark: ユーザー情報が登録されました。\n>>> ユーザー名:{interaction.user.display_name}", ephemeral=True) elif user_register["result"] == "already_exists": await interaction.response.send_message("# :no_entry: 登録できませんでした。\nもう登録されている可能性があります。", ephemeral=True) else: await interaction.response.send_message("# :no_entry: 登録できませんでした。\n内部エラーが発生しています。", ephemeral=True) class Monitor(): def __init__(self) -> None: pass def start(self, **kwargs): self.serach_time = kwargs["search_time"] self.allowable_time = kwargs["allowable_time"] serach_thread = threading.Thread(target=self.search) serach_thread.start() def search(self): try: while True: cursor = dislocker.db.cursor() cursor.execute("SELECT * FROM pc_list WHERE password_hash IS NOT NULL") pc_list = cursor.fetchall() print(pc_list) print(len(pc_list)) if pc_list: if len(pc_list) == 1: user_id = pc_list[0][1] cursor.execute("SELECT * FROM pc_usage_history WHERE member_id= %s AND end_use_time IS NULL ORDER BY id DESC LIMIT 1", (user_id,)) pc_usage = cursor.fetchall() print(pc_usage) start_time = pc_usage[0][4] print(start_time) print(type(start_time)) current_time = datetime.now() time_difference = current_time - start_time if time_difference >= timedelta(seconds=self.allowable_time): cursor.execute("SELECT * FROM club_member WHERE id = %s", (user_id,)) user_info = cursor.fetchall() stop = bot.stop(user_id=user_info[0][3]) bot.timeout_notify(pc_number=pc_list[0][0], discord_display_name=user_info[0][1]) time.sleep(1) elif len(pc_list) >= 2: for i in pc_list: print(i) user_id = i[1] cursor.execute("SELECT * FROM pc_usage_history WHERE member_id= %s AND end_use_time IS NULL ORDER BY id DESC LIMIT 1", (user_id,)) pc_usage = cursor.fetchall() print(pc_usage) start_time = pc_usage[0][4] print(start_time) print(type(start_time)) current_time = datetime.now() time_difference = current_time - start_time if time_difference >= timedelta(seconds=self.allowable_time): cursor.execute("SELECT * FROM club_member WHERE id = %s", (user_id,)) user_info = cursor.fetchall() stop = bot.stop(user_id=user_info[0][3]) bot.timeout_notify(pc_number=i[0], discord_display_name=user_info[0][1]) time.sleep(0.1) else: result = {"result": "NONE"} else: result = {"result": "NONE"} cursor.close() print(result["result"]) time.sleep(self.serach_time) except Exception as error: print("自動停止処理中にエラーが発生しました。\nエラー内容") print(str(error.__class__.__name__)) print(str(error.args)) print(str(error)) result = {"result": "error"} dislocker.db.rollback() finally: cursor.close() print(result["result"]) return result class Reason(Modal): def __init__(self, title: str, pc_number: str, device_number: str, timeout=15) -> None: super().__init__(title=title, timeout=timeout) print(pc_number) print(device_number) self.reason_input_form = TextInput(label="使用目的を入力してください", style=TextStyle.short, custom_id=f"register_{pc_number}_{device_number}") self.add_item(self.reason_input_form) async def on_submit(self, interaction: Interaction) -> None: custom_id = interaction.data["components"][0]["components"][0]["custom_id"] print(custom_id) custom_id_split = custom_id.split("_") pc_number = custom_id_split[1] device_number = custom_id_split[2] register = bot.register(user_id=interaction.user.id, name=interaction.user.name, display_name=interaction.user.display_name, pc_number=pc_number, device_number=device_number, detail=self.reason_input_form.value) print(register["result"]) if register["result"] == "ok": await interaction.response.send_message(f":white_check_mark: 使用が開始されました。\n>>> # パスワード | {register["password"]}\n## PC番号 | {pc_number}\n## デバイス番号 | {device_number}\n## 使用目的 | {self.reason_input_form.value}", ephemeral=True) await bot.get_channel(dislocker.server_config["bot"]["log_channel_id"]).send(f':white_check_mark: {register["name"]} さんがPC {pc_number} の使用を開始しました。') elif register["result"] == "pc_already_in_use_by_you": await interaction.response.send_message(f"# :exploding_head: あなたはPCをもう使用されているようです。\n使用状態を解除するには 終了ボタン で使用終了をお知らせください。\n>>> # PC番号 | {register["pc_number"]}\n# デバイス番号 | {register["device_number"]}\n# 使用開始時刻 | {register["start_time"]}\n# 使用目的 | {register["detail"]}", ephemeral=True) elif register["result"] == "pc_already_in_use_by_other": await interaction.response.send_message(f"# :man_gesturing_no: そのPCは他のメンバーによって使用されています。\n別のPC番号を指定して、再度お試しください。", ephemeral=True) elif register["result"] == "user_data_not_found": await interaction.response.send_message("# :dizzy_face: ユーザーとして登録されていないようです。\n最初にサーバーで登録を行ってください。", ephemeral=True) else: await interaction.response.send_message("# :skull_crossbones: 登録できませんでした。\n内部エラーが発生しています。", ephemeral=True) dislocker = DL() if dislocker.init_result == "ok": print("Botを起動します...") intents = discord.Intents.default() intents.message_content = True bot = Bot(intents=intents) monitor = Monitor() monitor.start(search_time=10, allowable_time=30) bot.run(dislocker.server_config['bot']['token']) else: pass