Dislocker/dislocker_slash.py

1342 lines
84 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import discord
import os
import json
import psycopg2
from psycopg2 import sql
from datetime import datetime, timedelta
import asyncio
import string
import random
import hashlib
import openpyxl
from openpyxl import Workbook
import threading
import time
class DL():
def __init__(self):
self.config_dir_path = "./config/"
self.export_dir_path = "./export/"
self.log_dir_path = "./log/"
self.server_config_path = self.config_dir_path + "server.json"
self.onetime_config_path = self.config_dir_path + "onetime.json"
self.log_path = self.log_dir_path + "dislocker.txt"
try:
if not os.path.isdir(self.config_dir_path):
print("config ディレクトリが見つかりません... 作成します。")
os.mkdir(self.config_dir_path)
if not os.path.isfile(self.server_config_path):
print("config ファイルが見つかりません... 作成します。")
self.server_config = {
"db": {
"host": "localhost",
"port": "5432",
"db_name": "dislocker",
"username": "user",
"password": "password"
},
"bot": {
"server_id": ["TYPE HERE SERVER ID (YOU MUST USE INT !!!!)"],
"token": "TYPE HERE BOTS TOKEN KEY",
"activity": {
"name": "Dislocker",
"details": "ロック中...",
"type": "playing",
"state": "ロック中..."
},
"log_channel_id" : "TYPE HERE CHANNEL ID (YOU MUST USE INT !!!!)",
"config_channel_id": "TYPE HERE CHANNEL ID (YOU MUST USE INT !!!!)",
"config_public_channel_id": "TYPE HERE CHANNEL ID (YOU MUST USE INT !!!!)",
"monitor": {
"search_frequency": 1,
"allowable_time": 180,
"fstop_time": "21:00:00"
},
"preset_games": ["TEST1", "TEST2", "TEST3", "TEST4", "TEST5"],
"admin_user_id": ["TYPE HERE CHANNEL ID (YOU MUST USE INT !!!!)"],
"debug": False
}
}
with open(self.server_config_path, "w", encoding="utf-8") as w:
json.dump(self.server_config, w, indent=4, ensure_ascii=False)
elif os.path.isfile(self.server_config_path):
with open(self.server_config_path, "r", encoding="utf-8") as r:
self.server_config = json.load(r)
print("config ファイルを読み込みました。")
if not os.path.isdir(self.export_dir_path):
print("export ディレクトリが見つかりません... 作成します。")
os.mkdir(self.export_dir_path)
if not os.path.isdir(self.log_dir_path):
print("log ディレクトリが見つかりません... 作成します。")
os.mkdir(self.log_dir_path)
if os.path.isfile(self.onetime_config_path):
print("ワンタイムパスワードが見つかりました。削除します。")
os.remove(self.onetime_config_path)
if type(self.server_config["bot"]["log_channel_id"]) is not int or type(self.server_config["bot"]["config_channel_id"]) is not int:
print("config ファイル内でチャンネルIDがint型で記述されていません。int型で記述して、起動してください。")
self.init_result = "not_int"
else:
self.db = psycopg2.connect(f"host={self.server_config['db']['host']} dbname={self.server_config['db']['db_name']} port={self.server_config['db']['port']} user={self.server_config['db']['username']} password={self.server_config['db']['password']}")
cursor = self.db.cursor()
self.pc_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
self.keyboard_list = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
self.mouse_list = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
self.preset_games = self.server_config["bot"]["preset_games"]
self.debug = self.server_config["bot"]["debug"]
cursor.execute("SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'club_member')")
find_club_member_table = cursor.fetchall()
print(find_club_member_table)
if find_club_member_table[0][0] == False:
cursor.execute("CREATE TABLE club_member (member_id SERIAL NOT NULL, name TEXT NOT NULL, discord_user_name TEXT NOT NULL, discord_user_id TEXT NOT NULL, PRIMARY KEY (member_id))")
self.db.commit()
cursor.execute("SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'pc_list')")
find_pc_list_table = cursor.fetchall()
print(find_pc_list_table)
if find_pc_list_table[0][0] == False:
cursor.execute("CREATE TABLE pc_list (pc_number INTEGER NOT NULL, using_member_id INTEGER, password_hash VARCHAR(64), pc_uuid VARCHAR(36), pc_token VARCHAR(36), master_password VARCHAR(16), detail TEXT, alt_name TEXT, PRIMARY KEY (pc_number), FOREIGN KEY (using_member_id) REFERENCES club_member(member_id))")
for i in self.pc_list:
print(i)
cursor.execute("INSERT INTO pc_list (pc_number) VALUES (%s)", (i,))
self.db.commit()
cursor.execute("SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'keyboard_list')")
find_keyboard_list_table = cursor.fetchall()
print(find_keyboard_list_table)
if find_keyboard_list_table[0][0] == False:
cursor.execute("CREATE TABLE keyboard_list (keyboard_number INTEGER NOT NULL, using_member_id INTEGER, device_instance_path TEXT, device_name TEXT, detail TEXT, alt_name TEXT, PRIMARY KEY (keyboard_number), FOREIGN KEY (using_member_id) REFERENCES club_member(member_id))")
for i in self.keyboard_list:
print(i)
cursor.execute("INSERT INTO keyboard_list (keyboard_number) VALUES (%s)", (i,))
self.db.commit()
cursor.execute("SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'mouse_list')")
find_mouse_list_table = cursor.fetchall()
print(find_mouse_list_table)
if find_mouse_list_table[0][0] == False:
cursor.execute("CREATE TABLE mouse_list (mouse_number INTEGER NOT NULL, using_member_id INTEGER, device_instance_path TEXT, device_name TEXT, detail TEXT, alt_name TEXT, PRIMARY KEY (mouse_number), FOREIGN KEY (using_member_id) REFERENCES club_member(member_id))")
for i in self.mouse_list:
print(i)
cursor.execute("INSERT INTO mouse_list (mouse_number) VALUES (%s)", (i,))
self.db.commit()
cursor.execute("SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'pc_usage_history')")
find_pc_usage_history_table = cursor.fetchall()
print(find_pc_usage_history_table)
if find_pc_usage_history_table[0][0] == False:
cursor.execute("CREATE TABLE pc_usage_history (id SERIAL NOT NULL, member_id INTEGER NOT NULL, pc_number INTEGER NOT NULL, keyboard_number INTEGER, mouse_number INTEGER, start_use_time TIMESTAMP NOT NULL, end_use_time TIMESTAMP, use_detail TEXT, bot_about TEXT, PRIMARY KEY (id), FOREIGN KEY (member_id) REFERENCES club_member(member_id), FOREIGN KEY (pc_number) REFERENCES pc_list(pc_number), FOREIGN KEY (keyboard_number) REFERENCES keyboard_list(keyboard_number), FOREIGN KEY (mouse_number) REFERENCES mouse_list(mouse_number))")
self.db.commit()
cursor.close()
self.init_result = "ok"
except (Exception) as error:
print("初回処理でエラーが発生しました。\nエラー内容\n" + str(error))
self.init_result = "error"
finally:
pass
def log(self, **kwargs):
if self.debug == True:
flag = 1
else:
if "flag" in kwargs:
if kwargs["flag"] == 1:
flag = 1
else:
flag = 0
else:
flag = 0
if flag == 1:
title = str(kwargs["title"])
if "message" in kwargs:
message = str(kwargs["message"])
else:
message = None
current_datetime = str(datetime.now())
if message == None:
detail = f"{current_datetime} | {title}\n"
else:
detail = f"{current_datetime} | {title}\n{message}\n"
print(detail)
if os.path.isfile(self.log_path):
try:
with open(self.log_path, "a", encoding="utf-8") as a:
a.write(detail)
except:
print("LOGGING ERROR mode a")
else:
try:
with open(self.log_path, "w", encoding="utf-8") as w:
w.write(detail)
except:
print("LOGGING ERROR mode w")
def password_generate(self, length):
numbers = string.digits # (1)
password = ''.join(random.choice(numbers) for _ in range(length)) # (2)
return password
def hash_genarate(self, source):
hashed = hashlib.sha256(source.encode())
return hashed.hexdigest()
def user_register_check(self, **kwargs):
try:
discord_user_id = str(kwargs["discord_user_id"])
cursor = self.db.cursor()
cursor.execute("SELECT * FROM club_member WHERE discord_user_id = %s", (discord_user_id,))
user_record = cursor.fetchall()
#ユーザーデータが見つかった場合(登録済みの場合)
if user_record:
member_id = user_record[0][0]
name = user_record[0][1]
discord_user_name = user_record[0][2]
return {"result": 0, "about": "exist", "user_info": {"member_id": member_id, "name": name, "discord_user_name": discord_user_name}}
#ユーザーデータがなかったら(未登録の場合)
else:
return {"result": 1, "about": "user_data_not_found"}
except Exception as error:
self.log(title=f"[ERROR] ユーザーの登録状態を調査中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
return {"result": 1, "about": "error"}
finally:
cursor.close()
def pc_used_check(self, **kwargs):
try:
if "pc_number" in kwargs:
pc_number = int(kwargs["pc_number"])
else:
pc_number = None
if "discord_user_id" in kwargs:
discord_user_id = str(kwargs["discord_user_id"])
else:
discord_user_id = None
if "member_id" in kwargs:
member_id = int(kwargs["member_id"])
else:
member_id = None
cursor = self.db.cursor()
if pc_number != None:
# pc番号を指定してpc_listから探す
cursor.execute("SELECT * FROM pc_list WHERE pc_number= %s", (pc_number,))
pc_list_record = cursor.fetchall()
if pc_list_record[0][1] == None:
return {"result": 0, "about": "vacent"}
else:
return {"result": 1, "about": "used_by_other"}
elif discord_user_id != None:
#ユーザーIDを指定してPC使用履歴から探す
cursor.execute("SELECT * FROM club_member WHERE discord_user_id = %s", (discord_user_id,))
user_record = cursor.fetchall()
#ユーザーデータが見つかった場合(登録済みの場合)
if user_record:
member_id = user_record[0][0]
cursor.execute("SELECT * FROM pc_usage_history WHERE member_id = %s ORDER BY id DESC LIMIT 1", (member_id,))
pc_usage_history_record = cursor.fetchall()
if pc_usage_history_record:
if pc_usage_history_record[0][6] == None:
keyboard_number = pc_usage_history_record[0][3]
mouse_number = pc_usage_history_record[0][4]
return {"result": 1, "about": "used_by_you", "pc_usage_history": {"pc_number": str(pc_usage_history_record[0][2]), "keyboard_number": keyboard_number, "mouse_number": mouse_number, "start_time": str(pc_usage_history_record[0][5]), "use_detail": str(pc_usage_history_record[0][7])}}
else:
return {"result": 0, "about": "vacent"}
else:
return {"result": 0, "about": "vacent"}
else:
return {"result": 1, "about": "user_data_not_found"}
elif member_id != None:
cursor.execute("SELECT * FROM pc_usage_history WHERE member_id = %s ORDER BY id DESC LIMIT 1", (member_id,))
pc_usage_history_record = cursor.fetchall()
if pc_usage_history_record:
if pc_usage_history_record[0][6] == None:
keyboard_number = pc_usage_history_record[0][3]
mouse_number = pc_usage_history_record[0][4]
return {"result": 1, "about": "used_by_you", "pc_usage_history": {"pc_number": str(pc_usage_history_record[0][2]), "keyboard_number": keyboard_number, "mouse_number": mouse_number, "start_time": str(pc_usage_history_record[0][5]), "use_detail": str(pc_usage_history_record[0][7])}}
else:
return {"result": 0, "about": "vacent"}
else:
return {"result": 0, "about": "vacent"}
else:
return {"result": 1, "about": "search_options_error"}
except Exception as error:
self.log(title=f"[ERROR] PCの使用状況を調査中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
return {"result": 1, "about": "error"}
finally:
if cursor:
cursor.close()
def keyboard_used_check(self, **kwargs):
try:
cursor = self.db.cursor()
if kwargs["keyboard_number"] == None:
return {"result": 0, "about": "ok"}
else:
keyboard_number = int(kwargs["keyboard_number"])
cursor.execute("SELECT * FROM keyboard_list WHERE keyboard_number=%s", (keyboard_number,))
keyboard_list_record = cursor.fetchall()
if keyboard_list_record[0][1] == None:
return {"result": 0, "about": "ok"}
else:
return {"result": 1, "about": "keyboard_already_in_use_by_other"}
except Exception as error:
self.log(title=f"[ERROR] キーボードの使用状況を調査中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
return {"result": 1, "about": "error"}
finally:
if cursor:
cursor.close()
def mouse_used_check(self, **kwargs):
try:
cursor = self.db.cursor()
if kwargs["mouse_number"] == None:
return {"result": 0, "about": "ok"}
else:
mouse_number = int(kwargs["mouse_number"])
cursor.execute("SELECT * FROM mouse_list WHERE mouse_number=%s", (mouse_number,))
mouse_list_record = cursor.fetchall()
if mouse_list_record[0][1] == None:
return {"result": 0, "about": "ok"}
else:
return {"result": 1, "about": "mouse_already_in_use_by_other"}
except Exception as error:
self.log(title=f"[ERROR] マウスの使用状況を調査中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
return {"result": 1, "about": "error"}
finally:
if cursor:
cursor.close()
def register(self, **kwargs):
try:
cursor = self.db.cursor()
user_info = {
"id": str(kwargs["discord_user_id"]),
"name": str(kwargs["name"]),
"display_name": str(kwargs["display_name"]),
"pc_number": int(kwargs["pc_number"]),
"keyboard_number": 0,
"mouse_number": 0,
"detail": None
}
if "detail" in kwargs:
user_info["detail"] = str(kwargs["detail"])
else:
pass
if kwargs["keyboard_number"] == "own":
pass
else:
user_info["keyboard_number"] = int(kwargs["keyboard_number"])
if kwargs["mouse_number"] == "own":
pass
else:
user_info["mouse_number"] = int(kwargs["mouse_number"])
# ユーザー登録されているかの確認
user_register = self.user_register_check(discord_user_id=user_info["id"])
if user_register["result"] == 0:
member_id = user_register["user_info"]["member_id"]
name = user_register["user_info"]["name"]
# ユーザーがPCを使っているか
pc_check_self = self.pc_used_check(member_id=member_id)
if pc_check_self["result"] == 0:
# 他の人がそのPCを使っているか
pc_check = self.pc_used_check(pc_number=user_info["pc_number"])
if pc_check["result"] == 0:
# キーボードは使われているか
keyboard_check = self.keyboard_used_check(keyboard_number=user_info["keyboard_number"])
if keyboard_check["result"] == 0:
# マウスは使われているか
mouse_check = self.mouse_used_check(mouse_number=user_info["mouse_number"])
if mouse_check["result"] == 0:
# パスワードとハッシュ作成
password = self.password_generate(4)
password_hash = self.hash_genarate(password)
# PC使用履歴のテーブルにレコードを挿入
cursor.execute("INSERT INTO pc_usage_history (member_id, pc_number, keyboard_number, mouse_number, start_use_time, use_detail) VALUES (%s, %s, %s, %s, clock_timestamp(), %s)", (member_id, user_info["pc_number"], user_info["keyboard_number"], user_info["mouse_number"], user_info["detail"]))
# PCリストの該当のレコードを更新
cursor.execute("UPDATE pc_list SET using_member_id = %s, password_hash = %s WHERE pc_number = %s", (member_id, password_hash, user_info["pc_number"]))
# キーボードリストの該当のレコードを自前(None)だったらスキップ、借りていたら更新
if user_info["keyboard_number"] == 0:
pass
else:
cursor.execute("UPDATE keyboard_list SET using_member_id = %s WHERE keyboard_number = %s", (member_id, user_info["keyboard_number"]))
# マウスも同様に
if user_info["mouse_number"] == 0:
pass
else:
cursor.execute("UPDATE mouse_list SET using_member_id = %s WHERE mouse_number = %s", (member_id, user_info["mouse_number"]))
self.db.commit()
return {"result": 0, "about": "ok", "output_dict": {"password": str(password), "name": str(name)}}
else:
return {"result": 1, "about": "mouse_already_in_use"}
else:
return {"result": 1, "about": "keyboard_already_in_use"}
else:
return {"result": 1, "about": "pc_already_in_use_by_other"}
else:
return {"result": 1, "about": "pc_already_in_use_by_you", "pc_usage_history": {"pc_number": pc_check_self["pc_usage_history"]["pc_number"], "keyboard_number": pc_check_self["pc_usage_history"]["keyboard_number"], "mouse_number": pc_check_self["pc_usage_history"]["mouse_number"], "start_time": pc_check_self["pc_usage_history"]["start_time"], "use_detail": pc_check_self["pc_usage_history"]["use_detail"]}}
else:
return {"result": 1, "about": "user_data_not_found"}
except Exception as error:
self.log(title=f"[ERROR] PCの使用登録中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
return {"result": 1, "about": "error"}
finally:
if cursor:
cursor.close()
def stop(self, **kwargs):
try:
cursor = self.db.cursor()
discord_user_id = str(kwargs["discord_user_id"])
if "bot_about" in kwargs:
bot_about = kwargs["bot_about"]
else:
bot_about = None
# ユーザーが登録してるかというよりはデータの取得のため
user_register = self.user_register_check(discord_user_id=discord_user_id)
member_id = user_register["user_info"]["member_id"]
name = user_register["user_info"]["name"]
if user_register["result"] == 0:
cursor.execute("SELECT * FROM pc_usage_history WHERE member_id= %s ORDER BY id DESC LIMIT 1", (member_id,))
pc_usage_history_record = cursor.fetchall()
if pc_usage_history_record:
pc_usage_history_id = pc_usage_history_record[0][0]
pc_number = pc_usage_history_record[0][2]
keyboard_number = pc_usage_history_record[0][3]
mouse_number = pc_usage_history_record[0][4]
end_use_time = pc_usage_history_record[0][6]
# 使用中のとき (使用停止時間がNoneのとき)
if end_use_time == None:
# 利用停止の理由の有無を判断
if bot_about == None:
cursor.execute("UPDATE pc_usage_history SET end_use_time = clock_timestamp() WHERE id = %s", (pc_usage_history_id,))
else:
cursor.execute("UPDATE pc_usage_history SET end_use_time = clock_timestamp(), bot_about = %s WHERE id = %s", (bot_about, pc_usage_history_id))
# pc_listの使用中ユーザーを消す
cursor.execute("UPDATE pc_list SET using_member_id = NULL, password_hash = NULL WHERE pc_number = %s", (pc_number,))
if keyboard_number == None:
pass
else:
# keyboard_listの使用中ユーザーを消す
cursor.execute("UPDATE keyboard_list SET using_member_id = NULL WHERE keyboard_number = %s", (keyboard_number,))
if mouse_number == None:
pass
else:
# mouse_listの使用中ユーザーを消す
cursor.execute("UPDATE mouse_list SET using_member_id = NULL WHERE mouse_number = %s", (mouse_number,))
self.db.commit()
return {"result": 0, "about": "ok", "output_dict": {"pc_number": str(pc_number), "name": str(name)}}
else:
return {"result": 1, "about": "unused"}
else:
return {"result": 1, "about": "unused"}
else:
return {"result": 1, "about": "user_data_not_found"}
except Exception as error:
self.log(title=f"[ERROR] PCの使用停止処理中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
return {"result": 1, "about": "error"}
finally:
if cursor:
cursor.close()
def user_register(self, **kwargs):
try:
discord_user_id = str(kwargs["discord_user_id"])
discord_user_name = str(kwargs["discord_user_name"])
name = str(kwargs["name"])
cursor = self.db.cursor()
cursor.execute("SELECT * FROM club_member WHERE discord_user_id = %s", (discord_user_id,))
user_record = cursor.fetchall()
if not user_record:
cursor.execute("INSERT INTO club_member (name, discord_user_name, discord_user_id) VALUES (%s, %s, %s)", (name, discord_user_name, discord_user_id))
self.db.commit()
return {"result": 0, "about": "ok"}
else:
return {"result": 1, "about": "already_exists"}
except Exception as error:
self.log(title=f"[ERROR] ユーザー情報の登録中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
return {"result": 1, "about": "error"}
finally:
if cursor:
cursor.close()
def format_datetime(self, value):
if isinstance(value, datetime):
return value.strftime('%Y-%m-%d %H:%M:%S')
return value
def pc_register(self, **kwargs):
try:
pc_number = int(kwargs["pc_number"])
cursor = self.db.cursor()
cursor.execute("SELECT * FROM pc_list WHERE pc_number = %s", (pc_number,))
pc_list = cursor.fetchall()
if not pc_list:
cursor.execute("INSERT INTO pc_list (pc_number) VALUES (%s)", (pc_number,))
self.db.commit()
return {"result": 0, "about": "ok"}
else:
return {"result": 1, "about": "already_exists"}
except Exception as error:
self.log(title=f"[ERROR] PCの情報を登録中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
return {"result": 1, "about": "error"}
finally:
if cursor:
cursor.close()
def report_export(self, **kwargs):
try:
cursor = self.db.cursor()
csv_file_path = self.export_dir_path + "pc_usage_history.csv"
main_table = "pc_usage_history"
related_table = "club_member"
excel_file_path = self.export_dir_path + "pc_usage_history.xlsx"
# メインテーブルの列情報を取得user_idを除く
cursor.execute(sql.SQL("SELECT * FROM {} LIMIT 0").format(sql.Identifier(main_table)))
main_columns = [desc[0] for desc in cursor.description if desc[0] != 'member_id']
# クエリを作成(列名を明確に指定)
query = sql.SQL("""
SELECT {main_columns}, {related_table}.name
FROM {main_table}
LEFT JOIN {related_table} ON {main_table}.member_id = {related_table}.member_id
ORDER BY id
""").format(
main_columns=sql.SQL(', ').join([sql.SQL("{}.{}").format(sql.Identifier(main_table), sql.Identifier(col)) for col in main_columns]),
main_table=sql.Identifier(main_table),
related_table=sql.Identifier(related_table)
)
cursor.execute(query)
# 列名を再構成nameを2番目に配置
column_names = [main_columns[0], 'name'] + main_columns[1:]
rows = cursor.fetchall()
# Excelワークブックを作成
wb = Workbook()
ws = wb.active
# 列名を書き込み
ws.append(column_names)
# データを書き込み
for row in rows:
# nameを2番目に移動
formatted_row = [self.format_datetime(row[0])] + [row[-1]] + [self.format_datetime(field) if field is not None else '' for field in row[1:-1]]
ws.append(formatted_row)
# 列幅を自動調整
for col in ws.columns:
max_length = 0
column = col[0].column_letter
for cell in col:
try:
if len(str(cell.value)) > max_length:
max_length = len(str(cell.value))
except:
pass
adjusted_width = (max_length + 2) * 1.2
ws.column_dimensions[column].width = adjusted_width
# Excelファイルを保存
wb.save(excel_file_path)
self.log(title=f"[SUCCESS] PCの使用履歴をエクスポートしました。", message=f"ファイルパス | {excel_file_path}", flag=0)
return {"result": 0, "about": "ok", "file_path": excel_file_path}
except Exception as error:
self.log(title=f"[ERROR] PCの使用履歴をエクスポート中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
return {"result": 1, "about": "error"}
finally:
if cursor:
cursor.close()
def force_stop(self, **kwargs):
try:
pc_number = kwargs["pc_number"]
cursor = self.db.cursor()
if "bot_about" in kwargs:
bot_about = kwargs["bot_about"]
cursor.execute("SELECT * FROM pc_list WHERE pc_number = %s", (pc_number,))
pc_list_record = cursor.fetchall()
pc_using_member_id = pc_list_record[0][1]
pc_password_hash = pc_list_record[0][2]
if pc_using_member_id == None:
return {"result": 1, "about": "not_used"}
else:
cursor.execute("UPDATE pc_list SET using_member_id = NULL WHERE pc_number = %s", (pc_number,))
if pc_password_hash == None:
pass
else:
cursor.execute("UPDATE pc_list SET password_hash = NULL WHERE pc_number = %s", (pc_number,))
cursor.execute("SELECT * FROM pc_usage_history WHERE member_id = %s AND pc_number = %s ORDER BY id DESC LIMIT 1", (pc_using_member_id, pc_number))
pc_usage_history_record = cursor.fetchall()
pc_usage_history_record_id = pc_usage_history_record[0][0]
keyboard_number = pc_usage_history_record[0][3]
mouse_number = pc_usage_history_record[0][4]
if keyboard_number == None:
pass
else:
# keyboard_listの使用中ユーザーを消す
cursor.execute("UPDATE keyboard_list SET using_member_id = NULL WHERE keyboard_number = %s", (keyboard_number,))
if mouse_number == None:
pass
else:
# mouse_listの使用中ユーザーを消す
cursor.execute("UPDATE mouse_list SET using_member_id = NULL WHERE mouse_number = %s", (mouse_number,))
cursor.execute("UPDATE pc_usage_history SET end_use_time = clock_timestamp(), bot_about = %s WHERE id = %s", (bot_about, pc_usage_history_record_id))
self.db.commit()
return {"result": 0, "about": "ok"}
else:
return {"result": 1, "about": "bot_about_not_found"}
except Exception as error:
self.log(title=f"[ERROR] fstop中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
return {"result": 1, "about": "error"}
finally:
if cursor:
cursor.close()
def pc_onetime_gen(self, **kwargs):
if kwargs.get("max_count") == None:
max_count = 1
elif isinstance(kwargs.get("max_count"), int):
max_count = int(kwargs.get("max_count"))
else:
max_count = 1
try:
if os.path.isfile(dislocker.onetime_config_path):
with open(dislocker.onetime_config_path, "r") as r:
onetime_config = json.load(r)
onetime_password = onetime_config["onetime"]["pc_register"]["password"]
if onetime_password == None:
onetime_password = str(self.password_generate(8))
onetime_config["onetime"]["pc_register"]["password"] = onetime_password
onetime_config["onetime"]["pc_register"]["max_count"] = max_count
onetime_config["onetime"]["pc_register"]["current_count"] = 0
current_count = onetime_config["onetime"]["pc_register"]["current_count"]
with open(dislocker.onetime_config_path, "w") as w:
json.dump(onetime_config, w, indent=4)
return {"result": 0, "about": "ok", "output_dict": {"onetime_password": onetime_password, "current_count": current_count, "max_count": max_count}}
else:
current_count = onetime_config["onetime"]["pc_register"]["current_count"]
max_count = onetime_config["onetime"]["pc_register"]["max_count"]
return {"result": 1, "about": "already_exists", "output_dict": {"onetime_password": onetime_password, "current_count": current_count, "max_count": max_count}}
else:
onetime_password = str(self.password_generate(8))
onetime_config = {
"onetime": {
"pc_register": {
"password": onetime_password,
"current_count": 0,
"max_count": int(max_count)
},
"device_register": {
"password": None,
"current_count": None,
"max_count": None
}
}
}
current_count = onetime_config["onetime"]["pc_register"]["current_count"]
with open(dislocker.onetime_config_path, "w") as w:
json.dump(onetime_config, w, indent=4)
return {"result": 0, "about": "ok", "output_dict": {"onetime_password": onetime_password, "current_count": current_count, "max_count": max_count}}
except Exception as error:
self.log(title=f"[ERROR] PC登録用のワンタイムパスワード発行中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
return {"result": 1, "about": "error"}
def device_onetime_gen(self, **kwargs):
if kwargs.get("max_count") == None:
max_count = 1
elif isinstance(kwargs.get("max_count"), int):
max_count = int(kwargs.get("max_count"))
else:
max_count = 1
try:
if os.path.isfile(dislocker.onetime_config_path):
with open(dislocker.onetime_config_path, "r") as r:
onetime_config = json.load(r)
onetime_password = onetime_config["onetime"]["device_register"]["password"]
if onetime_password == None:
onetime_password = str(self.password_generate(8))
onetime_config["onetime"]["device_register"]["password"] = onetime_password
onetime_config["onetime"]["device_register"]["max_count"] = max_count
onetime_config["onetime"]["device_register"]["current_count"] = 0
current_count = onetime_config["onetime"]["device_register"]["current_count"]
with open(dislocker.onetime_config_path, "w") as w:
json.dump(onetime_config, w, indent=4)
return {"result": 0, "about": "ok", "output_dict": {"onetime_password": onetime_password, "current_count": current_count, "max_count": max_count}}
else:
current_count = onetime_config["onetime"]["device_register"]["current_count"]
max_count = onetime_config["onetime"]["device_register"]["max_count"]
return {"result": 1, "about": "already_exists", "output_dict": {"onetime_password": onetime_password, "current_count": current_count, "max_count": max_count}}
else:
onetime_password = str(self.password_generate(8))
onetime_config = {
"onetime": {
"pc_register": {
"password": None,
"current_count": None,
"max_count": None
},
"device_register": {
"password": onetime_password,
"current_count": 0,
"max_count": int(max_count)
}
}
}
current_count = onetime_config["onetime"]["device_register"]["current_count"]
with open(dislocker.onetime_config_path, "w") as w:
json.dump(onetime_config, w, indent=4)
return {"result": 0, "about": "ok", "output_dict": {"onetime_password": onetime_password, "current_count": current_count, "max_count": max_count}}
except Exception as error:
self.log(title=f"[ERROR] デバイス登録用のワンタイムパスワード発行中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
return {"result": 1, "about": "error"}
def show_pc_master_password(self, **kwargs):
if isinstance(kwargs.get("pc_number"), int):
try:
pc_number = int(kwargs.get("pc_number"))
cursor = self.db.cursor()
cursor.execute("SELECT master_password FROM pc_list WHERE pc_number = %s", (pc_number,))
pc_master_password_list = cursor.fetchall()
pc_master_password = pc_master_password_list[0][0]
return {"result": 0, "about": "ok", "output_dict": {"pc_master_password": pc_master_password}}
except Exception as error:
self.log(title=f"[ERROR] PCのマスターパスワードを取得中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
return {"result": 1, "about": "error"}
else:
return {"result": 1, "about": "syntax_error"}
class ReasonModal(discord.ui.Modal):
def __init__(self, title: str, pc_number: str, keyboard_number: str, mouse_number: str, timeout=15) -> None:
super().__init__(title=title, timeout=timeout)
self.reason_input_form = discord.ui.TextInput(label="使用目的を入力してください", style=discord.TextStyle.short, custom_id=f"register_{pc_number}_{keyboard_number}_{mouse_number}")
self.add_item(self.reason_input_form)
async def on_submit(self, interaction: discord.Interaction) -> None:
custom_id = interaction.data["components"][0]["components"][0]["custom_id"]
custom_id_split = custom_id.split("_")
pc_number = custom_id_split[1]
keyboard_number = custom_id_split[2]
mouse_number = custom_id_split[3]
if keyboard_number == "own":
keyboard_number_show = "自前"
else:
keyboard_number_show = keyboard_number
if mouse_number == "own":
mouse_number_show = "自前"
else:
mouse_number_show = mouse_number
register = dislocker.register(discord_user_id=interaction.user.id, name=interaction.user.name, display_name=interaction.user.display_name, pc_number=pc_number, keyboard_number=keyboard_number, mouse_number=mouse_number, detail=self.reason_input_form.value)
if register["about"] == "ok":
await interaction.response.send_message(f":white_check_mark: 使用が開始されました。\n>>> # パスワード | {register["output_dict"]["password"]}\n## PC番号 | {pc_number}\n## キーボード番号 | {keyboard_number_show}\n## マウス番号 | {mouse_number_show}\n## 使用目的 | {self.reason_input_form.value}", ephemeral=True)
await client.get_channel(dislocker.server_config["bot"]["log_channel_id"]).send(f':white_check_mark: {register["output_dict"]["name"]} さんがPC {pc_number} の使用を開始しました。\n>>> ## PC番号 | {pc_number}\n## キーボード番号 | {keyboard_number_show}\n## マウス番号 | {mouse_number_show}\n## 使用目的 | {self.reason_input_form.value}')
dislocker.log(title=f"[INFO] PC番号{pc_number} の使用が開始されました。", message=f"名前 | {register["output_dict"]["name"]}, 使用目的 | {self.reason_input_form.value}", flag=0)
elif register["about"] == "pc_already_in_use_by_you":
pc_usage_history = register["pc_usage_history"]
if pc_usage_history["keyboard_number"] == None:
keyboard_number_show = "未認証"
elif pc_usage_history["keyboard_number"] == 0:
keyboard_number_show = "自前"
else:
keyboard_number_show = str(pc_usage_history["keyboard_number"])
if pc_usage_history["mouse_number"] == None:
mouse_number_show = "未認証"
elif pc_usage_history["mouse_number"] == 0:
mouse_number_show = "自前"
else:
mouse_number_show = str(pc_usage_history["mouse_number"])
await interaction.response.send_message(f"# :exploding_head: あなたはPCをもう使用されているようです。\n使用状態を解除するには 終了ボタン で使用終了をお知らせください。\n>>> # PC番号 | {pc_usage_history["pc_number"]}\n# キーボード番号 | {keyboard_number_show}\n# マウス番号 | {mouse_number_show}\n# 使用開始時刻 | {pc_usage_history["start_time"]}", ephemeral=True)
#await interaction.response.send_message(f"# :exploding_head: あなたはPCをもう使用されているようです。\n使用状態を解除するには 終了ボタン で使用終了をお知らせください。\n>>> # PC番号 | {pc_usage_history["pc_number"]}\n# キーボード番号 | {keyboard_number_show}\n# マウス番号 | {mouse_number_show}\n# 使用開始時刻 | {pc_usage_history["start_time"]}\n# 使用目的 | {pc_usage_history["use_detail"]}", ephemeral=True)
elif register["about"] == "pc_already_in_use_by_other":
await interaction.response.send_message(f"# :man_gesturing_no: そのPCは他のメンバーによって使用されています。\n別のPC番号を指定して、再度お試しください。", ephemeral=True)
elif register["about"] == "keyboard_already_in_use":
await interaction.response.send_message(f"# :man_gesturing_no: そのキーボードは他のメンバーによって使用されています。\n別のキーボードのデバイス番号を指定して、再度お試しください。", ephemeral=True)
elif register["about"] == "mouse_already_in_use":
await interaction.response.send_message(f"# :man_gesturing_no: そのマウスは他のメンバーによって使用されています。\n別のマウスのデバイス番号を指定して、再度お試しください。", ephemeral=True)
elif register["about"] == "user_data_not_found":
await interaction.response.send_message("# :dizzy_face: ユーザーとして登録されていないようです。\n最初にサーバーで登録を行ってください。", ephemeral=True)
else:
await interaction.response.send_message("# :skull_crossbones: 登録できませんでした。\n内部エラーが発生しています。", ephemeral=True)
class Monitor():
def __init__(self, **kwargs) -> None:
self.search_frequency = kwargs["search_frequency"]
self.allowable_time = kwargs["allowable_time"]
self.fstop_time = kwargs["fstop_time"]
self.init_wait_time = 10
def start(self, **kwargs):
search_thread = threading.Thread(target=self.search)
search_thread.start()
def search(self):
try:
time.sleep(self.init_wait_time)
while True:
cursor = dislocker.db.cursor()
cursor.execute("SELECT * FROM pc_list WHERE password_hash IS NOT NULL")
pc_list = cursor.fetchall()
current_datetime = datetime.now()
fstop_time = self.fstop_time
if current_datetime.time().strftime("%H:%M:%S") == fstop_time:
dislocker.log(title=f"[INFO] 定期のPCの使用停止処理を開始します。", flag=0)
for i in dislocker.pc_list:
stop = dislocker.force_stop(pc_number=i, bot_about="使用停止忘れによるBotによる強制停止。")
result = {"result": "FSTOP"}
dislocker.log(title=f"[SUCCESS] 定期のPCの使用停止処理は完了しました。", flag=0)
else:
if pc_list:
if len(pc_list) == 1:
member_id = pc_list[0][1]
cursor.execute("SELECT * FROM pc_usage_history WHERE member_id= %s AND end_use_time IS NULL ORDER BY id DESC LIMIT 1", (member_id,))
pc_usage = cursor.fetchall()
start_time = pc_usage[0][5]
time_difference = current_datetime - start_time
dislocker.log(title=f"[INFO] 現在確認されているパスワード未使用のユーザー", message=f"レコード | {str(pc_usage)}, 経過時間(Sec) | {time_difference.seconds}/{timedelta(seconds=self.allowable_time).seconds}", flag=0)
if time_difference.seconds >= timedelta(seconds=self.allowable_time).seconds:
cursor.execute("SELECT * FROM club_member WHERE member_id = %s", (member_id,))
user_info = cursor.fetchall()
stop = dislocker.stop(discord_user_id=user_info[0][3], bot_about="パスワードのタイムアウトでBotによる強制停止。")
#bot.timeout_notify(pc_number=pc_list[0][0], discord_display_name=user_info[0][1])
dislocker.log(title=f"[INFO] パスワードのタイムアウト時間に達したため、強制停止されました。", flag=0)
result = {"result": "STOP", "details": str(pc_usage)}
else:
result = {"result": "BUT SAFE", "details": str(pc_usage)}
elif len(pc_list) >= 2:
for i in pc_list:
member_id = i[1]
cursor.execute("SELECT * FROM pc_usage_history WHERE member_id= %s AND end_use_time IS NULL ORDER BY id DESC LIMIT 1", (member_id,))
pc_usage = cursor.fetchall()
start_time = pc_usage[0][5]
time_difference = current_datetime - start_time
dislocker.log(title=f"[INFO] 現在確認されているパスワード未使用のユーザー", message=f"レコード | {str(pc_usage)}, 経過時間(Sec) | {time_difference.seconds}/{timedelta(seconds=self.allowable_time).seconds}", flag=0)
if time_difference.seconds >= timedelta(seconds=self.allowable_time).seconds:
cursor.execute("SELECT * FROM club_member WHERE member_id = %s", (member_id,))
user_info = cursor.fetchall()
stop = dislocker.stop(discord_user_id=user_info[0][3], bot_about="タイムアウトでBotによる強制停止。")
#bot.timeout_notify(pc_number=i[0], discord_display_name=user_info[0][1])
dislocker.log(title=f"[INFO] パスワードのタイムアウト時間に達したため、強制停止されました。", flag=0)
result = {"result": "STOP", "details": str(pc_usage)}
else:
result = {"result": "BUT SAFE", "details": str(pc_usage)}
else:
result = {"result": "NONE"}
else:
result = {"result": "NONE"}
if result["result"] == "NONE":
pass
else:
pass
time.sleep(self.search_frequency)
except Exception as error:
dislocker.log(title=f"[ERROR] 自動停止処理中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
result = {"result": "error"}
dislocker.db.rollback()
finally:
if cursor:
cursor.close()
return result
dislocker = DL()
intents = discord.Intents.default()
intents.message_content = True
client = discord.Client(intents=intents)
tree = discord.app_commands.CommandTree(client)
@client.event
async def on_ready():
dislocker.log(title=f"[SUCCESS] DiscordのBotが起動しました。", message=f"{client.user.name} としてログインしています。", flag=1)
await tree.sync()
dislocker_activity = discord.Activity(
name=dislocker.server_config["bot"]["activity"]["name"],
type=discord.ActivityType.competing,
details=dislocker.server_config["bot"]["activity"]["details"],
state=dislocker.server_config["bot"]["activity"]["state"]
)
await client.change_presence(activity=dislocker_activity)
@client.event
async def on_message(message):
if message.author.bot:
pass
elif isinstance(message.channel, discord.DMChannel):
if message.author.id in dislocker.server_config["bot"]["admin_user_id"]:
msg_split = message.content.split()
if msg_split[0] == "/pcreg":
max_count = 1
if len(msg_split) == 2:
if msg_split[1].isdecimal():
max_count = int(msg_split[1])
pc_onetime_password_gen = dislocker.pc_onetime_gen(max_count=max_count)
if pc_onetime_password_gen["result"] == 0:
pc_onetime_password = str(pc_onetime_password_gen["output_dict"]["onetime_password"])
pc_onetime_password_max_count = str(pc_onetime_password_gen["output_dict"]["max_count"])
pc_onetime_password_current_count = str(pc_onetime_password_gen["output_dict"]["current_count"])
pc_onetime_password_remaining_times = str(int(pc_onetime_password_max_count) - int(pc_onetime_password_current_count))
await message.channel.send(f"# :dizzy_face: PC登録時のワンタイムパスワードを発行します。\n# パスワード | {pc_onetime_password}\n# 最大使用回数 | {pc_onetime_password_max_count}\n# 残り使用回数 | {pc_onetime_password_remaining_times}")
elif pc_onetime_password_gen["result"] == 1:
if pc_onetime_password_gen["about"] == "already_exists":
pc_onetime_password = str(pc_onetime_password_gen["output_dict"]["onetime_password"])
pc_onetime_password_max_count = str(pc_onetime_password_gen["output_dict"]["max_count"])
pc_onetime_password_current_count = str(pc_onetime_password_gen["output_dict"]["current_count"])
pc_onetime_password_remaining_times = str(int(pc_onetime_password_max_count) - int(pc_onetime_password_current_count))
await message.channel.send(f"# :dizzy_face: 既にワンタイムパスワードは発行されています。\n# パスワード | {pc_onetime_password}\n# 残り使用回数 | {pc_onetime_password_remaining_times}")
else:
await message.channel.send("# :skull_crossbones: ワンタイムパスワードの発行に失敗しました。")
elif msg_split[0] == "/devreg":
max_count = 1
if len(msg_split) == 2:
if msg_split[1].isdecimal():
max_count = int(msg_split[1])
device_onetime_password_gen = dislocker.device_onetime_gen(max_count=max_count)
if device_onetime_password_gen["result"] == 0:
device_onetime_password = str(device_onetime_password_gen["output_dict"]["onetime_password"])
device_onetime_password_max_count = str(device_onetime_password_gen["output_dict"]["max_count"])
device_onetime_password_current_count = str(device_onetime_password_gen["output_dict"]["current_count"])
device_onetime_password_remaining_times = str(int(device_onetime_password_max_count) - int(device_onetime_password_current_count))
await message.channel.send(f"# :dizzy_face: デバイス登録時のワンタイムパスワードを発行します。\n# パスワード | {device_onetime_password}\n# 最大使用回数 | {device_onetime_password_max_count}\n# 残り使用回数 | {device_onetime_password_remaining_times}")
elif device_onetime_password_gen["result"] == 1:
if device_onetime_password_gen["about"] == "already_exists":
device_onetime_password = str(device_onetime_password_gen["output_dict"]["onetime_password"])
device_onetime_password_max_count = str(device_onetime_password_gen["output_dict"]["max_count"])
device_onetime_password_current_count = str(device_onetime_password_gen["output_dict"]["current_count"])
device_onetime_password_remaining_times = str(int(device_onetime_password_max_count) - int(device_onetime_password_current_count))
await message.channel.send(f"# :dizzy_face: 既にワンタイムパスワードは発行されています。\n# パスワード | {device_onetime_password}\n# 残り使用回数 | {device_onetime_password_remaining_times}")
else:
await message.channel.send("# :skull_crossbones: ワンタイムパスワードの発行に失敗しました。")
else:
await message.channel.send("# :warning: DMでの応答は、現在無効化されています。")
else:
pass
@client.event
async def on_interaction(interaction: discord.Interaction):
try:
if interaction.data["component_type"] == 2:
await on_button(interaction)
except KeyError:
pass
async def on_button(interaction: discord.Interaction):
custom_id = interaction.data["custom_id"]
custom_id_split = custom_id.split("_")
dislocker.log(title=f"[INFO] ボタンが押されました。", message=f"custom_id | {custom_id}, DiscordユーザーID | {interaction.user.id}", flag=0)
if custom_id_split[0] == "pcregister":
keyboard_register_view = discord.ui.View(timeout=15)
pc_number = custom_id_split[1]
for i in dislocker.keyboard_list:
if i == 0:
pass
else:
keyboard_register_button = discord.ui.Button(style=discord.ButtonStyle.primary, label=f"{str(i)}", custom_id=f"keyboardregister_{str(pc_number)}_{str(i)}")
keyboard_register_view.add_item(keyboard_register_button)
keyboard_not_register_button = discord.ui.Button(style=discord.ButtonStyle.primary, label="キーボードは自前", custom_id=f"keyboardregister_{str(pc_number)}_own")
keyboard_register_view.add_item(keyboard_not_register_button)
await interaction.response.send_message(f"# :keyboard: キーボードのデバイス番号を選んでください!\n>>> # PC番号 | {str(pc_number)}", view=keyboard_register_view, ephemeral=True)
elif custom_id_split[0] == "keyboardregister":
mouse_register_view = discord.ui.View(timeout=15)
pc_number = custom_id_split[1]
keyboard_number = custom_id_split[2]
if keyboard_number == "own":
keyboard_number_show = "自前"
else:
keyboard_number_show = keyboard_number
for i in dislocker.mouse_list:
if i == 0:
pass
else:
mouse_register_button = discord.ui.Button(style=discord.ButtonStyle.primary, label=f"{str(i)}", custom_id=f"mouseregister_{str(pc_number)}_{str(keyboard_number)}_{str(i)}")
mouse_register_view.add_item(mouse_register_button)
mouse_not_register_button = discord.ui.Button(style=discord.ButtonStyle.primary, label="マウスは自前", custom_id=f"mouseregister_{str(pc_number)}_{str(keyboard_number)}_own")
mouse_register_view.add_item(mouse_not_register_button)
await interaction.response.send_message(f"# :mouse_three_button: マウスのデバイス番号を選んでください!\n>>> # PC番号 | {str(pc_number)}\n# キーボード番号 | {str(keyboard_number_show)}", view=mouse_register_view, ephemeral=True)
elif custom_id_split[0] == "mouseregister":
pc_number = custom_id_split[1]
keyboard_number = custom_id_split[2]
mouse_number = custom_id_split[3]
if keyboard_number == "own":
keyboard_number_show = "自前"
else:
keyboard_number_show = keyboard_number
if mouse_number == "own":
mouse_number_show = "自前"
else:
mouse_number_show = mouse_number
reason_register_view = discord.ui.View(timeout=15)
for i in dislocker.preset_games:
reason_quick_button = reason_button = discord.ui.Button(style=discord.ButtonStyle.primary, label=f"{str(i)}", custom_id=f"quickreasonregister_{str(pc_number)}_{str(keyboard_number)}_{str(mouse_number)}_{str(i)}")
reason_register_view.add_item(reason_quick_button)
reason_button = discord.ui.Button(style=discord.ButtonStyle.primary, label="使用目的を入力する", custom_id=f"reasonregister_{str(pc_number)}_{str(keyboard_number)}_{str(mouse_number)}")
reason_register_view.add_item(reason_button)
await interaction.response.send_message(f"# :regional_indicator_q: 使用目的を書いてください!\n>>> # PC番号 | {str(pc_number)}\n# キーボード番号 | {str(keyboard_number_show)}\n# マウス番号 | {str(mouse_number_show)}", view=reason_register_view, ephemeral=True)
elif custom_id_split[0] == "quickreasonregister":
pc_number = custom_id_split[1]
keyboard_number = custom_id_split[2]
mouse_number = custom_id_split[3]
if keyboard_number == "own":
keyboard_number_show = "自前"
else:
keyboard_number_show = keyboard_number
if mouse_number == "own":
mouse_number_show = "自前"
else:
mouse_number_show = mouse_number
reason = custom_id_split[4]
register = dislocker.register(discord_user_id=interaction.user.id, name=interaction.user.name, display_name=interaction.user.display_name, pc_number=pc_number, keyboard_number=keyboard_number, mouse_number=mouse_number, detail=reason)
if register["about"] == "ok":
await interaction.response.send_message(f":white_check_mark: 使用が開始されました。\n>>> # パスワード | {register["output_dict"]["password"]}\n## PC番号 | {pc_number}\n## キーボード番号 | {str(keyboard_number_show)}\n## マウス番号 | {str(mouse_number_show)}\n## 使用目的 | {reason}", ephemeral=True)
await client.get_channel(dislocker.server_config["bot"]["log_channel_id"]).send(f':white_check_mark: {register["output_dict"]["name"]} さんがPC {pc_number} の使用を開始しました。\n>>> ## PC番号 | {pc_number}\n## 使用目的 | {reason}')
dislocker.log(title=f"[INFO] PC番号{pc_number} の使用が開始されました。", message=f"名前 | {register["output_dict"]["name"]}, 使用目的 | {reason}", flag=0)
elif register["about"] == "pc_already_in_use_by_you":
pc_usage_history = register["pc_usage_history"]
if pc_usage_history["keyboard_number"] == None:
keyboard_number_show = "未認証"
elif pc_usage_history["keyboard_number"] == 0:
keyboard_number_show = "自前"
else:
keyboard_number_show = str(pc_usage_history["keyboard_number"])
if pc_usage_history["mouse_number"] == None:
mouse_number_show = "未認証"
elif pc_usage_history["mouse_number"] == 0:
mouse_number_show = "自前"
else:
mouse_number_show = str(pc_usage_history["mouse_number"])
await interaction.response.send_message(f"# :exploding_head: あなたはPCをもう使用されているようです。\n使用状態を解除するには 終了ボタン で使用終了をお知らせください。\n>>> # PC番号 | {pc_usage_history["pc_number"]}\n# キーボード番号 | {keyboard_number_show}\n# マウス番号 | {mouse_number_show}\n# 使用開始時刻 | {pc_usage_history["start_time"]}", ephemeral=True)
#await interaction.response.send_message(f"# :exploding_head: あなたはPCをもう使用されているようです。\n使用状態を解除するには 終了ボタン で使用終了をお知らせください。\n>>> # PC番号 | {pc_usage_history["pc_number"]}\n# キーボード番号 | {keyboard_number_show}\n# マウス番号 | {mouse_number_show}\n# 使用開始時刻 | {pc_usage_history["start_time"]}\n# 使用目的 | {pc_usage_history["use_detail"]}", ephemeral=True)
elif register["about"] == "pc_already_in_use_by_other":
await interaction.response.send_message(f"# :man_gesturing_no: そのPCは他のメンバーによって使用されています。\n別のPC番号を指定して、再度お試しください。", ephemeral=True)
elif register["about"] == "keyboard_already_in_use":
await interaction.response.send_message(f"# :man_gesturing_no: そのキーボードは他のメンバーによって使用されています。\n別のキーボードのデバイス番号を指定して、再度お試しください。", ephemeral=True)
elif register["about"] == "mouse_already_in_use":
await interaction.response.send_message(f"# :man_gesturing_no: そのマウスは他のメンバーによって使用されています。\n別のマウスのデバイス番号を指定して、再度お試しください。", ephemeral=True)
elif register["about"] == "user_data_not_found":
await interaction.response.send_message("# :dizzy_face: ユーザーとして登録されていないようです。\n最初にサーバーで登録を行ってください。", ephemeral=True)
else:
await interaction.response.send_message("# :skull_crossbones: 登録できませんでした。\n内部エラーが発生しています。", ephemeral=True)
elif custom_id_split[0] == "reasonregister":
pc_number = custom_id_split[1]
keyboard_number = custom_id_split[2]
mouse_number = custom_id_split[3]
if keyboard_number == "own":
keyboard_number_show = "自前"
else:
keyboard_number_show = keyboard_number
if mouse_number == "own":
mouse_number_show = "自前"
else:
mouse_number_show = mouse_number
reason_input_form = ReasonModal(title="Dislocker | 登録", pc_number=str(pc_number), keyboard_number=str(keyboard_number), mouse_number=str(mouse_number))
await interaction.response.send_modal(reason_input_form)
elif custom_id_split[0] == "stop":
pc_stop = dislocker.stop(discord_user_id=interaction.user.id)
stop_view = discord.ui.View(timeout=15)
if pc_stop["about"] == "unused":
await interaction.response.send_message("# :shaking_face: 使用されていないようです...", ephemeral=True)
elif pc_stop["about"] == "user_data_not_found":
await interaction.response.send_message("# :dizzy_face: ユーザーとして登録されていないようです。\n最初にサーバーで登録を行ってください。", ephemeral=True)
elif pc_stop["about"] == "ok":
await interaction.response.send_message(f":white_check_mark: PC番号 {pc_stop["output_dict"]["pc_number"]} の使用が終了されました。", ephemeral=True)
await client.get_channel(dislocker.server_config["bot"]["log_channel_id"]).send(f':negative_squared_cross_mark: {pc_stop["output_dict"]["name"]} さんがPC {pc_stop["output_dict"]["pc_number"]} の使用を終了しました。')
else:
await interaction.response.send_message("# :skull_crossbones: 停止できませんでした。\n内部エラーが発生しています。", ephemeral=True)
elif custom_id_split[0] == "user" and custom_id_split[1] == "register":
user_register = dislocker.user_register(name=interaction.user.display_name, discord_user_name=interaction.user.name, discord_user_id=interaction.user.id)
if user_register["about"] == "ok":
await interaction.response.send_message(f"# :white_check_mark: ユーザー情報が登録されました。\n>>> ユーザー名:{interaction.user.display_name}", ephemeral=True)
elif user_register["about"] == "already_exists":
await interaction.response.send_message("# :no_entry: 登録できませんでした。\nもう登録されている可能性があります。", ephemeral=True)
else:
await interaction.response.send_message("# :no_entry: 登録できませんでした。\n内部エラーが発生しています。", ephemeral=True)
#使用者側のスラッシュコマンド
@tree.command(name="use", description="パソコンの使用登録をします。通常はこのコマンドを使用する必要はありません。")
async def use(interaction: discord.Interaction, pc_number: int, keyboard_number: int, mouse_number: int, detail: str):
register = dislocker.register(discord_user_id=interaction.user.id, name=interaction.user.name, display_name=interaction.user.display_name, pc_number=pc_number, keyboard_number=keyboard_number, mouse_number=mouse_number, detail=detail)
if register["result"] == 0:
await interaction.response.send_message(f":white_check_mark: 使用が開始されました。\n>>> # パスワード | {register["output_dict"]["password"]}\n## PC番号 | {pc_number}\n## 使用目的 | {detail}", ephemeral=True)
dislocker.log(title=f"[INFO] PC番号{pc_number} の使用が開始されました。", message=f"名前 | {register["output_dict"]["name"]}, 使用目的 | {detail}", flag=0)
await client.get_channel(dislocker.server_config["bot"]["log_channel_id"]).send(f':white_check_mark: {register["output_dict"]["name"]} さんがPC {pc_number} の使用を開始しました。\n>>> ## PC番号 | {pc_number}\n## 使用目的 | {detail}')
elif register["result"] == 1:
if register["about"] == "pc_already_in_use_by_other":
await interaction.response.send_message(":x: 他の方がそのPCを使用中です。", ephemeral=True)
elif register["about"] == "pc_already_in_use_by_you":
await interaction.response.send_message(f":x: あなたは既にPC {register['pc_usage_history']['pc_number']} を使用中です。\n>>> ## PC番号 | {register['pc_usage_history']['pc_number']}\n## 使用目的 | {register['pc_usage_history']['use_detail']}", ephemeral=True)
elif register["about"] == "keyboard_already_in_use":
await interaction.response.send_message(":x: キーボードは既に使用中です。", ephemeral=True)
elif register["about"] == "mouse_already_in_use":
await interaction.response.send_message(":x: マウスは既に使用中です。", ephemeral=True)
elif register["about"] == "user_data_not_found":
await interaction.response.send_message(":x: ユーザーデータが見つかりませんでした。", ephemeral=True)
elif register["about"] == "error":
await interaction.response.send_message(":x: 内部エラーが発生しました。\nサーバーでエラーが発生しています。管理者に問い合わせてください。", ephemeral=True)
@tree.command(name="stop", description="パソコンの使用を終了します。通常はこのコマンドを使用する必要はありません。")
async def stop(interaction: discord.Interaction):
stop = dislocker.stop(discord_user_id=interaction.user.id)
if stop["result"] == 0:
await interaction.response.send_message(f":white_check_mark: 使用が終了されました。\n>>> ## PC番号 | {stop['output_dict']['pc_number']}", ephemeral=True)
dislocker.log(title=f"[INFO] PC番号{stop['output_dict']['pc_number']} の使用が終了されました。", message=f"名前 | {stop['output_dict']['name']}", flag=0)
await client.get_channel(dislocker.server_config["bot"]["log_channel_id"]).send(f':white_check_mark: {stop["output_dict"]["name"]} さんがPC {stop["output_dict"]["pc_number"]} の使用を終了しました。\n>>> ## PC番号 | {stop["output_dict"]["pc_number"]}')
elif stop["result"] == 1:
if stop["about"] == "unused":
await interaction.response.send_message("# :shaking_face: あなたはPCを使用されていないようです...", ephemeral=True)
elif stop["about"] == "user_data_not_found":
await interaction.response.send_message("# :dizzy_face: ユーザーとして登録されていないようです。\n最初にサーバーで登録を行ってください。", ephemeral=True)
elif stop["about"] == "error":
await interaction.response.send_message("# :skull_crossbones: 停止できませんでした。\n内部エラーが発生しています。", ephemeral=True)
#管理者側のスラッシュコマンド
@tree.command(name="userreg", description="ユーザーを登録します。")
@discord.app_commands.default_permissions(administrator=True)
async def userreg(interaction: discord.Interaction, discord_user_id: str, discord_user_name: str, name: str):
if interaction.guild_id in dislocker.server_config["bot"]["server_id"] or interaction.user.id in dislocker.server_config["bot"]["admin_user_id"]:
user_register = dislocker.user_register(discord_user_id=discord_user_id, discord_user_name=discord_user_name, name=name)
if user_register["result"] == 0:
await interaction.response.send_message(":white_check_mark: ユーザーを登録しました。", ephemeral=True)
dislocker.log(title=f"[INFO] ユーザーを登録しました。", message=f"名前 | {name}, Discordユーザー名 | {discord_user_name}, DiscordユーザーID | {discord_user_id}", flag=0)
elif user_register["result"] == 1:
if user_register["about"] == "already_exists":
await interaction.response.send_message(":x: 既に登録されているユーザーです。", ephemeral=True)
elif user_register["about"] == "error":
await interaction.response.send_message(":x: 内部エラーが発生しました。\nサーバーでエラーが発生しています。管理者に問い合わせてください。", ephemeral=True)
@tree.command(name="pcreg", description="PCをDislockerに登録するためのワンタイムパスワードを発行します。")
@discord.app_commands.default_permissions(administrator=True)
async def pcreg(interaction: discord.Interaction, how_much: int = 1):
if interaction.guild_id in dislocker.server_config["bot"]["server_id"] or interaction.user.id in dislocker.server_config["bot"]["admin_user_id"]:
max_count = how_much
pc_onetime_password_gen = dislocker.pc_onetime_gen(max_count=max_count)
if pc_onetime_password_gen["result"] == 0:
pc_onetime_password = str(pc_onetime_password_gen["output_dict"]["onetime_password"])
pc_onetime_password_max_count = str(pc_onetime_password_gen["output_dict"]["max_count"])
pc_onetime_password_current_count = str(pc_onetime_password_gen["output_dict"]["current_count"])
pc_onetime_password_remaining_times = str(int(pc_onetime_password_max_count) - int(pc_onetime_password_current_count))
await interaction.response.send_message(f"# :dizzy_face: PC登録時のワンタイムパスワードを発行します。\n# パスワード | {pc_onetime_password}\n# 最大使用回数 | {pc_onetime_password_max_count}\n# 残り使用回数 | {pc_onetime_password_remaining_times}", ephemeral=True)
elif pc_onetime_password_gen["result"] == 1:
if pc_onetime_password_gen["about"] == "already_exists":
pc_onetime_password = str(pc_onetime_password_gen["output_dict"]["onetime_password"])
pc_onetime_password_max_count = str(pc_onetime_password_gen["output_dict"]["max_count"])
pc_onetime_password_current_count = str(pc_onetime_password_gen["output_dict"]["current_count"])
pc_onetime_password_remaining_times = str(int(pc_onetime_password_max_count) - int(pc_onetime_password_current_count))
await interaction.response.send_message(f"# :dizzy_face: 既にワンタイムパスワードは発行されています。\n# パスワード | {pc_onetime_password}\n# 残り使用回数 | {pc_onetime_password_remaining_times}", ephemeral=True)
else:
await interaction.response.send_message("# :skull_crossbones: ワンタイムパスワードの発行に失敗しました。", ephemeral=True)
@tree.command(name="devicereg", description="デバイスをDislockerに登録するためのワンタイムパスワードを発行します。")
@discord.app_commands.default_permissions(administrator=True)
async def devicereg(interaction: discord.Interaction, how_much: int):
if interaction.guild_id in dislocker.server_config["bot"]["server_id"] or interaction.user.id in dislocker.server_config["bot"]["admin_user_id"]:
max_count = how_much
device_onetime_password_gen = dislocker.device_onetime_gen(max_count=max_count)
if device_onetime_password_gen["result"] == 0:
device_onetime_password = str(device_onetime_password_gen["output_dict"]["onetime_password"])
device_onetime_password_max_count = str(device_onetime_password_gen["output_dict"]["max_count"])
device_onetime_password_current_count = str(device_onetime_password_gen["output_dict"]["current_count"])
device_onetime_password_remaining_times = str(int(device_onetime_password_max_count) - int(device_onetime_password_current_count))
await interaction.response.send_message(f"# :dizzy_face: デバイス登録時のワンタイムパスワードを発行します。\n# パスワード | {device_onetime_password}\n# 最大使用回数 | {device_onetime_password_max_count}\n# 残り使用回数 | {device_onetime_password_remaining_times}", ephemeral=True)
elif device_onetime_password_gen["result"] == 1:
if device_onetime_password_gen["about"] == "already_exists":
device_onetime_password = str(device_onetime_password_gen["output_dict"]["onetime_password"])
device_onetime_password_max_count = str(device_onetime_password_gen["output_dict"]["max_count"])
device_onetime_password_current_count = str(device_onetime_password_gen["output_dict"]["current_count"])
device_onetime_password_remaining_times = str(int(device_onetime_password_max_count) - int(device_onetime_password_current_count))
await interaction.response.send_message(f"# :dizzy_face: 既にワンタイムパスワードは発行されています。\n# パスワード | {device_onetime_password}\n# 残り使用回数 | {device_onetime_password_remaining_times}", ephemeral=True)
else:
await interaction.response.send_message("# :skull_crossbones: ワンタイムパスワードの発行に失敗しました。", ephemeral=True)
@tree.command(name="fstop", description="PCの使用登録を強制的に終了します。")
@discord.app_commands.default_permissions(administrator=True)
async def fstop(interaction: discord.Interaction, pc_number: int, about: str):
if interaction.guild_id in dislocker.server_config["bot"]["server_id"] or interaction.user.id in dislocker.server_config["bot"]["admin_user_id"]:
force_stop = dislocker.force_stop(pc_number=pc_number, bot_about=about)
if force_stop["result"] == 0:
await interaction.response.send_message(f":white_check_mark: PC {pc_number} の使用を強制終了しました。", ephemeral=True)
dislocker.log(title=f"[INFO] PC {pc_number} の使用を強制終了しました。", message=f"理由 | {about}", flag=0)
elif force_stop["result"] == 1:
if force_stop["about"] == "not_used":
await interaction.response.send_message(":x: そのPCは使用されていません。", ephemeral=True)
elif force_stop["about"] == "bot_about_not_found":
await interaction.response.send_message(":x: 理由が指定されていません。", ephemeral=True)
elif force_stop["about"] == "error":
await interaction.response.send_message(":x: 内部エラーが発生しました。\nサーバーでエラーが発生しています。管理者に問い合わせてください。", ephemeral=True)
@tree.command(name="report", description="PCの使用履歴をエクスポートします。")
@discord.app_commands.default_permissions(administrator=True)
async def report(interaction: discord.Interaction):
if interaction.guild_id in dislocker.server_config["bot"]["server_id"] or interaction.user.id in dislocker.server_config["bot"]["admin_user_id"]:
report_export = dislocker.report_export()
if report_export["result"] == 0:
await interaction.response.send_message(f":white_check_mark: 使用履歴のレポートです。", file=discord.File(report_export["file_path"]), ephemeral=True)
dislocker.log(title=f"[INFO] PCの使用履歴をエクスポートしました。", message=f"ファイルパス | {report_export['file_path']}", flag=0)
elif report_export["result"] == 1:
if report_export["about"] == "error":
await interaction.response.send_message(":x: 内部エラーが発生しました。\nサーバーでエラーが発生しています。管理者に問い合わせてください。", ephemeral=True)
@tree.command(name="init", description="操作チャンネルにボタン一式を送信します。")
@discord.app_commands.default_permissions(administrator=True)
async def button_init(interaction: discord.Interaction, text_channel: discord.TextChannel):
if interaction.guild_id in dislocker.server_config["bot"]["server_id"] or interaction.user.id in dislocker.server_config["bot"]["admin_user_id"]:
user_register_button_view = discord.ui.View(timeout=None)
user_register_button = discord.ui.Button(style=discord.ButtonStyle.green, label="ユーザー登録", custom_id="user_register")
user_register_button_view.add_item(user_register_button)
await client.get_channel(text_channel.id).send(f'# :index_pointing_at_the_viewer: ユーザー登録はお済ですか?', view=user_register_button_view)
stop_button_view = discord.ui.View(timeout=None)
stop_button = discord.ui.Button(style=discord.ButtonStyle.danger, label="PCの使用を停止", custom_id="stop")
stop_button_view.add_item(stop_button)
await client.get_channel(text_channel.id).send(f'# :index_pointing_at_the_viewer: 使用を停止しますか?', view=stop_button_view)
pc_button_view = discord.ui.View(timeout=None)
for i in dislocker.pc_list:
pc_register_button = discord.ui.Button(style=discord.ButtonStyle.primary, label=f"{str(i)}", custom_id=f"pcregister_{str(i)}")
pc_button_view.add_item(pc_register_button)
await client.get_channel(text_channel.id).send(f'# :index_pointing_at_the_viewer: 使いたいPCの番号を選んでください', view=pc_button_view)
dislocker.log(title=f"[INFO] サーバーで初回処理を実行しました。", flag=0)
await interaction.response.send_message(f":white_check_mark: ボタンを送信しました!", ephemeral=True)
@tree.command(name="masterpass", description="PCのマスターパスワードを表示します。")
@discord.app_commands.default_permissions(administrator=True)
async def masterpass(interaction: discord.Interaction, pc_number: int):
if interaction.guild_id in dislocker.server_config["bot"]["server_id"] or interaction.user.id in dislocker.server_config["bot"]["admin_user_id"]:
pc_master_password_get = dislocker.show_pc_master_password(pc_number=pc_number)
if pc_master_password_get["result"] == 0:
pc_master_password = pc_master_password_get["output_dict"]["pc_master_password"]
await interaction.response.send_message(f"# :key: PC番号 {pc_number} 番のマスターパスワードは以下の通りです。\n>>> # マスターパスワード | {pc_master_password}", ephemeral=True)
else:
await interaction.response.send_message("# :skull_crossbones: マスターパスワードの取得に失敗しました。", ephemeral=True)
if dislocker.init_result == "ok":
print("Botを起動します...")
monitor = Monitor(search_frequency=dislocker.server_config["bot"]["monitor"]["search_frequency"], allowable_time=dislocker.server_config["bot"]["monitor"]["allowable_time"], fstop_time=dislocker.server_config["bot"]["monitor"]["fstop_time"])
monitor.start()
client.run(dislocker.server_config["bot"]["token"])
else:
pass