Dislocker/dislocker_slash.py

815 lines
No EOL
46 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import discord
import os
import json
import psycopg2
from datetime import datetime
import asyncio
import string
import random
import hashlib
import openpyxl
class DL():
def __init__(self):
self.config_dir_path = "./config/"
self.export_dir_path = "./export/"
self.log_dir_path = "./log/"
self.server_config_path = self.config_dir_path + "server.json"
self.onetime_config_path = self.config_dir_path + "onetime.json"
self.log_path = self.log_dir_path + "dislocker.txt"
try:
if not os.path.isdir(self.config_dir_path):
print("config ディレクトリが見つかりません... 作成します。")
os.mkdir(self.config_dir_path)
if not os.path.isfile(self.server_config_path):
print("config ファイルが見つかりません... 作成します。")
self.server_config = {
"db": {
"host": "localhost",
"port": "5432",
"db_name": "dislocker",
"username": "user",
"password": "password"
},
"bot": {
"token": "TYPE HERE BOTS TOKEN KEY",
"activity": {
"name": "Dislocker",
"details": "ロック中...",
"type": "playing",
"state": "ロック中..."
},
"log_channel_id" : "TYPE HERE CHANNEL ID (YOU MUST USE INT !!!!)",
"config_channel_id": "TYPE HERE CHANNEL ID (YOU MUST USE INT !!!!)",
"config_public_channel_id": "TYPE HERE CHANNEL ID (YOU MUST USE INT !!!!)",
"monitor": {
"search_frequency": 1,
"allowable_time": 180,
"fstop_time": "21:00:00"
},
"preset_games": ["TEST1", "TEST2", "TEST3", "TEST4", "TEST5"],
"admin_user_id": "TYPE HERE CHANNEL ID (YOU MUST USE INT !!!!)",
"debug": False
}
}
with open(self.server_config_path, "w", encoding="utf-8") as w:
json.dump(self.server_config, w, indent=4, ensure_ascii=False)
elif os.path.isfile(self.server_config_path):
with open(self.server_config_path, "r", encoding="utf-8") as r:
self.server_config = json.load(r)
print("config ファイルを読み込みました。")
if not os.path.isdir(self.export_dir_path):
print("export ディレクトリが見つかりません... 作成します。")
os.mkdir(self.export_dir_path)
if not os.path.isdir(self.log_dir_path):
print("log ディレクトリが見つかりません... 作成します。")
os.mkdir(self.log_dir_path)
if type(self.server_config["bot"]["log_channel_id"]) is not int or type(self.server_config["bot"]["config_channel_id"]) is not int:
print("config ファイル内でチャンネルIDがint型で記述されていません。int型で記述して、起動してください。")
self.init_result = "not_int"
else:
self.db = psycopg2.connect(f"host={self.server_config['db']['host']} dbname={self.server_config['db']['db_name']} port={self.server_config['db']['port']} user={self.server_config['db']['username']} password={self.server_config['db']['password']}")
cursor = self.db.cursor()
self.pc_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
self.keyboard_list = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
self.mouse_list = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
self.preset_games = self.server_config["bot"]["preset_games"]
self.debug = self.server_config["bot"]["debug"]
cursor.execute("SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'club_member')")
find_club_member_table = cursor.fetchall()
print(find_club_member_table)
if find_club_member_table[0][0] == False:
cursor.execute("CREATE TABLE club_member (member_id SERIAL NOT NULL, name TEXT NOT NULL, discord_user_name TEXT NOT NULL, discord_user_id TEXT NOT NULL, PRIMARY KEY (member_id))")
self.db.commit()
cursor.execute("SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'pc_list')")
find_pc_list_table = cursor.fetchall()
print(find_pc_list_table)
if find_pc_list_table[0][0] == False:
cursor.execute("CREATE TABLE pc_list (pc_number INTEGER NOT NULL, using_member_id INTEGER, password_hash VARCHAR(64), pc_uuid VARCHAR(36), pc_token VARCHAR(36), master_password VARCHAR(16), detail TEXT, PRIMARY KEY (pc_number), FOREIGN KEY (using_member_id) REFERENCES club_member(member_id))")
for i in self.pc_list:
print(i)
cursor.execute("INSERT INTO pc_list (pc_number) VALUES (%s)", (i,))
self.db.commit()
cursor.execute("SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'keyboard_list')")
find_keyboard_list_table = cursor.fetchall()
print(find_keyboard_list_table)
if find_keyboard_list_table[0][0] == False:
cursor.execute("CREATE TABLE keyboard_list (keyboard_number INTEGER NOT NULL, using_member_id INTEGER, device_instance_path TEXT, device_name TEXT, detail TEXT, PRIMARY KEY (keyboard_number), FOREIGN KEY (using_member_id) REFERENCES club_member(member_id))")
for i in self.keyboard_list:
print(i)
cursor.execute("INSERT INTO keyboard_list (keyboard_number) VALUES (%s)", (i,))
self.db.commit()
cursor.execute("SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'mouse_list')")
find_mouse_list_table = cursor.fetchall()
print(find_mouse_list_table)
if find_mouse_list_table[0][0] == False:
cursor.execute("CREATE TABLE mouse_list (mouse_number INTEGER NOT NULL, using_member_id INTEGER, device_instance_path TEXT, device_name TEXT, detail TEXT, PRIMARY KEY (mouse_number), FOREIGN KEY (using_member_id) REFERENCES club_member(member_id))")
for i in self.mouse_list:
print(i)
cursor.execute("INSERT INTO mouse_list (mouse_number) VALUES (%s)", (i,))
self.db.commit()
cursor.execute("SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'pc_usage_history')")
find_pc_usage_history_table = cursor.fetchall()
print(find_pc_usage_history_table)
if find_pc_usage_history_table[0][0] == False:
cursor.execute("CREATE TABLE pc_usage_history (id SERIAL NOT NULL, member_id INTEGER NOT NULL, pc_number INTEGER NOT NULL, keyboard_number INTEGER, mouse_number INTEGER, start_use_time TIMESTAMP NOT NULL, end_use_time TIMESTAMP, use_detail TEXT, bot_about TEXT, PRIMARY KEY (id), FOREIGN KEY (member_id) REFERENCES club_member(member_id), FOREIGN KEY (pc_number) REFERENCES pc_list(pc_number), FOREIGN KEY (keyboard_number) REFERENCES keyboard_list(keyboard_number), FOREIGN KEY (mouse_number) REFERENCES mouse_list(mouse_number))")
self.db.commit()
cursor.close()
self.init_result = "ok"
except (Exception) as error:
print("初回処理でエラーが発生しました。\nエラー内容\n" + str(error))
self.init_result = "error"
finally:
pass
def log(self, **kwargs):
if self.debug == True:
flag = 1
else:
if "flag" in kwargs:
if kwargs["flag"] == 1:
flag = 1
else:
flag = 0
else:
flag = 0
if flag == 1:
title = str(kwargs["title"])
if "message" in kwargs:
message = str(kwargs["message"])
else:
message = None
current_datetime = str(datetime.now())
if message == None:
detail = f"{current_datetime} | {title}\n"
else:
detail = f"{current_datetime} | {title}\n{message}\n"
print(detail)
if os.path.isfile(self.log_path):
try:
with open(self.log_path, "a", encoding="utf-8") as a:
a.write(detail)
except:
print("LOGGING ERROR mode a")
else:
try:
with open(self.log_path, "w", encoding="utf-8") as w:
w.write(detail)
except:
print("LOGGING ERROR mode w")
def password_generate(self, length):
numbers = string.digits # (1)
password = ''.join(random.choice(numbers) for _ in range(length)) # (2)
return password
def hash_genarate(self, source):
hashed = hashlib.sha256(source.encode())
return hashed.hexdigest()
def user_register_check(self, **kwargs):
try:
discord_user_id = str(kwargs["discord_user_id"])
cursor = self.db.cursor()
cursor.execute("SELECT * FROM club_member WHERE discord_user_id = %s", (discord_user_id,))
user_record = cursor.fetchall()
#ユーザーデータが見つかった場合(登録済みの場合)
if user_record:
member_id = user_record[0][0]
name = user_record[0][1]
discord_user_name = user_record[0][2]
return {"result": 0, "about": "exist", "user_info": {"member_id": member_id, "name": name, "discord_user_name": discord_user_name}}
#ユーザーデータがなかったら(未登録の場合)
else:
return {"result": 1, "about": "user_data_not_found"}
except Exception as error:
self.log(title=f"[ERROR] ユーザーの登録状態を調査中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
return {"result": 1, "about": "error"}
finally:
cursor.close()
def pc_used_check(self, **kwargs):
try:
if "pc_number" in kwargs:
pc_number = int(kwargs["pc_number"])
else:
pc_number = None
if "discord_user_id" in kwargs:
discord_user_id = str(kwargs["discord_user_id"])
else:
discord_user_id = None
if "member_id" in kwargs:
member_id = int(kwargs["member_id"])
else:
member_id = None
cursor = self.db.cursor()
if pc_number != None:
# pc番号を指定してpc_listから探す
cursor.execute("SELECT * FROM pc_list WHERE pc_number= %s", (pc_number,))
pc_list_record = cursor.fetchall()
if pc_list_record[0][1] == None:
return {"result": 0, "about": "vacent"}
else:
return {"result": 1, "about": "used_by_other"}
elif discord_user_id != None:
#ユーザーIDを指定してPC使用履歴から探す
cursor.execute("SELECT * FROM club_member WHERE discord_user_id = %s", (discord_user_id,))
user_record = cursor.fetchall()
#ユーザーデータが見つかった場合(登録済みの場合)
if user_record:
member_id = user_record[0][0]
cursor.execute("SELECT * FROM pc_usage_history WHERE member_id = %s ORDER BY id DESC LIMIT 1", (member_id,))
pc_usage_history_record = cursor.fetchall()
if pc_usage_history_record:
if pc_usage_history_record[0][6] == None:
keyboard_number = pc_usage_history_record[0][3]
mouse_number = pc_usage_history_record[0][4]
return {"result": 1, "about": "used_by_you", "pc_usage_history": {"pc_number": str(pc_usage_history_record[0][2]), "keyboard_number": keyboard_number, "mouse_number": mouse_number, "start_time": str(pc_usage_history_record[0][5]), "use_detail": str(pc_usage_history_record[0][7])}}
else:
return {"result": 0, "about": "vacent"}
else:
return {"result": 0, "about": "vacent"}
else:
return {"result": 1, "about": "user_data_not_found"}
elif member_id != None:
cursor.execute("SELECT * FROM pc_usage_history WHERE member_id = %s ORDER BY id DESC LIMIT 1", (member_id,))
pc_usage_history_record = cursor.fetchall()
if pc_usage_history_record:
if pc_usage_history_record[0][6] == None:
keyboard_number = pc_usage_history_record[0][3]
mouse_number = pc_usage_history_record[0][4]
return {"result": 1, "about": "used_by_you", "pc_usage_history": {"pc_number": str(pc_usage_history_record[0][2]), "keyboard_number": keyboard_number, "mouse_number": mouse_number, "start_time": str(pc_usage_history_record[0][5]), "use_detail": str(pc_usage_history_record[0][7])}}
else:
return {"result": 0, "about": "vacent"}
else:
return {"result": 0, "about": "vacent"}
else:
return {"result": 1, "about": "search_options_error"}
except Exception as error:
self.log(title=f"[ERROR] PCの使用状況を調査中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
return {"result": 1, "about": "error"}
finally:
if cursor:
cursor.close()
def keyboard_used_check(self, **kwargs):
try:
cursor = self.db.cursor()
if kwargs["keyboard_number"] == None:
return {"result": 0, "about": "ok"}
else:
keyboard_number = int(kwargs["keyboard_number"])
cursor.execute("SELECT * FROM keyboard_list WHERE keyboard_number=%s", (keyboard_number,))
keyboard_list_record = cursor.fetchall()
if keyboard_list_record[0][1] == None:
return {"result": 0, "about": "ok"}
else:
return {"result": 1, "about": "keyboard_already_in_use_by_other"}
except Exception as error:
self.log(title=f"[ERROR] キーボードの使用状況を調査中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
return {"result": 1, "about": "error"}
finally:
if cursor:
cursor.close()
def mouse_used_check(self, **kwargs):
try:
cursor = self.db.cursor()
if kwargs["mouse_number"] == None:
return {"result": 0, "about": "ok"}
else:
mouse_number = int(kwargs["mouse_number"])
cursor.execute("SELECT * FROM mouse_list WHERE mouse_number=%s", (mouse_number,))
mouse_list_record = cursor.fetchall()
if mouse_list_record[0][1] == None:
return {"result": 0, "about": "ok"}
else:
return {"result": 1, "about": "mouse_already_in_use_by_other"}
except Exception as error:
self.log(title=f"[ERROR] マウスの使用状況を調査中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
return {"result": 1, "about": "error"}
finally:
if cursor:
cursor.close()
def register(self, **kwargs):
try:
cursor = self.db.cursor()
user_info = {
"id": str(kwargs["discord_user_id"]),
"name": str(kwargs["name"]),
"display_name": str(kwargs["display_name"]),
"pc_number": int(kwargs["pc_number"]),
"keyboard_number": None,
"mouse_number": None,
"detail": None
}
if "detail" in kwargs:
user_info["detail"] = str(kwargs["detail"])
else:
pass
if kwargs["keyboard_number"] == "own":
pass
else:
user_info["keyboard_number"] = int(kwargs["keyboard_number"])
if kwargs["mouse_number"] == "own":
pass
else:
user_info["mouse_number"] = int(kwargs["mouse_number"])
# ユーザー登録されているかの確認
user_register = self.user_register_check(discord_user_id=user_info["id"])
if user_register["result"] == 0:
member_id = user_register["user_info"]["member_id"]
name = user_register["user_info"]["name"]
# ユーザーがPCを使っているか
pc_check_self = self.pc_used_check(member_id=member_id)
if pc_check_self["result"] == 0:
# 他の人がそのPCを使っているか
pc_check = self.pc_used_check(pc_number=user_info["pc_number"])
if pc_check["result"] == 0:
# キーボードは使われているか
keyboard_check = self.keyboard_used_check(keyboard_number=user_info["keyboard_number"])
if keyboard_check["result"] == 0:
# マウスは使われているか
mouse_check = self.mouse_used_check(mouse_number=user_info["mouse_number"])
if mouse_check["result"] == 0:
# パスワードとハッシュ作成
password = self.password_generate(4)
password_hash = self.hash_genarate(password)
# PC使用履歴のテーブルにレコードを挿入
cursor.execute("INSERT INTO pc_usage_history (member_id, pc_number, keyboard_number, mouse_number, start_use_time, use_detail) VALUES (%s, %s, %s, %s, clock_timestamp(), %s)", (member_id, user_info["pc_number"], user_info["keyboard_number"], user_info["mouse_number"], user_info["detail"]))
# PCリストの該当のレコードを更新
cursor.execute("UPDATE pc_list SET using_member_id = %s, password_hash = %s WHERE pc_number = %s", (member_id, password_hash, user_info["pc_number"]))
# キーボードリストの該当のレコードを自前(None)だったらスキップ、借りていたら更新
if user_info["keyboard_number"] == None:
pass
else:
cursor.execute("UPDATE keyboard_list SET using_member_id = %s WHERE keyboard_number = %s", (member_id, user_info["keyboard_number"]))
# マウスも同様に
if user_info["mouse_number"] == None:
pass
else:
cursor.execute("UPDATE mouse_list SET using_member_id = %s WHERE mouse_number = %s", (member_id, user_info["mouse_number"]))
self.db.commit()
return {"result": 0, "about": "ok", "output_dict": {"password": str(password), "name": str(name)}}
else:
return {"result": 1, "about": "mouse_already_in_use"}
else:
return {"result": 1, "about": "keyboard_already_in_use"}
else:
return {"result": 1, "about": "pc_already_in_use_by_other"}
else:
return {"result": 1, "about": "pc_already_in_use_by_you", "pc_usage_history": {"pc_number": pc_check_self["pc_usage_history"]["pc_number"], "keyboard_number": pc_check_self["pc_usage_history"]["keyboard_number"], "mouse_number": pc_check_self["pc_usage_history"]["mouse_number"], "start_time": pc_check_self["pc_usage_history"]["start_time"], "use_detail": pc_check_self["pc_usage_history"]["use_detail"]}}
else:
return {"result": 1, "about": "user_data_not_found"}
except Exception as error:
self.log(title=f"[ERROR] PCの使用登録中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
return {"result": 1, "about": "error"}
finally:
if cursor:
cursor.close()
def stop(self, **kwargs):
try:
cursor = self.db.cursor()
discord_user_id = str(kwargs["discord_user_id"])
if "bot_about" in kwargs:
bot_about = kwargs["bot_about"]
else:
bot_about = None
# ユーザーが登録してるかというよりはデータの取得のため
user_register = self.user_register_check(discord_user_id=discord_user_id)
member_id = user_register["user_info"]["member_id"]
name = user_register["user_info"]["name"]
if user_register["result"] == 0:
cursor.execute("SELECT * FROM pc_usage_history WHERE member_id= %s ORDER BY id DESC LIMIT 1", (member_id,))
pc_usage_history_record = cursor.fetchall()
if pc_usage_history_record:
pc_usage_history_id = pc_usage_history_record[0][0]
pc_number = pc_usage_history_record[0][2]
keyboard_number = pc_usage_history_record[0][3]
mouse_number = pc_usage_history_record[0][4]
end_use_time = pc_usage_history_record[0][6]
# 使用中のとき (使用停止時間がNoneのとき)
if end_use_time == None:
# 利用停止の理由の有無を判断
if bot_about == None:
cursor.execute("UPDATE pc_usage_history SET end_use_time = clock_timestamp() WHERE id = %s", (pc_usage_history_id,))
else:
cursor.execute("UPDATE pc_usage_history SET end_use_time = clock_timestamp(), bot_about = %s WHERE id = %s", (bot_about, pc_usage_history_id))
# pc_listの使用中ユーザーを消す
cursor.execute("UPDATE pc_list SET using_member_id = NULL, password_hash = NULL WHERE pc_number = %s", (pc_number,))
if keyboard_number == None:
pass
else:
# keyboard_listの使用中ユーザーを消す
cursor.execute("UPDATE keyboard_list SET using_member_id = NULL WHERE keyboard_number = %s", (keyboard_number,))
if mouse_number == None:
pass
else:
# mouse_listの使用中ユーザーを消す
cursor.execute("UPDATE mouse_list SET using_member_id = NULL WHERE mouse_number = %s", (mouse_number,))
self.db.commit()
return {"result": 0, "about": "ok", "output_dict": {"pc_number": str(pc_number), "name": str(name)}}
else:
return {"result": 1, "about": "unused"}
else:
return {"result": 1, "about": "unused"}
else:
return {"result": 1, "about": "user_data_not_found"}
except Exception as error:
self.log(title=f"[ERROR] PCの使用停止処理中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
return {"result": 1, "about": "error"}
finally:
if cursor:
cursor.close()
def user_register(self, **kwargs):
try:
discord_user_id = str(kwargs["discord_user_id"])
discord_user_name = str(kwargs["discord_user_name"])
name = str(kwargs["name"])
cursor = self.db.cursor()
cursor.execute("SELECT * FROM club_member WHERE discord_user_id = %s", (discord_user_id,))
user_record = cursor.fetchall()
if not user_record:
cursor.execute("INSERT INTO club_member (name, discord_user_name, discord_user_id) VALUES (%s, %s, %s)", (name, discord_user_name, discord_user_id))
self.db.commit()
return {"result": 0, "about": "ok"}
else:
return {"result": 1, "about": "already_exists"}
except Exception as error:
self.log(title=f"[ERROR] ユーザー情報の登録中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
return {"result": 1, "about": "error"}
finally:
if cursor:
cursor.close()
def format_datetime(self, value):
if isinstance(value, datetime):
return value.strftime('%Y-%m-%d %H:%M:%S')
return value
def pc_register(self, **kwargs):
try:
pc_number = int(kwargs["pc_number"])
cursor = self.db.cursor()
cursor.execute("SELECT * FROM pc_list WHERE pc_number = %s", (pc_number,))
pc_list = cursor.fetchall()
if not pc_list:
cursor.execute("INSERT INTO pc_list (pc_number) VALUES (%s)", (pc_number,))
self.db.commit()
return {"result": 0, "about": "ok"}
else:
return {"result": 1, "about": "already_exists"}
except Exception as error:
self.log(title=f"[ERROR] PCの情報を登録中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
return {"result": 1, "about": "error"}
finally:
if cursor:
cursor.close()
def report_export(self, **kwargs):
try:
cursor = self.db.cursor()
csv_file_path = self.export_dir_path + "pc_usage_history.csv"
main_table = "pc_usage_history"
related_table = "club_member"
excel_file_path = self.export_dir_path + "pc_usage_history.xlsx"
# メインテーブルの列情報を取得user_idを除く
cursor.execute(psycopg2.sql.SQL("SELECT * FROM {} LIMIT 0").format(psycopg2.sql.Identifier(main_table)))
main_columns = [desc[0] for desc in cursor.description if desc[0] != 'member_id']
# クエリを作成(列名を明確に指定)
query = psycopg2.sql.SQL("""
SELECT {main_columns}, {related_table}.name
FROM {main_table}
LEFT JOIN {related_table} ON {main_table}.member_id = {related_table}.member_id
ORDER BY id
""").format(
main_columns=psycopg2.sql.SQL(', ').join([psycopg2.sql.SQL("{}.{}").format(psycopg2.sql.Identifier(main_table), psycopg2.sql.Identifier(col)) for col in main_columns]),
main_table=psycopg2.sql.Identifier(main_table),
related_table=psycopg2.sql.Identifier(related_table)
)
cursor.execute(query)
# 列名を再構成nameを2番目に配置
column_names = [main_columns[0], 'name'] + main_columns[1:]
rows = cursor.fetchall()
# Excelワークブックを作成
wb = openpyxl.Workbook()()
ws = wb.active
# 列名を書き込み
ws.append(column_names)
# データを書き込み
for row in rows:
# nameを2番目に移動
formatted_row = [self.format_datetime(row[0])] + [row[-1]] + [self.format_datetime(field) if field is not None else '' for field in row[1:-1]]
ws.append(formatted_row)
# 列幅を自動調整
for col in ws.columns:
max_length = 0
column = col[0].column_letter
for cell in col:
try:
if len(str(cell.value)) > max_length:
max_length = len(str(cell.value))
except:
pass
adjusted_width = (max_length + 2) * 1.2
ws.column_dimensions[column].width = adjusted_width
# Excelファイルを保存
wb.save(excel_file_path)
self.log(title=f"[SUCCESS] PCの使用履歴をエクスポートしました。", message=f"ファイルパス | {excel_file_path}", flag=0)
return {"result": 0, "about": "ok", "file_path": excel_file_path}
except Exception as error:
self.log(title=f"[ERROR] PCの使用履歴をエクスポート中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
return {"result": 1, "about": "error"}
finally:
if cursor:
cursor.close()
def force_stop(self, **kwargs):
try:
pc_number = kwargs["pc_number"]
cursor = self.db.cursor()
if "bot_about" in kwargs:
bot_about = kwargs["bot_about"]
cursor.execute("SELECT * FROM pc_list WHERE pc_number = %s", (pc_number,))
pc_list_record = cursor.fetchall()
pc_using_member_id = pc_list_record[0][1]
pc_password_hash = pc_list_record[0][2]
if pc_using_member_id == None:
return {"result": 1, "about": "not_used"}
else:
cursor.execute("UPDATE pc_list SET using_member_id = NULL WHERE pc_number = %s", (pc_number,))
if pc_password_hash == None:
pass
else:
cursor.execute("UPDATE pc_list SET password_hash = NULL WHERE pc_number = %s", (pc_number,))
cursor.execute("SELECT * FROM pc_usage_history WHERE member_id = %s AND pc_number = %s ORDER BY id DESC LIMIT 1", (pc_using_member_id, pc_number))
pc_usage_history_record = cursor.fetchall()
pc_usage_history_record_id = pc_usage_history_record[0][0]
keyboard_number = pc_usage_history_record[0][3]
mouse_number = pc_usage_history_record[0][4]
if keyboard_number == None:
pass
else:
# keyboard_listの使用中ユーザーを消す
cursor.execute("UPDATE keyboard_list SET using_member_id = NULL WHERE keyboard_number = %s", (keyboard_number,))
if mouse_number == None:
pass
else:
# mouse_listの使用中ユーザーを消す
cursor.execute("UPDATE mouse_list SET using_member_id = NULL WHERE mouse_number = %s", (mouse_number,))
cursor.execute("UPDATE pc_usage_history SET end_use_time = clock_timestamp(), bot_about = %s WHERE id = %s", (bot_about, pc_usage_history_record_id))
self.db.commit()
return {"result": 0, "about": "ok"}
else:
return {"result": 1, "about": "bot_about_not_found"}
except Exception as error:
self.log(title=f"[ERROR] fstop中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
return {"result": 1, "about": "error"}
finally:
if cursor:
cursor.close()
def pc_onetime_gen(self):
try:
if os.path.isfile(dislocker.onetime_config_path):
with open(dislocker.onetime_config_path, "r") as r:
onetime_config = json.load(r)
onetime = onetime_config["onetime"]["pc_register"]
if onetime == None:
onetime = str(self.password_generate(8))
onetime_config["onetime"]["pc_register"] = onetime
with open(dislocker.onetime_config_path, "w") as w:
json.dump(onetime_config, w, indent=4)
return {"result": 0, "about": "ok", "onetime": onetime}
else:
return {"result": 0, "already_exists": "ok", "onetime": onetime}
else:
onetime = str(self.password_generate(8))
onetime_config = {
"onetime": {
"pc_register": onetime,
"device_register": None
}
}
with open(dislocker.onetime_config_path, "w") as w:
json.dump(onetime_config, w, indent=4)
return {"result": 0, "about": "ok", "onetime": onetime}
except Exception as error:
self.log(title=f"[ERROR] PC登録用のワンタイムパスワードを発行中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
return {"result": 1, "about": "error"}
def device_onetime_gen(self):
try:
if os.path.isfile(dislocker.onetime_config_path):
with open(dislocker.onetime_config_path, "r") as r:
onetime_config = json.load(r)
onetime = onetime_config["onetime"]["device_register"]
if onetime == None:
onetime = str(self.password_generate(8))
onetime_config["onetime"]["device_register"] = onetime
with open(dislocker.onetime_config_path, "w") as w:
json.dump(onetime_config, w, indent=4)
return {"result": 0, "about": "ok", "onetime": onetime}
else:
return {"result": 0, "already_exists": "ok", "onetime": onetime}
else:
onetime = str(self.password_generate(8))
onetime_config = {
"onetime": {
"pc_register": None,
"device_register": onetime
}
}
with open(dislocker.onetime_config_path, "w") as w:
json.dump(onetime_config, w, indent=4)
return {"result": 0, "about": "ok", "onetime": onetime}
except Exception as error:
self.log(title=f"[ERROR] デバイス登録用のワンタイムパスワードを発行中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
return {"result": 1, "about": "error"}
dislocker = DL()
intents = discord.Intents.default()
intents.message_content = True
client = discord.Client(intents=intents)
tree = discord.app_commands.CommandTree(client)
@client.event
async def on_ready():
print("Bot is ready.")
print("Logged in as")
print(client.user.name)
print(client.user.id)
print("------")
await tree.sync()
#使用者側のスラッシュコマンド
@tree.command(name="use", description="パソコンの使用登録をします。通常はこのコマンドを使用する必要はありません。")
async def use(interaction: discord.Interaction, pc_number: int, keyboard_number: int, mouse_number: int, detail: str):
register = dislocker.register(discord_user_id=interaction.user.id, name=interaction.user.name, display_name=interaction.user.display_name, pc_number=pc_number, keyboard_number=keyboard_number, mouse_number=mouse_number, detail=detail)
if register["result"] == 0:
await interaction.response.send_message(f":white_check_mark: 使用が開始されました。\n>>> # パスワード | {register["output_dict"]["password"]}\n## PC番号 | {pc_number}\n## 使用目的 | {detail}", ephemeral=True)
dislocker.log(title=f"[INFO] PC番号{pc_number} の使用が開始されました。", message=f"名前 | {register["output_dict"]["name"]}, 使用目的 | {detail}", flag=0)
await client.get_channel(dislocker.server_config["bot"]["log_channel_id"]).send(f':white_check_mark: {register["output_dict"]["name"]} さんがPC {pc_number} の使用を開始しました。\n>>> ## PC番号 | {pc_number}\n## 使用目的 | {detail}')
elif register["result"] == 1:
if register["about"] == "pc_already_in_use_by_other":
await interaction.response.send_message(":x: 他の方がそのPCを使用中です。", ephemeral=True)
elif register["about"] == "pc_already_in_use_by_you":
await interaction.response.send_message(f":x: あなたは既にPC {register['pc_usage_history']['pc_number']} を使用中です。\n>>> ## PC番号 | {register['pc_usage_history']['pc_number']}\n## 使用目的 | {register['pc_usage_history']['use_detail']}", ephemeral=True)
elif register["about"] == "keyboard_already_in_use":
await interaction.response.send_message(":x: キーボードは既に使用中です。", ephemeral=True)
elif register["about"] == "mouse_already_in_use":
await interaction.response.send_message(":x: マウスは既に使用中です。", ephemeral=True)
elif register["about"] == "user_data_not_found":
await interaction.response.send_message(":x: ユーザーデータが見つかりませんでした。", ephemeral=True)
elif register["about"] == "error":
await interaction.response.send_message(":x: 内部エラーが発生しました。\nサーバーでエラーが発生しています。管理者に問い合わせてください。", ephemeral=True)
@tree.command(name="stop", description="パソコンの使用を終了します。通常はこのコマンドを使用する必要はありません。")
async def stop(interaction: discord.Interaction):
stop = dislocker.stop(discord_user_id=interaction.user.id)
if stop["result"] == 0:
await interaction.response.send_message(f":white_check_mark: 使用が終了されました。\n>>> ## PC番号 | {stop['output_dict']['pc_number']}", ephemeral=True)
dislocker.log(title=f"[INFO] PC番号{stop['output_dict']['pc_number']} の使用が終了されました。", message=f"名前 | {stop['output_dict']['name']}", flag=0)
await client.get_channel(dislocker.server_config["bot"]["log_channel_id"]).send(f':white_check_mark: {stop["output_dict"]["name"]} さんがPC {stop["output_dict"]["pc_number"]} の使用を終了しました。\n>>> ## PC番号 | {stop["output_dict"]["pc_number"]}')
elif stop["result"] == 1:
if stop["about"] == "unused":
await interaction.response.send_message("# :shaking_face: あなたはPCを使用されていないようです...", ephemeral=True)
elif stop["about"] == "user_data_not_found":
await interaction.response.send_message("# :dizzy_face: ユーザーとして登録されていないようです。\n最初にサーバーで登録を行ってください。", ephemeral=True)
elif stop["about"] == "error":
await interaction.response.send_message("# :skull_crossbones: 停止できませんでした。\n内部エラーが発生しています。", ephemeral=True)
#管理者側のスラッシュコマンド
@tree.command(name="userreg", description="ユーザーを登録します。")
async def userreg(interaction: discord.Interaction, discord_user_id: str, discord_user_name: str, name: str):
user_register = dislocker.user_register(discord_user_id=discord_user_id, discord_user_name=discord_user_name, name=name)
if user_register["result"] == 0:
await interaction.response.send_message(":white_check_mark: ユーザーを登録しました。", ephemeral=True)
dislocker.log(title=f"[INFO] ユーザーを登録しました。", message=f"名前 | {name}, Discordユーザー名 | {discord_user_name}, DiscordユーザーID | {discord_user_id}", flag=0)
elif user_register["result"] == 1:
if user_register["about"] == "already_exists":
await interaction.response.send_message(":x: 既に登録されているユーザーです。", ephemeral=True)
elif user_register["about"] == "error":
await interaction.response.send_message(":x: 内部エラーが発生しました。\nサーバーでエラーが発生しています。管理者に問い合わせてください。", ephemeral=True)
@tree.command(name="pcreg", description="PCを登録するためのワンタイムパスワードを発行します。")
async def pcreg(interaction: discord.Interaction):
onetime = dislocker.pc_onetime_gen()
if onetime["result"] == 0:
await interaction.response.send_message(f":white_check_mark: PC登録用のワンタイムパスワードを発行しました。\n>>> # ワンタイムパスワード | {onetime['onetime']}\n## 有効期限 | 1回の登録でのみ使用可能です。", ephemeral=True)
dislocker.log(title=f"[INFO] PC登録用のワンタイムパスワードを発行しました。", message=f"ワンタイムパスワード | {onetime['onetime']}", flag=0)
elif onetime["result"] == 1:
if onetime["about"] == "error":
await interaction.response.send_message(":x: 内部エラーが発生しました。\nサーバーでエラーが発生しています。管理者に問い合わせてください。", ephemeral=True)
@tree.command(name="devicereg", description="デバイスを登録するためのワンタイムパスワードを発行します。")
async def devicereg(interaction: discord.Interaction):
onetime = dislocker.device_onetime_gen()
if onetime["result"] == 0:
await interaction.response.send_message(f":white_check_mark: デバイス登録用のワンタイムパスワードを発行しました。\n>>> # ワンタイムパスワード | {onetime['onetime']}\n## 有効期限 | 1回の登録でのみ使用可能です。", ephemeral=True)
dislocker.log(title=f"[INFO] デバイス登録用のワンタイムパスワードを発行しました。", message=f"ワンタイムパスワード | {onetime['onetime']}", flag=0)
elif onetime["result"] == 1:
if onetime["about"] == "error":
await interaction.response.send_message(":x: 内部エラーが発生しました。\nサーバーでエラーが発生しています。管理者に問い合わせてください。", ephemeral=True)
@tree.command(name="fstop", description="PCの使用を強制終了します。")
async def fstop(interaction: discord.Interaction, pc_number: int, bot_about: str):
force_stop = dislocker.force_stop(pc_number=pc_number, bot_about=bot_about)
if force_stop["result"] == 0:
await interaction.response.send_message(f":white_check_mark: PC {pc_number} の使用を強制終了しました。", ephemeral=True)
dislocker.log(title=f"[INFO] PC {pc_number} の使用を強制終了しました。", message=f"理由 | {bot_about}", flag=0)
elif force_stop["result"] == 1:
if force_stop["about"] == "not_used":
await interaction.response.send_message(":x: そのPCは使用されていません。", ephemeral=True)
elif force_stop["about"] == "bot_about_not_found":
await interaction.response.send_message(":x: 理由が指定されていません。", ephemeral=True)
elif force_stop["about"] == "error":
await interaction.response.send_message(":x: 内部エラーが発生しました。\nサーバーでエラーが発生しています。管理者に問い合わせてください。", ephemeral=True)
@tree.command(name="report", description="PCの使用履歴をエクスポートします。")
async def report(interaction: discord.Interaction):
report_export = dislocker.report_export()
if report_export["result"] == 0:
await interaction.response.send_message(f":white_check_mark: 使用履歴のレポートです。", file=discord.File(report_export["file_path"]), ephemeral=True)
dislocker.log(title=f"[INFO] PCの使用履歴をエクスポートしました。", message=f"ファイルパス | {report_export['file_path']}", flag=0)
elif report_export["result"] == 1:
if report_export["about"] == "error":
await interaction.response.send_message(":x: 内部エラーが発生しました。\nサーバーでエラーが発生しています。管理者に問い合わせてください。", ephemeral=True)
client.run(dislocker.server_config["bot"]["token"])