394 lines
No EOL
21 KiB
Python
394 lines
No EOL
21 KiB
Python
import json
|
||
import discord
|
||
import os
|
||
import psycopg2
|
||
from psycopg2 import sql
|
||
import hashlib
|
||
import string
|
||
import random
|
||
from datetime import datetime
|
||
from openpyxl import Workbook
|
||
|
||
class DL():
|
||
def __init__(self):
|
||
self.config_dir_path = "./config/"
|
||
self.export_dir_path = "./export/"
|
||
self.server_config_path = self.config_dir_path + "server.json"
|
||
try:
|
||
if not os.path.isdir(self.config_dir_path):
|
||
print("config ディレクトリが見つかりません... 作成します。")
|
||
os.mkdir(self.config_dir_path)
|
||
|
||
if not os.path.isfile(self.server_config_path):
|
||
print("config ファイルが見つかりません... 作成します。")
|
||
self.server_config = {
|
||
"db": {
|
||
"host": "localhost",
|
||
"port": "5432",
|
||
"db_name": "dislocker",
|
||
"username": "user",
|
||
"password": "password"
|
||
},
|
||
"bot": {
|
||
"token": "TYPE HERE BOTS TOKEN KEY",
|
||
"log_channel_id" : "TYPE HERE CHANNEL ID (YOU MUST USE INT !!!!)",
|
||
"config_channel_id": "TYPE HERE CHANNEL ID (YOU MUST USE INT !!!!)"
|
||
}
|
||
}
|
||
|
||
with open(self.server_config_path, "w") as w:
|
||
json.dump(self.server_config, w, indent=4)
|
||
|
||
|
||
elif os.path.isfile(self.server_config_path):
|
||
with open(self.server_config_path, "r") as r:
|
||
self.server_config = json.load(r)
|
||
print("config ファイルを読み込みました。")
|
||
|
||
|
||
if not os.path.isdir(self.export_dir_path):
|
||
print("export ディレクトリが見つかりません... 作成します。")
|
||
os.mkdir(self.export_dir_path)
|
||
|
||
if type(self.server_config["bot"]["log_channel_id"]) is not int or type(self.server_config["bot"]["config_channel_id"]) is not int:
|
||
print("config ファイル内でチャンネルIDがint型で記述されていません。int型で記述して、起動してください。")
|
||
self.init_result = "not_int"
|
||
else:
|
||
self.db = psycopg2.connect(f"host={self.server_config['db']['host']} dbname={self.server_config['db']['db_name']} port={self.server_config['db']['port']} user={self.server_config['db']['username']} password={self.server_config['db']['password']}")
|
||
self.init_result = "ok"
|
||
|
||
except (Exception) as error:
|
||
print("初回処理でエラーが発生しました。\nエラー内容\n" + str(error))
|
||
self.init_result = "error"
|
||
|
||
finally:
|
||
pass
|
||
|
||
|
||
class Bot(discord.Client):
|
||
def password_generate(self, length):
|
||
numbers = string.digits # (1)
|
||
password = ''.join(random.choice(numbers) for _ in range(length)) # (2)
|
||
return password
|
||
|
||
def hash_genarate(self, source):
|
||
hashed = hashlib.md5(source.encode())
|
||
return hashed.hexdigest()
|
||
|
||
def register(self, **kwargs):
|
||
try:
|
||
discord_user_id = str(kwargs["user_id"])
|
||
pc_number = int(kwargs["pc_number"])
|
||
device_number = int(kwargs["device_number"])
|
||
if "detail" in kwargs:
|
||
detail = str(kwargs["detail"])
|
||
else:
|
||
detail = None
|
||
#パスワード生成、ハッシュ化
|
||
password = self.password_generate(4)
|
||
password_hash = self.hash_genarate(password)
|
||
print("password generated")
|
||
#メンバーリストと送信者を照合
|
||
cursor = dislocker.db.cursor()
|
||
cursor.execute("SELECT * FROM club_member WHERE discord_userid = %s", (discord_user_id,))
|
||
user_record = cursor.fetchall()
|
||
#ユーザーデータがなかったら(未登録の場合)
|
||
if not user_record:
|
||
result = {"result": "user_data_not_found"}
|
||
#ユーザーデータが見つかった場合(登録済みの場合)
|
||
else:
|
||
print("found user data")
|
||
cursor.execute("SELECT * FROM pc_usage_history WHERE member_id=%s ORDER BY id DESC LIMIT 1", (user_record[0][0],))
|
||
pc_usage_history_record = cursor.fetchall()
|
||
if pc_usage_history_record:
|
||
print("used")
|
||
if pc_usage_history_record[0][5] == None:
|
||
result = {"result": "pc_already_in_use_by_you", "pc_number": str(pc_usage_history_record[0][2]), "device_number": str(pc_usage_history_record[0][3]), "start_time": str(pc_usage_history_record[0][4]), "detail": str(pc_usage_history_record[0][6])}
|
||
else:
|
||
cursor.execute("SELECT * FROM pc_list WHERE pc_number=%s", (pc_number,))
|
||
pc_list_record = cursor.fetchall()
|
||
if not pc_list_record[0][1] == None:
|
||
result = {"result": "pc_already_in_use_by_other"}
|
||
else:
|
||
if detail == None:
|
||
cursor.execute("INSERT INTO pc_usage_history (member_id, pc_number, device_number, start_use_time) VALUES (%s, %s, %s, current_timestamp)", (user_record[0][0], pc_number, device_number))
|
||
#使用用途があるとき
|
||
else:
|
||
cursor.execute("INSERT INTO pc_usage_history (member_id, pc_number, device_number, start_use_time, use_detail) VALUES (%s, %s, %s, current_timestamp, %s)", (user_record[0][0], pc_number, device_number, detail))
|
||
|
||
cursor.execute("UPDATE pc_list SET using_user_id = %s WHERE pc_number = %s", (user_record[0][0], pc_number))
|
||
cursor.execute("UPDATE pc_list SET password_hash = %s WHERE pc_number = %s", (password_hash, pc_number))
|
||
dislocker.db.commit()
|
||
result = {"result": "ok", "password": str(password), "name": str(user_record[0][1])}
|
||
else:
|
||
cursor.execute("SELECT * FROM pc_list WHERE pc_number=%s", (pc_number,))
|
||
pc_list_record = cursor.fetchall()
|
||
if not pc_list_record[0][1] == None:
|
||
result = {"result": "pc_already_in_use_by_other"}
|
||
else:
|
||
if detail == None:
|
||
cursor.execute("INSERT INTO pc_usage_history (member_id, pc_number, device_number, start_use_time) VALUES (%s, %s, %s, current_timestamp)", (user_record[0][0], pc_number, device_number))
|
||
#使用用途があるとき
|
||
else:
|
||
cursor.execute("INSERT INTO pc_usage_history (member_id, pc_number, device_number, start_use_time, use_detail) VALUES (%s, %s, %s, current_timestamp, %s)", (user_record[0][0], pc_number, device_number, detail))
|
||
|
||
cursor.execute("UPDATE pc_list SET using_user_id = %s WHERE pc_number = %s", (user_record[0][0], pc_number))
|
||
cursor.execute("UPDATE pc_list SET password_hash = %s WHERE pc_number = %s", (password_hash, pc_number))
|
||
dislocker.db.commit()
|
||
result = {"result": "ok", "password": str(password), "name": str(user_record[0][1])}
|
||
except:
|
||
print("登録処理中にエラーが発生しました。")
|
||
result = {"result": "error"}
|
||
|
||
finally:
|
||
cursor.close()
|
||
return result
|
||
|
||
def stop(self, **kwargs):
|
||
try:
|
||
discord_user_id = str(kwargs["user_id"])
|
||
cursor = dislocker.db.cursor()
|
||
cursor.execute("SELECT * FROM club_member WHERE discord_userid = %s", (discord_user_id,))
|
||
user_record = cursor.fetchall()
|
||
cursor.execute("SELECT * FROM pc_usage_history WHERE member_id= %s ORDER BY id DESC LIMIT 1", (user_record[0][0],))
|
||
pc_usage_history_record = cursor.fetchall()
|
||
if pc_usage_history_record:
|
||
if not pc_usage_history_record[0][5] == None:
|
||
result = {"result": "unused"}
|
||
else:
|
||
cursor.execute("UPDATE pc_usage_history SET end_use_time = current_timestamp WHERE id = %s", (pc_usage_history_record[0][0],))
|
||
cursor.execute("UPDATE pc_list SET using_user_id = NULL WHERE pc_number = %s", (pc_usage_history_record[0][2],))
|
||
cursor.execute("UPDATE pc_list SET password_hash = NULL WHERE pc_number = %s", (pc_usage_history_record[0][2],))
|
||
dislocker.db.commit()
|
||
result = {"result": "ok", "pc_number": str(pc_usage_history_record[0][2]), "name": str(user_record[0][1])}
|
||
except:
|
||
print("停止処理にエラーが発生しました。")
|
||
result = {"result": "error"}
|
||
|
||
finally:
|
||
cursor.close()
|
||
return result
|
||
|
||
def user_register(self, **kwargs):
|
||
try:
|
||
discord_user_id = str(kwargs["discord_user_id"])
|
||
discord_user_name = str(kwargs["discord_user_name"])
|
||
name = str(kwargs["name"])
|
||
cursor = dislocker.db.cursor()
|
||
cursor.execute("SELECT * FROM club_member WHERE discord_userid = %s", (discord_user_id,))
|
||
user_record = cursor.fetchall()
|
||
if not user_record:
|
||
cursor.execute("INSERT INTO club_member (name, discord_username, discord_userid) VALUES (%s, %s, %s)", (name, discord_user_name, discord_user_id))
|
||
dislocker.db.commit()
|
||
result = {"result": "ok"}
|
||
else:
|
||
result = {"result": "already_exists"}
|
||
|
||
except:
|
||
print("ユーザー登録中にエラーが発生しました。")
|
||
result = {"result": "error"}
|
||
|
||
finally:
|
||
cursor.close()
|
||
return result
|
||
|
||
def format_datetime(self, value):
|
||
if isinstance(value, datetime):
|
||
return value.strftime('%Y-%m-%d %H:%M:%S')
|
||
return value
|
||
|
||
def report_export(self, **kwargs):
|
||
try:
|
||
cursor = dislocker.db.cursor()
|
||
csv_file_path = dislocker.export_dir_path + "pc_usage_history.csv"
|
||
main_table = "pc_usage_history"
|
||
related_table = "club_member"
|
||
excel_file_path = dislocker.export_dir_path + "pc_usage_history.xlsx"
|
||
|
||
# メインテーブルの列情報を取得(user_idを除く)
|
||
cursor.execute(sql.SQL("SELECT * FROM {} LIMIT 0").format(sql.Identifier(main_table)))
|
||
main_columns = [desc[0] for desc in cursor.description if desc[0] != 'member_id']
|
||
|
||
# クエリを作成(列名を明確に指定)
|
||
query = sql.SQL("""
|
||
SELECT {main_columns}, {related_table}.name
|
||
FROM {main_table}
|
||
LEFT JOIN {related_table} ON {main_table}.member_id = {related_table}.id
|
||
ORDER BY id
|
||
""").format(
|
||
main_columns=sql.SQL(', ').join([sql.SQL("{}.{}").format(sql.Identifier(main_table), sql.Identifier(col)) for col in main_columns]),
|
||
main_table=sql.Identifier(main_table),
|
||
related_table=sql.Identifier(related_table)
|
||
)
|
||
|
||
cursor.execute(query)
|
||
|
||
# 列名を再構成(nameを2番目に配置)
|
||
column_names = [main_columns[0], 'name'] + main_columns[1:]
|
||
|
||
rows = cursor.fetchall()
|
||
|
||
# Excelワークブックを作成
|
||
wb = Workbook()
|
||
ws = wb.active
|
||
|
||
# 列名を書き込み
|
||
ws.append(column_names)
|
||
|
||
# データを書き込み
|
||
for row in rows:
|
||
# nameを2番目に移動
|
||
formatted_row = [self.format_datetime(row[0])] + [row[-1]] + [self.format_datetime(field) if field is not None else '' for field in row[1:-1]]
|
||
ws.append(formatted_row)
|
||
|
||
# 列幅を自動調整
|
||
for col in ws.columns:
|
||
max_length = 0
|
||
column = col[0].column_letter
|
||
for cell in col:
|
||
try:
|
||
if len(str(cell.value)) > max_length:
|
||
max_length = len(str(cell.value))
|
||
except:
|
||
pass
|
||
adjusted_width = (max_length + 2) * 1.2
|
||
ws.column_dimensions[column].width = adjusted_width
|
||
|
||
# Excelファイルを保存
|
||
wb.save(excel_file_path)
|
||
|
||
print(f"テーブル '{main_table}' の内容を '{excel_file_path}' に出力しました。")
|
||
|
||
except (Exception, psycopg2.Error) as error:
|
||
print("使用履歴のエクスポート時にエラーが発生しました\nエラー内容\n", str(error))
|
||
result = {"result": "export_error"}
|
||
|
||
finally:
|
||
cursor.close()
|
||
return result
|
||
|
||
def force_stop(self, **kwargs):
|
||
try:
|
||
pc_number = kwargs["pc_number"]
|
||
cursor = dislocker.db.cursor()
|
||
cursor.execute("SELECT * FROM pc_list WHERE pc_number = %s", (pc_number,))
|
||
pc_list_record = cursor.fetchall()
|
||
if not pc_list_record[0][1] == None:
|
||
cursor.execute("UPDATE pc_list SET using_user_id = NULL WHERE pc_number = %s", (pc_number,))
|
||
|
||
if not pc_list_record[0][2] == None:
|
||
cursor.execute("UPDATE pc_list SET password_hash = NULL WHERE pc_number = %s", (pc_number,))
|
||
|
||
cursor.execute("SELECT * FROM pc_usage_history WHERE member_id = %s AND pc_number = %s ORDER BY id DESC LIMIT 1", (pc_list_record[0][1], pc_number))
|
||
pc_usage_history_record = cursor.fetchall()
|
||
cursor.execute("UPDATE pc_usage_history SET end_use_time = current_timestamp WHERE id = %s", (pc_usage_history_record[0][0],))
|
||
dislocker.db.commit()
|
||
result = {"result": "ok"}
|
||
|
||
else:
|
||
result = {"result": "not_used"}
|
||
|
||
except:
|
||
result = {"result": "error"}
|
||
|
||
finally:
|
||
cursor.close()
|
||
return result
|
||
|
||
async def on_ready(self):
|
||
print("DiscordのBotが起動しました。")
|
||
|
||
async def on_message(self, message):
|
||
if message.author.bot:
|
||
pass
|
||
|
||
elif isinstance(message.channel, discord.DMChannel):
|
||
msg_split = message.content.split()
|
||
if msg_split[0] == "/password" or msg_split[0] == "/start":
|
||
#メッセージの要素が2つ以下の場合は拒否
|
||
if len(msg_split) <= 2:
|
||
await message.channel.send("PC番号、もしくはデバイス番号が入力されていません。")
|
||
#メッセージの要素が3つ以上の場合
|
||
elif len(msg_split) >= 3:
|
||
#番号が数字であることを確認
|
||
if msg_split[1].isdigit() and msg_split[2].isdigit():
|
||
#PC番号が1以上10以下であることを確認
|
||
if int(msg_split[1]) <= 10 and int(msg_split[1]) >= 1:
|
||
if len(msg_split) == 3:
|
||
register = self.register(user_id=message.author.id, pc_number=msg_split[1], device_number=msg_split[2])
|
||
elif len(msg_split) == 4:
|
||
register = self.register(user_id=message.author.id, pc_number=msg_split[1], device_number=msg_split[2], detail=msg_split[3])
|
||
|
||
if register["result"] == "ok":
|
||
if len(msg_split) == 3:
|
||
await message.channel.send(f"使用が開始されました。\nパスワード | {register["password"]}\nPC番号 | {msg_split[1]}\nデバイス番号 | {msg_split[2]}")
|
||
elif len(msg_split) == 4:
|
||
await message.channel.send(f"使用が開始されました。\nパスワード | {register["password"]}\nPC番号 | {msg_split[1]}\nデバイス番号 | {msg_split[2]}\n使用目的 | {msg_split[3]}")
|
||
await self.get_channel(dislocker.server_config["bot"]["log_channel_id"]).send(f'{register["name"]} さんがPC {msg_split[1]} を使用しています')
|
||
elif register["result"] == "user_data_not_found":
|
||
await message.channel.send("ユーザーとして登録されていないようです。管理者に問い合わせてください。")
|
||
elif register["result"] == "pc_already_in_use_by_you":
|
||
await message.channel.send(f"あなたはPCをもう使用されているようです。使用状態を解除するには /stop で使用終了をお知らせください。\nPC番号 | {register["pc_number"]}\nデバイス番号 | {register["device_number"]}\n使用開始時刻 | {register["start_time"]}\n使用目的 | {register["detail"]}")
|
||
elif register["result"] == "pc_already_in_use_by_other":
|
||
await message.channel.send(f"そのPCは他のメンバーによって使用されています。別のPC番号を指定して、再度お試しください。")
|
||
else:
|
||
await message.channel.send("番号がおかしいようです。")
|
||
else:
|
||
await message.channel.send("指定された番号は不正です。")
|
||
|
||
elif msg_split[0] == "/stop":
|
||
stop = self.stop(user_id=message.author.id)
|
||
if stop["result"] == "unused":
|
||
await message.channel.send("使用されていないようです...")
|
||
elif stop["result"] == "ok":
|
||
await message.channel.send(f"PC番号 {stop["pc_number"]} の使用が終了されました。")
|
||
await self.get_channel(dislocker.server_config["bot"]["log_channel_id"]).send(f'{stop["name"]} さんがPC {stop["pc_number"]} の使用を終了しました')
|
||
|
||
elif message.channel.id == dislocker.server_config["bot"]["config_channel_id"]:
|
||
msg_split = message.content.split()
|
||
if msg_split[0] == "/register":
|
||
if len(msg_split) <= 3:
|
||
await message.channel.send("名前、Discordのユーザー名、DiscordのユーザーIDのいずれかが入力されていません。")
|
||
elif len(msg_split) == 4:
|
||
if msg_split[3].isdigit():
|
||
register = self.user_register(name=msg_split[1], discord_user_name=msg_split[2], discord_user_id=msg_split[3])
|
||
if register["result"] == "ok":
|
||
await message.channel.send(f"登録が完了しました。\n名前 | {msg_split[1]}\nDiscordのユーザー名 | {msg_split[2]}\nDiscordのユーザーID | {msg_split[3]}")
|
||
elif register["result"] == "already_exists":
|
||
await message.channel.send("そのDiscordアカウントはすでに登録されています。")
|
||
else:
|
||
await message.channel.send("DiscordのユーザーIDが不正です。")
|
||
else:
|
||
await message.channel.send("なんでかわからんけど不正です。")
|
||
|
||
elif msg_split[0] == "/export":
|
||
export = self.report_export()
|
||
if export["result"] == "ok":
|
||
await message.channel.send("使用履歴のレポートです。", file=discord.File(export["file_path"]))
|
||
pass
|
||
elif export["result"] == "export_error":
|
||
await message.channel.send("エクスポートに失敗しました。")
|
||
|
||
elif msg_split[0] == "/fstop":
|
||
if len(msg_split) == 1:
|
||
await message.channel.send("PC番号を指定してください。")
|
||
elif len(msg_split) == 2:
|
||
fstop = self.force_stop(pc_number=msg_split[1])
|
||
if fstop["result"] == "ok":
|
||
await message.channel.send(f"PC番号 {msg_split[1]} の使用登録を解除しました。")
|
||
elif fstop["result"] == "not_used":
|
||
await message.channel.send("PCは使用されていないようです...")
|
||
else:
|
||
await message.channel.send("エラーが発生しました。")
|
||
else:
|
||
await message.channel.send("引数が多すぎます。")
|
||
|
||
|
||
dislocker = DL()
|
||
if dislocker.init_result == "ok":
|
||
intents = discord.Intents.default()
|
||
intents.message_content = True
|
||
bot = Bot(intents=intents)
|
||
bot.run(dislocker.server_config['bot']['token'])
|
||
else:
|
||
pass |