Dislocker/dislocker.py

1704 lines
106 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import discord
import os
import json
import psycopg2
from psycopg2 import sql
from datetime import datetime, timedelta
import asyncio
import string
import random
import hashlib
import openpyxl
from openpyxl import Workbook
import threading
import time
class DL():
def __init__(self):
self.config_dir_path = "./config/"
self.export_dir_path = "./export/"
self.log_dir_path = "./log/"
self.server_config_path = self.config_dir_path + "server.json"
self.onetime_config_path = self.config_dir_path + "onetime.json"
self.log_path = self.log_dir_path + "dislocker.txt"
try:
if not os.path.isdir(self.config_dir_path):
print("config ディレクトリが見つかりません... 作成します。")
os.mkdir(self.config_dir_path)
if not os.path.isfile(self.server_config_path):
print("config ファイルが見つかりません... 作成します。")
self.server_config = {
"db": {
"host": "localhost",
"port": "5432",
"db_name": "dislocker",
"username": "user",
"password": "password"
},
"bot": {
"server_id": ["TYPE HERE SERVER ID (YOU MUST USE INT !!!!)"],
"token": "TYPE HERE BOTS TOKEN KEY",
"activity": {
"name": "Dislocker",
"details": "ロック中...",
"type": "playing",
"state": "ロック中..."
},
"log_channel_id" : "TYPE HERE CHANNEL ID (YOU MUST USE INT !!!!)",
"config_channel_id": "TYPE HERE CHANNEL ID (YOU MUST USE INT !!!!)",
"config_public_channel_id": "TYPE HERE CHANNEL ID (YOU MUST USE INT !!!!)",
"monitor": {
"search_frequency": 1,
"allowable_time": 180,
"fstop_time": "21:00:00"
},
"preset_games": ["TEST1", "TEST2", "TEST3", "TEST4", "TEST5"],
"admin_user_id": ["TYPE HERE CHANNEL ID (YOU MUST USE INT !!!!)"],
"debug": False
}
}
with open(self.server_config_path, "w", encoding="utf-8") as w:
json.dump(self.server_config, w, indent=4, ensure_ascii=False)
elif os.path.isfile(self.server_config_path):
with open(self.server_config_path, "r", encoding="utf-8") as r:
self.server_config = json.load(r)
print("config ファイルを読み込みました。")
if not os.path.isdir(self.export_dir_path):
print("export ディレクトリが見つかりません... 作成します。")
os.mkdir(self.export_dir_path)
if not os.path.isdir(self.log_dir_path):
print("log ディレクトリが見つかりません... 作成します。")
os.mkdir(self.log_dir_path)
if os.path.isfile(self.onetime_config_path):
print("ワンタイムパスワードが見つかりました。削除します。")
os.remove(self.onetime_config_path)
if type(self.server_config["bot"]["log_channel_id"]) is not int or type(self.server_config["bot"]["config_channel_id"]) is not int:
print("config ファイル内でチャンネルIDがint型で記述されていません。int型で記述して、起動してください。")
self.init_result = "not_int"
else:
self.db = psycopg2.connect(f"host={self.server_config['db']['host']} dbname={self.server_config['db']['db_name']} port={self.server_config['db']['port']} user={self.server_config['db']['username']} password={self.server_config['db']['password']}")
cursor = self.db.cursor()
self.pc_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
self.keyboard_list = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
self.mouse_list = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
self.preset_games = self.server_config["bot"]["preset_games"]
self.debug = self.server_config["bot"]["debug"]
cursor.execute("SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'club_member')")
find_club_member_table = cursor.fetchall()
print(find_club_member_table)
if find_club_member_table[0][0] == False:
cursor.execute("CREATE TABLE club_member (member_id SERIAL NOT NULL, name TEXT NOT NULL, discord_user_name TEXT NOT NULL, discord_user_id TEXT NOT NULL, PRIMARY KEY (member_id))")
self.db.commit()
cursor.execute("SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'pc_list')")
find_pc_list_table = cursor.fetchall()
print(find_pc_list_table)
if find_pc_list_table[0][0] == False:
cursor.execute("CREATE TABLE pc_list (pc_number INTEGER NOT NULL, using_member_id INTEGER, password_hash VARCHAR(64), pc_uuid VARCHAR(36), pc_token VARCHAR(36), master_password VARCHAR(16), detail TEXT, alt_name TEXT, PRIMARY KEY (pc_number), FOREIGN KEY (using_member_id) REFERENCES club_member(member_id))")
for i in self.pc_list:
print(i)
cursor.execute("INSERT INTO pc_list (pc_number) VALUES (%s)", (i,))
self.db.commit()
cursor.execute("SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'keyboard_list')")
find_keyboard_list_table = cursor.fetchall()
print(find_keyboard_list_table)
if find_keyboard_list_table[0][0] == False:
cursor.execute("CREATE TABLE keyboard_list (keyboard_number INTEGER NOT NULL, using_member_id INTEGER, device_instance_path TEXT, device_name TEXT, detail TEXT, alt_name TEXT, PRIMARY KEY (keyboard_number), FOREIGN KEY (using_member_id) REFERENCES club_member(member_id))")
for i in self.keyboard_list:
print(i)
cursor.execute("INSERT INTO keyboard_list (keyboard_number) VALUES (%s)", (i,))
self.db.commit()
cursor.execute("SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'mouse_list')")
find_mouse_list_table = cursor.fetchall()
print(find_mouse_list_table)
if find_mouse_list_table[0][0] == False:
cursor.execute("CREATE TABLE mouse_list (mouse_number INTEGER NOT NULL, using_member_id INTEGER, device_instance_path TEXT, device_name TEXT, detail TEXT, alt_name TEXT, PRIMARY KEY (mouse_number), FOREIGN KEY (using_member_id) REFERENCES club_member(member_id))")
for i in self.mouse_list:
print(i)
cursor.execute("INSERT INTO mouse_list (mouse_number) VALUES (%s)", (i,))
self.db.commit()
cursor.execute("SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'pc_usage_history')")
find_pc_usage_history_table = cursor.fetchall()
print(find_pc_usage_history_table)
if find_pc_usage_history_table[0][0] == False:
cursor.execute("CREATE TABLE pc_usage_history (id SERIAL NOT NULL, member_id INTEGER NOT NULL, pc_number INTEGER NOT NULL, keyboard_number INTEGER, mouse_number INTEGER, start_use_time TIMESTAMP NOT NULL, end_use_time TIMESTAMP, use_detail TEXT, bot_about TEXT, PRIMARY KEY (id), FOREIGN KEY (member_id) REFERENCES club_member(member_id), FOREIGN KEY (pc_number) REFERENCES pc_list(pc_number), FOREIGN KEY (keyboard_number) REFERENCES keyboard_list(keyboard_number), FOREIGN KEY (mouse_number) REFERENCES mouse_list(mouse_number))")
self.db.commit()
cursor.close()
self.init_result = "ok"
except (Exception) as error:
print("初回処理でエラーが発生しました。\nエラー内容\n" + str(error))
self.init_result = "error"
finally:
pass
def log(self, **kwargs):
if self.debug == True:
flag = 1
else:
if "flag" in kwargs:
if kwargs["flag"] == 1:
flag = 1
else:
flag = 0
else:
flag = 0
if flag == 1:
title = str(kwargs["title"])
if "message" in kwargs:
message = str(kwargs["message"])
else:
message = None
current_datetime = str(datetime.now())
if message == None:
detail = f"{current_datetime} | {title}\n"
else:
detail = f"{current_datetime} | {title}\n{message}\n"
print(detail)
if os.path.isfile(self.log_path):
try:
with open(self.log_path, "a", encoding="utf-8") as a:
a.write(detail)
except:
print("LOGGING ERROR mode a")
else:
try:
with open(self.log_path, "w", encoding="utf-8") as w:
w.write(detail)
except:
print("LOGGING ERROR mode w")
def password_generate(self, length):
numbers = string.digits # (1)
password = ''.join(random.choice(numbers) for _ in range(length)) # (2)
return password
def hash_genarate(self, source):
hashed = hashlib.sha256(source.encode())
return hashed.hexdigest()
def user_register_check(self, **kwargs):
try:
discord_user_id = str(kwargs["discord_user_id"])
cursor = self.db.cursor()
cursor.execute("SELECT * FROM club_member WHERE discord_user_id = %s", (discord_user_id,))
user_record = cursor.fetchall()
#ユーザーデータが見つかった場合(登録済みの場合)
if user_record:
member_id = user_record[0][0]
name = user_record[0][1]
discord_user_name = user_record[0][2]
return {"result": 0, "about": "exist", "user_info": {"member_id": member_id, "name": name, "discord_user_name": discord_user_name}}
#ユーザーデータがなかったら(未登録の場合)
else:
return {"result": 1, "about": "user_data_not_found"}
except Exception as error:
self.log(title=f"[ERROR] ユーザーの登録状態を調査中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
return {"result": 1, "about": "error", "output_dict": {"error_class_name": str(error.__class__.__name__), "error_args": str(error.args)}}
finally:
cursor.close()
def pc_used_check(self, **kwargs):
try:
if "pc_number" in kwargs:
pc_number = int(kwargs["pc_number"])
else:
pc_number = None
if "discord_user_id" in kwargs:
discord_user_id = str(kwargs["discord_user_id"])
else:
discord_user_id = None
if "member_id" in kwargs:
member_id = int(kwargs["member_id"])
else:
member_id = None
cursor = self.db.cursor()
if pc_number != None:
# pc番号を指定してpc_listから探す
cursor.execute("SELECT * FROM pc_list WHERE pc_number= %s", (pc_number,))
pc_list_record = cursor.fetchall()
if pc_list_record[0][1] == None:
return {"result": 0, "about": "vacent"}
else:
return {"result": 1, "about": "used_by_other"}
elif discord_user_id != None:
#ユーザーIDを指定してPC使用履歴から探す
cursor.execute("SELECT * FROM club_member WHERE discord_user_id = %s", (discord_user_id,))
user_record = cursor.fetchall()
#ユーザーデータが見つかった場合(登録済みの場合)
if user_record:
member_id = user_record[0][0]
cursor.execute("SELECT * FROM pc_usage_history WHERE member_id = %s ORDER BY id DESC LIMIT 1", (member_id,))
pc_usage_history_record = cursor.fetchall()
if pc_usage_history_record:
if pc_usage_history_record[0][6] == None:
keyboard_number = pc_usage_history_record[0][3]
mouse_number = pc_usage_history_record[0][4]
return {"result": 1, "about": "used_by_you", "pc_usage_history": {"pc_number": str(pc_usage_history_record[0][2]), "keyboard_number": keyboard_number, "mouse_number": mouse_number, "start_time": str(pc_usage_history_record[0][5]), "use_detail": str(pc_usage_history_record[0][7])}}
else:
return {"result": 0, "about": "vacent"}
else:
return {"result": 0, "about": "vacent"}
else:
return {"result": 1, "about": "user_data_not_found"}
elif member_id != None:
cursor.execute("SELECT * FROM pc_usage_history WHERE member_id = %s ORDER BY id DESC LIMIT 1", (member_id,))
pc_usage_history_record = cursor.fetchall()
if pc_usage_history_record:
if pc_usage_history_record[0][6] == None:
keyboard_number = pc_usage_history_record[0][3]
mouse_number = pc_usage_history_record[0][4]
return {"result": 1, "about": "used_by_you", "pc_usage_history": {"pc_number": str(pc_usage_history_record[0][2]), "keyboard_number": keyboard_number, "mouse_number": mouse_number, "start_time": str(pc_usage_history_record[0][5]), "use_detail": str(pc_usage_history_record[0][7])}}
else:
return {"result": 0, "about": "vacent"}
else:
return {"result": 0, "about": "vacent"}
else:
return {"result": 1, "about": "search_options_error"}
except Exception as error:
self.log(title=f"[ERROR] PCの使用状況を調査中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
return {"result": 1, "about": "error", "output_dict": {"error_class_name": str(error.__class__.__name__), "error_args": str(error.args)}}
finally:
if cursor:
cursor.close()
def keyboard_used_check(self, **kwargs):
try:
cursor = self.db.cursor()
if kwargs["keyboard_number"] == None:
return {"result": 0, "about": "ok"}
else:
keyboard_number = int(kwargs["keyboard_number"])
cursor.execute("SELECT * FROM keyboard_list WHERE keyboard_number=%s", (keyboard_number,))
keyboard_list_record = cursor.fetchall()
if keyboard_list_record[0][1] == None:
return {"result": 0, "about": "ok"}
else:
return {"result": 1, "about": "keyboard_already_in_use_by_other"}
except Exception as error:
self.log(title=f"[ERROR] キーボードの使用状況を調査中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
return {"result": 1, "about": "error", "output_dict": {"error_class_name": str(error.__class__.__name__), "error_args": str(error.args)}}
finally:
if cursor:
cursor.close()
def mouse_used_check(self, **kwargs):
try:
cursor = self.db.cursor()
if kwargs["mouse_number"] == None:
return {"result": 0, "about": "ok"}
else:
mouse_number = int(kwargs["mouse_number"])
cursor.execute("SELECT * FROM mouse_list WHERE mouse_number=%s", (mouse_number,))
mouse_list_record = cursor.fetchall()
if mouse_list_record[0][1] == None:
return {"result": 0, "about": "ok"}
else:
return {"result": 1, "about": "mouse_already_in_use_by_other"}
except Exception as error:
self.log(title=f"[ERROR] マウスの使用状況を調査中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
return {"result": 1, "about": "error", "output_dict": {"error_class_name": str(error.__class__.__name__), "error_args": str(error.args)}}
finally:
if cursor:
cursor.close()
def register(self, **kwargs):
try:
cursor = self.db.cursor()
user_info = {
"id": str(kwargs["discord_user_id"]),
"name": str(kwargs["name"]),
"display_name": str(kwargs["display_name"]),
"pc_number": int(kwargs["pc_number"]),
"keyboard_number": 0,
"mouse_number": 0,
"detail": None
}
if "detail" in kwargs:
user_info["detail"] = str(kwargs["detail"])
else:
pass
if kwargs["keyboard_number"] == "0":
pass
else:
user_info["keyboard_number"] = int(kwargs["keyboard_number"])
if kwargs["mouse_number"] == "0":
pass
else:
user_info["mouse_number"] = int(kwargs["mouse_number"])
# ユーザー登録されているかの確認
user_register = self.user_register_check(discord_user_id=user_info["id"])
if user_register["result"] == 0:
member_id = user_register["user_info"]["member_id"]
name = user_register["user_info"]["name"]
# ユーザーがPCを使っているか
pc_check_self = self.pc_used_check(member_id=member_id)
if pc_check_self["result"] == 0:
# 他の人がそのPCを使っているか
pc_check = self.pc_used_check(pc_number=user_info["pc_number"])
if pc_check["result"] == 0:
# キーボードは使われているか
keyboard_check = self.keyboard_used_check(keyboard_number=user_info["keyboard_number"])
if keyboard_check["result"] == 0:
# マウスは使われているか
mouse_check = self.mouse_used_check(mouse_number=user_info["mouse_number"])
if mouse_check["result"] == 0:
# パスワードとハッシュ作成
password = self.password_generate(4)
password_hash = self.hash_genarate(password)
# PC使用履歴のテーブルにレコードを挿入
cursor.execute("INSERT INTO pc_usage_history (member_id, pc_number, keyboard_number, mouse_number, start_use_time, use_detail) VALUES (%s, %s, %s, %s, clock_timestamp(), %s)", (member_id, user_info["pc_number"], user_info["keyboard_number"], user_info["mouse_number"], user_info["detail"]))
# PCリストの該当のレコードを更新
cursor.execute("UPDATE pc_list SET using_member_id = %s, password_hash = %s WHERE pc_number = %s", (member_id, password_hash, user_info["pc_number"]))
# キーボードリストの該当のレコードを自前(None)だったらスキップ、借りていたら更新
if user_info["keyboard_number"] == 0:
pass
else:
cursor.execute("UPDATE keyboard_list SET using_member_id = %s WHERE keyboard_number = %s", (member_id, user_info["keyboard_number"]))
# マウスも同様に
if user_info["mouse_number"] == 0:
pass
else:
cursor.execute("UPDATE mouse_list SET using_member_id = %s WHERE mouse_number = %s", (member_id, user_info["mouse_number"]))
self.db.commit()
return {"result": 0, "about": "ok", "output_dict": {"password": str(password), "name": str(name)}}
else:
return {"result": 1, "about": "mouse_already_in_use"}
else:
return {"result": 1, "about": "keyboard_already_in_use"}
else:
return {"result": 1, "about": "pc_already_in_use_by_other"}
else:
return {"result": 1, "about": "pc_already_in_use_by_you", "pc_usage_history": {"pc_number": pc_check_self["pc_usage_history"]["pc_number"], "keyboard_number": pc_check_self["pc_usage_history"]["keyboard_number"], "mouse_number": pc_check_self["pc_usage_history"]["mouse_number"], "start_time": pc_check_self["pc_usage_history"]["start_time"], "use_detail": pc_check_self["pc_usage_history"]["use_detail"]}}
else:
return {"result": 1, "about": "user_data_not_found"}
except Exception as error:
self.log(title=f"[ERROR] PCの使用登録中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
return {"result": 1, "about": "error", "output_dict": {"error_class_name": str(error.__class__.__name__), "error_args": str(error.args)}}
finally:
if cursor:
cursor.close()
def stop(self, **kwargs):
try:
cursor = self.db.cursor()
discord_user_id = str(kwargs["discord_user_id"])
if "bot_about" in kwargs:
bot_about = kwargs["bot_about"]
else:
bot_about = None
# ユーザーが登録してるかというよりはデータの取得のため
user_register = self.user_register_check(discord_user_id=discord_user_id)
member_id = user_register["user_info"]["member_id"]
name = user_register["user_info"]["name"]
if user_register["result"] == 0:
cursor.execute("SELECT * FROM pc_usage_history WHERE member_id= %s ORDER BY id DESC LIMIT 1", (member_id,))
pc_usage_history_record = cursor.fetchall()
if pc_usage_history_record:
pc_usage_history_id = pc_usage_history_record[0][0]
pc_number = pc_usage_history_record[0][2]
keyboard_number = pc_usage_history_record[0][3]
mouse_number = pc_usage_history_record[0][4]
end_use_time = pc_usage_history_record[0][6]
# 使用中のとき (使用停止時間がNoneのとき)
if end_use_time == None:
# 利用停止の理由の有無を判断
if bot_about == None:
cursor.execute("UPDATE pc_usage_history SET end_use_time = clock_timestamp() WHERE id = %s", (pc_usage_history_id,))
else:
cursor.execute("UPDATE pc_usage_history SET end_use_time = clock_timestamp(), bot_about = %s WHERE id = %s", (bot_about, pc_usage_history_id))
# pc_listの使用中ユーザーを消す
cursor.execute("UPDATE pc_list SET using_member_id = NULL, password_hash = NULL WHERE pc_number = %s", (pc_number,))
if keyboard_number == None:
pass
else:
# keyboard_listの使用中ユーザーを消す
cursor.execute("UPDATE keyboard_list SET using_member_id = NULL WHERE keyboard_number = %s", (keyboard_number,))
if mouse_number == None:
pass
else:
# mouse_listの使用中ユーザーを消す
cursor.execute("UPDATE mouse_list SET using_member_id = NULL WHERE mouse_number = %s", (mouse_number,))
self.db.commit()
return {"result": 0, "about": "ok", "output_dict": {"pc_number": str(pc_number), "name": str(name)}}
else:
return {"result": 1, "about": "unused"}
else:
return {"result": 1, "about": "unused"}
else:
return {"result": 1, "about": "user_data_not_found"}
except Exception as error:
self.log(title=f"[ERROR] PCの使用停止処理中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
return {"result": 1, "about": "error", "output_dict": {"error_class_name": str(error.__class__.__name__), "error_args": str(error.args)}}
finally:
if cursor:
cursor.close()
def user_register(self, **kwargs):
try:
discord_user_id = str(kwargs["discord_user_id"])
discord_user_name = str(kwargs["discord_user_name"])
name = str(kwargs["name"])
cursor = self.db.cursor()
cursor.execute("SELECT * FROM club_member WHERE discord_user_id = %s", (discord_user_id,))
user_record = cursor.fetchall()
if not user_record:
cursor.execute("INSERT INTO club_member (name, discord_user_name, discord_user_id) VALUES (%s, %s, %s)", (name, discord_user_name, discord_user_id))
self.db.commit()
return {"result": 0, "about": "ok"}
else:
return {"result": 1, "about": "already_exists"}
except Exception as error:
self.log(title=f"[ERROR] ユーザー情報の登録中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
return {"result": 1, "about": "error", "output_dict": {"error_class_name": str(error.__class__.__name__), "error_args": str(error.args)}}
finally:
if cursor:
cursor.close()
def format_datetime(self, value):
if isinstance(value, datetime):
return value.strftime('%Y-%m-%d %H:%M:%S')
return value
def pc_register(self, **kwargs):
try:
pc_number = int(kwargs["pc_number"])
cursor = self.db.cursor()
cursor.execute("SELECT * FROM pc_list WHERE pc_number = %s", (pc_number,))
pc_list = cursor.fetchall()
if not pc_list:
cursor.execute("INSERT INTO pc_list (pc_number) VALUES (%s)", (pc_number,))
self.db.commit()
return {"result": 0, "about": "ok"}
else:
return {"result": 1, "about": "already_exists"}
except Exception as error:
self.log(title=f"[ERROR] PCの情報を登録中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
return {"result": 1, "about": "error", "output_dict": {"error_class_name": str(error.__class__.__name__), "error_args": str(error.args)}}
finally:
if cursor:
cursor.close()
def report_export(self, **kwargs):
try:
cursor = self.db.cursor()
csv_file_path = self.export_dir_path + "pc_usage_history.csv"
main_table = "pc_usage_history"
related_table = "club_member"
excel_file_path = self.export_dir_path + "pc_usage_history.xlsx"
# メインテーブルの列情報を取得user_idを除く
cursor.execute(sql.SQL("SELECT * FROM {} LIMIT 0").format(sql.Identifier(main_table)))
main_columns = [desc[0] for desc in cursor.description if desc[0] != 'member_id']
# クエリを作成(列名を明確に指定)
query = sql.SQL("""
SELECT {main_columns}, {related_table}.name
FROM {main_table}
LEFT JOIN {related_table} ON {main_table}.member_id = {related_table}.member_id
ORDER BY id
""").format(
main_columns=sql.SQL(', ').join([sql.SQL("{}.{}").format(sql.Identifier(main_table), sql.Identifier(col)) for col in main_columns]),
main_table=sql.Identifier(main_table),
related_table=sql.Identifier(related_table)
)
cursor.execute(query)
# 列名を再構成nameを2番目に配置
column_names = [main_columns[0], 'name'] + main_columns[1:]
rows = cursor.fetchall()
# Excelワークブックを作成
wb = Workbook()
ws = wb.active
# 列名を書き込み
ws.append(column_names)
# データを書き込み
for row in rows:
# nameを2番目に移動
formatted_row = [self.format_datetime(row[0])] + [row[-1]] + [self.format_datetime(field) if field is not None else '' for field in row[1:-1]]
ws.append(formatted_row)
# 列幅を自動調整
for col in ws.columns:
max_length = 0
column = col[0].column_letter
for cell in col:
try:
if len(str(cell.value)) > max_length:
max_length = len(str(cell.value))
except:
pass
adjusted_width = (max_length + 2) * 1.2
ws.column_dimensions[column].width = adjusted_width
# Excelファイルを保存
wb.save(excel_file_path)
self.log(title=f"[SUCCESS] PCの使用履歴をエクスポートしました。", message=f"ファイルパス | {excel_file_path}", flag=0)
return {"result": 0, "about": "ok", "file_path": excel_file_path}
except Exception as error:
self.log(title=f"[ERROR] PCの使用履歴をエクスポート中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
return {"result": 1, "about": "error", "output_dict": {"error_class_name": str(error.__class__.__name__), "error_args": str(error.args)}}
finally:
if cursor:
cursor.close()
def force_stop(self, **kwargs):
try:
pc_number = kwargs["pc_number"]
cursor = self.db.cursor()
if "bot_about" in kwargs:
bot_about = kwargs["bot_about"]
cursor.execute("SELECT * FROM pc_list WHERE pc_number = %s", (pc_number,))
pc_list_record = cursor.fetchall()
pc_using_member_id = pc_list_record[0][1]
pc_password_hash = pc_list_record[0][2]
if pc_using_member_id == None:
return {"result": 1, "about": "not_used"}
else:
cursor.execute("UPDATE pc_list SET using_member_id = NULL WHERE pc_number = %s", (pc_number,))
if pc_password_hash == None:
pass
else:
cursor.execute("UPDATE pc_list SET password_hash = NULL WHERE pc_number = %s", (pc_number,))
cursor.execute("SELECT * FROM pc_usage_history WHERE member_id = %s AND pc_number = %s ORDER BY id DESC LIMIT 1", (pc_using_member_id, pc_number))
pc_usage_history_record = cursor.fetchall()
pc_usage_history_record_id = pc_usage_history_record[0][0]
keyboard_number = pc_usage_history_record[0][3]
mouse_number = pc_usage_history_record[0][4]
if keyboard_number == None:
pass
else:
# keyboard_listの使用中ユーザーを消す
cursor.execute("UPDATE keyboard_list SET using_member_id = NULL WHERE keyboard_number = %s", (keyboard_number,))
if mouse_number == None:
pass
else:
# mouse_listの使用中ユーザーを消す
cursor.execute("UPDATE mouse_list SET using_member_id = NULL WHERE mouse_number = %s", (mouse_number,))
cursor.execute("UPDATE pc_usage_history SET end_use_time = clock_timestamp(), bot_about = %s WHERE id = %s", (bot_about, pc_usage_history_record_id))
self.db.commit()
return {"result": 0, "about": "ok"}
else:
return {"result": 1, "about": "bot_about_not_found"}
except Exception as error:
self.log(title=f"[ERROR] fstop中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
return {"result": 1, "about": "error", "output_dict": {"error_class_name": str(error.__class__.__name__), "error_args": str(error.args)}}
finally:
if cursor:
cursor.close()
def pc_onetime_gen(self, **kwargs):
if kwargs.get("max_count") == None:
max_count = 1
elif isinstance(kwargs.get("max_count"), int):
max_count = int(kwargs.get("max_count"))
else:
max_count = 1
try:
if os.path.isfile(dislocker.onetime_config_path):
with open(dislocker.onetime_config_path, "r") as r:
onetime_config = json.load(r)
onetime_password = onetime_config["onetime"]["pc_register"]["password"]
if onetime_password == None:
onetime_password = str(self.password_generate(8))
onetime_config["onetime"]["pc_register"]["password"] = onetime_password
onetime_config["onetime"]["pc_register"]["max_count"] = max_count
onetime_config["onetime"]["pc_register"]["current_count"] = 0
current_count = onetime_config["onetime"]["pc_register"]["current_count"]
with open(dislocker.onetime_config_path, "w") as w:
json.dump(onetime_config, w, indent=4)
return {"result": 0, "about": "ok", "output_dict": {"onetime_password": onetime_password, "current_count": current_count, "max_count": max_count}}
else:
current_count = onetime_config["onetime"]["pc_register"]["current_count"]
max_count = onetime_config["onetime"]["pc_register"]["max_count"]
return {"result": 1, "about": "already_exists", "output_dict": {"onetime_password": onetime_password, "current_count": current_count, "max_count": max_count}}
else:
onetime_password = str(self.password_generate(8))
onetime_config = {
"onetime": {
"pc_register": {
"password": onetime_password,
"current_count": 0,
"max_count": int(max_count)
},
"device_register": {
"password": None,
"current_count": None,
"max_count": None
}
}
}
current_count = onetime_config["onetime"]["pc_register"]["current_count"]
with open(dislocker.onetime_config_path, "w") as w:
json.dump(onetime_config, w, indent=4)
return {"result": 0, "about": "ok", "output_dict": {"onetime_password": onetime_password, "current_count": current_count, "max_count": max_count}}
except Exception as error:
self.log(title=f"[ERROR] PC登録用のワンタイムパスワード発行中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
return {"result": 1, "about": "error", "output_dict": {"error_class_name": str(error.__class__.__name__), "error_args": str(error.args)}}
def device_onetime_gen(self, **kwargs):
if kwargs.get("max_count") == None:
max_count = 1
elif isinstance(kwargs.get("max_count"), int):
max_count = int(kwargs.get("max_count"))
else:
max_count = 1
try:
if os.path.isfile(dislocker.onetime_config_path):
with open(dislocker.onetime_config_path, "r") as r:
onetime_config = json.load(r)
onetime_password = onetime_config["onetime"]["device_register"]["password"]
if onetime_password == None:
onetime_password = str(self.password_generate(8))
onetime_config["onetime"]["device_register"]["password"] = onetime_password
onetime_config["onetime"]["device_register"]["max_count"] = max_count
onetime_config["onetime"]["device_register"]["current_count"] = 0
current_count = onetime_config["onetime"]["device_register"]["current_count"]
with open(dislocker.onetime_config_path, "w") as w:
json.dump(onetime_config, w, indent=4)
return {"result": 0, "about": "ok", "output_dict": {"onetime_password": onetime_password, "current_count": current_count, "max_count": max_count}}
else:
current_count = onetime_config["onetime"]["device_register"]["current_count"]
max_count = onetime_config["onetime"]["device_register"]["max_count"]
return {"result": 1, "about": "already_exists", "output_dict": {"onetime_password": onetime_password, "current_count": current_count, "max_count": max_count}}
else:
onetime_password = str(self.password_generate(8))
onetime_config = {
"onetime": {
"pc_register": {
"password": None,
"current_count": None,
"max_count": None
},
"device_register": {
"password": onetime_password,
"current_count": 0,
"max_count": int(max_count)
}
}
}
current_count = onetime_config["onetime"]["device_register"]["current_count"]
with open(dislocker.onetime_config_path, "w") as w:
json.dump(onetime_config, w, indent=4)
return {"result": 0, "about": "ok", "output_dict": {"onetime_password": onetime_password, "current_count": current_count, "max_count": max_count}}
except Exception as error:
self.log(title=f"[ERROR] デバイス登録用のワンタイムパスワード発行中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
return {"result": 1, "about": "error", "output_dict": {"error_class_name": str(error.__class__.__name__), "error_args": str(error.args)}}
def show_pc_master_password(self, **kwargs):
try:
if isinstance(kwargs.get("pc_number"), int):
pc_number = int(kwargs.get("pc_number"))
cursor = self.db.cursor()
cursor.execute("SELECT master_password FROM pc_list WHERE pc_number = %s", (pc_number,))
pc_master_password_list = cursor.fetchall()
pc_master_password = pc_master_password_list[0][0]
return {"result": 0, "about": "ok", "output_dict": {"pc_master_password": pc_master_password}}
else:
return {"result": 1, "about": "syntax_error"}
except Exception as error:
self.log(title=f"[ERROR] PCのマスターパスワードを取得中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
return {"result": 1, "about": "error", "output_dict": {"error_class_name": str(error.__class__.__name__), "error_args": str(error.args)}}
finally:
if cursor:
cursor.close()
def get_discord_user_id(self, **kwargs):
try:
member_id = int(kwargs["member_id"])
cursor = self.db.cursor()
cursor.execute("SELECT discord_user_id FROM club_member WHERE member_id = %s", (member_id,))
discord_user_id_list = cursor.fetchall()
discord_user_id = discord_user_id_list[0][0]
return {"result": 0, "about": "ok", "discord_user_id": discord_user_id}
except Exception as error:
self.log(title=f"[ERROR] DiscordのユーザーIDの取得中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
return {"result": 1, "about": "error", "output_dict": {"error_class_name": str(error.__class__.__name__), "error_args": str(error.args)}}
finally:
if cursor:
cursor.close()
def get_pc_list(self, **kwargs):
try:
cursor = self.db.cursor()
if "pc_number" in kwargs:
pc_number = int(kwargs["pc_number"])
cursor.execute("SELECT * FROM pc_list WHERE pc_number = %s", (pc_number,))
pc_list = cursor.fetchall()
return {"result": 0, "about": "ok", "output_dict": {pc_number: {"pc_number": pc_list[0][0], "using_member_id": pc_list[0][1], "pc_token": i[4], "master_password": [0][5], "detail": pc_list[0][6], "alt_name": pc_list[0][7]}}}
else:
cursor.execute("SELECT * FROM pc_list ORDER BY pc_number")
pc_list = cursor.fetchall()
pc_list_base = {}
for i in pc_list:
pc_list_base[i[0]] = {"pc_number": i[0], "using_member_id": i[1], "pc_token": i[4], "master_password": i[5], "detail": i[6], "alt_name": i[7]}
return {"result": 0, "about": "ok", "output_dict": pc_list_base}
except Exception as error:
self.log(title=f"[ERROR] PCリストの取得中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
return {"result": 1, "about": "error", "output_dict": {"error_class_name": str(error.__class__.__name__), "error_args": str(error.args)}}
finally:
if cursor:
cursor.close()
def get_keyboard_list(self, **kwargs):
try:
cursor = self.db.cursor()
if "keyboard_number" in kwargs:
keyboard_number = int(kwargs["keyboard_number"])
cursor.execute("SELECT * FROM keyboard_list WHERE keyboard_number = %s", (keyboard_number,))
keyboard_list = cursor.fetchall()
return {"result": 0, "about": "ok", "output_dict": {keyboard_number: {"keyboard_number": keyboard_list[0][0], "using_member_id": keyboard_list[0][1], "device_instance_path": keyboard_list[0][2], "device_name": keyboard_list[0][3], "detail": keyboard_list[0][4], "alt_name": keyboard_list[0][5]}}}
else:
cursor.execute("SELECT * FROM keyboard_list ORDER BY keyboard_number")
keyboard_list = cursor.fetchall()
keyboard_list_base = {}
for i in keyboard_list:
if i[0] == 0:
pass
else:
keyboard_list_base[i[0]] = {"keyboard_number": i[0], "using_member_id": i[1], "device_instance_path": i[2], "device_name": i[3], "detail": i[4], "alt_name": i[5]}
return {"result": 0, "about": "ok", "output_dict": keyboard_list_base}
except Exception as error:
self.log(title=f"[ERROR] キーボードリストの取得中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
return {"result": 1, "about": "error", "output_dict": {"error_class_name": str(error.__class__.__name__), "error_args": str(error.args)}}
finally:
if cursor:
cursor.close()
def get_mouse_list(self, **kwargs):
try:
cursor = self.db.cursor()
if "mouse_number" in kwargs:
mouse_number = int(kwargs["mouse_number"])
cursor.execute("SELECT * FROM mouse_list WHERE mouse_number = %s", (mouse_number,))
mouse_list = cursor.fetchall()
return {"result": 0, "about": "ok", "output_dict": {mouse_number: {"mouse_number": mouse_list[0][0], "using_member_id": mouse_list[0][1], "device_instance_path": mouse_list[0][2], "device_name": mouse_list[0][3], "detail": mouse_list[0][4], "alt_name": mouse_list[0][5]}}}
else:
cursor.execute("SELECT * FROM mouse_list ORDER BY mouse_number")
mouse_list = cursor.fetchall()
mouse_list_base = {}
for i in mouse_list:
if i[0] == 0:
pass
else:
mouse_list_base[i[0]] = {"mouse_number": i[0], "using_member_id": i[1], "device_instance_path": i[2], "device_name": i[3], "detail": i[4], "alt_name": i[5]}
return {"result": 0, "about": "ok", "output_dict": mouse_list_base}
except Exception as error:
self.log(title=f"[ERROR] マウスリストの取得中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
return {"result": 1, "about": "error", "output_dict": {"error_class_name": str(error.__class__.__name__), "error_args": str(error.args)}}
finally:
if cursor:
cursor.close()
def set_pc_nickname(self, **kwargs):
try:
cursor = self.db.cursor()
if 'pc_number' in kwargs and 'alt_name' in kwargs:
pc_number = int(kwargs["pc_number"])
alt_name = kwargs["alt_name"]
cursor.execute("UPDATE pc_list SET alt_name = %s WHERE pc_number = %s", (alt_name, pc_number))
self.db.commit()
return {"result": 0, "about": "ok"}
else:
return {"result": 1, "about": "syntax_error"}
except Exception as error:
self.log(title=f"[ERROR] PCのニックネームの設定中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
return {"result": 1, "about": "error", "output_dict": {"error_class_name": str(error.__class__.__name__), "error_args": str(error.args)}}
finally:
if cursor:
cursor.close()
def pc_unregister(self, **kwargs):
try:
cursor = self.db.cursor()
if 'pc_number' in kwargs:
pc_number = int(kwargs["pc_number"])
cursor.execute("SELECT * FROM pc_list WHERE pc_number = %s", (pc_number,))
pc_list = cursor.fetchall()
if pc_list:
cursor.execute("UPDATE pc_list SET using_member_id = NULL, password_hash = NULL, pc_uuid = NULL, pc_token = NULL, master_password = NULL, detail = NULL, alt_name = NULL WHERE pc_number = %s", (pc_number,))
self.db.commit()
return {"result": 0, "about": "ok"}
else:
return {"result": 1, "about": "not_exists"}
else:
return {"result": 1, "about": "syntax_error"}
except Exception as error:
self.log(title=f"[ERROR] PCの登録解除中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
return {"result": 1, "about": "error", "output_dict": {"error_class_name": str(error.__class__.__name__), "error_args": str(error.args)}}
finally:
if cursor:
cursor.close()
class ReasonModal(discord.ui.Modal):
def __init__(self, title: str, pc_number: str, keyboard_number: str, mouse_number: str, timeout=15) -> None:
super().__init__(title=title, timeout=timeout)
self.reason_input_form = discord.ui.TextInput(label="使用目的を入力してください", style=discord.TextStyle.short, custom_id=f"register_{pc_number}_{keyboard_number}_{mouse_number}")
self.add_item(self.reason_input_form)
async def on_submit(self, interaction: discord.Interaction) -> None:
custom_id = interaction.data["components"][0]["components"][0]["custom_id"]
custom_id_split = custom_id.split("_")
pc_number = custom_id_split[1]
keyboard_number = custom_id_split[2]
mouse_number = custom_id_split[3]
if keyboard_number == "0":
keyboard_number_show = "自前"
else:
keyboard_number_show = keyboard_number
if mouse_number == "0":
mouse_number_show = "自前"
else:
mouse_number_show = mouse_number
register = dislocker.register(discord_user_id=interaction.user.id, name=interaction.user.name, display_name=interaction.user.display_name, pc_number=pc_number, keyboard_number=keyboard_number, mouse_number=mouse_number, detail=self.reason_input_form.value)
if register["about"] == "ok":
await interaction.response.send_message(f":white_check_mark: 使用が開始されました。\n>>> # パスワード | {register["output_dict"]["password"]}\n## PC番号 | {pc_number}\n## キーボード番号 | {keyboard_number_show}\n## マウス番号 | {mouse_number_show}\n## 使用目的 | {self.reason_input_form.value}", ephemeral=True)
await send_log(mode="use", pc_number=pc_number, keyboard_number=keyboard_number, mouse_number=mouse_number, reason=self.reason_input_form.value, discord_user_id=interaction.user.id)
dislocker.log(title=f"[INFO] PC番号{pc_number} の使用が開始されました。", message=f"名前 | {register["output_dict"]["name"]}, 使用目的 | {self.reason_input_form.value}", flag=0)
elif register["about"] == "pc_already_in_use_by_you":
pc_usage_history = register["pc_usage_history"]
if pc_usage_history["keyboard_number"] == None:
keyboard_number_show = "未認証"
elif pc_usage_history["keyboard_number"] == 0:
keyboard_number_show = "自前"
else:
keyboard_number_show = str(pc_usage_history["keyboard_number"])
if pc_usage_history["mouse_number"] == None:
mouse_number_show = "未認証"
elif pc_usage_history["mouse_number"] == 0:
mouse_number_show = "自前"
else:
mouse_number_show = str(pc_usage_history["mouse_number"])
await interaction.response.send_message(f"# :exploding_head: あなたはPCをもう使用されているようです。\n使用状態を解除するには 終了ボタン で使用終了をお知らせください。\n>>> # PC番号 | {pc_usage_history["pc_number"]}\n# キーボード番号 | {keyboard_number_show}\n# マウス番号 | {mouse_number_show}\n# 使用開始時刻 | {pc_usage_history["start_time"]}", ephemeral=True)
#await interaction.response.send_message(f"# :exploding_head: あなたはPCをもう使用されているようです。\n使用状態を解除するには 終了ボタン で使用終了をお知らせください。\n>>> # PC番号 | {pc_usage_history["pc_number"]}\n# キーボード番号 | {keyboard_number_show}\n# マウス番号 | {mouse_number_show}\n# 使用開始時刻 | {pc_usage_history["start_time"]}\n# 使用目的 | {pc_usage_history["use_detail"]}", ephemeral=True)
elif register["about"] == "pc_already_in_use_by_other":
await interaction.response.send_message(f"# :man_gesturing_no: そのPCは他のメンバーによって使用されています。\n別のPC番号を指定して、再度お試しください。", ephemeral=True)
elif register["about"] == "keyboard_already_in_use":
await interaction.response.send_message(f"# :man_gesturing_no: そのキーボードは他のメンバーによって使用されています。\n別のキーボードのデバイス番号を指定して、再度お試しください。", ephemeral=True)
elif register["about"] == "mouse_already_in_use":
await interaction.response.send_message(f"# :man_gesturing_no: そのマウスは他のメンバーによって使用されています。\n別のマウスのデバイス番号を指定して、再度お試しください。", ephemeral=True)
elif register["about"] == "user_data_not_found":
await interaction.response.send_message("# :dizzy_face: ユーザーとして登録されていないようです。\n最初にサーバーで登録を行ってください。", ephemeral=True)
else:
await interaction.response.send_message("# :skull_crossbones: 登録できませんでした。\n内部エラーが発生しています。", ephemeral=True)
class Monitor():
def __init__(self, **kwargs) -> None:
self.search_frequency = kwargs["search_frequency"]
self.allowable_time = kwargs["allowable_time"]
self.fstop_time = kwargs["fstop_time"]
self.init_wait_time = 10
def start(self, **kwargs):
search_thread = threading.Thread(target=self.search)
search_thread.start()
def search(self):
try:
time.sleep(self.init_wait_time)
while True:
cursor = dislocker.db.cursor()
cursor.execute("SELECT * FROM pc_list WHERE password_hash IS NOT NULL")
pc_list = cursor.fetchall()
current_datetime = datetime.now()
fstop_time = self.fstop_time
if current_datetime.time().strftime("%H:%M:%S") == fstop_time:
dislocker.log(title=f"[INFO] 定期のPCの使用停止処理を開始します。", flag=0)
for i in dislocker.pc_list:
stop = dislocker.force_stop(pc_number=i, bot_about="使用停止忘れによるBotによる強制停止。")
result = {"result": "FSTOP"}
dislocker.log(title=f"[SUCCESS] 定期のPCの使用停止処理は完了しました。", flag=0)
else:
if pc_list:
for i in pc_list:
member_id = i[1]
cursor.execute("SELECT * FROM pc_usage_history WHERE member_id= %s AND end_use_time IS NULL ORDER BY id DESC LIMIT 1", (member_id,))
pc_usage = cursor.fetchall()
start_time = pc_usage[0][5]
time_difference = current_datetime - start_time
dislocker.log(title=f"[INFO] 現在確認されているパスワード未使用のユーザー", message=f"レコード | {str(pc_usage)}, 経過時間(Sec) | {time_difference.seconds}/{timedelta(seconds=self.allowable_time).seconds}", flag=0)
if time_difference.seconds >= timedelta(seconds=self.allowable_time).seconds:
cursor.execute("SELECT * FROM club_member WHERE member_id = %s", (member_id,))
user_info = cursor.fetchall()
stop = dislocker.stop(discord_user_id=user_info[0][3], bot_about="タイムアウトでBotによる強制停止。")
#discord_user_id = dislocker.get_discord_user_id(member_id=member_id)["discord_user_id"]
#asyncio.run_coroutine_threadsafe(send_log(mode="timeout", pc_number=i[0], discord_user_id=discord_user_id))
dislocker.log(title=f"[INFO] パスワードのタイムアウト時間に達したため、強制停止されました。", flag=0)
result = {"result": "STOP", "details": str(pc_usage)}
else:
result = {"result": "BUT SAFE", "details": str(pc_usage)}
else:
result = {"result": "NONE"}
if result["result"] == "NONE":
pass
else:
pass
time.sleep(self.search_frequency)
except Exception as error:
dislocker.log(title=f"[ERROR] 自動停止処理中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
result = {"result": "error"}
dislocker.db.rollback()
finally:
if cursor:
cursor.close()
return result
dislocker = DL()
intents = discord.Intents.default()
intents.message_content = True
client = discord.Client(intents=intents)
tree = discord.app_commands.CommandTree(client)
async def send_log(**kwargs):
try:
pc_number = str(kwargs.get("pc_number"))
discord_user_id = int(kwargs.get("discord_user_id"))
mode = str(kwargs.get("mode"))
if mode == "use":
keyboard_number = int(kwargs.get("keyboard_number"))
mouse_number = int(kwargs.get("mouse_number"))
reason = str(kwargs.get("reason"))
if keyboard_number == 0:
keyboard_number_show = "自前"
else:
keyboard_number_show = str(keyboard_number)
if mouse_number == 0:
mouse_number_show = "自前"
else:
mouse_number_show = str(mouse_number)
log_embed = discord.Embed(title=f":video_game: PC {pc_number} 番 | 使用開始通知", description=f"<@{discord_user_id}> さんはPCの使用を開始しました。", color=0x1343EB)
log_embed.add_field(name="PC番号", value=pc_number)
log_embed.add_field(name="キーボード番号", value=keyboard_number_show)
log_embed.add_field(name="マウス番号", value=mouse_number_show)
log_embed.add_field(name="使用目的", value=reason)
elif mode == "stop":
log_embed = discord.Embed(title=f":stop_button: PC {pc_number} 番 | 使用終了通知", description=f"<@{discord_user_id}> さんはPCの使用を終了しました。", color=0xE512EB)
elif mode == "timeout":
log_embed = discord.Embed(title=f":alarm_clock: PC {pc_number} 番 | タイムアウト通知", description=f"<@{discord_user_id}> さんが指定時間内にPCを使用しなかったため、停止されました。", color=0xE512EB)
elif mode == "userreg":
log_embed = discord.Embed(title=f":bust_in_silhouette: ユーザー登録通知", description=f"<@{discord_user_id}> さんがユーザーとして登録されました。", color=0x1343EB)
elif mode == "fstop":
reason = str(kwargs.get("reason"))
log_embed = discord.Embed(title=f":stop_button: PC {pc_number} 番 | 強制停止通知", description=f"<@{discord_user_id}> さんによってPCの使用は停止されました。", color=0xE512EB)
log_embed.add_field(name="理由", value=reason)
elif mode == "pcnickname":
alt_name = str(kwargs.get("alt_name"))
log_embed = discord.Embed(title=f":pencil: PC {pc_number} 番 | PCのニックネーム変更通知", description=f"<@{discord_user_id}> さんによってPCのニックネームが変更されました。\nボタンに変更を適用する場合は、再度 /init コマンドでボタンを送信して下さい。\n古いボタンを削除することをお忘れなく!", color=0x1343EB)
log_embed.add_field(name="ニックネーム", value=alt_name)
elif mode == "pcunreg":
alt_name = str(kwargs.get("alt_name"))
if alt_name == None:
log_embed = discord.Embed(title=f":x: PC {pc_number} 番 | PCの登録解除通知", description=f"<@{discord_user_id}> さんによってPCの登録が解除されました。", color=0xE512EB)
else:
log_embed = discord.Embed(title=f":x: PC {pc_number} 番 | PCの登録解除通知", description=f"<@{discord_user_id}> さんによってPCの登録が解除されました。", color=0xE512EB)
log_embed.add_field(name="ニックネーム", value=alt_name)
await client.get_channel(dislocker.server_config["bot"]["log_channel_id"]).send(embed=log_embed)
return {"result": 0, "about": "ok"}
except Exception as error:
dislocker.log(title=f"[ERROR] ログ送信中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
return {"result": 1, "about": "error", "output_dict": {"error_class_name": str(error.__class__.__name__), "error_args": str(error.args)}}
@client.event
async def on_ready():
dislocker.log(title=f"[SUCCESS] DiscordのBotが起動しました。", message=f"{client.user.name} としてログインしています。", flag=1)
await tree.sync()
dislocker_activity = discord.Activity(
name=dislocker.server_config["bot"]["activity"]["name"],
type=discord.ActivityType.competing,
details=dislocker.server_config["bot"]["activity"]["details"],
state=dislocker.server_config["bot"]["activity"]["state"]
)
await client.change_presence(activity=dislocker_activity)
@client.event
async def on_message(message):
if message.author.bot:
pass
elif isinstance(message.channel, discord.DMChannel):
if message.author.id in dislocker.server_config["bot"]["admin_user_id"]:
msg_split = message.content.split()
if msg_split[0] == "/pcreg":
max_count = 1
if len(msg_split) == 2:
if msg_split[1].isdecimal():
max_count = int(msg_split[1])
pc_onetime_password_gen = dislocker.pc_onetime_gen(max_count=max_count)
if pc_onetime_password_gen["result"] == 0:
pc_onetime_password = str(pc_onetime_password_gen["output_dict"]["onetime_password"])
pc_onetime_password_max_count = str(pc_onetime_password_gen["output_dict"]["max_count"])
pc_onetime_password_current_count = str(pc_onetime_password_gen["output_dict"]["current_count"])
pc_onetime_password_remaining_times = str(int(pc_onetime_password_max_count) - int(pc_onetime_password_current_count))
result_embed = discord.Embed(title=":dizzy_face: PCの登録", description=f"PC登録時のワンタイムパスワードを発行します。", color=0x2286C9)
result_embed.add_field(name="パスワード", value=pc_onetime_password)
result_embed.add_field(name="最大使用回数", value=pc_onetime_password_max_count)
result_embed.add_field(name="残り使用回数", value=pc_onetime_password_remaining_times)
elif pc_onetime_password_gen["result"] == 1:
if pc_onetime_password_gen["about"] == "already_exists":
pc_onetime_password = str(pc_onetime_password_gen["output_dict"]["onetime_password"])
pc_onetime_password_max_count = str(pc_onetime_password_gen["output_dict"]["max_count"])
pc_onetime_password_current_count = str(pc_onetime_password_gen["output_dict"]["current_count"])
pc_onetime_password_remaining_times = str(int(pc_onetime_password_max_count) - int(pc_onetime_password_current_count))
result_embed = discord.Embed(title=":dizzy_face: PCの登録", description=f"ワンタイムパスワードはもう発行されており、有効です。", color=0x2286C9)
result_embed.add_field(name="パスワード", value=pc_onetime_password)
result_embed.add_field(name="最大使用回数", value=pc_onetime_password_max_count)
result_embed.add_field(name="残り使用回数", value=pc_onetime_password_remaining_times)
else:
result_embed = discord.Embed(title=":x: 操作に失敗しました。", description=f'サーバーでエラーが発生しています。', color=0xC91111)
result_embed.add_field(name=f"{pc_onetime_password_gen['output_dict']['error_class_name']}", value=f"{pc_onetime_password_gen['output_dict']['error_args']}")
await message.channel.send(embed=result_embed)
elif msg_split[0] == "/devreg":
max_count = 1
if len(msg_split) == 2:
if msg_split[1].isdecimal():
max_count = int(msg_split[1])
device_onetime_password_gen = dislocker.device_onetime_gen(max_count=max_count)
if device_onetime_password_gen["result"] == 0:
device_onetime_password = str(device_onetime_password_gen["output_dict"]["onetime_password"])
device_onetime_password_max_count = str(device_onetime_password_gen["output_dict"]["max_count"])
device_onetime_password_current_count = str(device_onetime_password_gen["output_dict"]["current_count"])
device_onetime_password_remaining_times = str(int(device_onetime_password_max_count) - int(device_onetime_password_current_count))
result_embed = discord.Embed(title=":dizzy_face: デバイスの登録", description=f"デバイス登録時のワンタイムパスワードを発行します。", color=0x2286C9)
result_embed.add_field(name="パスワード", value=device_onetime_password)
result_embed.add_field(name="最大使用回数", value=device_onetime_password_max_count)
result_embed.add_field(name="残り使用回数", value=device_onetime_password_remaining_times)
elif device_onetime_password_gen["result"] == 1:
if device_onetime_password_gen["about"] == "already_exists":
device_onetime_password = str(device_onetime_password_gen["output_dict"]["onetime_password"])
device_onetime_password_max_count = str(device_onetime_password_gen["output_dict"]["max_count"])
device_onetime_password_current_count = str(device_onetime_password_gen["output_dict"]["current_count"])
device_onetime_password_remaining_times = str(int(device_onetime_password_max_count) - int(device_onetime_password_current_count))
result_embed = discord.Embed(title=":dizzy_face: デバイスの登録", description=f"ワンタイムパスワードはもう発行されており、有効です。", color=0x2286C9)
result_embed.add_field(name="パスワード", value=device_onetime_password)
result_embed.add_field(name="最大使用回数", value=device_onetime_password_max_count)
result_embed.add_field(name="残り使用回数", value=device_onetime_password_remaining_times)
else:
result_embed = discord.Embed(title=":x: 操作に失敗しました。", description=f'サーバーでエラーが発生しています。', color=0xC91111)
result_embed.add_field(name=f"{device_onetime_password_gen['output_dict']['error_class_name']}", value=f"{device_onetime_password_gen['output_dict']['error_args']}")
await message.channel.send(embed=result_embed)
else:
result_embed = discord.Embed(title=":x: 警告", description=f'DMでの操作はサポートされていません。', color=0xC91111)
await message.channel.send(embed=result_embed)
else:
pass
@client.event
async def on_interaction(interaction: discord.Interaction):
try:
if interaction.data["component_type"] == 2:
await on_button(interaction)
except KeyError:
pass
async def on_button(interaction: discord.Interaction):
custom_id = interaction.data["custom_id"]
custom_id_split = custom_id.split("_")
dislocker.log(title=f"[INFO] ボタンが押されました。", message=f"custom_id | {custom_id}, DiscordユーザーID | {interaction.user.id}", flag=0)
if custom_id_split[0] == "pcregister":
keyboard_register_view = discord.ui.View(timeout=15)
pc_number = custom_id_split[1]
keyboard_list = dislocker.get_keyboard_list()
for i in keyboard_list["output_dict"].keys():
current_keyboard_list = keyboard_list['output_dict'][i]
if current_keyboard_list['alt_name'] == None:
keyboard_register_button = discord.ui.Button(style=discord.ButtonStyle.primary, label=f"{str(current_keyboard_list['keyboard_number'])}", custom_id=f"keyboardregister_{str(pc_number)}_{str(current_keyboard_list['keyboard_number'])}")
else:
keyboard_register_button = discord.ui.Button(style=discord.ButtonStyle.primary, label=f"{str(current_keyboard_list['keyboard_number'])} 番 | ({current_keyboard_list['alt_name']})", custom_id=f"keyboardregister_{str(pc_number)}_{str(current_keyboard_list['keyboard_number'])}")
keyboard_register_view.add_item(keyboard_register_button)
keyboard_not_register_button = discord.ui.Button(style=discord.ButtonStyle.primary, label="キーボードは自前", custom_id=f"keyboardregister_{str(pc_number)}_0")
keyboard_register_view.add_item(keyboard_not_register_button)
await interaction.response.send_message(f"# :keyboard: キーボードのデバイス番号を選んでください!\n>>> # PC番号 | {str(pc_number)}", view=keyboard_register_view, ephemeral=True)
elif custom_id_split[0] == "keyboardregister":
mouse_register_view = discord.ui.View(timeout=15)
pc_number = custom_id_split[1]
keyboard_number = custom_id_split[2]
mouse_list = dislocker.get_mouse_list()
if keyboard_number == "0":
keyboard_number_show = "自前"
else:
keyboard_number_show = keyboard_number
for i in mouse_list["output_dict"].keys():
current_mouse_list = mouse_list['output_dict'][i]
if current_mouse_list['alt_name'] == None:
mouse_register_button = discord.ui.Button(style=discord.ButtonStyle.primary, label=f"{str(current_mouse_list['mouse_number'])}", custom_id=f"mouseregister_{str(pc_number)}_{str(keyboard_number)}_{str(current_mouse_list['mouse_number'])}")
else:
mouse_register_button = discord.ui.Button(style=discord.ButtonStyle.primary, label=f"{str(current_mouse_list['mouse_number'])} 番 | ({current_mouse_list['alt_name']})", custom_id=f"mouseregister_{str(pc_number)}_{str(keyboard_number)}_{str(current_mouse_list['mouse_number'])}")
mouse_register_view.add_item(mouse_register_button)
mouse_not_register_button = discord.ui.Button(style=discord.ButtonStyle.primary, label="マウスは自前", custom_id=f"mouseregister_{str(pc_number)}_{str(keyboard_number)}_0")
mouse_register_view.add_item(mouse_not_register_button)
await interaction.response.send_message(f"# :mouse_three_button: マウスのデバイス番号を選んでください!\n>>> # PC番号 | {str(pc_number)}\n# キーボード番号 | {str(keyboard_number_show)}", view=mouse_register_view, ephemeral=True)
elif custom_id_split[0] == "mouseregister":
pc_number = custom_id_split[1]
keyboard_number = custom_id_split[2]
mouse_number = custom_id_split[3]
if keyboard_number == "0":
keyboard_number_show = "自前"
else:
keyboard_number_show = keyboard_number
if mouse_number == "0":
mouse_number_show = "自前"
else:
mouse_number_show = mouse_number
reason_register_view = discord.ui.View(timeout=15)
for i in dislocker.preset_games:
reason_quick_button = reason_button = discord.ui.Button(style=discord.ButtonStyle.primary, label=f"{str(i)}", custom_id=f"quickreasonregister_{str(pc_number)}_{str(keyboard_number)}_{str(mouse_number)}_{str(i)}")
reason_register_view.add_item(reason_quick_button)
reason_button = discord.ui.Button(style=discord.ButtonStyle.primary, label="使用目的を入力する", custom_id=f"reasonregister_{str(pc_number)}_{str(keyboard_number)}_{str(mouse_number)}")
reason_register_view.add_item(reason_button)
await interaction.response.send_message(f"# :regional_indicator_q: 使用目的を書いてください!\n>>> # PC番号 | {str(pc_number)}\n# キーボード番号 | {str(keyboard_number_show)}\n# マウス番号 | {str(mouse_number_show)}", view=reason_register_view, ephemeral=True)
elif custom_id_split[0] == "quickreasonregister":
pc_number = custom_id_split[1]
keyboard_number = custom_id_split[2]
mouse_number = custom_id_split[3]
if keyboard_number == "0":
keyboard_number_show = "自前"
else:
keyboard_number_show = keyboard_number
if mouse_number == "0":
mouse_number_show = "自前"
else:
mouse_number_show = mouse_number
reason = custom_id_split[4]
register = dislocker.register(discord_user_id=interaction.user.id, name=interaction.user.name, display_name=interaction.user.display_name, pc_number=pc_number, keyboard_number=keyboard_number, mouse_number=mouse_number, detail=reason)
if register["about"] == "ok":
await interaction.response.send_message(f":white_check_mark: 使用が開始されました。\n>>> # パスワード | {register["output_dict"]["password"]}\n## PC番号 | {pc_number}\n## キーボード番号 | {str(keyboard_number_show)}\n## マウス番号 | {str(mouse_number_show)}\n## 使用目的 | {reason}", ephemeral=True)
await send_log(mode="use", pc_number=pc_number, keyboard_number=keyboard_number, mouse_number=mouse_number, reason=reason, discord_user_id=interaction.user.id)
dislocker.log(title=f"[INFO] PC番号{pc_number} の使用が開始されました。", message=f"名前 | {register["output_dict"]["name"]}, 使用目的 | {reason}", flag=0)
elif register["about"] == "pc_already_in_use_by_you":
pc_usage_history = register["pc_usage_history"]
if pc_usage_history["keyboard_number"] == None:
keyboard_number_show = "未認証"
elif pc_usage_history["keyboard_number"] == 0:
keyboard_number_show = "自前"
else:
keyboard_number_show = str(pc_usage_history["keyboard_number"])
if pc_usage_history["mouse_number"] == None:
mouse_number_show = "未認証"
elif pc_usage_history["mouse_number"] == 0:
mouse_number_show = "自前"
else:
mouse_number_show = str(pc_usage_history["mouse_number"])
await interaction.response.send_message(f"# :exploding_head: あなたはPCをもう使用されているようです。\n使用状態を解除するには 終了ボタン で使用終了をお知らせください。\n>>> # PC番号 | {pc_usage_history["pc_number"]}\n# キーボード番号 | {keyboard_number_show}\n# マウス番号 | {mouse_number_show}\n# 使用開始時刻 | {pc_usage_history["start_time"]}", ephemeral=True)
#await interaction.response.send_message(f"# :exploding_head: あなたはPCをもう使用されているようです。\n使用状態を解除するには 終了ボタン で使用終了をお知らせください。\n>>> # PC番号 | {pc_usage_history["pc_number"]}\n# キーボード番号 | {keyboard_number_show}\n# マウス番号 | {mouse_number_show}\n# 使用開始時刻 | {pc_usage_history["start_time"]}\n# 使用目的 | {pc_usage_history["use_detail"]}", ephemeral=True)
elif register["about"] == "pc_already_in_use_by_other":
await interaction.response.send_message(f"# :man_gesturing_no: そのPCは他のメンバーによって使用されています。\n別のPC番号を指定して、再度お試しください。", ephemeral=True)
elif register["about"] == "keyboard_already_in_use":
await interaction.response.send_message(f"# :man_gesturing_no: そのキーボードは他のメンバーによって使用されています。\n別のキーボードのデバイス番号を指定して、再度お試しください。", ephemeral=True)
elif register["about"] == "mouse_already_in_use":
await interaction.response.send_message(f"# :man_gesturing_no: そのマウスは他のメンバーによって使用されています。\n別のマウスのデバイス番号を指定して、再度お試しください。", ephemeral=True)
elif register["about"] == "user_data_not_found":
await interaction.response.send_message("# :dizzy_face: ユーザーとして登録されていないようです。\n最初にサーバーで登録を行ってください。", ephemeral=True)
else:
await interaction.response.send_message("# :skull_crossbones: 登録できませんでした。\n内部エラーが発生しています。", ephemeral=True)
elif custom_id_split[0] == "reasonregister":
pc_number = custom_id_split[1]
keyboard_number = custom_id_split[2]
mouse_number = custom_id_split[3]
if keyboard_number == "0":
keyboard_number_show = "自前"
else:
keyboard_number_show = keyboard_number
if mouse_number == "0":
mouse_number_show = "自前"
else:
mouse_number_show = mouse_number
reason_input_form = ReasonModal(title="Dislocker | 登録", pc_number=str(pc_number), keyboard_number=str(keyboard_number), mouse_number=str(mouse_number))
await interaction.response.send_modal(reason_input_form)
elif custom_id_split[0] == "stop":
pc_stop = dislocker.stop(discord_user_id=interaction.user.id)
stop_view = discord.ui.View(timeout=15)
if pc_stop["about"] == "unused":
await interaction.response.send_message("# :shaking_face: 使用されていないようです...", ephemeral=True)
elif pc_stop["about"] == "user_data_not_found":
await interaction.response.send_message("# :dizzy_face: ユーザーとして登録されていないようです。\n最初にサーバーで登録を行ってください。", ephemeral=True)
elif pc_stop["about"] == "ok":
await interaction.response.send_message(f":white_check_mark: PC番号 {pc_stop["output_dict"]["pc_number"]} の使用が終了されました。", ephemeral=True)
await send_log(mode="stop", pc_number=pc_stop['output_dict']['pc_number'], discord_user_id=interaction.user.id)
else:
await interaction.response.send_message("# :skull_crossbones: 停止できませんでした。\n内部エラーが発生しています。", ephemeral=True)
elif custom_id_split[0] == "user" and custom_id_split[1] == "register":
user_register = dislocker.user_register(name=interaction.user.display_name, discord_user_name=interaction.user.name, discord_user_id=interaction.user.id)
if user_register["about"] == "ok":
await interaction.response.send_message(f"# :white_check_mark: ユーザー情報が登録されました。\n>>> ユーザー名:{interaction.user.display_name}", ephemeral=True)
await send_log(mode="userreg", discord_user_id=interaction.user.id)
elif user_register["about"] == "already_exists":
await interaction.response.send_message("# :no_entry: 登録できませんでした。\nもう登録されている可能性があります。", ephemeral=True)
else:
await interaction.response.send_message("# :no_entry: 登録できませんでした。\n内部エラーが発生しています。", ephemeral=True)
#使用者側のスラッシュコマンド
@tree.command(name="use", description="パソコンの使用登録をします。通常はこのコマンドを使用する必要はありません。")
async def use(interaction: discord.Interaction, pc_number: int, keyboard_number: int, mouse_number: int, reason: str):
register = dislocker.register(discord_user_id=interaction.user.id, name=interaction.user.name, display_name=interaction.user.display_name, pc_number=pc_number, keyboard_number=keyboard_number, mouse_number=mouse_number, detail=reason)
if register["result"] == 0:
await interaction.response.send_message(f":white_check_mark: 使用が開始されました。\n>>> # パスワード | {register["output_dict"]["password"]}\n## PC番号 | {pc_number}\n## 使用目的 | {reason}", ephemeral=True)
dislocker.log(title=f"[INFO] PC番号{pc_number} の使用が開始されました。", message=f"名前 | {register["output_dict"]["name"]}, 使用目的 | {reason}", flag=0)
await send_log(mode="use", pc_number=pc_number, keyboard_number=keyboard_number, mouse_number=mouse_number, reason=reason, discord_user_id=interaction.user.id)
elif register["result"] == 1:
if register["about"] == "pc_already_in_use_by_other":
await interaction.response.send_message(":x: 他の方がそのPCを使用中です。", ephemeral=True)
elif register["about"] == "pc_already_in_use_by_you":
await interaction.response.send_message(f":x: あなたは既にPC {register['pc_usage_history']['pc_number']} を使用中です。\n>>> ## PC番号 | {register['pc_usage_history']['pc_number']}\n## 使用目的 | {register['pc_usage_history']['use_detail']}", ephemeral=True)
elif register["about"] == "keyboard_already_in_use":
await interaction.response.send_message(":x: キーボードは既に使用中です。", ephemeral=True)
elif register["about"] == "mouse_already_in_use":
await interaction.response.send_message(":x: マウスは既に使用中です。", ephemeral=True)
elif register["about"] == "user_data_not_found":
await interaction.response.send_message(":x: ユーザーデータが見つかりませんでした。", ephemeral=True)
elif register["about"] == "error":
await interaction.response.send_message(":x: 内部エラーが発生しました。\nサーバーでエラーが発生しています。管理者に問い合わせてください。", ephemeral=True)
@tree.command(name="stop", description="パソコンの使用を終了します。通常はこのコマンドを使用する必要はありません。")
async def stop(interaction: discord.Interaction):
pc_stop = dislocker.stop(discord_user_id=interaction.user.id)
if pc_stop["result"] == 0:
await interaction.response.send_message(f":white_check_mark: 使用が終了されました。\n>>> ## PC番号 | {pc_stop['output_dict']['pc_number']}", ephemeral=True)
result_embed = discord.Embed(title=":white_check_mark: 使用停止処理は完了しました。", description=f'PC番号 {pc_stop['output_dict']['pc_number']} 番の使用停止処理が完了しました。', color=0x56FF01)
dislocker.log(title=f"[INFO] PC番号{pc_stop['output_dict']['pc_number']} の使用が終了されました。", message=f"名前 | {pc_stop['output_dict']['name']}", flag=0)
await send_log(mode="stop", pc_number=pc_stop['output_dict']['pc_number'], discord_user_id=interaction.user.id)
elif pc_stop["result"] == 1:
if pc_stop["about"] == "unused":
result_embed = discord.Embed(title=":x: 操作に失敗しました。", description=f'あなたはPCを使用されていないようです...', color=0xC91111)
elif pc_stop["about"] == "user_data_not_found":
result_embed = discord.Embed(title=":x: 操作に失敗しました。", description=f'Dislockerのユーザーとして登録されていないようです。\n登録を行ってから、またお試しください。', color=0xC91111)
elif pc_stop["about"] == "error":
result_embed = discord.Embed(title=":x: 操作に失敗しました。", description=f'サーバーでエラーが発生しています。', color=0xC91111)
result_embed.add_field(name=f"{pc_stop['output_dict']['error_class_name']}", value=f"{pc_stop['output_dict']['error_args']}")
await interaction.response.send_message(embed=result_embed, ephemeral=True)
#管理者側のスラッシュコマンド
@tree.command(name="userreg", description="ユーザーを登録します。")
@discord.app_commands.default_permissions(administrator=True)
async def userreg(interaction: discord.Interaction, discord_user_id: str, discord_user_name: str, name: str):
if interaction.guild_id in dislocker.server_config["bot"]["server_id"] or interaction.user.id in dislocker.server_config["bot"]["admin_user_id"]:
user_register = dislocker.user_register(discord_user_id=discord_user_id, discord_user_name=discord_user_name, name=name)
if user_register["result"] == 0:
result_embed = discord.Embed(title=":white_check_mark: ユーザー登録が完了しました。", description=f'続いて、PCの使用登録を行いましょう', color=0x56FF01)
dislocker.log(title=f"[INFO] ユーザーを登録しました。", message=f"名前 | {name}, Discordユーザー名 | {discord_user_name}, DiscordユーザーID | {discord_user_id}", flag=0)
await send_log(mode="userreg", discord_user_id=discord_user_id)
elif user_register["result"] == 1:
if user_register["about"] == "already_exists":
result_embed = discord.Embed(title=":x: 操作に失敗しました。", description=f'既に登録されているユーザーです。', color=0xC91111)
elif user_register["about"] == "error":
result_embed = discord.Embed(title=":x: 操作に失敗しました。", description=f'サーバーでエラーが発生しています。', color=0xC91111)
result_embed.add_field(name=f"{user_register['output_dict']['error_class_name']}", value=f"{user_register['output_dict']['error_args']}")
await interaction.response.send_message(embed=result_embed, ephemeral=True)
@tree.command(name="pcreg", description="PCをDislockerに登録するためのワンタイムパスワードを発行します。")
@discord.app_commands.default_permissions(administrator=True)
async def pcreg(interaction: discord.Interaction, how_much: int = 1):
if interaction.guild_id in dislocker.server_config["bot"]["server_id"] or interaction.user.id in dislocker.server_config["bot"]["admin_user_id"]:
max_count = how_much
pc_onetime_password_gen = dislocker.pc_onetime_gen(max_count=max_count)
if pc_onetime_password_gen["result"] == 0:
pc_onetime_password = str(pc_onetime_password_gen["output_dict"]["onetime_password"])
pc_onetime_password_max_count = str(pc_onetime_password_gen["output_dict"]["max_count"])
pc_onetime_password_current_count = str(pc_onetime_password_gen["output_dict"]["current_count"])
pc_onetime_password_remaining_times = str(int(pc_onetime_password_max_count) - int(pc_onetime_password_current_count))
result_embed = discord.Embed(title=":dizzy_face: PCの登録", description=f"PC登録時のワンタイムパスワードを発行します。", color=0x2286C9)
result_embed.add_field(name="パスワード", value=pc_onetime_password)
result_embed.add_field(name="最大使用回数", value=pc_onetime_password_max_count)
result_embed.add_field(name="残り使用回数", value=pc_onetime_password_remaining_times)
elif pc_onetime_password_gen["result"] == 1:
if pc_onetime_password_gen["about"] == "already_exists":
pc_onetime_password = str(pc_onetime_password_gen["output_dict"]["onetime_password"])
pc_onetime_password_max_count = str(pc_onetime_password_gen["output_dict"]["max_count"])
pc_onetime_password_current_count = str(pc_onetime_password_gen["output_dict"]["current_count"])
pc_onetime_password_remaining_times = str(int(pc_onetime_password_max_count) - int(pc_onetime_password_current_count))
result_embed = discord.Embed(title=":dizzy_face: PCの登録", description=f"ワンタイムパスワードはもう発行されており、有効です。", color=0x2286C9)
result_embed.add_field(name="パスワード", value=pc_onetime_password)
result_embed.add_field(name="最大使用回数", value=pc_onetime_password_max_count)
result_embed.add_field(name="残り使用回数", value=pc_onetime_password_remaining_times)
else:
result_embed = discord.Embed(title=":x: 操作に失敗しました。", description=f'サーバーでエラーが発生しています。', color=0xC91111)
result_embed.add_field(name=f"{pc_onetime_password_gen['output_dict']['error_class_name']}", value=f"{pc_onetime_password_gen['output_dict']['error_args']}")
await interaction.response.send_message(embed=result_embed, ephemeral=True)
@tree.command(name="devicereg", description="デバイスをDislockerに登録するためのワンタイムパスワードを発行します。")
@discord.app_commands.default_permissions(administrator=True)
async def devicereg(interaction: discord.Interaction, how_much: int):
if interaction.guild_id in dislocker.server_config["bot"]["server_id"] or interaction.user.id in dislocker.server_config["bot"]["admin_user_id"]:
max_count = how_much
device_onetime_password_gen = dislocker.device_onetime_gen(max_count=max_count)
if device_onetime_password_gen["result"] == 0:
device_onetime_password = str(device_onetime_password_gen["output_dict"]["onetime_password"])
device_onetime_password_max_count = str(device_onetime_password_gen["output_dict"]["max_count"])
device_onetime_password_current_count = str(device_onetime_password_gen["output_dict"]["current_count"])
device_onetime_password_remaining_times = str(int(device_onetime_password_max_count) - int(device_onetime_password_current_count))
result_embed = discord.Embed(title=":dizzy_face: デバイスの登録", description=f"デバイス登録時のワンタイムパスワードを発行します。", color=0x2286C9)
result_embed.add_field(name="パスワード", value=device_onetime_password)
result_embed.add_field(name="最大使用回数", value=device_onetime_password_max_count)
result_embed.add_field(name="残り使用回数", value=device_onetime_password_remaining_times)
elif device_onetime_password_gen["result"] == 1:
if device_onetime_password_gen["about"] == "already_exists":
device_onetime_password = str(device_onetime_password_gen["output_dict"]["onetime_password"])
device_onetime_password_max_count = str(device_onetime_password_gen["output_dict"]["max_count"])
device_onetime_password_current_count = str(device_onetime_password_gen["output_dict"]["current_count"])
device_onetime_password_remaining_times = str(int(device_onetime_password_max_count) - int(device_onetime_password_current_count))
result_embed = discord.Embed(title=":dizzy_face: デバイスの登録", description=f"ワンタイムパスワードはもう発行されており、有効です。", color=0x2286C9)
result_embed.add_field(name="パスワード", value=device_onetime_password)
result_embed.add_field(name="最大使用回数", value=device_onetime_password_max_count)
result_embed.add_field(name="残り使用回数", value=device_onetime_password_remaining_times)
else:
result_embed = discord.Embed(title=":x: 操作に失敗しました。", description=f'サーバーでエラーが発生しています。', color=0xC91111)
result_embed.add_field(name=f"{device_onetime_password_gen['output_dict']['error_class_name']}", value=f"{device_onetime_password_gen['output_dict']['error_args']}")
await interaction.response.send_message(embed=result_embed, ephemeral=True)
@tree.command(name="fstop", description="PCの使用登録を強制的に終了します。")
@discord.app_commands.default_permissions(administrator=True)
async def fstop(interaction: discord.Interaction, pc_number: int, reason: str):
if interaction.guild_id in dislocker.server_config["bot"]["server_id"] or interaction.user.id in dislocker.server_config["bot"]["admin_user_id"]:
force_stop = dislocker.force_stop(pc_number=pc_number, bot_about=reason)
if force_stop["result"] == 0:
result_embed = discord.Embed(title=":white_check_mark: 処理が完了しました。", description=f'PC番号 {str(pc_number)} 番の使用登録は抹消されました。', color=0x56FF01)
dislocker.log(title=f"[INFO] PC {pc_number} の使用を強制終了しました。", message=f"理由 | {reason}", flag=0)
await send_log(mode="fstop", pc_number=pc_number, discord_user_id=interaction.user.id, reason=reason)
elif force_stop["result"] == 1:
if force_stop["about"] == "not_used":
result_embed = discord.Embed(title=":x: 操作に失敗しました。", description=f'指定されたPCは使用されていないようです...', color=0xC91111)
elif force_stop["about"] == "bot_about_not_found":
result_embed = discord.Embed(title=":x: 操作に失敗しました。", description=f'強制停止する理由を入力してください。', color=0xC91111)
elif force_stop["about"] == "error":
result_embed = discord.Embed(title=":x: 操作に失敗しました。", description=f'サーバーでエラーが発生しています。', color=0xC91111)
result_embed.add_field(name=f"{force_stop['output_dict']['error_class_name']}", value=f"{force_stop['output_dict']['error_args']}")
await interaction.response.send_message(embed=result_embed, ephemeral=True)
@tree.command(name="report", description="PCの使用履歴をエクスポートします。")
@discord.app_commands.default_permissions(administrator=True)
async def report(interaction: discord.Interaction):
if interaction.guild_id in dislocker.server_config["bot"]["server_id"] or interaction.user.id in dislocker.server_config["bot"]["admin_user_id"]:
report_export = dislocker.report_export()
if report_export["result"] == 0:
await interaction.response.send_message(f":white_check_mark: 使用履歴のレポートです。", file=discord.File(report_export["file_path"]), ephemeral=True)
dislocker.log(title=f"[INFO] PCの使用履歴をエクスポートしました。", message=f"ファイルパス | {report_export['file_path']}", flag=0)
elif report_export["result"] == 1:
if report_export["about"] == "error":
await interaction.response.send_message(":x: 内部エラーが発生しました。\nサーバーでエラーが発生しています。管理者に問い合わせてください。", ephemeral=True)
@tree.command(name="init", description="操作チャンネルにボタン一式を送信します。")
@discord.app_commands.default_permissions(administrator=True)
async def button_init(interaction: discord.Interaction, text_channel: discord.TextChannel):
if interaction.guild_id in dislocker.server_config["bot"]["server_id"] or interaction.user.id in dislocker.server_config["bot"]["admin_user_id"]:
pc_list = dislocker.get_pc_list()
user_register_button_view = discord.ui.View(timeout=None)
user_register_button = discord.ui.Button(style=discord.ButtonStyle.green, label="ユーザー登録", custom_id="user_register")
user_register_button_view.add_item(user_register_button)
await client.get_channel(text_channel.id).send(f'# :index_pointing_at_the_viewer: ユーザー登録はお済ですか?', view=user_register_button_view)
stop_button_view = discord.ui.View(timeout=None)
stop_button = discord.ui.Button(style=discord.ButtonStyle.danger, label="PCの使用を停止", custom_id="stop")
stop_button_view.add_item(stop_button)
await client.get_channel(text_channel.id).send(f'# :index_pointing_at_the_viewer: 使用を停止しますか?', view=stop_button_view)
pc_button_view = discord.ui.View(timeout=None)
for i in pc_list["output_dict"].keys():
current_pc_list = pc_list['output_dict'][i]
if current_pc_list['pc_token'] == None:
pass
else:
if current_pc_list['alt_name'] == None:
pc_register_button = discord.ui.Button(style=discord.ButtonStyle.primary, label=f"{str(current_pc_list['pc_number'])}", custom_id=f"pcregister_{str(current_pc_list['pc_number'])}")
else:
pc_register_button = discord.ui.Button(style=discord.ButtonStyle.primary, label=f"{str(current_pc_list['pc_number'])} 番 | ({current_pc_list['alt_name']})", custom_id=f"pcregister_{str(current_pc_list['pc_number'])}")
pc_button_view.add_item(pc_register_button)
await client.get_channel(text_channel.id).send(f'# :index_pointing_at_the_viewer: 使いたいPCの番号を選んでください', view=pc_button_view)
dislocker.log(title=f"[INFO] サーバーで初回処理を実行しました。", flag=0)
result_embed = discord.Embed(title=":white_check_mark: 初回処理が完了しました。", description=f'指定したテキストチャンネルをご確認ください。', color=0x56FF01)
await interaction.response.send_message(embed=result_embed, ephemeral=True)
@tree.command(name="masterpass", description="PCのマスターパスワードを表示します。")
@discord.app_commands.default_permissions(administrator=True)
async def masterpass(interaction: discord.Interaction, pc_number: int):
if interaction.guild_id in dislocker.server_config["bot"]["server_id"] or interaction.user.id in dislocker.server_config["bot"]["admin_user_id"]:
pc_master_password_get = dislocker.show_pc_master_password(pc_number=pc_number)
if pc_master_password_get["result"] == 0:
pc_master_password = pc_master_password_get["output_dict"]["pc_master_password"]
result_embed = discord.Embed(title=":information_source: マスターパスワード", description=f"PC番号 {str(pc_number)} 番のマスターパスワードを表示します。", color=0x2286C9)
result_embed.add_field(name=f"マスターパスワード", value=f"{str(pc_master_password)}")
else:
result_embed = discord.Embed(title=":x: 取得に失敗しました。", description=f'サーバーでエラーが発生しています。', color=0xC91111)
result_embed.add_field(name=f"{pc_master_password_get['output_dict']['error_class_name']}", value=f"{pc_master_password_get['output_dict']['error_args']}")
await interaction.response.send_message(embed=result_embed, ephemeral=True)
@tree.command(name="pcinfo", description="PCの情報を表示します。")
@discord.app_commands.default_permissions(administrator=True)
async def pcinfo(interaction: discord.Interaction):
if interaction.guild_id in dislocker.server_config["bot"]["server_id"] or interaction.user.id in dislocker.server_config["bot"]["admin_user_id"]:
pc_list = dislocker.get_pc_list()
if pc_list["result"] == 0:
result_embed = discord.Embed(title=":information_source: 現在のPCリスト", description="PCリストです。", color=0x2286C9)
for i in pc_list['output_dict'].keys():
current_pc_list = pc_list['output_dict'][i]
if current_pc_list['pc_token'] == None:
pass
else:
if current_pc_list['alt_name'] == None:
pc_name_title = f'{current_pc_list['pc_number']}'
else:
pc_name_title = f'{current_pc_list['pc_number']} 番 ({current_pc_list['alt_name']})'
if current_pc_list['using_member_id'] == None:
pc_using_value = f'未使用'
else:
discord_user_id = dislocker.get_discord_user_id(member_id=current_pc_list['using_member_id'])['discord_user_id']
pc_using_value = f'<@{discord_user_id}> が使用中'
result_embed.add_field(name=f'{pc_name_title}', value=f'{pc_using_value}')
else:
result_embed = discord.Embed(title=":x: 操作に失敗しました。", description=f'サーバーでエラーが発生しています。', color=0xC91111)
result_embed.add_field(name=f"{pc_list['output_dict']['error_class_name']}", value=f"{pc_list['output_dict']['error_args']}")
await interaction.response.send_message(embed=result_embed, ephemeral=True)
@tree.command(name="pcnickname", description="PCにニックネームを設定します。")
@discord.app_commands.default_permissions(administrator=True)
async def pcnickname(interaction: discord.Interaction, pc_number: int, nickname: str):
if interaction.guild_id in dislocker.server_config["bot"]["server_id"] or interaction.user.id in dislocker.server_config["bot"]["admin_user_id"]:
pc_nickname_set = dislocker.set_pc_nickname(pc_number=pc_number, alt_name=nickname)
if pc_nickname_set["result"] == 0:
result_embed = discord.Embed(title=":white_check_mark: 操作が完了しました。", description=f'PC番号 {str(pc_number)} のニックネームは {str(nickname)} に設定されました。', color=0x56FF01)
await send_log(mode="pcnickname", pc_number=pc_number, discord_user_id=interaction.user.id, alt_name=nickname)
else:
result_embed = discord.Embed(title=":x: 操作に失敗しました。", description=f'サーバーでエラーが発生しています。ニックネームは変更されません。', color=0xC91111)
result_embed.add_field(name=f"{pc_nickname_set['output_dict']['error_class_name']}", value=f"{pc_nickname_set['output_dict']['error_args']}")
await interaction.response.send_message(embed=result_embed, ephemeral=True)
@tree.command(name="pcunreg", description="PCの登録を解除します。")
@discord.app_commands.default_permissions(administrator=True)
async def pcunreg(interaction: discord.Interaction, pc_number:int):
if interaction.guild_id in dislocker.server_config["bot"]["server_id"] or interaction.user.id in dislocker.server_config["bot"]["admin_user_id"]:
pc_dict = dislocker.get_pc_list()['output_dict'].get(pc_number)
if pc_dict == None:
result_embed = discord.Embed(title=":x: 操作に失敗しました。", description=f'指定されたPC番号には登録されていません。', color=0xC91111)
await interaction.response.send_message(embed=result_embed, ephemeral=True)
else:
nickname = pc_dict['alt_name']
pc_unregister = dislocker.pc_unregister(pc_number=pc_number)
if pc_unregister["result"] == 0:
if nickname == None:
result_embed = discord.Embed(title=":white_check_mark: 操作が完了しました。", description=f'PC {str(pc_number)} 番 の登録は解除されました。', color=0x56FF01)
else:
result_embed = discord.Embed(title=":white_check_mark: 操作が完了しました。", description=f'PC {str(pc_number)} 番 | {str(nickname)}の登録は解除されました。', color=0x56FF01)
await send_log(mode="pcunreg", pc_number=pc_number, discord_user_id=interaction.user.id, alt_name=nickname)
else:
result_embed = discord.Embed(title=":x: 操作に失敗しました。", description=f'サーバーでエラーが発生しています。登録は解除されません。', color=0xC91111)
result_embed.add_field(name=f"{pc_unregister['output_dict']['error_class_name']}", value=f"{pc_unregister['output_dict']['error_args']}")
await interaction.response.send_message(embed=result_embed, ephemeral=True)
if dislocker.init_result == "ok":
print("Botを起動します...")
monitor = Monitor(search_frequency=dislocker.server_config["bot"]["monitor"]["search_frequency"], allowable_time=dislocker.server_config["bot"]["monitor"]["allowable_time"], fstop_time=dislocker.server_config["bot"]["monitor"]["fstop_time"])
monitor.start()
client.run(dislocker.server_config["bot"]["token"])
else:
pass