import json import discord import os import psycopg2 from psycopg2 import sql import hashlib import string import random from datetime import datetime from openpyxl import Workbook class DL(): def __init__(self): self.config_dir_path = "./config/" self.export_dir_path = "./export/" self.server_config_path = self.config_dir_path + "server.json" try: if not os.path.isdir(self.config_dir_path): print("config ディレクトリが見つかりません... 作成します。") os.mkdir(self.config_dir_path) if not os.path.isfile(self.server_config_path): print("config ファイルが見つかりません... 作成します。") self.server_config = { "db": { "host": "localhost", "port": "5432", "db_name": "dislocker", "username": "user", "password": "password" }, "bot": { "token": "TYPE HERE BOTS TOKEN KEY", "log_channel_id" : "TYPE HERE CHANNEL ID (YOU MUST USE INT !!!!)", "config_channel_id": "TYPE HERE CHANNEL ID (YOU MUST USE INT !!!!)", "config_public_channel_id": "TYPE HERE CHANNEL ID (YOU MUST USE INT !!!!)" } } with open(self.server_config_path, "w") as w: json.dump(self.server_config, w, indent=4) elif os.path.isfile(self.server_config_path): with open(self.server_config_path, "r") as r: self.server_config = json.load(r) print("config ファイルを読み込みました。") if not os.path.isdir(self.export_dir_path): print("export ディレクトリが見つかりません... 作成します。") os.mkdir(self.export_dir_path) if type(self.server_config["bot"]["log_channel_id"]) is not int or type(self.server_config["bot"]["config_channel_id"]) is not int: print("config ファイル内でチャンネルIDがint型で記述されていません。int型で記述して、起動してください。") self.init_result = "not_int" else: self.db = psycopg2.connect(f"host={self.server_config['db']['host']} dbname={self.server_config['db']['db_name']} port={self.server_config['db']['port']} user={self.server_config['db']['username']} password={self.server_config['db']['password']}") cursor = self.db.cursor() cursor.execute("SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'club_member')") find_club_member_table = cursor.fetchall() print(find_club_member_table) if find_club_member_table[0][0] == False: cursor.execute("CREATE TABLE club_member (id SERIAL NOT NULL, name VARCHAR(30) NOT NULL, discord_username VARCHAR(32) NOT NULL, discord_userid VARCHAR(18) NOT NULL, PRIMARY KEY (id))") self.db.commit() cursor.execute("SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'pc_list')") find_pc_list_table = cursor.fetchall() print(find_pc_list_table) if find_pc_list_table[0][0] == False: cursor.execute("CREATE TABLE pc_list (pc_number INTEGER NOT NULL, using_user_id INTEGER, password_hash VARCHAR(32), PRIMARY KEY (pc_number), FOREIGN KEY (using_user_id) REFERENCES club_member(id))") self.db.commit cursor.execute("SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'pc_usage_history')") find_pc_usage_history_table = cursor.fetchall() print(find_pc_usage_history_table) if find_pc_usage_history_table[0][0] == False: cursor.execute("CREATE TABLE pc_usage_history (id SERIAL NOT NULL, member_id INTEGER NOT NULL, pc_number INTEGER NOT NULL, device_number INTEGER NOT NULL, start_use_time TIMESTAMP NOT NULL, end_use_time TIMESTAMP, use_detail VARCHAR(30), PRIMARY KEY (id), FOREIGN KEY (member_id) REFERENCES club_member(id), FOREIGN KEY (pc_number) REFERENCES pc_list(pc_number))") self.db.commit() cursor.close() self.init_result = "ok" except (Exception) as error: print("初回処理でエラーが発生しました。\nエラー内容\n" + str(error)) self.init_result = "error" finally: pass class Bot(discord.Client): def password_generate(self, length): numbers = string.digits # (1) password = ''.join(random.choice(numbers) for _ in range(length)) # (2) return password def hash_genarate(self, source): hashed = hashlib.md5(source.encode()) return hashed.hexdigest() def register(self, **kwargs): try: user_info = { "id": str(kwargs["user_id"]), "name": str(kwargs["name"]), "display_name": str(kwargs["display_name"]), "pc_number": int(kwargs["pc_number"]), "device_number": int(kwargs["device_number"]), "detail": None } if "detail" in kwargs: user_info["detail"] = str(kwargs["detail"]) else: pass #パスワード生成、ハッシュ化 password = self.password_generate(4) password_hash = self.hash_genarate(password) print("password generated") #メンバーリストと送信者を照合 cursor = dislocker.db.cursor() cursor.execute("SELECT * FROM club_member WHERE discord_userid = %s", (user_info["id"],)) user_record = cursor.fetchall() #ユーザーデータがなかったら(未登録の場合) if not user_record: result = {"result": "user_data_not_found"} #ユーザーデータが見つかった場合(登録済みの場合) else: print("found user data") cursor.execute("SELECT * FROM pc_usage_history WHERE member_id=%s ORDER BY id DESC LIMIT 1", (user_record[0][0],)) pc_usage_history_record = cursor.fetchall() if pc_usage_history_record: print("used") if pc_usage_history_record[0][5] == None: result = {"result": "pc_already_in_use_by_you", "pc_number": str(pc_usage_history_record[0][2]), "device_number": str(pc_usage_history_record[0][3]), "start_time": str(pc_usage_history_record[0][4]), "detail": str(pc_usage_history_record[0][6])} else: cursor.execute("SELECT * FROM pc_list WHERE pc_number=%s", (user_info["pc_number"],)) pc_list_record = cursor.fetchall() if not pc_list_record[0][1] == None: result = {"result": "pc_already_in_use_by_other"} else: if user_info["detail"] == None: cursor.execute("INSERT INTO pc_usage_history (member_id, pc_number, device_number, start_use_time) VALUES (%s, %s, %s, current_timestamp)", (user_record[0][0], user_info["pc_number"], user_info["device_number"])) #使用用途があるとき else: cursor.execute("INSERT INTO pc_usage_history (member_id, pc_number, device_number, start_use_time, use_detail) VALUES (%s, %s, %s, current_timestamp, %s)", (user_record[0][0], user_info["pc_number"], user_info["device_number"], user_info["detail"])) cursor.execute("UPDATE pc_list SET using_user_id = %s WHERE pc_number = %s", (user_record[0][0], user_info["pc_number"])) cursor.execute("UPDATE pc_list SET password_hash = %s WHERE pc_number = %s", (password_hash, user_info["pc_number"])) dislocker.db.commit() result = {"result": "ok", "password": str(password), "name": str(user_record[0][1])} else: print("unused") cursor.execute("SELECT * FROM pc_list WHERE pc_number=%s", (user_info["pc_number"],)) pc_list_record = cursor.fetchall() if pc_list_record: if not pc_list_record[0][1] == None: result = {"result": "pc_already_in_use_by_other"} else: if user_info["detail"] == None: cursor.execute("INSERT INTO pc_usage_history (member_id, pc_number, device_number, start_use_time) VALUES (%s, %s, %s, current_timestamp)", (user_record[0][0], user_info["pc_number"], user_info["device_number"])) #使用用途があるとき else: cursor.execute("INSERT INTO pc_usage_history (member_id, pc_number, device_number, start_use_time, use_detail) VALUES (%s, %s, %s, current_timestamp, %s)", (user_record[0][0], user_info["pc_number"], user_info["device_number"], user_info["detail"])) cursor.execute("UPDATE pc_list SET using_user_id = %s WHERE pc_number = %s", (user_record[0][0], user_info["pc_number"])) cursor.execute("UPDATE pc_list SET password_hash = %s WHERE pc_number = %s", (password_hash, user_info["pc_number"])) dislocker.db.commit() result = {"result": "ok", "password": str(password), "name": str(user_record[0][1])} else: if user_info["detail"] == None: cursor.execute("INSERT INTO pc_usage_history (member_id, pc_number, device_number, start_use_time) VALUES (%s, %s, %s, current_timestamp)", (user_record[0][0], user_info["pc_number"], user_info["device_number"])) #使用用途があるとき else: cursor.execute("INSERT INTO pc_usage_history (member_id, pc_number, device_number, start_use_time, use_detail) VALUES (%s, %s, %s, current_timestamp, %s)", (user_record[0][0], user_info["pc_number"], user_info["device_number"], user_info["detail"])) cursor.execute("UPDATE pc_list SET using_user_id = %s WHERE pc_number = %s", (user_record[0][0], user_info["pc_number"])) cursor.execute("UPDATE pc_list SET password_hash = %s WHERE pc_number = %s", (password_hash, user_info["pc_number"])) dislocker.db.commit() result = {"result": "ok", "password": str(password), "name": str(user_record[0][1])} except Exception as error: print("登録処理中にエラーが発生しました。\nエラー内容") print(str(error.__class__.__name__)) print(str(error.args)) print(str(error)) result = {"result": "error"} finally: cursor.close() return result def stop(self, **kwargs): try: discord_user_id = str(kwargs["user_id"]) cursor = dislocker.db.cursor() cursor.execute("SELECT * FROM club_member WHERE discord_userid = %s", (discord_user_id,)) user_record = cursor.fetchall() cursor.execute("SELECT * FROM pc_usage_history WHERE member_id= %s ORDER BY id DESC LIMIT 1", (user_record[0][0],)) pc_usage_history_record = cursor.fetchall() if pc_usage_history_record: if not pc_usage_history_record[0][5] == None: result = {"result": "unused"} else: cursor.execute("UPDATE pc_usage_history SET end_use_time = current_timestamp WHERE id = %s", (pc_usage_history_record[0][0],)) cursor.execute("UPDATE pc_list SET using_user_id = NULL WHERE pc_number = %s", (pc_usage_history_record[0][2],)) cursor.execute("UPDATE pc_list SET password_hash = NULL WHERE pc_number = %s", (pc_usage_history_record[0][2],)) dislocker.db.commit() result = {"result": "ok", "pc_number": str(pc_usage_history_record[0][2]), "name": str(user_record[0][1])} except: print("停止処理にエラーが発生しました。") result = {"result": "error"} finally: cursor.close() return result def user_register(self, **kwargs): try: discord_user_id = str(kwargs["discord_user_id"]) discord_user_name = str(kwargs["discord_user_name"]) name = str(kwargs["name"]) cursor = dislocker.db.cursor() cursor.execute("SELECT * FROM club_member WHERE discord_userid = %s", (discord_user_id,)) user_record = cursor.fetchall() if not user_record: cursor.execute("INSERT INTO club_member (name, discord_username, discord_userid) VALUES (%s, %s, %s)", (name, discord_user_name, discord_user_id)) dislocker.db.commit() result = {"result": "ok"} else: result = {"result": "already_exists"} except Exception as error: print("ユーザー登録中にエラーが発生しました。\nエラー内容") print(str(error.__class__.__name__)) print(str(error.args)) print(str(error)) result = {"result": "error"} finally: cursor.close() return result def format_datetime(self, value): if isinstance(value, datetime): return value.strftime('%Y-%m-%d %H:%M:%S') return value def pc_register(self, **kwargs): try: pc_number = int(kwargs["pc_number"]) cursor = dislocker.db.cursor() cursor.execute("SELECT * FROM pc_list WHERE pc_number = %s", (pc_number,)) pc_list = cursor.fetchall() if not pc_list: cursor.execute("INSERT INTO pc_list (pc_number) VALUES (%s)", (pc_number,)) dislocker.db.commit() result = {"result": "ok"} else: result = {"result": "already_exists"} except Exception as error: print("PCの登録中にエラーが発生しました。\nエラー内容") print(str(error.__class__.__name__)) print(str(error.args)) print(str(error)) result = {"result": "error"} finally: cursor.close() return result def report_export(self, **kwargs): try: cursor = dislocker.db.cursor() csv_file_path = dislocker.export_dir_path + "pc_usage_history.csv" main_table = "pc_usage_history" related_table = "club_member" excel_file_path = dislocker.export_dir_path + "pc_usage_history.xlsx" # メインテーブルの列情報を取得(user_idを除く) cursor.execute(sql.SQL("SELECT * FROM {} LIMIT 0").format(sql.Identifier(main_table))) main_columns = [desc[0] for desc in cursor.description if desc[0] != 'member_id'] # クエリを作成(列名を明確に指定) query = sql.SQL(""" SELECT {main_columns}, {related_table}.name FROM {main_table} LEFT JOIN {related_table} ON {main_table}.member_id = {related_table}.id ORDER BY id """).format( main_columns=sql.SQL(', ').join([sql.SQL("{}.{}").format(sql.Identifier(main_table), sql.Identifier(col)) for col in main_columns]), main_table=sql.Identifier(main_table), related_table=sql.Identifier(related_table) ) cursor.execute(query) # 列名を再構成(nameを2番目に配置) column_names = [main_columns[0], 'name'] + main_columns[1:] rows = cursor.fetchall() # Excelワークブックを作成 wb = Workbook() ws = wb.active # 列名を書き込み ws.append(column_names) # データを書き込み for row in rows: # nameを2番目に移動 formatted_row = [self.format_datetime(row[0])] + [row[-1]] + [self.format_datetime(field) if field is not None else '' for field in row[1:-1]] ws.append(formatted_row) # 列幅を自動調整 for col in ws.columns: max_length = 0 column = col[0].column_letter for cell in col: try: if len(str(cell.value)) > max_length: max_length = len(str(cell.value)) except: pass adjusted_width = (max_length + 2) * 1.2 ws.column_dimensions[column].width = adjusted_width # Excelファイルを保存 wb.save(excel_file_path) print(f"テーブル '{main_table}' の内容を '{excel_file_path}' に出力しました。") result = {"result": "ok", "file_path": excel_file_path} except (Exception, psycopg2.Error) as error: print("使用履歴のエクスポート時にエラーが発生しました\nエラー内容\n", str(error)) result = {"result": "export_error"} finally: cursor.close() return result def force_stop(self, **kwargs): try: pc_number = kwargs["pc_number"] cursor = dislocker.db.cursor() cursor.execute("SELECT * FROM pc_list WHERE pc_number = %s", (pc_number,)) pc_list_record = cursor.fetchall() if not pc_list_record[0][1] == None: cursor.execute("UPDATE pc_list SET using_user_id = NULL WHERE pc_number = %s", (pc_number,)) if not pc_list_record[0][2] == None: cursor.execute("UPDATE pc_list SET password_hash = NULL WHERE pc_number = %s", (pc_number,)) cursor.execute("SELECT * FROM pc_usage_history WHERE member_id = %s AND pc_number = %s ORDER BY id DESC LIMIT 1", (pc_list_record[0][1], pc_number)) pc_usage_history_record = cursor.fetchall() cursor.execute("UPDATE pc_usage_history SET end_use_time = current_timestamp WHERE id = %s", (pc_usage_history_record[0][0],)) dislocker.db.commit() result = {"result": "ok"} else: result = {"result": "not_used"} except: result = {"result": "error"} finally: cursor.close() return result async def on_ready(self): print("DiscordのBotが起動しました。") async def on_message(self, message): if message.author.bot: pass elif isinstance(message.channel, discord.DMChannel): msg_split = message.content.split() if msg_split[0] == "/password" or msg_split[0] == "/start": #メッセージの要素が2つ以下の場合は拒否 if len(msg_split) <= 2: await message.channel.send("# :warning: PC番号、もしくはデバイス番号が入力されていません。") #メッセージの要素が3つ以上の場合 elif len(msg_split) >= 3: #番号が数字であることを確認 if msg_split[1].isdigit() and msg_split[2].isdigit(): #PC番号が1以上10以下であることを確認 if int(msg_split[1]) <= 10 and int(msg_split[1]) >= 1: if len(msg_split) == 3: register = self.register(user_id=message.author.id, name=message.author.name, display_name=message.author.display_name, pc_number=msg_split[1], device_number=msg_split[2]) elif len(msg_split) == 4: register = self.register(user_id=message.author.id, name=message.author.name, display_name=message.author.display_name, pc_number=msg_split[1], device_number=msg_split[2], detail=msg_split[3]) if register["result"] == "ok": if len(msg_split) == 3: await message.channel.send(f":white_check_mark: 使用が開始されました。\n>>> # パスワード | {register["password"]}\n## PC番号 | {msg_split[1]}\n## デバイス番号 | {msg_split[2]}") elif len(msg_split) == 4: await message.channel.send(f":white_check_mark: 使用が開始されました。\n>>> # パスワード | {register["password"]}\n## PC番号 | {msg_split[1]}\n## デバイス番号 | {msg_split[2]}\n## 使用目的 | {msg_split[3]}") await self.get_channel(dislocker.server_config["bot"]["log_channel_id"]).send(f':white_check_mark: {register["name"]} さんがPC {msg_split[1]} の使用を開始しました。') elif register["result"] == "user_data_not_found": await message.channel.send("# :dizzy_face: ユーザーとして登録されていないようです。\n最初にサーバーで登録を行ってください。") elif register["result"] == "pc_already_in_use_by_you": await message.channel.send(f"# :exploding_head: あなたはPCをもう使用されているようです。\n使用状態を解除するには /stop で使用終了をお知らせください。\n>>> # PC番号 | {register["pc_number"]}\n# デバイス番号 | {register["device_number"]}\n# 使用開始時刻 | {register["start_time"]}\n# 使用目的 | {register["detail"]}") elif register["result"] == "pc_already_in_use_by_other": await message.channel.send(f"# :man_gesturing_no: そのPCは他のメンバーによって使用されています。\n別のPC番号を指定して、再度お試しください。") else: await message.channel.send("# :dizzy_face: 番号がおかしいようです。") else: await message.channel.send("# :dizzy_face: 指定された番号は不正です。") elif msg_split[0] == "/stop": stop = self.stop(user_id=message.author.id) if stop["result"] == "unused": await message.channel.send("# :shaking_face: 使用されていないようです...") elif stop["result"] == "ok": await message.channel.send(f":white_check_mark: PC番号 {stop["pc_number"]} の使用が終了されました。") await self.get_channel(dislocker.server_config["bot"]["log_channel_id"]).send(f':negative_squared_cross_mark: {stop["name"]} さんがPC {stop["pc_number"]} の使用を終了しました。') elif message.channel.id == dislocker.server_config["bot"]["config_channel_id"]: msg_split = message.content.split() if msg_split[0] == "/register": print(len(msg_split)) if len(msg_split) == 1: register = self.user_register(name=message.author.display_name, discord_user_name=message.author.name, discord_user_id=message.author.id) print(register) if register["result"] == "ok": await message.channel.send(f"# :white_check_mark: ユーザー情報が登録されました。\n>>> ユーザー名:{message.author.display_name}") elif register["result"] == "already_exists": await message.channel.send("# :no_entry: 登録できませんでした。\nもう登録されている可能性があります。") else: await message.channel.send("# :no_entry: 登録できませんでした。\n内部エラーが発生しています。") elif len(msg_split) <= 3: await message.channel.send("# :japanese_goblin: 入力内容に不備があります。\n名前、Discordのユーザー名、DiscordのユーザーIDのいずれかが入力されていません。") elif len(msg_split) == 4: if msg_split[3].isdigit(): register = self.user_register(name=msg_split[1], discord_user_name=msg_split[2], discord_user_id=msg_split[3]) if register["result"] == "ok": await message.channel.send(f"# :white_check_mark: 登録が完了しました。\n>>> # 名前 | {msg_split[1]}\n# Discordのユーザー名 | {msg_split[2]}\n# DiscordのユーザーID | {msg_split[3]}") elif register["result"] == "already_exists": await message.channel.send("# :skull_crossbones: 登録できませんでした。\nそのDiscordアカウントはすでに登録されています。") else: await message.channel.send("# :skull_crossbones: 登録できませんでした。\nDiscordのユーザーIDが不正です。") else: await message.channel.send("# :skull_crossbones: 登録できませんでした。\n内部エラーが発生しています。") elif msg_split[0] == "/export": export = self.report_export() if export["result"] == "ok": await message.channel.send("# :page_facing_up: 使用履歴のレポートです。", file=discord.File(export["file_path"])) pass elif export["result"] == "export_error": await message.channel.send("# :volcano: エクスポートに失敗しました。") elif msg_split[0] == "/fstop": if len(msg_split) == 1: await message.channel.send("# :warning: 登録を解除できませんでした。\n使用を停止したいPC番号を指定してください。\n-# /fstop PC番号") elif len(msg_split) == 2: if msg_split[1].isdigit(): fstop = self.force_stop(pc_number=msg_split[1]) if fstop["result"] == "ok": await message.channel.send(f"# :white_check_mark: PC番号 {msg_split[1]} の使用登録を解除しました。") elif fstop["result"] == "not_used": await message.channel.send("# :exploding_head: 登録を解除できませんでした。\nPCは使用されていないようです...") else: await message.channel.send("# :x: 登録を解除できませんでした。\n内部エラーが発生しています。") else: await message.channel.send("# :warning: 登録を解除できませんでした。\nPC番号を認識できません。\n-# 半角数字で入力してください。") else: await message.channel.send("# warning: 登録を解除できませんでした。\構文が間違っています。\n-# /fstop PC番号") elif msg_split[0] == "/pcregister": if len(msg_split) == 1: await message.channel.send("# :warning: PCを登録できませんでした。\n登録したいPC番号を指定してください。\n-# 半角数字で入力してください。") elif len(msg_split) == 2: if msg_split[1].isdigit(): pc_register = self.pc_register(pc_number=msg_split[1]) if pc_register["result"] == "ok": await message.channel.send(f"# :white_check_mark: PCを登録しました。\n>>> # PC番号 | {msg_split[1]}") elif pc_register["result"] == "already_exists": await message.channel.send(f":x: PCを登録できませんでした。\nその番号のPCは既に存在します。") else: await message.channel.send("# :x: PCを登録できませんでした。\n内部エラーが発生しています。") else: await message.channel.send("# :warning: PCを登録できませんでした。\nPC番号を認識できません。\n-# 半角数字で入力してください。") else: await message.channel.send("# :warning: PCを登録できませんでした。\n構文が間違っています。\n-# /pcregister PC番号") elif message.channel.id == dislocker.server_config["bot"]["config_public_channel_id"]: msg_split = message.content.split() if msg_split[0] == "/register": print(len(msg_split)) if len(msg_split) == 1: register = self.user_register(name=message.author.display_name, discord_user_name=message.author.name, discord_user_id=message.author.id) print(register) if register["result"] == "ok": await message.channel.send(f"# :white_check_mark: ユーザー情報が登録されました。\nユーザー名:{message.author.display_name}") elif register["result"] == "already_exists": await message.channel.send("# :skull_crossbones: 登録できませんでした。\nそのDiscordアカウントはすでに登録されています。") else: await message.channel.send("# :skull_crossbones: 登録できませんでした。\n内部エラーが発生しています。") else: await message.channel.send("# :skull_crossbones: 登録できませんでした。\n\n-# もしかして...\n-# 手動でメンバーを登録したいですか?\n-# もしそうなら、このチャンネルにはその権限がありません。\n-# そのチャンネルに移動してから、もう一度試してみてください!") dislocker = DL() if dislocker.init_result == "ok": print("Botを起動します...") intents = discord.Intents.default() intents.message_content = True bot = Bot(intents=intents) bot.run(dislocker.server_config['bot']['token']) else: pass