698 lines
No EOL
37 KiB
Python
698 lines
No EOL
37 KiB
Python
import discord
|
||
import os
|
||
import json
|
||
import psycopg2
|
||
import datetime
|
||
import asyncio
|
||
import string
|
||
import random
|
||
import hashlib
|
||
import openpyxl
|
||
|
||
class DL():
|
||
def __init__(self):
|
||
self.config_dir_path = "./config/"
|
||
self.export_dir_path = "./export/"
|
||
self.log_dir_path = "./log/"
|
||
self.server_config_path = self.config_dir_path + "server.json"
|
||
self.onetime_config_path = self.config_dir_path + "onetime.json"
|
||
self.log_path = self.log_dir_path + "dislocker.txt"
|
||
try:
|
||
if not os.path.isdir(self.config_dir_path):
|
||
print("config ディレクトリが見つかりません... 作成します。")
|
||
os.mkdir(self.config_dir_path)
|
||
|
||
if not os.path.isfile(self.server_config_path):
|
||
print("config ファイルが見つかりません... 作成します。")
|
||
self.server_config = {
|
||
"db": {
|
||
"host": "localhost",
|
||
"port": "5432",
|
||
"db_name": "dislocker",
|
||
"username": "user",
|
||
"password": "password"
|
||
},
|
||
"bot": {
|
||
"token": "TYPE HERE BOTS TOKEN KEY",
|
||
"activity": {
|
||
"name": "Dislocker",
|
||
"details": "ロック中...",
|
||
"type": "playing",
|
||
"state": "ロック中..."
|
||
},
|
||
"log_channel_id" : "TYPE HERE CHANNEL ID (YOU MUST USE INT !!!!)",
|
||
"config_channel_id": "TYPE HERE CHANNEL ID (YOU MUST USE INT !!!!)",
|
||
"config_public_channel_id": "TYPE HERE CHANNEL ID (YOU MUST USE INT !!!!)",
|
||
"monitor": {
|
||
"search_frequency": 1,
|
||
"allowable_time": 180,
|
||
"fstop_time": "21:00:00"
|
||
},
|
||
"preset_games": ["TEST1", "TEST2", "TEST3", "TEST4", "TEST5"],
|
||
"admin_user_id": "TYPE HERE CHANNEL ID (YOU MUST USE INT !!!!)",
|
||
"debug": False
|
||
}
|
||
}
|
||
|
||
with open(self.server_config_path, "w", encoding="utf-8") as w:
|
||
json.dump(self.server_config, w, indent=4, ensure_ascii=False)
|
||
|
||
|
||
elif os.path.isfile(self.server_config_path):
|
||
with open(self.server_config_path, "r", encoding="utf-8") as r:
|
||
self.server_config = json.load(r)
|
||
print("config ファイルを読み込みました。")
|
||
|
||
|
||
if not os.path.isdir(self.export_dir_path):
|
||
print("export ディレクトリが見つかりません... 作成します。")
|
||
os.mkdir(self.export_dir_path)
|
||
|
||
if not os.path.isdir(self.log_dir_path):
|
||
print("log ディレクトリが見つかりません... 作成します。")
|
||
os.mkdir(self.log_dir_path)
|
||
|
||
if type(self.server_config["bot"]["log_channel_id"]) is not int or type(self.server_config["bot"]["config_channel_id"]) is not int:
|
||
print("config ファイル内でチャンネルIDがint型で記述されていません。int型で記述して、起動してください。")
|
||
self.init_result = "not_int"
|
||
else:
|
||
self.db = psycopg2.connect(f"host={self.server_config['db']['host']} dbname={self.server_config['db']['db_name']} port={self.server_config['db']['port']} user={self.server_config['db']['username']} password={self.server_config['db']['password']}")
|
||
cursor = self.db.cursor()
|
||
|
||
self.pc_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
|
||
self.keyboard_list = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
|
||
self.mouse_list = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
|
||
self.preset_games = self.server_config["bot"]["preset_games"]
|
||
self.debug = self.server_config["bot"]["debug"]
|
||
|
||
cursor.execute("SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'club_member')")
|
||
find_club_member_table = cursor.fetchall()
|
||
print(find_club_member_table)
|
||
if find_club_member_table[0][0] == False:
|
||
cursor.execute("CREATE TABLE club_member (member_id SERIAL NOT NULL, name TEXT NOT NULL, discord_user_name TEXT NOT NULL, discord_user_id TEXT NOT NULL, PRIMARY KEY (member_id))")
|
||
self.db.commit()
|
||
|
||
cursor.execute("SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'pc_list')")
|
||
find_pc_list_table = cursor.fetchall()
|
||
print(find_pc_list_table)
|
||
if find_pc_list_table[0][0] == False:
|
||
cursor.execute("CREATE TABLE pc_list (pc_number INTEGER NOT NULL, using_member_id INTEGER, password_hash VARCHAR(64), pc_uuid VARCHAR(36), pc_token VARCHAR(36), master_password VARCHAR(16), detail TEXT, PRIMARY KEY (pc_number), FOREIGN KEY (using_member_id) REFERENCES club_member(member_id))")
|
||
for i in self.pc_list:
|
||
print(i)
|
||
cursor.execute("INSERT INTO pc_list (pc_number) VALUES (%s)", (i,))
|
||
self.db.commit()
|
||
|
||
cursor.execute("SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'keyboard_list')")
|
||
find_keyboard_list_table = cursor.fetchall()
|
||
print(find_keyboard_list_table)
|
||
if find_keyboard_list_table[0][0] == False:
|
||
cursor.execute("CREATE TABLE keyboard_list (keyboard_number INTEGER NOT NULL, using_member_id INTEGER, device_instance_path TEXT, device_name TEXT, detail TEXT, PRIMARY KEY (keyboard_number), FOREIGN KEY (using_member_id) REFERENCES club_member(member_id))")
|
||
for i in self.keyboard_list:
|
||
print(i)
|
||
cursor.execute("INSERT INTO keyboard_list (keyboard_number) VALUES (%s)", (i,))
|
||
self.db.commit()
|
||
|
||
cursor.execute("SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'mouse_list')")
|
||
find_mouse_list_table = cursor.fetchall()
|
||
print(find_mouse_list_table)
|
||
if find_mouse_list_table[0][0] == False:
|
||
cursor.execute("CREATE TABLE mouse_list (mouse_number INTEGER NOT NULL, using_member_id INTEGER, device_instance_path TEXT, device_name TEXT, detail TEXT, PRIMARY KEY (mouse_number), FOREIGN KEY (using_member_id) REFERENCES club_member(member_id))")
|
||
for i in self.mouse_list:
|
||
print(i)
|
||
cursor.execute("INSERT INTO mouse_list (mouse_number) VALUES (%s)", (i,))
|
||
self.db.commit()
|
||
|
||
cursor.execute("SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'pc_usage_history')")
|
||
find_pc_usage_history_table = cursor.fetchall()
|
||
print(find_pc_usage_history_table)
|
||
if find_pc_usage_history_table[0][0] == False:
|
||
cursor.execute("CREATE TABLE pc_usage_history (id SERIAL NOT NULL, member_id INTEGER NOT NULL, pc_number INTEGER NOT NULL, keyboard_number INTEGER, mouse_number INTEGER, start_use_time TIMESTAMP NOT NULL, end_use_time TIMESTAMP, use_detail TEXT, bot_about TEXT, PRIMARY KEY (id), FOREIGN KEY (member_id) REFERENCES club_member(member_id), FOREIGN KEY (pc_number) REFERENCES pc_list(pc_number), FOREIGN KEY (keyboard_number) REFERENCES keyboard_list(keyboard_number), FOREIGN KEY (mouse_number) REFERENCES mouse_list(mouse_number))")
|
||
self.db.commit()
|
||
|
||
cursor.close()
|
||
self.init_result = "ok"
|
||
|
||
except (Exception) as error:
|
||
print("初回処理でエラーが発生しました。\nエラー内容\n" + str(error))
|
||
self.init_result = "error"
|
||
|
||
finally:
|
||
pass
|
||
|
||
def log(self, **kwargs):
|
||
if self.debug == True:
|
||
flag = 1
|
||
else:
|
||
if "flag" in kwargs:
|
||
if kwargs["flag"] == 1:
|
||
flag = 1
|
||
else:
|
||
flag = 0
|
||
else:
|
||
flag = 0
|
||
|
||
if flag == 1:
|
||
title = str(kwargs["title"])
|
||
if "message" in kwargs:
|
||
message = str(kwargs["message"])
|
||
else:
|
||
message = None
|
||
|
||
current_datetime = str(datetime.now())
|
||
|
||
if message == None:
|
||
detail = f"{current_datetime} | {title}\n"
|
||
else:
|
||
detail = f"{current_datetime} | {title}\n{message}\n"
|
||
|
||
print(detail)
|
||
|
||
if os.path.isfile(self.log_path):
|
||
try:
|
||
with open(self.log_path, "a", encoding="utf-8") as a:
|
||
a.write(detail)
|
||
except:
|
||
print("LOGGING ERROR mode a")
|
||
else:
|
||
try:
|
||
with open(self.log_path, "w", encoding="utf-8") as w:
|
||
w.write(detail)
|
||
except:
|
||
print("LOGGING ERROR mode w")
|
||
|
||
def password_generate(self, length):
|
||
numbers = string.digits # (1)
|
||
password = ''.join(random.choice(numbers) for _ in range(length)) # (2)
|
||
return password
|
||
|
||
def hash_genarate(self, source):
|
||
hashed = hashlib.sha256(source.encode())
|
||
return hashed.hexdigest()
|
||
|
||
def user_register_check(self, **kwargs):
|
||
try:
|
||
discord_user_id = str(kwargs["discord_user_id"])
|
||
|
||
cursor = self.db.cursor()
|
||
|
||
cursor.execute("SELECT * FROM club_member WHERE discord_user_id = %s", (discord_user_id,))
|
||
user_record = cursor.fetchall()
|
||
#ユーザーデータが見つかった場合(登録済みの場合)
|
||
if user_record:
|
||
member_id = user_record[0][0]
|
||
name = user_record[0][1]
|
||
discord_user_name = user_record[0][2]
|
||
return {"result": 0, "about": "exist", "user_info": {"member_id": member_id, "name": name, "discord_user_name": discord_user_name}}
|
||
#ユーザーデータがなかったら(未登録の場合)
|
||
else:
|
||
return {"result": 1, "about": "user_data_not_found"}
|
||
|
||
except Exception as error:
|
||
self.log(title=f"[ERROR] ユーザーの登録状態を調査中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
|
||
return {"result": 1, "about": "error"}
|
||
|
||
finally:
|
||
cursor.close()
|
||
|
||
def pc_used_check(self, **kwargs):
|
||
try:
|
||
if "pc_number" in kwargs:
|
||
pc_number = int(kwargs["pc_number"])
|
||
else:
|
||
pc_number = None
|
||
if "discord_user_id" in kwargs:
|
||
discord_user_id = str(kwargs["discord_user_id"])
|
||
else:
|
||
discord_user_id = None
|
||
if "member_id" in kwargs:
|
||
member_id = int(kwargs["member_id"])
|
||
else:
|
||
member_id = None
|
||
|
||
cursor = self.db.cursor()
|
||
|
||
if pc_number != None:
|
||
# pc番号を指定してpc_listから探す
|
||
cursor.execute("SELECT * FROM pc_list WHERE pc_number= %s", (pc_number,))
|
||
pc_list_record = cursor.fetchall()
|
||
if pc_list_record[0][1] == None:
|
||
return {"result": 0, "about": "vacent"}
|
||
else:
|
||
return {"result": 1, "about": "used_by_other"}
|
||
elif discord_user_id != None:
|
||
#ユーザーIDを指定してPC使用履歴から探す
|
||
cursor.execute("SELECT * FROM club_member WHERE discord_user_id = %s", (discord_user_id,))
|
||
user_record = cursor.fetchall()
|
||
#ユーザーデータが見つかった場合(登録済みの場合)
|
||
if user_record:
|
||
member_id = user_record[0][0]
|
||
cursor.execute("SELECT * FROM pc_usage_history WHERE member_id = %s ORDER BY id DESC LIMIT 1", (member_id,))
|
||
pc_usage_history_record = cursor.fetchall()
|
||
if pc_usage_history_record:
|
||
if pc_usage_history_record[0][6] == None:
|
||
keyboard_number = pc_usage_history_record[0][3]
|
||
mouse_number = pc_usage_history_record[0][4]
|
||
|
||
return {"result": 1, "about": "used_by_you", "pc_usage_history": {"pc_number": str(pc_usage_history_record[0][2]), "keyboard_number": keyboard_number, "mouse_number": mouse_number, "start_time": str(pc_usage_history_record[0][5]), "use_detail": str(pc_usage_history_record[0][7])}}
|
||
else:
|
||
return {"result": 0, "about": "vacent"}
|
||
else:
|
||
return {"result": 0, "about": "vacent"}
|
||
else:
|
||
return {"result": 1, "about": "user_data_not_found"}
|
||
elif member_id != None:
|
||
cursor.execute("SELECT * FROM pc_usage_history WHERE member_id = %s ORDER BY id DESC LIMIT 1", (member_id,))
|
||
pc_usage_history_record = cursor.fetchall()
|
||
if pc_usage_history_record:
|
||
if pc_usage_history_record[0][6] == None:
|
||
keyboard_number = pc_usage_history_record[0][3]
|
||
mouse_number = pc_usage_history_record[0][4]
|
||
return {"result": 1, "about": "used_by_you", "pc_usage_history": {"pc_number": str(pc_usage_history_record[0][2]), "keyboard_number": keyboard_number, "mouse_number": mouse_number, "start_time": str(pc_usage_history_record[0][5]), "use_detail": str(pc_usage_history_record[0][7])}}
|
||
else:
|
||
return {"result": 0, "about": "vacent"}
|
||
else:
|
||
return {"result": 0, "about": "vacent"}
|
||
else:
|
||
return {"result": 1, "about": "search_options_error"}
|
||
|
||
except Exception as error:
|
||
self.log(title=f"[ERROR] PCの使用状況を調査中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
|
||
return {"result": 1, "about": "error"}
|
||
|
||
finally:
|
||
if cursor:
|
||
cursor.close()
|
||
|
||
def keyboard_used_check(self, **kwargs):
|
||
try:
|
||
cursor = self.db.cursor()
|
||
if kwargs["keyboard_number"] == None:
|
||
return {"result": 0, "about": "ok"}
|
||
else:
|
||
keyboard_number = int(kwargs["keyboard_number"])
|
||
|
||
cursor.execute("SELECT * FROM keyboard_list WHERE keyboard_number=%s", (keyboard_number,))
|
||
keyboard_list_record = cursor.fetchall()
|
||
if keyboard_list_record[0][1] == None:
|
||
return {"result": 0, "about": "ok"}
|
||
else:
|
||
return {"result": 1, "about": "keyboard_already_in_use_by_other"}
|
||
|
||
except Exception as error:
|
||
self.log(title=f"[ERROR] キーボードの使用状況を調査中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
|
||
return {"result": 1, "about": "error"}
|
||
|
||
finally:
|
||
if cursor:
|
||
cursor.close()
|
||
|
||
def mouse_used_check(self, **kwargs):
|
||
try:
|
||
cursor = self.db.cursor()
|
||
if kwargs["mouse_number"] == None:
|
||
return {"result": 0, "about": "ok"}
|
||
else:
|
||
mouse_number = int(kwargs["mouse_number"])
|
||
|
||
cursor.execute("SELECT * FROM mouse_list WHERE mouse_number=%s", (mouse_number,))
|
||
mouse_list_record = cursor.fetchall()
|
||
if mouse_list_record[0][1] == None:
|
||
return {"result": 0, "about": "ok"}
|
||
else:
|
||
return {"result": 1, "about": "mouse_already_in_use_by_other"}
|
||
|
||
except Exception as error:
|
||
self.log(title=f"[ERROR] マウスの使用状況を調査中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
|
||
return {"result": 1, "about": "error"}
|
||
|
||
finally:
|
||
if cursor:
|
||
cursor.close()
|
||
|
||
|
||
def register(self, **kwargs):
|
||
try:
|
||
cursor = self.db.cursor()
|
||
user_info = {
|
||
"id": str(kwargs["discord_user_id"]),
|
||
"name": str(kwargs["name"]),
|
||
"display_name": str(kwargs["display_name"]),
|
||
"pc_number": int(kwargs["pc_number"]),
|
||
"keyboard_number": None,
|
||
"mouse_number": None,
|
||
"detail": None
|
||
}
|
||
if "detail" in kwargs:
|
||
user_info["detail"] = str(kwargs["detail"])
|
||
else:
|
||
pass
|
||
|
||
if kwargs["keyboard_number"] == "own":
|
||
pass
|
||
else:
|
||
user_info["keyboard_number"] = int(kwargs["keyboard_number"])
|
||
|
||
if kwargs["mouse_number"] == "own":
|
||
pass
|
||
else:
|
||
user_info["mouse_number"] = int(kwargs["mouse_number"])
|
||
# ユーザー登録されているかの確認
|
||
user_register = self.user_register_check(discord_user_id=user_info["id"])
|
||
if user_register["result"] == 0:
|
||
member_id = user_register["user_info"]["member_id"]
|
||
name = user_register["user_info"]["name"]
|
||
# ユーザーがPCを使っているか
|
||
pc_check_self = self.pc_used_check(member_id=member_id)
|
||
if pc_check_self["result"] == 0:
|
||
# 他の人がそのPCを使っているか
|
||
pc_check = self.pc_used_check(pc_number=user_info["pc_number"])
|
||
if pc_check["result"] == 0:
|
||
# キーボードは使われているか
|
||
keyboard_check = self.keyboard_used_check(keyboard_number=user_info["keyboard_number"])
|
||
if keyboard_check["result"] == 0:
|
||
# マウスは使われているか
|
||
mouse_check = self.mouse_used_check(mouse_number=user_info["mouse_number"])
|
||
if mouse_check["result"] == 0:
|
||
# パスワードとハッシュ作成
|
||
password = self.password_generate(4)
|
||
password_hash = self.hash_genarate(password)
|
||
# PC使用履歴のテーブルにレコードを挿入
|
||
|
||
cursor.execute("INSERT INTO pc_usage_history (member_id, pc_number, keyboard_number, mouse_number, start_use_time, use_detail) VALUES (%s, %s, %s, %s, clock_timestamp(), %s)", (member_id, user_info["pc_number"], user_info["keyboard_number"], user_info["mouse_number"], user_info["detail"]))
|
||
# PCリストの該当のレコードを更新
|
||
cursor.execute("UPDATE pc_list SET using_member_id = %s, password_hash = %s WHERE pc_number = %s", (member_id, password_hash, user_info["pc_number"]))
|
||
# キーボードリストの該当のレコードを自前(None)だったらスキップ、借りていたら更新
|
||
if user_info["keyboard_number"] == None:
|
||
pass
|
||
else:
|
||
cursor.execute("UPDATE keyboard_list SET using_member_id = %s WHERE keyboard_number = %s", (member_id, user_info["keyboard_number"]))
|
||
# マウスも同様に
|
||
if user_info["mouse_number"] == None:
|
||
pass
|
||
else:
|
||
cursor.execute("UPDATE mouse_list SET using_member_id = %s WHERE mouse_number = %s", (member_id, user_info["mouse_number"]))
|
||
self.db.commit()
|
||
return {"result": 0, "about": "ok", "output_dict": {"password": str(password), "name": str(name)}}
|
||
else:
|
||
return {"result": 1, "about": "mouse_already_in_use"}
|
||
else:
|
||
return {"result": 1, "about": "keyboard_already_in_use"}
|
||
else:
|
||
return {"result": 1, "about": "pc_already_in_use_by_other"}
|
||
else:
|
||
return {"result": 1, "about": "pc_already_in_use_by_you", "pc_usage_history": {"pc_number": pc_check_self["pc_usage_history"]["pc_number"], "keyboard_number": pc_check_self["pc_usage_history"]["keyboard_number"], "mouse_number": pc_check_self["pc_usage_history"]["mouse_number"], "start_time": pc_check_self["pc_usage_history"]["start_time"], "use_detail": pc_check_self["pc_usage_history"]["use_detail"]}}
|
||
else:
|
||
return {"result": 1, "about": "user_data_not_found"}
|
||
except Exception as error:
|
||
self.log(title=f"[ERROR] PCの使用登録中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
|
||
return {"result": 1, "about": "error"}
|
||
finally:
|
||
if cursor:
|
||
cursor.close()
|
||
|
||
|
||
def stop(self, **kwargs):
|
||
try:
|
||
cursor = self.db.cursor()
|
||
discord_user_id = str(kwargs["discord_user_id"])
|
||
if "bot_about" in kwargs:
|
||
bot_about = kwargs["bot_about"]
|
||
else:
|
||
bot_about = None
|
||
|
||
# ユーザーが登録してるかというよりはデータの取得のため
|
||
user_register = self.user_register_check(discord_user_id=discord_user_id)
|
||
member_id = user_register["user_info"]["member_id"]
|
||
name = user_register["user_info"]["name"]
|
||
|
||
if user_register["result"] == 0:
|
||
cursor.execute("SELECT * FROM pc_usage_history WHERE member_id= %s ORDER BY id DESC LIMIT 1", (member_id,))
|
||
pc_usage_history_record = cursor.fetchall()
|
||
|
||
if pc_usage_history_record:
|
||
pc_usage_history_id = pc_usage_history_record[0][0]
|
||
pc_number = pc_usage_history_record[0][2]
|
||
keyboard_number = pc_usage_history_record[0][3]
|
||
mouse_number = pc_usage_history_record[0][4]
|
||
end_use_time = pc_usage_history_record[0][6]
|
||
|
||
# 使用中のとき (使用停止時間がNoneのとき)
|
||
if end_use_time == None:
|
||
# 利用停止の理由の有無を判断
|
||
if bot_about == None:
|
||
cursor.execute("UPDATE pc_usage_history SET end_use_time = clock_timestamp() WHERE id = %s", (pc_usage_history_id,))
|
||
else:
|
||
cursor.execute("UPDATE pc_usage_history SET end_use_time = clock_timestamp(), bot_about = %s WHERE id = %s", (bot_about, pc_usage_history_id))
|
||
# pc_listの使用中ユーザーを消す
|
||
cursor.execute("UPDATE pc_list SET using_member_id = NULL, password_hash = NULL WHERE pc_number = %s", (pc_number,))
|
||
if keyboard_number == None:
|
||
pass
|
||
else:
|
||
# keyboard_listの使用中ユーザーを消す
|
||
cursor.execute("UPDATE keyboard_list SET using_member_id = NULL WHERE keyboard_number = %s", (keyboard_number,))
|
||
if mouse_number == None:
|
||
pass
|
||
else:
|
||
# mouse_listの使用中ユーザーを消す
|
||
cursor.execute("UPDATE mouse_list SET using_member_id = NULL WHERE mouse_number = %s", (mouse_number,))
|
||
self.db.commit()
|
||
return {"result": 0, "about": "ok", "output_dict": {"pc_number": str(pc_number), "name": str(name)}}
|
||
else:
|
||
return {"result": 1, "about": "unused"}
|
||
else:
|
||
return {"result": 1, "about": "unused"}
|
||
else:
|
||
return {"result": 1, "about": "user_data_not_found"}
|
||
|
||
except Exception as error:
|
||
self.log(title=f"[ERROR] PCの使用停止処理中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
|
||
return {"result": 1, "about": "error"}
|
||
|
||
finally:
|
||
if cursor:
|
||
cursor.close()
|
||
|
||
def user_register(self, **kwargs):
|
||
try:
|
||
discord_user_id = str(kwargs["discord_user_id"])
|
||
discord_user_name = str(kwargs["discord_user_name"])
|
||
name = str(kwargs["name"])
|
||
cursor = self.db.cursor()
|
||
cursor.execute("SELECT * FROM club_member WHERE discord_user_id = %s", (discord_user_id,))
|
||
user_record = cursor.fetchall()
|
||
if not user_record:
|
||
cursor.execute("INSERT INTO club_member (name, discord_user_name, discord_user_id) VALUES (%s, %s, %s)", (name, discord_user_name, discord_user_id))
|
||
self.db.commit()
|
||
return {"result": 0, "about": "ok"}
|
||
else:
|
||
return {"result": 1, "about": "already_exists"}
|
||
|
||
except Exception as error:
|
||
self.log(title=f"[ERROR] ユーザー情報の登録中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
|
||
return {"result": 1, "about": "error"}
|
||
|
||
finally:
|
||
if cursor:
|
||
cursor.close()
|
||
|
||
|
||
def format_datetime(self, value):
|
||
if isinstance(value, datetime):
|
||
return value.strftime('%Y-%m-%d %H:%M:%S')
|
||
return value
|
||
|
||
def pc_register(self, **kwargs):
|
||
try:
|
||
pc_number = int(kwargs["pc_number"])
|
||
cursor = self.db.cursor()
|
||
cursor.execute("SELECT * FROM pc_list WHERE pc_number = %s", (pc_number,))
|
||
pc_list = cursor.fetchall()
|
||
if not pc_list:
|
||
cursor.execute("INSERT INTO pc_list (pc_number) VALUES (%s)", (pc_number,))
|
||
self.db.commit()
|
||
return {"result": 0, "about": "ok"}
|
||
else:
|
||
return {"result": 1, "about": "already_exists"}
|
||
|
||
except Exception as error:
|
||
self.log(title=f"[ERROR] PCの情報を登録中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
|
||
return {"result": 1, "about": "error"}
|
||
|
||
finally:
|
||
if cursor:
|
||
cursor.close()
|
||
|
||
def report_export(self, **kwargs):
|
||
try:
|
||
cursor = self.db.cursor()
|
||
csv_file_path = self.export_dir_path + "pc_usage_history.csv"
|
||
main_table = "pc_usage_history"
|
||
related_table = "club_member"
|
||
excel_file_path = self.export_dir_path + "pc_usage_history.xlsx"
|
||
|
||
# メインテーブルの列情報を取得(user_idを除く)
|
||
cursor.execute(psycopg2.sql.SQL("SELECT * FROM {} LIMIT 0").format(psycopg2.sql.Identifier(main_table)))
|
||
main_columns = [desc[0] for desc in cursor.description if desc[0] != 'member_id']
|
||
|
||
# クエリを作成(列名を明確に指定)
|
||
query = psycopg2.sql.SQL("""
|
||
SELECT {main_columns}, {related_table}.name
|
||
FROM {main_table}
|
||
LEFT JOIN {related_table} ON {main_table}.member_id = {related_table}.member_id
|
||
ORDER BY id
|
||
""").format(
|
||
main_columns=psycopg2.sql.SQL(', ').join([psycopg2.sql.SQL("{}.{}").format(psycopg2.sql.Identifier(main_table), psycopg2.sql.Identifier(col)) for col in main_columns]),
|
||
main_table=psycopg2.sql.Identifier(main_table),
|
||
related_table=psycopg2.sql.Identifier(related_table)
|
||
)
|
||
|
||
cursor.execute(query)
|
||
|
||
# 列名を再構成(nameを2番目に配置)
|
||
column_names = [main_columns[0], 'name'] + main_columns[1:]
|
||
|
||
rows = cursor.fetchall()
|
||
|
||
# Excelワークブックを作成
|
||
wb = openpyxl.Workbook()()
|
||
ws = wb.active
|
||
|
||
# 列名を書き込み
|
||
ws.append(column_names)
|
||
|
||
# データを書き込み
|
||
for row in rows:
|
||
# nameを2番目に移動
|
||
formatted_row = [self.format_datetime(row[0])] + [row[-1]] + [self.format_datetime(field) if field is not None else '' for field in row[1:-1]]
|
||
ws.append(formatted_row)
|
||
|
||
# 列幅を自動調整
|
||
for col in ws.columns:
|
||
max_length = 0
|
||
column = col[0].column_letter
|
||
for cell in col:
|
||
try:
|
||
if len(str(cell.value)) > max_length:
|
||
max_length = len(str(cell.value))
|
||
except:
|
||
pass
|
||
adjusted_width = (max_length + 2) * 1.2
|
||
ws.column_dimensions[column].width = adjusted_width
|
||
|
||
# Excelファイルを保存
|
||
wb.save(excel_file_path)
|
||
self.log(title=f"[SUCCESS] PCの使用履歴をエクスポートしました。", message=f"ファイルパス | {excel_file_path}", flag=0)
|
||
return {"result": 0, "about": "ok", "file_path": excel_file_path}
|
||
|
||
|
||
except Exception as error:
|
||
self.log(title=f"[ERROR] PCの使用履歴をエクスポート中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
|
||
return {"result": 1, "about": "error"}
|
||
|
||
finally:
|
||
if cursor:
|
||
cursor.close()
|
||
|
||
|
||
def force_stop(self, **kwargs):
|
||
try:
|
||
pc_number = kwargs["pc_number"]
|
||
cursor = self.db.cursor()
|
||
if "bot_about" in kwargs:
|
||
bot_about = kwargs["bot_about"]
|
||
cursor.execute("SELECT * FROM pc_list WHERE pc_number = %s", (pc_number,))
|
||
pc_list_record = cursor.fetchall()
|
||
pc_using_member_id = pc_list_record[0][1]
|
||
pc_password_hash = pc_list_record[0][2]
|
||
if pc_using_member_id == None:
|
||
return {"result": 1, "about": "not_used"}
|
||
else:
|
||
cursor.execute("UPDATE pc_list SET using_member_id = NULL WHERE pc_number = %s", (pc_number,))
|
||
if pc_password_hash == None:
|
||
pass
|
||
else:
|
||
cursor.execute("UPDATE pc_list SET password_hash = NULL WHERE pc_number = %s", (pc_number,))
|
||
cursor.execute("SELECT * FROM pc_usage_history WHERE member_id = %s AND pc_number = %s ORDER BY id DESC LIMIT 1", (pc_using_member_id, pc_number))
|
||
pc_usage_history_record = cursor.fetchall()
|
||
pc_usage_history_record_id = pc_usage_history_record[0][0]
|
||
keyboard_number = pc_usage_history_record[0][3]
|
||
mouse_number = pc_usage_history_record[0][4]
|
||
if keyboard_number == None:
|
||
pass
|
||
else:
|
||
# keyboard_listの使用中ユーザーを消す
|
||
cursor.execute("UPDATE keyboard_list SET using_member_id = NULL WHERE keyboard_number = %s", (keyboard_number,))
|
||
if mouse_number == None:
|
||
pass
|
||
else:
|
||
# mouse_listの使用中ユーザーを消す
|
||
cursor.execute("UPDATE mouse_list SET using_member_id = NULL WHERE mouse_number = %s", (mouse_number,))
|
||
cursor.execute("UPDATE pc_usage_history SET end_use_time = clock_timestamp(), bot_about = %s WHERE id = %s", (bot_about, pc_usage_history_record_id))
|
||
self.db.commit()
|
||
return {"result": 0, "about": "ok"}
|
||
else:
|
||
return {"result": 1, "about": "bot_about_not_found"}
|
||
|
||
except Exception as error:
|
||
self.log(title=f"[ERROR] fstop中にエラーが発生しました。 {str(error.__class__.__name__)}", message=str(error.args), flag=1)
|
||
return {"result": 1, "about": "error"}
|
||
|
||
finally:
|
||
if cursor:
|
||
cursor.close()
|
||
|
||
dislocker = DL()
|
||
|
||
intents = discord.Intents.default()
|
||
intents.message_content = True
|
||
|
||
client = discord.Client(intents=intents)
|
||
tree = discord.app_commands.CommandTree(client)
|
||
|
||
@client.event
|
||
async def on_ready():
|
||
print("Bot is ready.")
|
||
print("Logged in as")
|
||
print(client.user.name)
|
||
print(client.user.id)
|
||
print("------")
|
||
await tree.sync()
|
||
|
||
@tree.command(name="use", description="パソコンの使用登録をします。")
|
||
async def use(interaction: discord.Interaction, pc_number: int, keyboard_number: int, mouse_number: int, detail: str):
|
||
register = dislocker.register(discord_user_id=interaction.user.id, name=interaction.user.name, display_name=interaction.user.display_name, pc_number=pc_number, keyboard_number=keyboard_number, mouse_number=mouse_number, detail=detail)
|
||
if register["result"] == 0:
|
||
await interaction.response.send_message(f":white_check_mark: 使用が開始されました。\n>>> # パスワード | {register["output_dict"]["password"]}\n## PC番号 | {pc_number}\n## 使用目的 | {detail}", ephemeral=True)
|
||
dislocker.log(title=f"[INFO] PC番号{pc_number} の使用が開始されました。", message=f"名前 | {register["output_dict"]["name"]}, 使用目的 | {detail}", flag=0)
|
||
await client.get_channel(dislocker.server_config["bot"]["log_channel_id"]).send(f':white_check_mark: {register["output_dict"]["name"]} さんがPC {pc_number} の使用を開始しました。\n>>> ## PC番号 | {pc_number}\n## 使用目的 | {detail}')
|
||
elif register["result"] == 1:
|
||
if register["about"] == "pc_already_in_use_by_other":
|
||
await interaction.response.send_message(":x: 他の方がそのPCを使用中です。", ephemeral=True)
|
||
elif register["about"] == "pc_already_in_use_by_you":
|
||
await interaction.response.send_message(f":x: あなたは既にPC {register['pc_usage_history']['pc_number']} を使用中です。\n>>> ## PC番号 | {register['pc_usage_history']['pc_number']}\n## 使用目的 | {register['pc_usage_history']['use_detail']}", ephemeral=True)
|
||
elif register["about"] == "keyboard_already_in_use":
|
||
await interaction.response.send_message(":x: キーボードは既に使用中です。", ephemeral=True)
|
||
elif register["about"] == "mouse_already_in_use":
|
||
await interaction.response.send_message(":x: マウスは既に使用中です。", ephemeral=True)
|
||
elif register["about"] == "user_data_not_found":
|
||
await interaction.response.send_message(":x: ユーザーデータが見つかりませんでした。", ephemeral=True)
|
||
elif register["about"] == "error":
|
||
await interaction.response.send_message(":x: 内部エラーが発生しました。\nサーバーでエラーが発生しています。管理者に問い合わせてください。", ephemeral=True)
|
||
|
||
@tree.command(name="stop", description="パソコンの使用を終了します。")
|
||
async def stop(interaction: discord.Interaction):
|
||
stop = dislocker.stop(discord_user_id=interaction.user.id)
|
||
if stop["result"] == 0:
|
||
await interaction.response.send_message(f":white_check_mark: 使用が終了されました。\n>>> ## PC番号 | {stop['output_dict']['pc_number']}", ephemeral=True)
|
||
dislocker.log(title=f"[INFO] PC番号{stop['output_dict']['pc_number']} の使用が終了されました。", message=f"名前 | {stop['output_dict']['name']}", flag=0)
|
||
await client.get_channel(dislocker.server_config["bot"]["log_channel_id"]).send(f':white_check_mark: {stop["output_dict"]["name"]} さんがPC {stop["output_dict"]["pc_number"]} の使用を終了しました。\n>>> ## PC番号 | {stop["output_dict"]["pc_number"]}')
|
||
elif stop["result"] == 1:
|
||
if stop["about"] == "unused":
|
||
await interaction.response.send_message(":x: あなたはPCを使用していません。", ephemeral=True)
|
||
elif stop["about"] == "user_data_not_found":
|
||
await interaction.response.send_message(":x: ユーザーデータが見つかりませんでした。", ephemeral=True)
|
||
elif stop["about"] == "error":
|
||
await interaction.response.send_message(":x: 内部エラーが発生しました。\nサーバーでエラーが発生しています。管理者に問い合わせてください。", ephemeral=True)
|
||
|
||
|
||
client.run(dislocker.server_config["bot"]["token"]) |