Dislocker/dislocker.py

1641 lines
102 KiB
Python
Raw Normal View History

2024-06-04 09:36:29 +09:00
import discord
import os
2024-09-30 17:32:44 +09:00
import json
2024-06-04 09:36:29 +09:00
import psycopg2
2024-07-07 18:00:49 +09:00
from psycopg2 import sql
2024-09-30 17:32:44 +09:00
from datetime import datetime, timedelta
import asyncio
import string
import random
2024-09-30 17:32:44 +09:00
import hashlib
import openpyxl
from openpyxl import Workbook
import threading
import time
2024-06-04 09:36:29 +09:00
2024-07-07 15:33:30 +09:00
class DL():
def __init__(self):
self.config_dir_path = "./config/"
self.export_dir_path = "./export/"
self.log_dir_path = "./log/"
self.server_config_path = self.config_dir_path + "server.json"
self.onetime_config_path = self.config_dir_path + "onetime.json"
self.log_path = self.log_dir_path + "dislocker.txt"
2024-07-07 15:52:46 +09:00
try:
if not os.path.isdir(self.config_dir_path):
print("config ディレクトリが見つかりません... 作成します。")
os.mkdir(self.config_dir_path)
2024-07-07 15:52:46 +09:00
if not os.path.isfile(self.server_config_path):
print("config ファイルが見つかりません... 作成します。")
self.server_config = {
"db": {
"host": "localhost",
"port": "5432",
"db_name": "dislocker",
"username": "user",
"password": "password"
},
"bot": {
2024-09-30 17:32:44 +09:00
"server_id": ["TYPE HERE SERVER ID (YOU MUST USE INT !!!!)"],
2024-07-07 15:52:46 +09:00
"token": "TYPE HERE BOTS TOKEN KEY",
"activity": {
"name": "Dislocker",
"details": "ロック中...",
"type": "playing",
"state": "ロック中..."
},
"log_channel_id" : "TYPE HERE CHANNEL ID (YOU MUST USE INT !!!!)",
"config_channel_id": "TYPE HERE CHANNEL ID (YOU MUST USE INT !!!!)",
"config_public_channel_id": "TYPE HERE CHANNEL ID (YOU MUST USE INT !!!!)",
"monitor": {
"search_frequency": 1,
"allowable_time": 180,
"fstop_time": "21:00:00"
},
"preset_games": ["TEST1", "TEST2", "TEST3", "TEST4", "TEST5"],
2024-09-30 17:32:44 +09:00
"admin_user_id": ["TYPE HERE CHANNEL ID (YOU MUST USE INT !!!!)"],
"debug": False
2024-07-07 15:52:46 +09:00
}
}
with open(self.server_config_path, "w", encoding="utf-8") as w:
json.dump(self.server_config, w, indent=4, ensure_ascii=False)
2024-07-07 15:52:46 +09:00
elif os.path.isfile(self.server_config_path):
with open(self.server_config_path, "r", encoding="utf-8") as r:
2024-07-07 15:52:46 +09:00
self.server_config = json.load(r)
2024-07-07 16:04:46 +09:00
print("config ファイルを読み込みました。")
2024-07-07 15:52:46 +09:00
if not os.path.isdir(self.export_dir_path):
print("export ディレクトリが見つかりません... 作成します。")
os.mkdir(self.export_dir_path)
if not os.path.isdir(self.log_dir_path):
print("log ディレクトリが見つかりません... 作成します。")
os.mkdir(self.log_dir_path)
2024-07-07 17:34:51 +09:00
2024-09-30 17:32:44 +09:00
if os.path.isfile(self.onetime_config_path):
print("ワンタイムパスワードが見つかりました。削除します。")
os.remove(self.onetime_config_path)
2024-07-07 17:34:51 +09:00
if type(self.server_config["bot"]["log_channel_id"]) is not int or type(self.server_config["bot"]["config_channel_id"]) is not int:
print("config ファイル内でチャンネルIDがint型で記述されていません。int型で記述して、起動してください。")
self.init_result = "not_int"
else:
self.db = psycopg2.connect(f"host={self.server_config['db']['host']} dbname={self.server_config['db']['db_name']} port={self.server_config['db']['port']} user={self.server_config['db']['username']} password={self.server_config['db']['password']}")
cursor = self.db.cursor()
self.pc_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
self.keyboard_list = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
self.mouse_list = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
self.preset_games = self.server_config["bot"]["preset_games"]
self.debug = self.server_config["bot"]["debug"]
cursor.execute("SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'club_member')")
find_club_member_table = cursor.fetchall()
print(find_club_member_table)
if find_club_member_table[0][0] == False:
cursor.execute("CREATE TABLE club_member (member_id SERIAL NOT NULL, name TEXT NOT NULL, discord_user_name TEXT NOT NULL, discord_user_id TEXT NOT NULL, PRIMARY KEY (member_id))")
self.db.commit()
cursor.execute("SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'pc_list')")
find_pc_list_table = cursor.fetchall()
print(find_pc_list_table)
if find_pc_list_table[0][0] == False:
2024-09-30 17:32:44 +09:00
cursor.execute("CREATE TABLE pc_list (pc_number INTEGER NOT NULL, using_member_id INTEGER, password_hash VARCHAR(64), pc_uuid VARCHAR(36), pc_token VARCHAR(36), master_password VARCHAR(16), detail TEXT, alt_name TEXT, PRIMARY KEY (pc_number), FOREIGN KEY (using_member_id) REFERENCES club_member(member_id))")
for i in self.pc_list:
print(i)
cursor.execute("INSERT INTO pc_list (pc_number) VALUES (%s)", (i,))
self.db.commit()
cursor.execute("SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'keyboard_list')")
find_keyboard_list_table = cursor.fetchall()
print(find_keyboard_list_table)
if find_keyboard_list_table[0][0] == False:
2024-09-30 17:32:44 +09:00
cursor.execute("CREATE TABLE keyboard_list (keyboard_number INTEGER NOT NULL, using_member_id INTEGER, device_instance_path TEXT, device_name TEXT, detail TEXT, alt_name TEXT, PRIMARY KEY (keyboard_number), FOREIGN KEY (using_member_id) REFERENCES club_member(member_id))")
for i in self.keyboard_list:
print(i)
cursor.execute("INSERT INTO keyboard_list (keyboard_number) VALUES (%s)", (i,))
self.db.commit()
cursor.execute("SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'mouse_list')")
find_mouse_list_table = cursor.fetchall()
print(find_mouse_list_table)
if find_mouse_list_table[0][0] == False:
2024-09-30 17:32:44 +09:00
cursor.execute("CREATE TABLE mouse_list (mouse_number INTEGER NOT NULL, using_member_id INTEGER, device_instance_path TEXT, device_name TEXT, detail TEXT, alt_name TEXT, PRIMARY KEY (mouse_number), FOREIGN KEY (using_member_id) REFERENCES club_member(member_id))")
for i in self.mouse_list:
print(i)
cursor.execute("INSERT INTO mouse_list (mouse_number) VALUES (%s)", (i,))
self.db.commit()
cursor.execute("SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'pc_usage_history')")
find_pc_usage_history_table = cursor.fetchall()
print(find_pc_usage_history_table)
if find_pc_usage_history_table[0][0] == False:
cursor.execute("CREATE TABLE pc_usage_history (id SERIAL NOT NULL, member_id INTEGER NOT NULL, pc_number INTEGER NOT NULL, keyboard_number INTEGER, mouse_number INTEGER, start_use_time TIMESTAMP NOT NULL, end_use_time TIMESTAMP, use_detail TEXT, bot_about TEXT, PRIMARY KEY (id), FOREIGN KEY (member_id) REFERENCES club_member(member_id), FOREIGN KEY (pc_number) REFERENCES pc_list(pc_number), FOREIGN KEY (keyboard_number) REFERENCES keyboard_list(keyboard_number), FOREIGN KEY (mouse_number) REFERENCES mouse_list(mouse_number))")
self.db.commit()
cursor.close()
self.init_result = "ok"
2024-07-07 15:52:46 +09:00
except (Exception) as error:
2024-07-07 16:04:46 +09:00
print("初回処理でエラーが発生しました。\nエラー内容\n" + str(error))
self.init_result = "error"
2024-07-07 15:48:43 +09:00
finally:
2024-07-07 15:55:14 +09:00
pass
def log(self, **kwargs):
if self.debug == True:
flag = 1
else:
if "flag" in kwargs:
if kwargs["flag"] == 1:
flag = 1
else:
flag = 0
else:
flag = 0
if flag == 1:
title = str(kwargs["title"])
if "message" in kwargs:
message = str(kwargs["message"])
else:
message = None
current_datetime = str(datetime.now())
if message == None:
detail = f"{current_datetime} | {title}\n"
else:
detail = f"{current_datetime} | {title}\n{message}\n"
print(detail)
if os.path.isfile(self.log_path):
try:
with open(self.log_path, "a", encoding="utf-8") as a:
a.write(detail)
except:
print("LOGGING ERROR mode a")
else:
try:
with open(self.log_path, "w", encoding="utf-8") as w:
w.write(detail)
except:
print("LOGGING ERROR mode w")
2024-09-30 17:32:44 +09:00
def password_generate(self, length):
numbers = string.digits # (1)
password = ''.join(random.choice(numbers) for _ in range(length)) # (2)
2024-06-09 18:39:42 +09:00
return password
def hash_genarate(self, source):
hashed = hashlib.sha256(source.encode())
return hashed.hexdigest()
def user_register_check(self, **kwargs):
try:
discord_user_id = str(kwargs["discord_user_id"])
2024-09-30 17:32:44 +09:00
cursor = self.db.cursor()
cursor.execute("SELECT * FROM club_member WHERE discord_user_id = %s", (discord_user_id,))
user_record = cursor.fetchall()
#ユーザーデータが見つかった場合(登録済みの場合)
if user_record:
member_id = user_record[0][0]
name = user_record[0][1]
discord_user_name = user_record[0][2]
return {"result": 0, "about": "exist", "user_info": {"member_id": member_id, "name": name, "discord_user_name": discord_user_name}}
#ユーザーデータがなかったら(未登録の場合)
else:
return {"result": 1, "about": "user_data_not_found"}
except Exception as error:
2024-09-30 17:32:44 +09:00
self.log(title=f"[ERROR] ユーザーの登録状態を調査中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
return {"result": 1, "about": "error", "output_dict": {"error_class_name": str(error.__class__.__name__), "error_args": str(error.args)}}
finally:
cursor.close()
def pc_used_check(self, **kwargs):
try:
if "pc_number" in kwargs:
pc_number = int(kwargs["pc_number"])
else:
pc_number = None
if "discord_user_id" in kwargs:
discord_user_id = str(kwargs["discord_user_id"])
else:
discord_user_id = None
if "member_id" in kwargs:
member_id = int(kwargs["member_id"])
else:
member_id = None
2024-09-30 17:32:44 +09:00
cursor = self.db.cursor()
if pc_number != None:
# pc番号を指定してpc_listから探す
cursor.execute("SELECT * FROM pc_list WHERE pc_number= %s", (pc_number,))
pc_list_record = cursor.fetchall()
if pc_list_record[0][1] == None:
return {"result": 0, "about": "vacent"}
else:
return {"result": 1, "about": "used_by_other"}
elif discord_user_id != None:
#ユーザーIDを指定してPC使用履歴から探す
cursor.execute("SELECT * FROM club_member WHERE discord_user_id = %s", (discord_user_id,))
user_record = cursor.fetchall()
#ユーザーデータが見つかった場合(登録済みの場合)
if user_record:
member_id = user_record[0][0]
cursor.execute("SELECT * FROM pc_usage_history WHERE member_id = %s ORDER BY id DESC LIMIT 1", (member_id,))
pc_usage_history_record = cursor.fetchall()
if pc_usage_history_record:
if pc_usage_history_record[0][6] == None:
keyboard_number = pc_usage_history_record[0][3]
mouse_number = pc_usage_history_record[0][4]
return {"result": 1, "about": "used_by_you", "pc_usage_history": {"pc_number": str(pc_usage_history_record[0][2]), "keyboard_number": keyboard_number, "mouse_number": mouse_number, "start_time": str(pc_usage_history_record[0][5]), "use_detail": str(pc_usage_history_record[0][7])}}
else:
return {"result": 0, "about": "vacent"}
else:
return {"result": 0, "about": "vacent"}
else:
return {"result": 1, "about": "user_data_not_found"}
elif member_id != None:
cursor.execute("SELECT * FROM pc_usage_history WHERE member_id = %s ORDER BY id DESC LIMIT 1", (member_id,))
pc_usage_history_record = cursor.fetchall()
if pc_usage_history_record:
if pc_usage_history_record[0][6] == None:
keyboard_number = pc_usage_history_record[0][3]
mouse_number = pc_usage_history_record[0][4]
return {"result": 1, "about": "used_by_you", "pc_usage_history": {"pc_number": str(pc_usage_history_record[0][2]), "keyboard_number": keyboard_number, "mouse_number": mouse_number, "start_time": str(pc_usage_history_record[0][5]), "use_detail": str(pc_usage_history_record[0][7])}}
else:
return {"result": 0, "about": "vacent"}
else:
return {"result": 0, "about": "vacent"}
else:
return {"result": 1, "about": "search_options_error"}
except Exception as error:
2024-09-30 17:32:44 +09:00
self.log(title=f"[ERROR] PCの使用状況を調査中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
return {"result": 1, "about": "error", "output_dict": {"error_class_name": str(error.__class__.__name__), "error_args": str(error.args)}}
finally:
if cursor:
cursor.close()
def keyboard_used_check(self, **kwargs):
try:
2024-09-30 17:32:44 +09:00
cursor = self.db.cursor()
if kwargs["keyboard_number"] == None:
return {"result": 0, "about": "ok"}
else:
keyboard_number = int(kwargs["keyboard_number"])
cursor.execute("SELECT * FROM keyboard_list WHERE keyboard_number=%s", (keyboard_number,))
keyboard_list_record = cursor.fetchall()
if keyboard_list_record[0][1] == None:
return {"result": 0, "about": "ok"}
else:
return {"result": 1, "about": "keyboard_already_in_use_by_other"}
except Exception as error:
2024-09-30 17:32:44 +09:00
self.log(title=f"[ERROR] キーボードの使用状況を調査中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
return {"result": 1, "about": "error", "output_dict": {"error_class_name": str(error.__class__.__name__), "error_args": str(error.args)}}
finally:
if cursor:
cursor.close()
def mouse_used_check(self, **kwargs):
try:
2024-09-30 17:32:44 +09:00
cursor = self.db.cursor()
if kwargs["mouse_number"] == None:
return {"result": 0, "about": "ok"}
else:
mouse_number = int(kwargs["mouse_number"])
cursor.execute("SELECT * FROM mouse_list WHERE mouse_number=%s", (mouse_number,))
mouse_list_record = cursor.fetchall()
if mouse_list_record[0][1] == None:
return {"result": 0, "about": "ok"}
else:
return {"result": 1, "about": "mouse_already_in_use_by_other"}
except Exception as error:
2024-09-30 17:32:44 +09:00
self.log(title=f"[ERROR] マウスの使用状況を調査中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
return {"result": 1, "about": "error", "output_dict": {"error_class_name": str(error.__class__.__name__), "error_args": str(error.args)}}
finally:
if cursor:
cursor.close()
def register(self, **kwargs):
try:
2024-09-30 17:32:44 +09:00
cursor = self.db.cursor()
user_info = {
"id": str(kwargs["discord_user_id"]),
"name": str(kwargs["name"]),
"display_name": str(kwargs["display_name"]),
"pc_number": int(kwargs["pc_number"]),
2024-09-30 17:32:44 +09:00
"keyboard_number": 0,
"mouse_number": 0,
"detail": None
}
if "detail" in kwargs:
user_info["detail"] = str(kwargs["detail"])
else:
pass
if kwargs["keyboard_number"] == "0":
pass
else:
user_info["keyboard_number"] = int(kwargs["keyboard_number"])
if kwargs["mouse_number"] == "0":
pass
else:
user_info["mouse_number"] = int(kwargs["mouse_number"])
# ユーザー登録されているかの確認
user_register = self.user_register_check(discord_user_id=user_info["id"])
if user_register["result"] == 0:
member_id = user_register["user_info"]["member_id"]
name = user_register["user_info"]["name"]
# ユーザーがPCを使っているか
pc_check_self = self.pc_used_check(member_id=member_id)
if pc_check_self["result"] == 0:
# 他の人がそのPCを使っているか
pc_check = self.pc_used_check(pc_number=user_info["pc_number"])
if pc_check["result"] == 0:
# キーボードは使われているか
keyboard_check = self.keyboard_used_check(keyboard_number=user_info["keyboard_number"])
if keyboard_check["result"] == 0:
# マウスは使われているか
mouse_check = self.mouse_used_check(mouse_number=user_info["mouse_number"])
if mouse_check["result"] == 0:
# パスワードとハッシュ作成
password = self.password_generate(4)
password_hash = self.hash_genarate(password)
# PC使用履歴のテーブルにレコードを挿入
2024-08-23 01:42:02 +09:00
cursor.execute("INSERT INTO pc_usage_history (member_id, pc_number, keyboard_number, mouse_number, start_use_time, use_detail) VALUES (%s, %s, %s, %s, clock_timestamp(), %s)", (member_id, user_info["pc_number"], user_info["keyboard_number"], user_info["mouse_number"], user_info["detail"]))
# PCリストの該当のレコードを更新
cursor.execute("UPDATE pc_list SET using_member_id = %s, password_hash = %s WHERE pc_number = %s", (member_id, password_hash, user_info["pc_number"]))
# キーボードリストの該当のレコードを自前(None)だったらスキップ、借りていたら更新
2024-09-30 17:32:44 +09:00
if user_info["keyboard_number"] == 0:
pass
else:
cursor.execute("UPDATE keyboard_list SET using_member_id = %s WHERE keyboard_number = %s", (member_id, user_info["keyboard_number"]))
# マウスも同様に
2024-09-30 17:32:44 +09:00
if user_info["mouse_number"] == 0:
pass
else:
cursor.execute("UPDATE mouse_list SET using_member_id = %s WHERE mouse_number = %s", (member_id, user_info["mouse_number"]))
2024-09-30 17:32:44 +09:00
self.db.commit()
return {"result": 0, "about": "ok", "output_dict": {"password": str(password), "name": str(name)}}
else:
return {"result": 1, "about": "mouse_already_in_use"}
else:
return {"result": 1, "about": "keyboard_already_in_use"}
2024-06-09 18:39:42 +09:00
else:
return {"result": 1, "about": "pc_already_in_use_by_other"}
else:
return {"result": 1, "about": "pc_already_in_use_by_you", "pc_usage_history": {"pc_number": pc_check_self["pc_usage_history"]["pc_number"], "keyboard_number": pc_check_self["pc_usage_history"]["keyboard_number"], "mouse_number": pc_check_self["pc_usage_history"]["mouse_number"], "start_time": pc_check_self["pc_usage_history"]["start_time"], "use_detail": pc_check_self["pc_usage_history"]["use_detail"]}}
else:
return {"result": 1, "about": "user_data_not_found"}
except Exception as error:
2024-09-30 17:32:44 +09:00
self.log(title=f"[ERROR] PCの使用登録中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
return {"result": 1, "about": "error", "output_dict": {"error_class_name": str(error.__class__.__name__), "error_args": str(error.args)}}
finally:
if cursor:
cursor.close()
2024-06-09 18:39:42 +09:00
def stop(self, **kwargs):
try:
2024-09-30 17:32:44 +09:00
cursor = self.db.cursor()
2024-08-23 02:28:23 +09:00
discord_user_id = str(kwargs["discord_user_id"])
if "bot_about" in kwargs:
bot_about = kwargs["bot_about"]
else:
bot_about = None
2024-08-23 02:28:23 +09:00
# ユーザーが登録してるかというよりはデータの取得のため
user_register = self.user_register_check(discord_user_id=discord_user_id)
member_id = user_register["user_info"]["member_id"]
name = user_register["user_info"]["name"]
if user_register["result"] == 0:
cursor.execute("SELECT * FROM pc_usage_history WHERE member_id= %s ORDER BY id DESC LIMIT 1", (member_id,))
pc_usage_history_record = cursor.fetchall()
if pc_usage_history_record:
2024-08-23 02:28:23 +09:00
pc_usage_history_id = pc_usage_history_record[0][0]
pc_number = pc_usage_history_record[0][2]
keyboard_number = pc_usage_history_record[0][3]
mouse_number = pc_usage_history_record[0][4]
end_use_time = pc_usage_history_record[0][6]
# 使用中のとき (使用停止時間がNoneのとき)
if end_use_time == None:
# 利用停止の理由の有無を判断
if bot_about == None:
2024-08-23 02:28:23 +09:00
cursor.execute("UPDATE pc_usage_history SET end_use_time = clock_timestamp() WHERE id = %s", (pc_usage_history_id,))
else:
2024-08-23 02:28:23 +09:00
cursor.execute("UPDATE pc_usage_history SET end_use_time = clock_timestamp(), bot_about = %s WHERE id = %s", (bot_about, pc_usage_history_id))
# pc_listの使用中ユーザーを消す
cursor.execute("UPDATE pc_list SET using_member_id = NULL, password_hash = NULL WHERE pc_number = %s", (pc_number,))
if keyboard_number == None:
pass
else:
# keyboard_listの使用中ユーザーを消す
cursor.execute("UPDATE keyboard_list SET using_member_id = NULL WHERE keyboard_number = %s", (keyboard_number,))
if mouse_number == None:
pass
else:
# mouse_listの使用中ユーザーを消す
cursor.execute("UPDATE mouse_list SET using_member_id = NULL WHERE mouse_number = %s", (mouse_number,))
2024-09-30 17:32:44 +09:00
self.db.commit()
return {"result": 0, "about": "ok", "output_dict": {"pc_number": str(pc_number), "name": str(name)}}
else:
return {"result": 1, "about": "unused"}
else:
return {"result": 1, "about": "unused"}
else:
return {"result": 1, "about": "user_data_not_found"}
except Exception as error:
2024-09-30 17:32:44 +09:00
self.log(title=f"[ERROR] PCの使用停止処理中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
return {"result": 1, "about": "error", "output_dict": {"error_class_name": str(error.__class__.__name__), "error_args": str(error.args)}}
2024-06-09 19:04:32 +09:00
2024-07-07 15:48:43 +09:00
finally:
if cursor:
cursor.close()
2024-06-09 18:39:42 +09:00
def user_register(self, **kwargs):
try:
discord_user_id = str(kwargs["discord_user_id"])
discord_user_name = str(kwargs["discord_user_name"])
name = str(kwargs["name"])
2024-09-30 17:32:44 +09:00
cursor = self.db.cursor()
cursor.execute("SELECT * FROM club_member WHERE discord_user_id = %s", (discord_user_id,))
user_record = cursor.fetchall()
if not user_record:
cursor.execute("INSERT INTO club_member (name, discord_user_name, discord_user_id) VALUES (%s, %s, %s)", (name, discord_user_name, discord_user_id))
2024-09-30 17:32:44 +09:00
self.db.commit()
return {"result": 0, "about": "ok"}
else:
return {"result": 1, "about": "already_exists"}
except Exception as error:
2024-09-30 17:32:44 +09:00
self.log(title=f"[ERROR] ユーザー情報の登録中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
return {"result": 1, "about": "error", "output_dict": {"error_class_name": str(error.__class__.__name__), "error_args": str(error.args)}}
finally:
if cursor:
cursor.close()
2024-07-07 17:48:54 +09:00
def format_datetime(self, value):
if isinstance(value, datetime):
return value.strftime('%Y-%m-%d %H:%M:%S')
return value
def pc_register(self, **kwargs):
try:
pc_number = int(kwargs["pc_number"])
2024-09-30 17:32:44 +09:00
cursor = self.db.cursor()
cursor.execute("SELECT * FROM pc_list WHERE pc_number = %s", (pc_number,))
pc_list = cursor.fetchall()
if not pc_list:
cursor.execute("INSERT INTO pc_list (pc_number) VALUES (%s)", (pc_number,))
2024-09-30 17:32:44 +09:00
self.db.commit()
return {"result": 0, "about": "ok"}
else:
return {"result": 1, "about": "already_exists"}
except Exception as error:
2024-09-30 17:32:44 +09:00
self.log(title=f"[ERROR] PCの情報を登録中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
return {"result": 1, "about": "error", "output_dict": {"error_class_name": str(error.__class__.__name__), "error_args": str(error.args)}}
finally:
if cursor:
cursor.close()
def report_export(self, **kwargs):
try:
2024-09-30 17:32:44 +09:00
cursor = self.db.cursor()
csv_file_path = self.export_dir_path + "pc_usage_history.csv"
2024-07-07 17:48:54 +09:00
main_table = "pc_usage_history"
related_table = "club_member"
2024-09-30 17:32:44 +09:00
excel_file_path = self.export_dir_path + "pc_usage_history.xlsx"
2024-07-07 18:00:49 +09:00
# メインテーブルの列情報を取得user_idを除く
2024-07-07 18:00:49 +09:00
cursor.execute(sql.SQL("SELECT * FROM {} LIMIT 0").format(sql.Identifier(main_table)))
2024-07-07 17:43:36 +09:00
main_columns = [desc[0] for desc in cursor.description if desc[0] != 'member_id']
# クエリを作成(列名を明確に指定)
2024-07-07 18:00:49 +09:00
query = sql.SQL("""
SELECT {main_columns}, {related_table}.name
FROM {main_table}
2024-08-23 02:09:13 +09:00
LEFT JOIN {related_table} ON {main_table}.member_id = {related_table}.member_id
ORDER BY id
""").format(
2024-07-07 18:00:49 +09:00
main_columns=sql.SQL(', ').join([sql.SQL("{}.{}").format(sql.Identifier(main_table), sql.Identifier(col)) for col in main_columns]),
main_table=sql.Identifier(main_table),
related_table=sql.Identifier(related_table)
)
2024-07-07 16:35:57 +09:00
cursor.execute(query)
# 列名を再構成nameを2番目に配置
column_names = [main_columns[0], 'name'] + main_columns[1:]
2024-07-07 16:35:57 +09:00
rows = cursor.fetchall()
# Excelワークブックを作成
wb = Workbook()
ws = wb.active
# 列名を書き込み
ws.append(column_names)
# データを書き込み
for row in rows:
# nameを2番目に移動
formatted_row = [self.format_datetime(row[0])] + [row[-1]] + [self.format_datetime(field) if field is not None else '' for field in row[1:-1]]
ws.append(formatted_row)
# 列幅を自動調整
for col in ws.columns:
max_length = 0
column = col[0].column_letter
for cell in col:
try:
if len(str(cell.value)) > max_length:
max_length = len(str(cell.value))
except:
pass
adjusted_width = (max_length + 2) * 1.2
ws.column_dimensions[column].width = adjusted_width
# Excelファイルを保存
wb.save(excel_file_path)
2024-09-30 17:32:44 +09:00
self.log(title=f"[SUCCESS] PCの使用履歴をエクスポートしました。", message=f"ファイルパス | {excel_file_path}", flag=0)
return {"result": 0, "about": "ok", "file_path": excel_file_path}
except Exception as error:
2024-09-30 17:32:44 +09:00
self.log(title=f"[ERROR] PCの使用履歴をエクスポート中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
return {"result": 1, "about": "error", "output_dict": {"error_class_name": str(error.__class__.__name__), "error_args": str(error.args)}}
2024-07-08 11:30:20 +09:00
finally:
if cursor:
cursor.close()
2024-07-07 16:35:57 +09:00
def force_stop(self, **kwargs):
try:
pc_number = kwargs["pc_number"]
2024-09-30 17:32:44 +09:00
cursor = self.db.cursor()
if "bot_about" in kwargs:
bot_about = kwargs["bot_about"]
cursor.execute("SELECT * FROM pc_list WHERE pc_number = %s", (pc_number,))
pc_list_record = cursor.fetchall()
pc_using_member_id = pc_list_record[0][1]
pc_password_hash = pc_list_record[0][2]
if pc_using_member_id == None:
return {"result": 1, "about": "not_used"}
else:
cursor.execute("UPDATE pc_list SET using_member_id = NULL WHERE pc_number = %s", (pc_number,))
if pc_password_hash == None:
pass
else:
cursor.execute("UPDATE pc_list SET password_hash = NULL WHERE pc_number = %s", (pc_number,))
cursor.execute("SELECT * FROM pc_usage_history WHERE member_id = %s AND pc_number = %s ORDER BY id DESC LIMIT 1", (pc_using_member_id, pc_number))
pc_usage_history_record = cursor.fetchall()
pc_usage_history_record_id = pc_usage_history_record[0][0]
keyboard_number = pc_usage_history_record[0][3]
mouse_number = pc_usage_history_record[0][4]
if keyboard_number == None:
pass
else:
# keyboard_listの使用中ユーザーを消す
cursor.execute("UPDATE keyboard_list SET using_member_id = NULL WHERE keyboard_number = %s", (keyboard_number,))
if mouse_number == None:
pass
else:
# mouse_listの使用中ユーザーを消す
cursor.execute("UPDATE mouse_list SET using_member_id = NULL WHERE mouse_number = %s", (mouse_number,))
cursor.execute("UPDATE pc_usage_history SET end_use_time = clock_timestamp(), bot_about = %s WHERE id = %s", (bot_about, pc_usage_history_record_id))
2024-09-30 17:32:44 +09:00
self.db.commit()
return {"result": 0, "about": "ok"}
2024-07-07 16:35:57 +09:00
else:
return {"result": 1, "about": "bot_about_not_found"}
except Exception as error:
2024-09-30 17:32:44 +09:00
self.log(title=f"[ERROR] fstop中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
return {"result": 1, "about": "error", "output_dict": {"error_class_name": str(error.__class__.__name__), "error_args": str(error.args)}}
2024-07-07 16:35:57 +09:00
finally:
if cursor:
cursor.close()
2024-09-30 17:32:44 +09:00
def pc_onetime_gen(self, **kwargs):
if kwargs.get("max_count") == None:
max_count = 1
elif isinstance(kwargs.get("max_count"), int):
max_count = int(kwargs.get("max_count"))
else:
max_count = 1
try:
2024-09-30 17:32:44 +09:00
if os.path.isfile(dislocker.onetime_config_path):
with open(dislocker.onetime_config_path, "r") as r:
onetime_config = json.load(r)
onetime_password = onetime_config["onetime"]["pc_register"]["password"]
if onetime_password == None:
onetime_password = str(self.password_generate(8))
onetime_config["onetime"]["pc_register"]["password"] = onetime_password
onetime_config["onetime"]["pc_register"]["max_count"] = max_count
onetime_config["onetime"]["pc_register"]["current_count"] = 0
current_count = onetime_config["onetime"]["pc_register"]["current_count"]
with open(dislocker.onetime_config_path, "w") as w:
json.dump(onetime_config, w, indent=4)
return {"result": 0, "about": "ok", "output_dict": {"onetime_password": onetime_password, "current_count": current_count, "max_count": max_count}}
else:
current_count = onetime_config["onetime"]["pc_register"]["current_count"]
max_count = onetime_config["onetime"]["pc_register"]["max_count"]
return {"result": 1, "about": "already_exists", "output_dict": {"onetime_password": onetime_password, "current_count": current_count, "max_count": max_count}}
else:
onetime_password = str(self.password_generate(8))
onetime_config = {
"onetime": {
"pc_register": {
"password": onetime_password,
"current_count": 0,
"max_count": int(max_count)
},
"device_register": {
"password": None,
"current_count": None,
"max_count": None
}
}
}
current_count = onetime_config["onetime"]["pc_register"]["current_count"]
with open(dislocker.onetime_config_path, "w") as w:
json.dump(onetime_config, w, indent=4)
return {"result": 0, "about": "ok", "output_dict": {"onetime_password": onetime_password, "current_count": current_count, "max_count": max_count}}
except Exception as error:
2024-09-30 17:32:44 +09:00
self.log(title=f"[ERROR] PC登録用のワンタイムパスワード発行中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
return {"result": 1, "about": "error", "output_dict": {"error_class_name": str(error.__class__.__name__), "error_args": str(error.args)}}
2024-07-07 16:35:57 +09:00
2024-09-30 17:32:44 +09:00
def device_onetime_gen(self, **kwargs):
if kwargs.get("max_count") == None:
max_count = 1
elif isinstance(kwargs.get("max_count"), int):
max_count = int(kwargs.get("max_count"))
else:
max_count = 1
2024-06-04 09:36:29 +09:00
try:
2024-09-30 17:32:44 +09:00
if os.path.isfile(dislocker.onetime_config_path):
with open(dislocker.onetime_config_path, "r") as r:
onetime_config = json.load(r)
onetime_password = onetime_config["onetime"]["device_register"]["password"]
if onetime_password == None:
onetime_password = str(self.password_generate(8))
onetime_config["onetime"]["device_register"]["password"] = onetime_password
onetime_config["onetime"]["device_register"]["max_count"] = max_count
onetime_config["onetime"]["device_register"]["current_count"] = 0
current_count = onetime_config["onetime"]["device_register"]["current_count"]
with open(dislocker.onetime_config_path, "w") as w:
json.dump(onetime_config, w, indent=4)
return {"result": 0, "about": "ok", "output_dict": {"onetime_password": onetime_password, "current_count": current_count, "max_count": max_count}}
else:
2024-09-30 17:32:44 +09:00
current_count = onetime_config["onetime"]["device_register"]["current_count"]
max_count = onetime_config["onetime"]["device_register"]["max_count"]
return {"result": 1, "about": "already_exists", "output_dict": {"onetime_password": onetime_password, "current_count": current_count, "max_count": max_count}}
else:
2024-09-30 17:32:44 +09:00
onetime_password = str(self.password_generate(8))
onetime_config = {
"onetime": {
"pc_register": {
"password": None,
"current_count": None,
"max_count": None
},
"device_register": {
"password": onetime_password,
"current_count": 0,
"max_count": int(max_count)
}
}
}
current_count = onetime_config["onetime"]["device_register"]["current_count"]
with open(dislocker.onetime_config_path, "w") as w:
json.dump(onetime_config, w, indent=4)
return {"result": 0, "about": "ok", "output_dict": {"onetime_password": onetime_password, "current_count": current_count, "max_count": max_count}}
2024-09-30 17:32:44 +09:00
except Exception as error:
self.log(title=f"[ERROR] デバイス登録用のワンタイムパスワード発行中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
return {"result": 1, "about": "error", "output_dict": {"error_class_name": str(error.__class__.__name__), "error_args": str(error.args)}}
def show_pc_master_password(self, **kwargs):
try:
if isinstance(kwargs.get("pc_number"), int):
pc_number = int(kwargs.get("pc_number"))
2024-09-30 17:32:44 +09:00
cursor = self.db.cursor()
cursor.execute("SELECT master_password FROM pc_list WHERE pc_number = %s", (pc_number,))
pc_master_password_list = cursor.fetchall()
pc_master_password = pc_master_password_list[0][0]
2024-09-30 17:32:44 +09:00
return {"result": 0, "about": "ok", "output_dict": {"pc_master_password": pc_master_password}}
else:
return {"result": 1, "about": "syntax_error"}
except Exception as error:
self.log(title=f"[ERROR] PCのマスターパスワードを取得中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
return {"result": 1, "about": "error", "output_dict": {"error_class_name": str(error.__class__.__name__), "error_args": str(error.args)}}
finally:
if cursor:
cursor.close()
2024-09-30 17:32:44 +09:00
def get_discord_user_id(self, **kwargs):
try:
member_id = int(kwargs["member_id"])
cursor = self.db.cursor()
cursor.execute("SELECT discord_user_id FROM club_member WHERE member_id = %s", (member_id,))
discord_user_id_list = cursor.fetchall()
discord_user_id = discord_user_id_list[0][0]
return {"result": 0, "about": "ok", "discord_user_id": discord_user_id}
except Exception as error:
self.log(title=f"[ERROR] DiscordのユーザーIDの取得中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
return {"result": 1, "about": "error", "output_dict": {"error_class_name": str(error.__class__.__name__), "error_args": str(error.args)}}
finally:
if cursor:
cursor.close()
2024-09-30 17:32:44 +09:00
def get_pc_list(self, **kwargs):
try:
cursor = self.db.cursor()
2024-09-30 17:32:44 +09:00
if "pc_number" in kwargs:
pc_number = int(kwargs["pc_number"])
cursor.execute("SELECT * FROM pc_list WHERE pc_number = %s", (pc_number,))
pc_list = cursor.fetchall()
2024-09-30 17:32:44 +09:00
return {"result": 0, "about": "ok", "output_dict": {pc_number: {"pc_number": pc_list[0][0], "using_member_id": pc_list[0][1], "master_password": [0][5], "detail": pc_list[0][6], "alt_name": pc_list[0][7]}}}
else:
cursor.execute("SELECT * FROM pc_list ORDER BY pc_number")
pc_list = cursor.fetchall()
pc_list_base = {}
for i in pc_list:
pc_list_base[i[0]] = {"pc_number": i[0], "using_member_id": i[1], "master_password": i[5], "detail": i[6], "alt_name": i[7]}
return {"result": 0, "about": "ok", "output_dict": pc_list_base}
except Exception as error:
self.log(title=f"[ERROR] PCリストの取得中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
return {"result": 1, "about": "error", "output_dict": {"error_class_name": str(error.__class__.__name__), "error_args": str(error.args)}}
finally:
if cursor:
cursor.close()
def get_keyboard_list(self, **kwargs):
try:
cursor = self.db.cursor()
2024-09-30 17:32:44 +09:00
if "keyboard_number" in kwargs:
keyboard_number = int(kwargs["keyboard_number"])
cursor.execute("SELECT * FROM keyboard_list WHERE keyboard_number = %s", (keyboard_number,))
keyboard_list = cursor.fetchall()
2024-09-30 17:32:44 +09:00
return {"result": 0, "about": "ok", "output_dict": {keyboard_number: {"keyboard_number": keyboard_list[0][0], "using_member_id": keyboard_list[0][1], "device_instance_path": keyboard_list[0][2], "device_name": keyboard_list[0][3], "detail": keyboard_list[0][4], "alt_name": keyboard_list[0][5]}}}
else:
cursor.execute("SELECT * FROM keyboard_list ORDER BY keyboard_number")
keyboard_list = cursor.fetchall()
keyboard_list_base = {}
for i in keyboard_list:
if i[0] == 0:
pass
else:
keyboard_list_base[i[0]] = {"keyboard_number": i[0], "using_member_id": i[1], "device_instance_path": i[2], "device_name": i[3], "detail": i[4], "alt_name": i[5]}
return {"result": 0, "about": "ok", "output_dict": keyboard_list_base}
except Exception as error:
self.log(title=f"[ERROR] キーボードリストの取得中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
return {"result": 1, "about": "error", "output_dict": {"error_class_name": str(error.__class__.__name__), "error_args": str(error.args)}}
finally:
if cursor:
cursor.close()
def get_mouse_list(self, **kwargs):
try:
cursor = self.db.cursor()
2024-09-30 17:32:44 +09:00
if "mouse_number" in kwargs:
mouse_number = int(kwargs["mouse_number"])
cursor.execute("SELECT * FROM mouse_list WHERE mouse_number = %s", (mouse_number,))
mouse_list = cursor.fetchall()
2024-09-30 17:32:44 +09:00
return {"result": 0, "about": "ok", "output_dict": {mouse_number: {"mouse_number": mouse_list[0][0], "using_member_id": mouse_list[0][1], "device_instance_path": mouse_list[0][2], "device_name": mouse_list[0][3], "detail": mouse_list[0][4], "alt_name": mouse_list[0][5]}}}
else:
cursor.execute("SELECT * FROM mouse_list ORDER BY mouse_number")
mouse_list = cursor.fetchall()
mouse_list_base = {}
for i in mouse_list:
if i[0] == 0:
pass
else:
mouse_list_base[i[0]] = {"mouse_number": i[0], "using_member_id": i[1], "device_instance_path": i[2], "device_name": i[3], "detail": i[4], "alt_name": i[5]}
return {"result": 0, "about": "ok", "output_dict": mouse_list_base}
except Exception as error:
self.log(title=f"[ERROR] マウスリストの取得中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
return {"result": 1, "about": "error", "output_dict": {"error_class_name": str(error.__class__.__name__), "error_args": str(error.args)}}
finally:
if cursor:
cursor.close()
2024-09-30 17:32:44 +09:00
def set_pc_nickname(self, **kwargs):
try:
cursor = self.db.cursor()
2024-09-30 17:32:44 +09:00
if 'pc_number' in kwargs and 'alt_name' in kwargs:
pc_number = int(kwargs["pc_number"])
alt_name = kwargs["alt_name"]
cursor.execute("UPDATE pc_list SET alt_name = %s WHERE pc_number = %s", (alt_name, pc_number))
self.db.commit()
return {"result": 0, "about": "ok"}
else:
return {"result": 1, "about": "syntax_error"}
except Exception as error:
self.log(title=f"[ERROR] PCのニックネームの設定中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
return {"result": 1, "about": "error", "output_dict": {"error_class_name": str(error.__class__.__name__), "error_args": str(error.args)}}
finally:
if cursor:
cursor.close()
2024-09-30 17:32:44 +09:00
class ReasonModal(discord.ui.Modal):
def __init__(self, title: str, pc_number: str, keyboard_number: str, mouse_number: str, timeout=15) -> None:
super().__init__(title=title, timeout=timeout)
self.reason_input_form = discord.ui.TextInput(label="使用目的を入力してください", style=discord.TextStyle.short, custom_id=f"register_{pc_number}_{keyboard_number}_{mouse_number}")
self.add_item(self.reason_input_form)
2024-07-07 16:35:57 +09:00
2024-09-30 17:32:44 +09:00
async def on_submit(self, interaction: discord.Interaction) -> None:
custom_id = interaction.data["components"][0]["components"][0]["custom_id"]
custom_id_split = custom_id.split("_")
2024-09-30 17:32:44 +09:00
pc_number = custom_id_split[1]
keyboard_number = custom_id_split[2]
mouse_number = custom_id_split[3]
if keyboard_number == "0":
2024-09-30 17:32:44 +09:00
keyboard_number_show = "自前"
else:
keyboard_number_show = keyboard_number
if mouse_number == "0":
2024-09-30 17:32:44 +09:00
mouse_number_show = "自前"
else:
mouse_number_show = mouse_number
register = dislocker.register(discord_user_id=interaction.user.id, name=interaction.user.name, display_name=interaction.user.display_name, pc_number=pc_number, keyboard_number=keyboard_number, mouse_number=mouse_number, detail=self.reason_input_form.value)
if register["about"] == "ok":
await interaction.response.send_message(f":white_check_mark: 使用が開始されました。\n>>> # パスワード | {register["output_dict"]["password"]}\n## PC番号 | {pc_number}\n## キーボード番号 | {keyboard_number_show}\n## マウス番号 | {mouse_number_show}\n## 使用目的 | {self.reason_input_form.value}", ephemeral=True)
2024-10-01 12:15:43 +09:00
await send_log(mode="use", pc_number=pc_number, keyboard_number=keyboard_number, mouse_number=mouse_number, reason=self.reason_input_form.value, discord_user_id=interaction.user.id)
2024-09-30 17:32:44 +09:00
dislocker.log(title=f"[INFO] PC番号{pc_number} の使用が開始されました。", message=f"名前 | {register["output_dict"]["name"]}, 使用目的 | {self.reason_input_form.value}", flag=0)
elif register["about"] == "pc_already_in_use_by_you":
pc_usage_history = register["pc_usage_history"]
if pc_usage_history["keyboard_number"] == None:
keyboard_number_show = "未認証"
elif pc_usage_history["keyboard_number"] == 0:
keyboard_number_show = "自前"
else:
2024-09-30 17:32:44 +09:00
keyboard_number_show = str(pc_usage_history["keyboard_number"])
2024-09-30 17:32:44 +09:00
if pc_usage_history["mouse_number"] == None:
mouse_number_show = "未認証"
elif pc_usage_history["mouse_number"] == 0:
mouse_number_show = "自前"
else:
mouse_number_show = str(pc_usage_history["mouse_number"])
await interaction.response.send_message(f"# :exploding_head: あなたはPCをもう使用されているようです。\n使用状態を解除するには 終了ボタン で使用終了をお知らせください。\n>>> # PC番号 | {pc_usage_history["pc_number"]}\n# キーボード番号 | {keyboard_number_show}\n# マウス番号 | {mouse_number_show}\n# 使用開始時刻 | {pc_usage_history["start_time"]}", ephemeral=True)
#await interaction.response.send_message(f"# :exploding_head: あなたはPCをもう使用されているようです。\n使用状態を解除するには 終了ボタン で使用終了をお知らせください。\n>>> # PC番号 | {pc_usage_history["pc_number"]}\n# キーボード番号 | {keyboard_number_show}\n# マウス番号 | {mouse_number_show}\n# 使用開始時刻 | {pc_usage_history["start_time"]}\n# 使用目的 | {pc_usage_history["use_detail"]}", ephemeral=True)
2024-09-30 17:32:44 +09:00
elif register["about"] == "pc_already_in_use_by_other":
await interaction.response.send_message(f"# :man_gesturing_no: そのPCは他のメンバーによって使用されています。\n別のPC番号を指定して、再度お試しください。", ephemeral=True)
elif register["about"] == "keyboard_already_in_use":
await interaction.response.send_message(f"# :man_gesturing_no: そのキーボードは他のメンバーによって使用されています。\n別のキーボードのデバイス番号を指定して、再度お試しください。", ephemeral=True)
elif register["about"] == "mouse_already_in_use":
await interaction.response.send_message(f"# :man_gesturing_no: そのマウスは他のメンバーによって使用されています。\n別のマウスのデバイス番号を指定して、再度お試しください。", ephemeral=True)
elif register["about"] == "user_data_not_found":
await interaction.response.send_message("# :dizzy_face: ユーザーとして登録されていないようです。\n最初にサーバーで登録を行ってください。", ephemeral=True)
else:
await interaction.response.send_message("# :skull_crossbones: 登録できませんでした。\n内部エラーが発生しています。", ephemeral=True)
class Monitor():
def __init__(self, **kwargs) -> None:
self.search_frequency = kwargs["search_frequency"]
self.allowable_time = kwargs["allowable_time"]
self.fstop_time = kwargs["fstop_time"]
self.init_wait_time = 10
def start(self, **kwargs):
search_thread = threading.Thread(target=self.search)
search_thread.start()
def search(self):
try:
time.sleep(self.init_wait_time)
while True:
cursor = dislocker.db.cursor()
cursor.execute("SELECT * FROM pc_list WHERE password_hash IS NOT NULL")
pc_list = cursor.fetchall()
current_datetime = datetime.now()
fstop_time = self.fstop_time
if current_datetime.time().strftime("%H:%M:%S") == fstop_time:
dislocker.log(title=f"[INFO] 定期のPCの使用停止処理を開始します。", flag=0)
for i in dislocker.pc_list:
2024-09-30 17:32:44 +09:00
stop = dislocker.force_stop(pc_number=i, bot_about="使用停止忘れによるBotによる強制停止。")
result = {"result": "FSTOP"}
dislocker.log(title=f"[SUCCESS] 定期のPCの使用停止処理は完了しました。", flag=0)
else:
if pc_list:
for i in pc_list:
member_id = i[1]
2024-08-23 02:04:41 +09:00
cursor.execute("SELECT * FROM pc_usage_history WHERE member_id= %s AND end_use_time IS NULL ORDER BY id DESC LIMIT 1", (member_id,))
pc_usage = cursor.fetchall()
2024-08-23 02:04:41 +09:00
start_time = pc_usage[0][5]
time_difference = current_datetime - start_time
dislocker.log(title=f"[INFO] 現在確認されているパスワード未使用のユーザー", message=f"レコード | {str(pc_usage)}, 経過時間(Sec) | {time_difference.seconds}/{timedelta(seconds=self.allowable_time).seconds}", flag=0)
2024-07-25 23:44:47 +09:00
if time_difference.seconds >= timedelta(seconds=self.allowable_time).seconds:
cursor.execute("SELECT * FROM club_member WHERE member_id = %s", (member_id,))
user_info = cursor.fetchall()
stop = dislocker.stop(discord_user_id=user_info[0][3], bot_about="タイムアウトでBotによる強制停止。")
#discord_user_id = dislocker.get_discord_user_id(member_id=member_id)["discord_user_id"]
#asyncio.run_coroutine_threadsafe(send_log(mode="timeout", pc_number=i[0], discord_user_id=discord_user_id))
dislocker.log(title=f"[INFO] パスワードのタイムアウト時間に達したため、強制停止されました。", flag=0)
2024-08-16 13:23:12 +09:00
result = {"result": "STOP", "details": str(pc_usage)}
2024-07-25 23:44:47 +09:00
else:
2024-08-16 13:23:12 +09:00
result = {"result": "BUT SAFE", "details": str(pc_usage)}
else:
result = {"result": "NONE"}
if result["result"] == "NONE":
pass
else:
pass
time.sleep(self.search_frequency)
except Exception as error:
dislocker.log(title=f"[ERROR] 自動停止処理中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
result = {"result": "error"}
dislocker.db.rollback()
finally:
if cursor:
cursor.close()
return result
2024-09-30 17:32:44 +09:00
dislocker = DL()
2024-09-30 17:32:44 +09:00
intents = discord.Intents.default()
intents.message_content = True
client = discord.Client(intents=intents)
tree = discord.app_commands.CommandTree(client)
async def send_log(**kwargs):
try:
pc_number = str(kwargs.get("pc_number"))
discord_user_id = int(kwargs.get("discord_user_id"))
mode = str(kwargs.get("mode"))
if mode == "use":
keyboard_number = int(kwargs.get("keyboard_number"))
mouse_number = int(kwargs.get("mouse_number"))
2024-10-01 12:15:43 +09:00
reason = str(kwargs.get("reason"))
if keyboard_number == 0:
keyboard_number_show = "自前"
else:
keyboard_number_show = str(keyboard_number)
if mouse_number == 0:
mouse_number_show = "自前"
else:
mouse_number_show = str(mouse_number)
log_embed = discord.Embed(title=f":video_game: PC {pc_number} 番 | 使用開始通知", description=f"<@{discord_user_id}> さんはPCの使用を開始しました。", color=0x1343EB)
log_embed.add_field(name="PC番号", value=pc_number)
log_embed.add_field(name="キーボード番号", value=keyboard_number_show)
log_embed.add_field(name="マウス番号", value=mouse_number_show)
2024-10-01 12:15:43 +09:00
log_embed.add_field(name="使用目的", value=reason)
elif mode == "stop":
log_embed = discord.Embed(title=f":stop_button: PC {pc_number} 番 | 使用終了通知", description=f"<@{discord_user_id}> さんはPCの使用を終了しました。", color=0xE512EB)
elif mode == "timeout":
log_embed = discord.Embed(title=f":alarm_clock: PC {pc_number} 番 | タイムアウト通知", description=f"<@{discord_user_id}> さんが指定時間内にPCを使用しなかったため、停止されました。", color=0xE512EB)
2024-10-01 12:04:24 +09:00
elif mode == "userreg":
log_embed = discord.Embed(title=f":bust_in_silhouette: ユーザー登録通知", description=f"<@{discord_user_id}> さんがユーザーとして登録されました。", color=0x1343EB)
elif mode == "fstop":
reason = str(kwargs.get("reason"))
log_embed = discord.Embed(title=f":stop_button: PC {pc_number} 番 | 強制停止通知", description=f"<@{discord_user_id}> さんによってPCの使用は停止されました。", color=0xE512EB)
log_embed.add_field(name="理由", value=reason)
elif mode == "pcnickname":
alt_name = str(kwargs.get("alt_name"))
2024-10-01 12:10:11 +09:00
log_embed = discord.Embed(title=f":pencil: PC {pc_number} 番 | PCのニックネーム変更通知", description=f"<@{discord_user_id}> さんによってPCのニックネームが変更されました。\nボタンに変更を適用する場合は、再度 /init コマンドでボタンを送信して下さい。\n古いボタンを削除することをお忘れなく!", color=0x1343EB)
2024-10-01 12:04:24 +09:00
log_embed.add_field(name="ニックネーム", value=alt_name)
await client.get_channel(dislocker.server_config["bot"]["log_channel_id"]).send(embed=log_embed)
return {"result": 0, "about": "ok"}
except Exception as error:
dislocker.log(title=f"[ERROR] ログ送信中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
return {"result": 1, "about": "error", "output_dict": {"error_class_name": str(error.__class__.__name__), "error_args": str(error.args)}}
2024-09-30 17:32:44 +09:00
@client.event
async def on_ready():
dislocker.log(title=f"[SUCCESS] DiscordのBotが起動しました。", message=f"{client.user.name} としてログインしています。", flag=1)
await tree.sync()
dislocker_activity = discord.Activity(
name=dislocker.server_config["bot"]["activity"]["name"],
type=discord.ActivityType.competing,
details=dislocker.server_config["bot"]["activity"]["details"],
state=dislocker.server_config["bot"]["activity"]["state"]
)
await client.change_presence(activity=dislocker_activity)
@client.event
async def on_message(message):
if message.author.bot:
pass
elif isinstance(message.channel, discord.DMChannel):
if message.author.id in dislocker.server_config["bot"]["admin_user_id"]:
msg_split = message.content.split()
if msg_split[0] == "/pcreg":
max_count = 1
if len(msg_split) == 2:
if msg_split[1].isdecimal():
max_count = int(msg_split[1])
pc_onetime_password_gen = dislocker.pc_onetime_gen(max_count=max_count)
if pc_onetime_password_gen["result"] == 0:
pc_onetime_password = str(pc_onetime_password_gen["output_dict"]["onetime_password"])
pc_onetime_password_max_count = str(pc_onetime_password_gen["output_dict"]["max_count"])
pc_onetime_password_current_count = str(pc_onetime_password_gen["output_dict"]["current_count"])
pc_onetime_password_remaining_times = str(int(pc_onetime_password_max_count) - int(pc_onetime_password_current_count))
result_embed = discord.Embed(title=":dizzy_face: PCの登録", description=f"PC登録時のワンタイムパスワードを発行します。", color=0x2286C9)
result_embed.add_field(name="パスワード", value=pc_onetime_password)
result_embed.add_field(name="最大使用回数", value=pc_onetime_password_max_count)
result_embed.add_field(name="残り使用回数", value=pc_onetime_password_remaining_times)
elif pc_onetime_password_gen["result"] == 1:
if pc_onetime_password_gen["about"] == "already_exists":
pc_onetime_password = str(pc_onetime_password_gen["output_dict"]["onetime_password"])
pc_onetime_password_max_count = str(pc_onetime_password_gen["output_dict"]["max_count"])
pc_onetime_password_current_count = str(pc_onetime_password_gen["output_dict"]["current_count"])
pc_onetime_password_remaining_times = str(int(pc_onetime_password_max_count) - int(pc_onetime_password_current_count))
result_embed = discord.Embed(title=":dizzy_face: PCの登録", description=f"ワンタイムパスワードはもう発行されており、有効です。", color=0x2286C9)
result_embed.add_field(name="パスワード", value=pc_onetime_password)
result_embed.add_field(name="最大使用回数", value=pc_onetime_password_max_count)
result_embed.add_field(name="残り使用回数", value=pc_onetime_password_remaining_times)
else:
result_embed = discord.Embed(title=":x: 操作に失敗しました。", description=f'サーバーでエラーが発生しています。', color=0xC91111)
result_embed.add_field(name=f"{pc_onetime_password_gen['output_dict']['error_class_name']}", value=f"{pc_onetime_password_gen['output_dict']['error_args']}")
await message.channel.send(embed=result_embed)
elif msg_split[0] == "/devreg":
max_count = 1
if len(msg_split) == 2:
if msg_split[1].isdecimal():
max_count = int(msg_split[1])
device_onetime_password_gen = dislocker.device_onetime_gen(max_count=max_count)
if device_onetime_password_gen["result"] == 0:
device_onetime_password = str(device_onetime_password_gen["output_dict"]["onetime_password"])
device_onetime_password_max_count = str(device_onetime_password_gen["output_dict"]["max_count"])
device_onetime_password_current_count = str(device_onetime_password_gen["output_dict"]["current_count"])
device_onetime_password_remaining_times = str(int(device_onetime_password_max_count) - int(device_onetime_password_current_count))
result_embed = discord.Embed(title=":dizzy_face: デバイスの登録", description=f"デバイス登録時のワンタイムパスワードを発行します。", color=0x2286C9)
result_embed.add_field(name="パスワード", value=device_onetime_password)
result_embed.add_field(name="最大使用回数", value=device_onetime_password_max_count)
result_embed.add_field(name="残り使用回数", value=device_onetime_password_remaining_times)
elif device_onetime_password_gen["result"] == 1:
if device_onetime_password_gen["about"] == "already_exists":
device_onetime_password = str(device_onetime_password_gen["output_dict"]["onetime_password"])
device_onetime_password_max_count = str(device_onetime_password_gen["output_dict"]["max_count"])
device_onetime_password_current_count = str(device_onetime_password_gen["output_dict"]["current_count"])
device_onetime_password_remaining_times = str(int(device_onetime_password_max_count) - int(device_onetime_password_current_count))
result_embed = discord.Embed(title=":dizzy_face: デバイスの登録", description=f"ワンタイムパスワードはもう発行されており、有効です。", color=0x2286C9)
result_embed.add_field(name="パスワード", value=device_onetime_password)
result_embed.add_field(name="最大使用回数", value=device_onetime_password_max_count)
result_embed.add_field(name="残り使用回数", value=device_onetime_password_remaining_times)
else:
result_embed = discord.Embed(title=":x: 操作に失敗しました。", description=f'サーバーでエラーが発生しています。', color=0xC91111)
result_embed.add_field(name=f"{device_onetime_password_gen['output_dict']['error_class_name']}", value=f"{device_onetime_password_gen['output_dict']['error_args']}")
await message.channel.send(embed=result_embed)
else:
result_embed = discord.Embed(title=":x: 警告", description=f'DMでの操作はサポートされていません。', color=0xC91111)
await message.channel.send(embed=result_embed)
else:
pass
@client.event
async def on_interaction(interaction: discord.Interaction):
try:
if interaction.data["component_type"] == 2:
await on_button(interaction)
except KeyError:
pass
async def on_button(interaction: discord.Interaction):
custom_id = interaction.data["custom_id"]
custom_id_split = custom_id.split("_")
dislocker.log(title=f"[INFO] ボタンが押されました。", message=f"custom_id | {custom_id}, DiscordユーザーID | {interaction.user.id}", flag=0)
if custom_id_split[0] == "pcregister":
keyboard_register_view = discord.ui.View(timeout=15)
pc_number = custom_id_split[1]
keyboard_list = dislocker.get_keyboard_list()
for i in keyboard_list["output_dict"].keys():
current_keyboard_list = keyboard_list['output_dict'][i]
if current_keyboard_list['alt_name'] == None:
keyboard_register_button = discord.ui.Button(style=discord.ButtonStyle.primary, label=f"{str(current_keyboard_list['keyboard_number'])}", custom_id=f"keyboardregister_{str(pc_number)}_{str(current_keyboard_list['keyboard_number'])}")
else:
keyboard_register_button = discord.ui.Button(style=discord.ButtonStyle.primary, label=f"{str(current_keyboard_list['keyboard_number'])} 番 | ({current_keyboard_list['alt_name']})", custom_id=f"keyboardregister_{str(pc_number)}_{str(current_keyboard_list['keyboard_number'])}")
keyboard_register_view.add_item(keyboard_register_button)
keyboard_not_register_button = discord.ui.Button(style=discord.ButtonStyle.primary, label="キーボードは自前", custom_id=f"keyboardregister_{str(pc_number)}_0")
2024-09-30 17:32:44 +09:00
keyboard_register_view.add_item(keyboard_not_register_button)
await interaction.response.send_message(f"# :keyboard: キーボードのデバイス番号を選んでください!\n>>> # PC番号 | {str(pc_number)}", view=keyboard_register_view, ephemeral=True)
elif custom_id_split[0] == "keyboardregister":
mouse_register_view = discord.ui.View(timeout=15)
pc_number = custom_id_split[1]
keyboard_number = custom_id_split[2]
mouse_list = dislocker.get_mouse_list()
if keyboard_number == "0":
2024-09-30 17:32:44 +09:00
keyboard_number_show = "自前"
else:
keyboard_number_show = keyboard_number
for i in mouse_list["output_dict"].keys():
current_mouse_list = mouse_list['output_dict'][i]
if current_mouse_list['alt_name'] == None:
mouse_register_button = discord.ui.Button(style=discord.ButtonStyle.primary, label=f"{str(current_mouse_list['mouse_number'])}", custom_id=f"mouseregister_{str(pc_number)}_{str(keyboard_number)}_{str(current_mouse_list['mouse_number'])}")
else:
mouse_register_button = discord.ui.Button(style=discord.ButtonStyle.primary, label=f"{str(current_mouse_list['mouse_number'])} 番 | ({current_mouse_list['alt_name']})", custom_id=f"mouseregister_{str(pc_number)}_{str(keyboard_number)}_{str(current_mouse_list['mouse_number'])}")
mouse_register_view.add_item(mouse_register_button)
mouse_not_register_button = discord.ui.Button(style=discord.ButtonStyle.primary, label="マウスは自前", custom_id=f"mouseregister_{str(pc_number)}_{str(keyboard_number)}_0")
2024-09-30 17:32:44 +09:00
mouse_register_view.add_item(mouse_not_register_button)
await interaction.response.send_message(f"# :mouse_three_button: マウスのデバイス番号を選んでください!\n>>> # PC番号 | {str(pc_number)}\n# キーボード番号 | {str(keyboard_number_show)}", view=mouse_register_view, ephemeral=True)
elif custom_id_split[0] == "mouseregister":
pc_number = custom_id_split[1]
keyboard_number = custom_id_split[2]
2024-09-30 17:32:44 +09:00
mouse_number = custom_id_split[3]
if keyboard_number == "0":
keyboard_number_show = "自前"
else:
keyboard_number_show = keyboard_number
2024-09-30 17:32:44 +09:00
if mouse_number == "0":
2024-09-30 17:32:44 +09:00
mouse_number_show = "自前"
else:
mouse_number_show = mouse_number
reason_register_view = discord.ui.View(timeout=15)
for i in dislocker.preset_games:
reason_quick_button = reason_button = discord.ui.Button(style=discord.ButtonStyle.primary, label=f"{str(i)}", custom_id=f"quickreasonregister_{str(pc_number)}_{str(keyboard_number)}_{str(mouse_number)}_{str(i)}")
reason_register_view.add_item(reason_quick_button)
reason_button = discord.ui.Button(style=discord.ButtonStyle.primary, label="使用目的を入力する", custom_id=f"reasonregister_{str(pc_number)}_{str(keyboard_number)}_{str(mouse_number)}")
reason_register_view.add_item(reason_button)
await interaction.response.send_message(f"# :regional_indicator_q: 使用目的を書いてください!\n>>> # PC番号 | {str(pc_number)}\n# キーボード番号 | {str(keyboard_number_show)}\n# マウス番号 | {str(mouse_number_show)}", view=reason_register_view, ephemeral=True)
elif custom_id_split[0] == "quickreasonregister":
pc_number = custom_id_split[1]
keyboard_number = custom_id_split[2]
mouse_number = custom_id_split[3]
2024-09-30 17:32:44 +09:00
if keyboard_number == "0":
2024-09-30 17:32:44 +09:00
keyboard_number_show = "自前"
else:
keyboard_number_show = keyboard_number
if mouse_number == "0":
mouse_number_show = "自前"
else:
mouse_number_show = mouse_number
2024-09-30 17:32:44 +09:00
reason = custom_id_split[4]
2024-09-30 17:32:44 +09:00
register = dislocker.register(discord_user_id=interaction.user.id, name=interaction.user.name, display_name=interaction.user.display_name, pc_number=pc_number, keyboard_number=keyboard_number, mouse_number=mouse_number, detail=reason)
if register["about"] == "ok":
2024-09-30 17:32:44 +09:00
await interaction.response.send_message(f":white_check_mark: 使用が開始されました。\n>>> # パスワード | {register["output_dict"]["password"]}\n## PC番号 | {pc_number}\n## キーボード番号 | {str(keyboard_number_show)}\n## マウス番号 | {str(mouse_number_show)}\n## 使用目的 | {reason}", ephemeral=True)
2024-10-01 12:15:43 +09:00
await send_log(mode="use", pc_number=pc_number, keyboard_number=keyboard_number, mouse_number=mouse_number, reason=reason, discord_user_id=interaction.user.id)
2024-09-30 17:32:44 +09:00
dislocker.log(title=f"[INFO] PC番号{pc_number} の使用が開始されました。", message=f"名前 | {register["output_dict"]["name"]}, 使用目的 | {reason}", flag=0)
elif register["about"] == "pc_already_in_use_by_you":
pc_usage_history = register["pc_usage_history"]
if pc_usage_history["keyboard_number"] == None:
keyboard_number_show = "未認証"
elif pc_usage_history["keyboard_number"] == 0:
keyboard_number_show = "自前"
else:
keyboard_number_show = str(pc_usage_history["keyboard_number"])
if pc_usage_history["mouse_number"] == None:
mouse_number_show = "未認証"
elif pc_usage_history["mouse_number"] == 0:
mouse_number_show = "自前"
else:
mouse_number_show = str(pc_usage_history["mouse_number"])
await interaction.response.send_message(f"# :exploding_head: あなたはPCをもう使用されているようです。\n使用状態を解除するには 終了ボタン で使用終了をお知らせください。\n>>> # PC番号 | {pc_usage_history["pc_number"]}\n# キーボード番号 | {keyboard_number_show}\n# マウス番号 | {mouse_number_show}\n# 使用開始時刻 | {pc_usage_history["start_time"]}", ephemeral=True)
#await interaction.response.send_message(f"# :exploding_head: あなたはPCをもう使用されているようです。\n使用状態を解除するには 終了ボタン で使用終了をお知らせください。\n>>> # PC番号 | {pc_usage_history["pc_number"]}\n# キーボード番号 | {keyboard_number_show}\n# マウス番号 | {mouse_number_show}\n# 使用開始時刻 | {pc_usage_history["start_time"]}\n# 使用目的 | {pc_usage_history["use_detail"]}", ephemeral=True)
elif register["about"] == "pc_already_in_use_by_other":
await interaction.response.send_message(f"# :man_gesturing_no: そのPCは他のメンバーによって使用されています。\n別のPC番号を指定して、再度お試しください。", ephemeral=True)
elif register["about"] == "keyboard_already_in_use":
await interaction.response.send_message(f"# :man_gesturing_no: そのキーボードは他のメンバーによって使用されています。\n別のキーボードのデバイス番号を指定して、再度お試しください。", ephemeral=True)
elif register["about"] == "mouse_already_in_use":
await interaction.response.send_message(f"# :man_gesturing_no: そのマウスは他のメンバーによって使用されています。\n別のマウスのデバイス番号を指定して、再度お試しください。", ephemeral=True)
elif register["about"] == "user_data_not_found":
await interaction.response.send_message("# :dizzy_face: ユーザーとして登録されていないようです。\n最初にサーバーで登録を行ってください。", ephemeral=True)
else:
await interaction.response.send_message("# :skull_crossbones: 登録できませんでした。\n内部エラーが発生しています。", ephemeral=True)
2024-09-30 17:32:44 +09:00
elif custom_id_split[0] == "reasonregister":
pc_number = custom_id_split[1]
keyboard_number = custom_id_split[2]
mouse_number = custom_id_split[3]
if keyboard_number == "0":
2024-09-30 17:32:44 +09:00
keyboard_number_show = "自前"
else:
keyboard_number_show = keyboard_number
if mouse_number == "0":
2024-09-30 17:32:44 +09:00
mouse_number_show = "自前"
else:
mouse_number_show = mouse_number
reason_input_form = ReasonModal(title="Dislocker | 登録", pc_number=str(pc_number), keyboard_number=str(keyboard_number), mouse_number=str(mouse_number))
await interaction.response.send_modal(reason_input_form)
elif custom_id_split[0] == "stop":
pc_stop = dislocker.stop(discord_user_id=interaction.user.id)
stop_view = discord.ui.View(timeout=15)
if pc_stop["about"] == "unused":
await interaction.response.send_message("# :shaking_face: 使用されていないようです...", ephemeral=True)
elif pc_stop["about"] == "user_data_not_found":
await interaction.response.send_message("# :dizzy_face: ユーザーとして登録されていないようです。\n最初にサーバーで登録を行ってください。", ephemeral=True)
elif pc_stop["about"] == "ok":
await interaction.response.send_message(f":white_check_mark: PC番号 {pc_stop["output_dict"]["pc_number"]} の使用が終了されました。", ephemeral=True)
await send_log(mode="stop", pc_number=pc_stop['output_dict']['pc_number'], discord_user_id=interaction.user.id)
2024-09-30 17:32:44 +09:00
else:
await interaction.response.send_message("# :skull_crossbones: 停止できませんでした。\n内部エラーが発生しています。", ephemeral=True)
elif custom_id_split[0] == "user" and custom_id_split[1] == "register":
user_register = dislocker.user_register(name=interaction.user.display_name, discord_user_name=interaction.user.name, discord_user_id=interaction.user.id)
if user_register["about"] == "ok":
await interaction.response.send_message(f"# :white_check_mark: ユーザー情報が登録されました。\n>>> ユーザー名:{interaction.user.display_name}", ephemeral=True)
2024-10-01 12:04:24 +09:00
await send_log(mode="userreg", discord_user_id=interaction.user.id)
2024-09-30 17:32:44 +09:00
elif user_register["about"] == "already_exists":
await interaction.response.send_message("# :no_entry: 登録できませんでした。\nもう登録されている可能性があります。", ephemeral=True)
else:
await interaction.response.send_message("# :no_entry: 登録できませんでした。\n内部エラーが発生しています。", ephemeral=True)
#使用者側のスラッシュコマンド
@tree.command(name="use", description="パソコンの使用登録をします。通常はこのコマンドを使用する必要はありません。")
2024-10-01 12:15:43 +09:00
async def use(interaction: discord.Interaction, pc_number: int, keyboard_number: int, mouse_number: int, reason: str):
register = dislocker.register(discord_user_id=interaction.user.id, name=interaction.user.name, display_name=interaction.user.display_name, pc_number=pc_number, keyboard_number=keyboard_number, mouse_number=mouse_number, detail=reason)
2024-09-30 17:32:44 +09:00
if register["result"] == 0:
2024-10-01 12:15:43 +09:00
await interaction.response.send_message(f":white_check_mark: 使用が開始されました。\n>>> # パスワード | {register["output_dict"]["password"]}\n## PC番号 | {pc_number}\n## 使用目的 | {reason}", ephemeral=True)
dislocker.log(title=f"[INFO] PC番号{pc_number} の使用が開始されました。", message=f"名前 | {register["output_dict"]["name"]}, 使用目的 | {reason}", flag=0)
await send_log(mode="use", pc_number=pc_number, keyboard_number=keyboard_number, mouse_number=mouse_number, reason=reason, discord_user_id=interaction.user.id)
2024-09-30 17:32:44 +09:00
elif register["result"] == 1:
if register["about"] == "pc_already_in_use_by_other":
await interaction.response.send_message(":x: 他の方がそのPCを使用中です。", ephemeral=True)
elif register["about"] == "pc_already_in_use_by_you":
await interaction.response.send_message(f":x: あなたは既にPC {register['pc_usage_history']['pc_number']} を使用中です。\n>>> ## PC番号 | {register['pc_usage_history']['pc_number']}\n## 使用目的 | {register['pc_usage_history']['use_detail']}", ephemeral=True)
elif register["about"] == "keyboard_already_in_use":
await interaction.response.send_message(":x: キーボードは既に使用中です。", ephemeral=True)
elif register["about"] == "mouse_already_in_use":
await interaction.response.send_message(":x: マウスは既に使用中です。", ephemeral=True)
elif register["about"] == "user_data_not_found":
await interaction.response.send_message(":x: ユーザーデータが見つかりませんでした。", ephemeral=True)
elif register["about"] == "error":
await interaction.response.send_message(":x: 内部エラーが発生しました。\nサーバーでエラーが発生しています。管理者に問い合わせてください。", ephemeral=True)
@tree.command(name="stop", description="パソコンの使用を終了します。通常はこのコマンドを使用する必要はありません。")
async def stop(interaction: discord.Interaction):
pc_stop = dislocker.stop(discord_user_id=interaction.user.id)
if pc_stop["result"] == 0:
await interaction.response.send_message(f":white_check_mark: 使用が終了されました。\n>>> ## PC番号 | {pc_stop['output_dict']['pc_number']}", ephemeral=True)
result_embed = discord.Embed(title=":white_check_mark: 使用停止処理は完了しました。", description=f'PC番号 {pc_stop['output_dict']['pc_number']} 番の使用停止処理が完了しました。', color=0x56FF01)
dislocker.log(title=f"[INFO] PC番号{pc_stop['output_dict']['pc_number']} の使用が終了されました。", message=f"名前 | {pc_stop['output_dict']['name']}", flag=0)
await send_log(mode="stop", pc_number=pc_stop['output_dict']['pc_number'], discord_user_id=interaction.user.id)
elif pc_stop["result"] == 1:
if pc_stop["about"] == "unused":
2024-09-30 17:32:44 +09:00
result_embed = discord.Embed(title=":x: 操作に失敗しました。", description=f'あなたはPCを使用されていないようです...', color=0xC91111)
elif pc_stop["about"] == "user_data_not_found":
2024-09-30 17:32:44 +09:00
result_embed = discord.Embed(title=":x: 操作に失敗しました。", description=f'Dislockerのユーザーとして登録されていないようです。\n登録を行ってから、またお試しください。', color=0xC91111)
elif pc_stop["about"] == "error":
2024-09-30 17:32:44 +09:00
result_embed = discord.Embed(title=":x: 操作に失敗しました。", description=f'サーバーでエラーが発生しています。', color=0xC91111)
result_embed.add_field(name=f"{pc_stop['output_dict']['error_class_name']}", value=f"{pc_stop['output_dict']['error_args']}")
2024-09-30 17:32:44 +09:00
await interaction.response.send_message(embed=result_embed, ephemeral=True)
#管理者側のスラッシュコマンド
@tree.command(name="userreg", description="ユーザーを登録します。")
@discord.app_commands.default_permissions(administrator=True)
async def userreg(interaction: discord.Interaction, discord_user_id: str, discord_user_name: str, name: str):
if interaction.guild_id in dislocker.server_config["bot"]["server_id"] or interaction.user.id in dislocker.server_config["bot"]["admin_user_id"]:
user_register = dislocker.user_register(discord_user_id=discord_user_id, discord_user_name=discord_user_name, name=name)
if user_register["result"] == 0:
result_embed = discord.Embed(title=":white_check_mark: ユーザー登録が完了しました。", description=f'続いて、PCの使用登録を行いましょう', color=0x56FF01)
dislocker.log(title=f"[INFO] ユーザーを登録しました。", message=f"名前 | {name}, Discordユーザー名 | {discord_user_name}, DiscordユーザーID | {discord_user_id}", flag=0)
2024-10-01 12:04:24 +09:00
await send_log(mode="userreg", discord_user_id=discord_user_id)
2024-09-30 17:32:44 +09:00
elif user_register["result"] == 1:
if user_register["about"] == "already_exists":
result_embed = discord.Embed(title=":x: 操作に失敗しました。", description=f'既に登録されているユーザーです。', color=0xC91111)
elif user_register["about"] == "error":
result_embed = discord.Embed(title=":x: 操作に失敗しました。", description=f'サーバーでエラーが発生しています。', color=0xC91111)
result_embed.add_field(name=f"{user_register['output_dict']['error_class_name']}", value=f"{user_register['output_dict']['error_args']}")
await interaction.response.send_message(embed=result_embed, ephemeral=True)
@tree.command(name="pcreg", description="PCをDislockerに登録するためのワンタイムパスワードを発行します。")
@discord.app_commands.default_permissions(administrator=True)
async def pcreg(interaction: discord.Interaction, how_much: int = 1):
if interaction.guild_id in dislocker.server_config["bot"]["server_id"] or interaction.user.id in dislocker.server_config["bot"]["admin_user_id"]:
max_count = how_much
pc_onetime_password_gen = dislocker.pc_onetime_gen(max_count=max_count)
if pc_onetime_password_gen["result"] == 0:
pc_onetime_password = str(pc_onetime_password_gen["output_dict"]["onetime_password"])
pc_onetime_password_max_count = str(pc_onetime_password_gen["output_dict"]["max_count"])
pc_onetime_password_current_count = str(pc_onetime_password_gen["output_dict"]["current_count"])
pc_onetime_password_remaining_times = str(int(pc_onetime_password_max_count) - int(pc_onetime_password_current_count))
result_embed = discord.Embed(title=":dizzy_face: PCの登録", description=f"PC登録時のワンタイムパスワードを発行します。", color=0x2286C9)
result_embed.add_field(name="パスワード", value=pc_onetime_password)
result_embed.add_field(name="最大使用回数", value=pc_onetime_password_max_count)
result_embed.add_field(name="残り使用回数", value=pc_onetime_password_remaining_times)
elif pc_onetime_password_gen["result"] == 1:
if pc_onetime_password_gen["about"] == "already_exists":
pc_onetime_password = str(pc_onetime_password_gen["output_dict"]["onetime_password"])
pc_onetime_password_max_count = str(pc_onetime_password_gen["output_dict"]["max_count"])
pc_onetime_password_current_count = str(pc_onetime_password_gen["output_dict"]["current_count"])
pc_onetime_password_remaining_times = str(int(pc_onetime_password_max_count) - int(pc_onetime_password_current_count))
result_embed = discord.Embed(title=":dizzy_face: PCの登録", description=f"ワンタイムパスワードはもう発行されており、有効です。", color=0x2286C9)
result_embed.add_field(name="パスワード", value=pc_onetime_password)
result_embed.add_field(name="最大使用回数", value=pc_onetime_password_max_count)
result_embed.add_field(name="残り使用回数", value=pc_onetime_password_remaining_times)
else:
result_embed = discord.Embed(title=":x: 操作に失敗しました。", description=f'サーバーでエラーが発生しています。', color=0xC91111)
result_embed.add_field(name=f"{pc_onetime_password_gen['output_dict']['error_class_name']}", value=f"{pc_onetime_password_gen['output_dict']['error_args']}")
await interaction.response.send_message(embed=result_embed, ephemeral=True)
@tree.command(name="devicereg", description="デバイスをDislockerに登録するためのワンタイムパスワードを発行します。")
@discord.app_commands.default_permissions(administrator=True)
async def devicereg(interaction: discord.Interaction, how_much: int):
if interaction.guild_id in dislocker.server_config["bot"]["server_id"] or interaction.user.id in dislocker.server_config["bot"]["admin_user_id"]:
max_count = how_much
device_onetime_password_gen = dislocker.device_onetime_gen(max_count=max_count)
if device_onetime_password_gen["result"] == 0:
device_onetime_password = str(device_onetime_password_gen["output_dict"]["onetime_password"])
device_onetime_password_max_count = str(device_onetime_password_gen["output_dict"]["max_count"])
device_onetime_password_current_count = str(device_onetime_password_gen["output_dict"]["current_count"])
device_onetime_password_remaining_times = str(int(device_onetime_password_max_count) - int(device_onetime_password_current_count))
result_embed = discord.Embed(title=":dizzy_face: デバイスの登録", description=f"デバイス登録時のワンタイムパスワードを発行します。", color=0x2286C9)
result_embed.add_field(name="パスワード", value=device_onetime_password)
result_embed.add_field(name="最大使用回数", value=device_onetime_password_max_count)
result_embed.add_field(name="残り使用回数", value=device_onetime_password_remaining_times)
elif device_onetime_password_gen["result"] == 1:
if device_onetime_password_gen["about"] == "already_exists":
device_onetime_password = str(device_onetime_password_gen["output_dict"]["onetime_password"])
device_onetime_password_max_count = str(device_onetime_password_gen["output_dict"]["max_count"])
device_onetime_password_current_count = str(device_onetime_password_gen["output_dict"]["current_count"])
device_onetime_password_remaining_times = str(int(device_onetime_password_max_count) - int(device_onetime_password_current_count))
result_embed = discord.Embed(title=":dizzy_face: デバイスの登録", description=f"ワンタイムパスワードはもう発行されており、有効です。", color=0x2286C9)
result_embed.add_field(name="パスワード", value=device_onetime_password)
result_embed.add_field(name="最大使用回数", value=device_onetime_password_max_count)
result_embed.add_field(name="残り使用回数", value=device_onetime_password_remaining_times)
else:
result_embed = discord.Embed(title=":x: 操作に失敗しました。", description=f'サーバーでエラーが発生しています。', color=0xC91111)
result_embed.add_field(name=f"{device_onetime_password_gen['output_dict']['error_class_name']}", value=f"{device_onetime_password_gen['output_dict']['error_args']}")
await interaction.response.send_message(embed=result_embed, ephemeral=True)
@tree.command(name="fstop", description="PCの使用登録を強制的に終了します。")
@discord.app_commands.default_permissions(administrator=True)
2024-10-01 12:04:24 +09:00
async def fstop(interaction: discord.Interaction, pc_number: int, reason: str):
2024-09-30 17:32:44 +09:00
if interaction.guild_id in dislocker.server_config["bot"]["server_id"] or interaction.user.id in dislocker.server_config["bot"]["admin_user_id"]:
2024-10-01 12:04:24 +09:00
force_stop = dislocker.force_stop(pc_number=pc_number, bot_about=reason)
2024-09-30 17:32:44 +09:00
if force_stop["result"] == 0:
result_embed = discord.Embed(title=":white_check_mark: 処理が完了しました。", description=f'PC番号 {str(pc_number)} 番の使用登録は抹消されました。', color=0x56FF01)
2024-10-01 12:04:24 +09:00
dislocker.log(title=f"[INFO] PC {pc_number} の使用を強制終了しました。", message=f"理由 | {reason}", flag=0)
await send_log(mode="fstop", pc_number=pc_number, discord_user_id=interaction.user.id, reason=reason)
2024-09-30 17:32:44 +09:00
elif force_stop["result"] == 1:
if force_stop["about"] == "not_used":
result_embed = discord.Embed(title=":x: 操作に失敗しました。", description=f'指定されたPCは使用されていないようです...', color=0xC91111)
elif force_stop["about"] == "bot_about_not_found":
result_embed = discord.Embed(title=":x: 操作に失敗しました。", description=f'強制停止する理由を入力してください。', color=0xC91111)
elif force_stop["about"] == "error":
result_embed = discord.Embed(title=":x: 操作に失敗しました。", description=f'サーバーでエラーが発生しています。', color=0xC91111)
result_embed.add_field(name=f"{force_stop['output_dict']['error_class_name']}", value=f"{force_stop['output_dict']['error_args']}")
await interaction.response.send_message(embed=result_embed, ephemeral=True)
@tree.command(name="report", description="PCの使用履歴をエクスポートします。")
@discord.app_commands.default_permissions(administrator=True)
async def report(interaction: discord.Interaction):
if interaction.guild_id in dislocker.server_config["bot"]["server_id"] or interaction.user.id in dislocker.server_config["bot"]["admin_user_id"]:
report_export = dislocker.report_export()
if report_export["result"] == 0:
await interaction.response.send_message(f":white_check_mark: 使用履歴のレポートです。", file=discord.File(report_export["file_path"]), ephemeral=True)
dislocker.log(title=f"[INFO] PCの使用履歴をエクスポートしました。", message=f"ファイルパス | {report_export['file_path']}", flag=0)
elif report_export["result"] == 1:
if report_export["about"] == "error":
await interaction.response.send_message(":x: 内部エラーが発生しました。\nサーバーでエラーが発生しています。管理者に問い合わせてください。", ephemeral=True)
@tree.command(name="init", description="操作チャンネルにボタン一式を送信します。")
@discord.app_commands.default_permissions(administrator=True)
async def button_init(interaction: discord.Interaction, text_channel: discord.TextChannel):
if interaction.guild_id in dislocker.server_config["bot"]["server_id"] or interaction.user.id in dislocker.server_config["bot"]["admin_user_id"]:
pc_list = dislocker.get_pc_list()
user_register_button_view = discord.ui.View(timeout=None)
user_register_button = discord.ui.Button(style=discord.ButtonStyle.green, label="ユーザー登録", custom_id="user_register")
user_register_button_view.add_item(user_register_button)
await client.get_channel(text_channel.id).send(f'# :index_pointing_at_the_viewer: ユーザー登録はお済ですか?', view=user_register_button_view)
stop_button_view = discord.ui.View(timeout=None)
stop_button = discord.ui.Button(style=discord.ButtonStyle.danger, label="PCの使用を停止", custom_id="stop")
stop_button_view.add_item(stop_button)
await client.get_channel(text_channel.id).send(f'# :index_pointing_at_the_viewer: 使用を停止しますか?', view=stop_button_view)
pc_button_view = discord.ui.View(timeout=None)
for i in pc_list["output_dict"].keys():
current_pc_list = pc_list['output_dict'][i]
if current_pc_list['alt_name'] == None:
pc_register_button = discord.ui.Button(style=discord.ButtonStyle.primary, label=f"{str(current_pc_list['pc_number'])}", custom_id=f"pcregister_{str(current_pc_list['pc_number'])}")
else:
pc_register_button = discord.ui.Button(style=discord.ButtonStyle.primary, label=f"{str(current_pc_list['pc_number'])} 番 | ({current_pc_list['alt_name']})", custom_id=f"pcregister_{str(current_pc_list['pc_number'])}")
pc_button_view.add_item(pc_register_button)
await client.get_channel(text_channel.id).send(f'# :index_pointing_at_the_viewer: 使いたいPCの番号を選んでください', view=pc_button_view)
dislocker.log(title=f"[INFO] サーバーで初回処理を実行しました。", flag=0)
result_embed = discord.Embed(title=":white_check_mark: 初回処理が完了しました。", description=f'指定したテキストチャンネルをご確認ください。', color=0x56FF01)
await interaction.response.send_message(embed=result_embed, ephemeral=True)
@tree.command(name="masterpass", description="PCのマスターパスワードを表示します。")
@discord.app_commands.default_permissions(administrator=True)
async def masterpass(interaction: discord.Interaction, pc_number: int):
if interaction.guild_id in dislocker.server_config["bot"]["server_id"] or interaction.user.id in dislocker.server_config["bot"]["admin_user_id"]:
pc_master_password_get = dislocker.show_pc_master_password(pc_number=pc_number)
if pc_master_password_get["result"] == 0:
pc_master_password = pc_master_password_get["output_dict"]["pc_master_password"]
result_embed = discord.Embed(title=":information_source: マスターパスワード", description=f"PC番号 {str(pc_number)} 番のマスターパスワードを表示します。", color=0x2286C9)
result_embed.add_field(name=f"マスターパスワード", value=f"{str(pc_master_password)}")
else:
result_embed = discord.Embed(title=":x: 取得に失敗しました。", description=f'サーバーでエラーが発生しています。', color=0xC91111)
result_embed.add_field(name=f"{pc_master_password_get['output_dict']['error_class_name']}", value=f"{pc_master_password_get['output_dict']['error_args']}")
await interaction.response.send_message(embed=result_embed, ephemeral=True)
@tree.command(name="pcinfo", description="PCの情報を表示します。")
@discord.app_commands.default_permissions(administrator=True)
async def pcinfo(interaction: discord.Interaction):
if interaction.guild_id in dislocker.server_config["bot"]["server_id"] or interaction.user.id in dislocker.server_config["bot"]["admin_user_id"]:
pc_list = dislocker.get_pc_list()
if pc_list["result"] == 0:
result_embed = discord.Embed(title=":information_source: 現在のPCリスト", description="PCリストです。", color=0x2286C9)
for i in pc_list['output_dict'].keys():
current_pc_list = pc_list['output_dict'][i]
if current_pc_list['alt_name'] == None:
pc_name_title = f'{current_pc_list['pc_number']}'
else:
pc_name_title = f'{current_pc_list['pc_number']} 番 ({current_pc_list['alt_name']})'
if current_pc_list['using_member_id'] == None:
pc_using_value = f'未使用'
else:
discord_user_id = dislocker.get_discord_user_id(current_pc_list['using_member_id'])
pc_using_value = f'<@{discord_user_id}> が使用中'
result_embed.add_field(name=f'{pc_name_title}', value=f'{pc_using_value}')
else:
result_embed = discord.Embed(title=":x: 操作に失敗しました。", description=f'サーバーでエラーが発生しています。', color=0xC91111)
result_embed.add_field(name=f"{pc_list['output_dict']['error_class_name']}", value=f"{pc_list['output_dict']['error_args']}")
await interaction.response.send_message(embed=result_embed, ephemeral=True)
@tree.command(name="pcnickname", description="PCにニックネームを設定します。")
@discord.app_commands.default_permissions(administrator=True)
async def pcnickname(interaction: discord.Interaction, pc_number: int, nickname: str):
if interaction.guild_id in dislocker.server_config["bot"]["server_id"] or interaction.user.id in dislocker.server_config["bot"]["admin_user_id"]:
pc_nickname_set = dislocker.set_pc_nickname(pc_number=pc_number, alt_name=nickname)
if pc_nickname_set["result"] == 0:
result_embed = discord.Embed(title=":white_check_mark: 操作が完了しました。", description=f'PC番号 {str(pc_number)} のニックネームは {str(nickname)} に設定されました。', color=0x56FF01)
await send_log(mode="pcnickname", pc_number=pc_number, discord_user_id=interaction.user.id, alt_name=nickname)
2024-09-30 17:32:44 +09:00
else:
result_embed = discord.Embed(title=":x: 操作に失敗しました。", description=f'サーバーでエラーが発生しています。ニックネームは変更されません。', color=0xC91111)
result_embed.add_field(name=f"{pc_nickname_set['output_dict']['error_class_name']}", value=f"{pc_nickname_set['output_dict']['error_args']}")
await interaction.response.send_message(embed=result_embed, ephemeral=True)
2024-07-07 16:35:57 +09:00
2024-07-07 16:04:46 +09:00
if dislocker.init_result == "ok":
print("Botを起動します...")
monitor = Monitor(search_frequency=dislocker.server_config["bot"]["monitor"]["search_frequency"], allowable_time=dislocker.server_config["bot"]["monitor"]["allowable_time"], fstop_time=dislocker.server_config["bot"]["monitor"]["fstop_time"])
monitor.start()
2024-09-30 17:32:44 +09:00
client.run(dislocker.server_config["bot"]["token"])
2024-07-07 17:10:05 +09:00
else:
2024-09-30 17:32:44 +09:00
pass