Compare commits

..

No commits in common. "707e2e3ed6886c6a52285d1372fdb20797aae6b9" and "e3d9f73fd737c2cc987c89b8a4d93a602dc8435f" have entirely different histories.

4 changed files with 152 additions and 548 deletions

View file

@ -2,70 +2,14 @@ import json
import discord import discord
import os import os
import psycopg2 import psycopg2
from psycopg2 import sql
import hashlib import hashlib
import string import string
import random import random
from datetime import datetime
from openpyxl import Workbook
class DL():
def __init__(self):
self.config_dir_path = "./config/"
self.export_dir_path = "./export/"
self.server_config_path = self.config_dir_path + "server.json"
try:
if not os.path.isdir(self.config_dir_path):
print("config ディレクトリが見つかりません... 作成します。")
os.mkdir(self.config_dir_path)
if not os.path.isfile(self.server_config_path):
print("config ファイルが見つかりません... 作成します。")
self.server_config = {
"db": {
"host": "localhost",
"port": "5432",
"db_name": "dislocker",
"username": "user",
"password": "password"
},
"bot": {
"token": "TYPE HERE BOTS TOKEN KEY",
"log_channel_id" : "TYPE HERE CHANNEL ID (YOU MUST USE INT !!!!)",
"config_channel_id": "TYPE HERE CHANNEL ID (YOU MUST USE INT !!!!)"
}
}
with open(self.server_config_path, "w") as w:
json.dump(self.server_config, w, indent=4)
elif os.path.isfile(self.server_config_path):
with open(self.server_config_path, "r") as r:
self.server_config = json.load(r)
print("config ファイルを読み込みました。")
if not os.path.isdir(self.export_dir_path):
print("export ディレクトリが見つかりません... 作成します。")
os.mkdir(self.export_dir_path)
if type(self.server_config["bot"]["log_channel_id"]) is not int or type(self.server_config["bot"]["config_channel_id"]) is not int:
print("config ファイル内でチャンネルIDがint型で記述されていません。int型で記述して、起動してください。")
self.init_result = "not_int"
else:
self.db = psycopg2.connect(f"host={self.server_config['db']['host']} dbname={self.server_config['db']['db_name']} port={self.server_config['db']['port']} user={self.server_config['db']['username']} password={self.server_config['db']['password']}")
self.init_result = "ok"
except (Exception) as error:
print("初回処理でエラーが発生しました。\nエラー内容\n" + str(error))
self.init_result = "error"
finally:
pass
class Bot(discord.Client): class Bot(discord.Client):
def db_connect(self, host, db, port, user, password):
self.db = psycopg2.connect(f"host={host} dbname={db} port={port} user={user} password={password}")
def password_generate(self, length): def password_generate(self, length):
numbers = string.digits # (1) numbers = string.digits # (1)
password = ''.join(random.choice(numbers) for _ in range(length)) # (2) password = ''.join(random.choice(numbers) for _ in range(length)) # (2)
@ -75,51 +19,34 @@ class Bot(discord.Client):
hashed = hashlib.md5(source.encode()) hashed = hashlib.md5(source.encode())
return hashed.hexdigest() return hashed.hexdigest()
def register(self, **kwargs): def register(self, **kwrags):
try: discord_user_id = str(kwrags["user_id"])
discord_user_id = str(kwargs["user_id"]) pc_number = int(kwrags["pc_number"])
pc_number = int(kwargs["pc_number"]) device_number = int(kwrags["device_number"])
device_number = int(kwargs["device_number"]) if "detail" in kwrags:
if "detail" in kwargs: detail = str(kwrags["detail"])
detail = str(kwargs["detail"]) else:
else: detail = None
detail = None #パスワード生成、ハッシュ化
#パスワード生成、ハッシュ化 password = self.password_generate(4)
password = self.password_generate(4) password_hash = self.hash_genarate(password)
password_hash = self.hash_genarate(password) print("password generated")
print("password generated") #メンバーリストと送信者を照合
#メンバーリストと送信者を照合 cursor = self.db.cursor()
cursor = dislocker.db.cursor() cursor.execute("SELECT * FROM club_member WHERE discord_userid = %s", (discord_user_id,))
cursor.execute("SELECT * FROM club_member WHERE discord_userid = %s", (discord_user_id,)) user_record = cursor.fetchall()
user_record = cursor.fetchall() #ユーザーデータがなかったら(未登録の場合)
#ユーザーデータがなかったら(未登録の場合) if not user_record:
if not user_record: result = {"result": "user_data_not_found"}
result = {"result": "user_data_not_found"} #ユーザーデータが見つかった場合(登録済みの場合)
#ユーザーデータが見つかった場合(登録済みの場合) else:
else: print("found user data")
print("found user data") cursor.execute("SELECT * FROM pc_usage_history WHERE member_id=%s ORDER BY id DESC LIMIT 1", (user_record[0][0],))
cursor.execute("SELECT * FROM pc_usage_history WHERE member_id=%s ORDER BY id DESC LIMIT 1", (user_record[0][0],)) pc_usage_history_record = cursor.fetchall()
pc_usage_history_record = cursor.fetchall() if pc_usage_history_record:
if pc_usage_history_record: print("used")
print("used") if pc_usage_history_record[0][5] == None:
if pc_usage_history_record[0][5] == None: result = {"result": "pc_already_in_use_by_you", "pc_number": str(pc_usage_history_record[0][2]), "device_number": str(pc_usage_history_record[0][3]), "start_time": str(pc_usage_history_record[0][4]), "detail": str(pc_usage_history_record[0][6])}
result = {"result": "pc_already_in_use_by_you", "pc_number": str(pc_usage_history_record[0][2]), "device_number": str(pc_usage_history_record[0][3]), "start_time": str(pc_usage_history_record[0][4]), "detail": str(pc_usage_history_record[0][6])}
else:
cursor.execute("SELECT * FROM pc_list WHERE pc_number=%s", (pc_number,))
pc_list_record = cursor.fetchall()
if not pc_list_record[0][1] == None:
result = {"result": "pc_already_in_use_by_other"}
else:
if detail == None:
cursor.execute("INSERT INTO pc_usage_history (member_id, pc_number, device_number, start_use_time) VALUES (%s, %s, %s, current_timestamp)", (user_record[0][0], pc_number, device_number))
#使用用途があるとき
else:
cursor.execute("INSERT INTO pc_usage_history (member_id, pc_number, device_number, start_use_time, use_detail) VALUES (%s, %s, %s, current_timestamp, %s)", (user_record[0][0], pc_number, device_number, detail))
cursor.execute("UPDATE pc_list SET using_user_id = %s WHERE pc_number = %s", (user_record[0][0], pc_number))
cursor.execute("UPDATE pc_list SET password_hash = %s WHERE pc_number = %s", (password_hash, pc_number))
dislocker.db.commit()
result = {"result": "ok", "password": str(password), "name": str(user_record[0][1])}
else: else:
cursor.execute("SELECT * FROM pc_list WHERE pc_number=%s", (pc_number,)) cursor.execute("SELECT * FROM pc_list WHERE pc_number=%s", (pc_number,))
pc_list_record = cursor.fetchall() pc_list_record = cursor.fetchall()
@ -134,170 +61,64 @@ class Bot(discord.Client):
cursor.execute("UPDATE pc_list SET using_user_id = %s WHERE pc_number = %s", (user_record[0][0], pc_number)) cursor.execute("UPDATE pc_list SET using_user_id = %s WHERE pc_number = %s", (user_record[0][0], pc_number))
cursor.execute("UPDATE pc_list SET password_hash = %s WHERE pc_number = %s", (password_hash, pc_number)) cursor.execute("UPDATE pc_list SET password_hash = %s WHERE pc_number = %s", (password_hash, pc_number))
dislocker.db.commit() self.db.commit()
result = {"result": "ok", "password": str(password), "name": str(user_record[0][1])} result = {"result": "ok", "password": str(password), "name": str(user_record[0][1])}
except: else:
print("登録処理中にエラーが発生しました。") cursor.execute("SELECT * FROM pc_list WHERE pc_number=%s", (pc_number,))
result = {"result": "error"} pc_list_record = cursor.fetchall()
if not pc_list_record[0][1] == None:
finally: result = {"result": "pc_already_in_use_by_other"}
cursor.close()
return result
def stop(self, **kwargs):
try:
discord_user_id = str(kwargs["user_id"])
cursor = dislocker.db.cursor()
cursor.execute("SELECT * FROM club_member WHERE discord_userid = %s", (discord_user_id,))
user_record = cursor.fetchall()
cursor.execute("SELECT * FROM pc_usage_history WHERE member_id= %s ORDER BY id DESC LIMIT 1", (user_record[0][0],))
pc_usage_history_record = cursor.fetchall()
if pc_usage_history_record:
if not pc_usage_history_record[0][5] == None:
result = {"result": "unused"}
else: else:
cursor.execute("UPDATE pc_usage_history SET end_use_time = current_timestamp WHERE id = %s", (pc_usage_history_record[0][0],)) if detail == None:
cursor.execute("UPDATE pc_list SET using_user_id = NULL WHERE pc_number = %s", (pc_usage_history_record[0][2],)) cursor.execute("INSERT INTO pc_usage_history (member_id, pc_number, device_number, start_use_time) VALUES (%s, %s, %s, current_timestamp)", (user_record[0][0], pc_number, device_number))
cursor.execute("UPDATE pc_list SET password_hash = NULL WHERE pc_number = %s", (pc_usage_history_record[0][2],)) #使用用途があるとき
dislocker.db.commit() else:
result = {"result": "ok", "pc_number": str(pc_usage_history_record[0][2]), "name": str(user_record[0][1])} cursor.execute("INSERT INTO pc_usage_history (member_id, pc_number, device_number, start_use_time, use_detail) VALUES (%s, %s, %s, current_timestamp, %s)", (user_record[0][0], pc_number, device_number, detail))
except:
print("停止処理にエラーが発生しました。") cursor.execute("UPDATE pc_list SET using_user_id = %s WHERE pc_number = %s", (user_record[0][0], pc_number))
result = {"result": "error"} cursor.execute("UPDATE pc_list SET password_hash = %s WHERE pc_number = %s", (password_hash, pc_number))
self.db.commit()
result = {"result": "ok", "password": str(password), "name": str(user_record[0][1])}
finally: return result
cursor.close()
return result
def user_register(self, **kwargs): def stop(self, **kwrags):
try: discord_user_id = str(kwrags["user_id"])
discord_user_id = str(kwargs["discord_user_id"]) cursor = self.db.cursor()
discord_user_name = str(kwargs["discord_user_name"]) cursor.execute("SELECT * FROM club_member WHERE discord_userid = %s", (discord_user_id,))
name = str(kwargs["name"]) user_record = cursor.fetchall()
cursor = dislocker.db.cursor() cursor.execute("SELECT * FROM pc_usage_history WHERE member_id= %s ORDER BY id DESC LIMIT 1", (user_record[0][0],))
cursor.execute("SELECT * FROM club_member WHERE discord_userid = %s", (discord_user_id,)) pc_usage_history_record = cursor.fetchall()
user_record = cursor.fetchall() if pc_usage_history_record:
if not user_record: if not pc_usage_history_record[0][5] == None:
cursor.execute("INSERT INTO club_member (name, discord_username, discord_userid) VALUES (%s, %s, %s)", (name, discord_user_name, discord_user_id)) result = {"result": "unused"}
dislocker.db.commit()
result = {"result": "ok"}
else: else:
result = {"result": "already_exists"}
except:
print("ユーザー登録中にエラーが発生しました。")
result = {"result": "error"}
finally:
cursor.close()
return result
def format_datetime(self, value):
if isinstance(value, datetime):
return value.strftime('%Y-%m-%d %H:%M:%S')
return value
def report_export(self, **kwargs):
try:
cursor = dislocker.db.cursor()
csv_file_path = dislocker.export_dir_path + "pc_usage_history.csv"
main_table = "pc_usage_history"
related_table = "club_member"
excel_file_path = dislocker.export_dir_path + "pc_usage_history.xlsx"
# メインテーブルの列情報を取得user_idを除く
cursor.execute(sql.SQL("SELECT * FROM {} LIMIT 0").format(sql.Identifier(main_table)))
main_columns = [desc[0] for desc in cursor.description if desc[0] != 'member_id']
# クエリを作成(列名を明確に指定)
query = sql.SQL("""
SELECT {main_columns}, {related_table}.name
FROM {main_table}
LEFT JOIN {related_table} ON {main_table}.member_id = {related_table}.id
ORDER BY id
""").format(
main_columns=sql.SQL(', ').join([sql.SQL("{}.{}").format(sql.Identifier(main_table), sql.Identifier(col)) for col in main_columns]),
main_table=sql.Identifier(main_table),
related_table=sql.Identifier(related_table)
)
cursor.execute(query)
# 列名を再構成nameを2番目に配置
column_names = [main_columns[0], 'name'] + main_columns[1:]
rows = cursor.fetchall()
# Excelワークブックを作成
wb = Workbook()
ws = wb.active
# 列名を書き込み
ws.append(column_names)
# データを書き込み
for row in rows:
# nameを2番目に移動
formatted_row = [self.format_datetime(row[0])] + [row[-1]] + [self.format_datetime(field) if field is not None else '' for field in row[1:-1]]
ws.append(formatted_row)
# 列幅を自動調整
for col in ws.columns:
max_length = 0
column = col[0].column_letter
for cell in col:
try:
if len(str(cell.value)) > max_length:
max_length = len(str(cell.value))
except:
pass
adjusted_width = (max_length + 2) * 1.2
ws.column_dimensions[column].width = adjusted_width
# Excelファイルを保存
wb.save(excel_file_path)
print(f"テーブル '{main_table}' の内容を '{excel_file_path}' に出力しました。")
result = {"result": "ok", "file_path": excel_file_path}
except (Exception, psycopg2.Error) as error:
print("使用履歴のエクスポート時にエラーが発生しました\nエラー内容\n", str(error))
result = {"result": "export_error"}
finally:
cursor.close()
return result
def force_stop(self, **kwargs):
try:
pc_number = kwargs["pc_number"]
cursor = dislocker.db.cursor()
cursor.execute("SELECT * FROM pc_list WHERE pc_number = %s", (pc_number,))
pc_list_record = cursor.fetchall()
if not pc_list_record[0][1] == None:
cursor.execute("UPDATE pc_list SET using_user_id = NULL WHERE pc_number = %s", (pc_number,))
if not pc_list_record[0][2] == None:
cursor.execute("UPDATE pc_list SET password_hash = NULL WHERE pc_number = %s", (pc_number,))
cursor.execute("SELECT * FROM pc_usage_history WHERE member_id = %s AND pc_number = %s ORDER BY id DESC LIMIT 1", (pc_list_record[0][1], pc_number))
pc_usage_history_record = cursor.fetchall()
cursor.execute("UPDATE pc_usage_history SET end_use_time = current_timestamp WHERE id = %s", (pc_usage_history_record[0][0],)) cursor.execute("UPDATE pc_usage_history SET end_use_time = current_timestamp WHERE id = %s", (pc_usage_history_record[0][0],))
dislocker.db.commit() cursor.execute("UPDATE pc_list SET using_user_id = NULL WHERE pc_number = %s", (pc_usage_history_record[0][2],))
result = {"result": "ok"} cursor.execute("UPDATE pc_list SET password_hash = NULL WHERE pc_number = %s", (pc_usage_history_record[0][2],))
self.db.commit()
result = {"result": "ok", "pc_number": str(pc_usage_history_record[0][2]), "name": str(user_record[0][1])}
else: return result
result = {"result": "not_used"}
except: def user_register(self, **kwrags):
result = {"result": "error"} discord_user_id = str(kwrags["discord_user_id"])
discord_user_name = str(kwrags["discord_user_name"])
name = str(kwrags["name"])
cursor = self.db.cursor()
cursor.execute("SELECT * FROM club_member WHERE discord_userid = %s", (discord_user_id,))
user_record = cursor.fetchall()
if not user_record:
cursor.execute("INSERT INTO club_member (name, discord_username, discord_userid) VALUES (%s, %s, %s)", (name, discord_user_name, discord_user_id))
self.db.commit()
result = {"result": "ok"}
else:
result = {"result": "already_exists"}
finally: return result
cursor.close()
return result
async def on_ready(self): async def on_ready(self):
print("DiscordのBotが起動しました。") print("ログイン成功")
async def on_message(self, message): async def on_message(self, message):
if message.author.bot: if message.author.bot:
@ -325,13 +146,13 @@ class Bot(discord.Client):
await message.channel.send(f"使用が開始されました。\nパスワード | {register["password"]}\nPC番号 | {msg_split[1]}\nデバイス番号 | {msg_split[2]}") await message.channel.send(f"使用が開始されました。\nパスワード | {register["password"]}\nPC番号 | {msg_split[1]}\nデバイス番号 | {msg_split[2]}")
elif len(msg_split) == 4: elif len(msg_split) == 4:
await message.channel.send(f"使用が開始されました。\nパスワード | {register["password"]}\nPC番号 | {msg_split[1]}\nデバイス番号 | {msg_split[2]}\n使用目的 | {msg_split[3]}") await message.channel.send(f"使用が開始されました。\nパスワード | {register["password"]}\nPC番号 | {msg_split[1]}\nデバイス番号 | {msg_split[2]}\n使用目的 | {msg_split[3]}")
await self.get_channel(dislocker.server_config["bot"]["log_channel_id"]).send(f'{register["name"]} さんがPC {msg_split[1]} を使用しています') await self.get_channel(bot_config["log_channel_id"]).send(f'{register["name"]} さんがPC {msg_split[1]} を使用しています')
elif register["result"] == "user_data_not_found": elif register["result"] == "user_data_not_found":
await message.channel.send("ユーザーとして登録されていないようです。管理者に問い合わせてください。") await message.channel.send("ユーザーとして登録されていないようです。管理者に問い合わせてください。")
elif register["result"] == "pc_already_in_use_by_you": elif register["result"] == "pc_already_in_use_by_you":
await message.channel.send(f"あなたはPCをもう使用されているようです。使用状態を解除するには /stop で使用終了をお知らせください。\nPC番号 | {register["pc_number"]}\nデバイス番号 | {register["device_number"]}\n使用開始時刻 | {register["start_time"]}\n使用目的 | {register["detail"]}") await message.channel.send(f"あなたはPCをもう使用されているようです。使用状態を解除するには /stop で使用終了をお知らせください。\nPC番号 | {register["pc_number"]}\nデバイス番号 | {register["device_number"]}\n使用開始時刻 | {register["start_time"]}\n使用目的 | {register["detail"]}")
elif register["result"] == "pc_already_in_use_by_other": elif register["result"] == "pc_already_in_use_by_other":
await message.channel.send(f"そのPCは他のメンバーによって使用されています。別のPC番号を指定して、再度お試しください。") await message.channel.send(f"PCはもう使用されています。別のPC番号を指定して、再度お試しください。")
else: else:
await message.channel.send("番号がおかしいようです。") await message.channel.send("番号がおかしいようです。")
else: else:
@ -343,9 +164,9 @@ class Bot(discord.Client):
await message.channel.send("使用されていないようです...") await message.channel.send("使用されていないようです...")
elif stop["result"] == "ok": elif stop["result"] == "ok":
await message.channel.send(f"PC番号 {stop["pc_number"]} の使用が終了されました。") await message.channel.send(f"PC番号 {stop["pc_number"]} の使用が終了されました。")
await self.get_channel(dislocker.server_config["bot"]["log_channel_id"]).send(f'{stop["name"]} さんがPC {stop["pc_number"]} の使用を終了しました') await self.get_channel(bot_config["log_channel_id"]).send(f'{stop["name"]} さんがPC {stop["pc_number"]} の使用を終了しました')
elif message.channel.id == dislocker.server_config["bot"]["config_channel_id"]: elif message.channel.id == bot_config["config_channel_id"]:
msg_split = message.content.split() msg_split = message.content.split()
if msg_split[0] == "/register": if msg_split[0] == "/register":
if len(msg_split) <= 3: if len(msg_split) <= 3:
@ -362,34 +183,46 @@ class Bot(discord.Client):
else: else:
await message.channel.send("なんでかわからんけど不正です。") await message.channel.send("なんでかわからんけど不正です。")
elif msg_split[0] == "/export":
export = self.report_export()
if export["result"] == "ok":
await message.channel.send("使用履歴のレポートです。", file=discord.File(export["file_path"]))
pass
elif export["result"] == "export_error":
await message.channel.send("エクスポートに失敗しました。")
elif msg_split[0] == "/fstop": config_dir_path = "./config/"
if len(msg_split) == 1: db_config_path = config_dir_path + "db.json"
await message.channel.send("PC番号を指定してください。") if not os.path.isfile(db_config_path):
elif len(msg_split) == 2: if not os.path.isdir(config_dir_path):
fstop = self.force_stop(pc_number=msg_split[1]) os.mkdir(config_dir_path)
if fstop["result"] == "ok":
await message.channel.send(f"PC番号 {msg_split[1]} の使用登録を解除しました。")
elif fstop["result"] == "not_used":
await message.channel.send("PCは使用されていないようです...")
else:
await message.channel.send("エラーが発生しました。")
else:
await message.channel.send("引数が多すぎます。")
db_config = {
"host": "localhost",
"db": "dislocker",
"username": "user",
"password": "example_pass",
"port": "5432"
}
with open(db_config_path, "w") as w:
json.dump(db_config, w, indent=4)
elif os.path.isfile(db_config_path):
with open(db_config_path, "r") as r:
db_config = json.load(r)
dislocker = DL() bot_config_path = config_dir_path + "bot.json"
if dislocker.init_result == "ok": if not os.path.isfile(bot_config_path):
intents = discord.Intents.default() if not os.path.isdir(config_dir_path):
intents.message_content = True os.mkdir(config_dir_path)
bot = Bot(intents=intents)
bot.run(dislocker.server_config['bot']['token']) bot_config = {
else: "token": "TYPE HERE BOTS TOKEN KEY",
pass "log_channel_id" : "TYPE HERE CHANNEL ID",
"config_channel_id": "TYPE HERE CHANNEL ID"
}
with open(bot_config_path, "w") as w:
json.dump(bot_config, w, indent=4)
elif os.path.isfile(bot_config_path):
with open(bot_config_path, "r") as r:
bot_config = json.load(r)
intents = discord.Intents.default()
intents.message_content = True
bot = Bot(intents=intents)
bot.db_connect(db_config["host"], db_config["db"], db_config["port"], db_config["username"], db_config["password"])
bot.run(bot_config["token"])

View file

@ -21,12 +21,12 @@ class App(customtkinter.CTk):
else: else:
self.attributes('-fullscreen', True) self.attributes('-fullscreen', True)
self.attributes('-topmost', True) self.attributes('-topmost', True)
self.block_taskmgr()
self.block_key()
self.frame = customtkinter.CTkFrame(self, corner_radius=0, fg_color='transparent') self.frame = customtkinter.CTkFrame(self, corner_radius=0, fg_color='transparent')
self.frame.grid(row=0, column=0, sticky='nsew') self.frame.grid(row=0, column=0, sticky='nsew')
self.block_taskmgr()
self.block_key()
lock = Lock() lock = Lock()
def exit(self): def exit(self):
@ -65,76 +65,47 @@ class App(customtkinter.CTk):
class Lock(customtkinter.CTkToplevel): class Lock(customtkinter.CTkToplevel):
def __init__(self): def __init__(self):
super().__init__() super().__init__()
if client_config["testing"] == 1: self.title(f'{app_name} | PC番号 {client_config["pc_number"]} | ロックされています')
self.title(f'{app_name} | PC番号 {client_config["pc_number"]} | テストモード') self.geometry('400x200')
else:
self.title(f'{app_name} | PC番号 {client_config["pc_number"]} | ロックされています')
self.geometry("600x400")
self.resizable(height=False, width=False) self.resizable(height=False, width=False)
self.attributes('-topmost', True) self.attributes('-topmost', True)
self.grab_set() self.grab_set()
self.lift() self.lift()
self.protocol("WM_DELETE_WINDOW", self.handler_close) self.protocol("WM_DELETE_WINDOW", self.handler_close)
self.emoji_font = customtkinter.CTkFont(family="Segoe UI Emoji", size=32) self.title_font = customtkinter.CTkFont(family="meiryo", size=24, weight="bold")
self.title_font = customtkinter.CTkFont(family="meiryo", size=32, weight="bold") self.general_font = customtkinter.CTkFont(family="meiryo", size=14)
self.title_small_font = customtkinter.CTkFont(family="meiryo", size=16)
self.general_font = customtkinter.CTkFont(family="meiryo", size=18)
self.general_small_font = customtkinter.CTkFont(family="meiryo", size=12) self.general_small_font = customtkinter.CTkFont(family="meiryo", size=12)
self.textbox_font = customtkinter.CTkFont(family="meiryo", size=14)
self.button_font = customtkinter.CTkFont(family="meiryo", size=14)
self.msg_title_frame = customtkinter.CTkFrame(self, corner_radius=0, fg_color='transparent') self.msg_title_frame = customtkinter.CTkFrame(self, corner_radius=0, fg_color='transparent')
self.msg_title_frame.grid(row=0, column=0, padx=10, pady=10, sticky="nsew") self.msg_title_frame.grid(row=0, column=0, sticky="nsew")
self.icon_title_1 = customtkinter.CTkLabel(self.msg_title_frame, text='😎', font=self.emoji_font, justify="left") self.msg_title_1 = customtkinter.CTkLabel(self.msg_title_frame, text='ちょっと待って!!', font=self.title_font, anchor=tkinter.W)
self.icon_title_1.grid(row=0, column=0, padx=10, sticky="w") self.msg_title_1.grid(row=0, column=0, padx=10, pady=10)
self.msg_title_1 = customtkinter.CTkLabel(self.msg_title_frame, text='ちょっと待って!!', font=self.title_font, justify="left")
self.msg_title_1.grid(row=0, column=1, padx=10, sticky="w")
self.msg_title_2 = customtkinter.CTkLabel(self.msg_title_frame, text="本当にあなたですか?", font=self.title_small_font, justify="left")
self.msg_title_2.grid(row=1, column=1, padx=10, sticky="w")
self.msg_subtitle_frame = customtkinter.CTkFrame(self, corner_radius=0, fg_color='transparent') self.msg_subtitle_frame = customtkinter.CTkFrame(self, corner_radius=0, fg_color='transparent')
self.msg_subtitle_frame.grid(row=1, column=0, padx=10, pady=10, sticky="nsew") self.msg_subtitle_frame.grid(row=1, column=0, sticky="nsew")
self.msg_subtitle_frame.grid_columnconfigure(0, weight=1)
self.msg_subtitle_1 = customtkinter.CTkLabel(self.msg_subtitle_frame, text='サインインするには、Discordのダイレクトメッセージに送信された\nパスワードを入力してください。', font=self.general_font, justify="left") self.msg_subtitle_1 = customtkinter.CTkLabel(self.msg_subtitle_frame, text='サインインするには、パスワードが必要です。', font=self.general_font, anchor=tkinter.W)
self.msg_subtitle_1.grid(row=0, column=0, padx=10, sticky="ew") self.msg_subtitle_1.grid(row=0, column=0, padx=10, pady=10)
self.msg_subtitle_2 = customtkinter.CTkLabel(self.msg_subtitle_frame, text='※ パスワードの有効期限は23:59までです。', font=self.general_small_font, justify="left")
self.msg_subtitle_2.grid(row=1, column=0, padx=10, sticky="w")
self.input_frame = customtkinter.CTkFrame(self, corner_radius=0, fg_color='transparent') self.input_frame = customtkinter.CTkFrame(self, corner_radius=0, fg_color='transparent')
self.input_frame.grid(row=2, column=0, padx=10, pady=10, sticky="nsew") self.input_frame.grid(row=2, column=0, sticky="nsew")
self.input_frame.columnconfigure(0, weight=1)
self.password_entry = customtkinter.CTkEntry(self.input_frame, placeholder_text='パスワード', show='*', font=self.textbox_font) self.password_entry = customtkinter.CTkEntry(self.input_frame, placeholder_text='パスワード', width=380, show='*', font=self.general_small_font)
self.password_entry.grid(row=0, column=0, padx=10, sticky="ew") self.password_entry.grid(row=0, column=0, padx=10, pady=10)
self.button_frame = customtkinter.CTkFrame(self, corner_radius=0, fg_color='transparent') self.button_frame = customtkinter.CTkFrame(self, corner_radius=0, fg_color='transparent')
self.button_frame.grid(row=3, column=0, padx=10, pady=10, sticky="nsew") self.button_frame.grid(row=3, column=0, sticky="nsew")
self.button_frame.columnconfigure(0, weight=3)
self.button_frame.columnconfigure(1, weight=1)
self.button_frame.columnconfigure(2, weight=1)
self.signin_button = customtkinter.CTkButton(self.button_frame, text='サインイン', command=self.auth, font=self.button_font) self.signin_button = customtkinter.CTkButton(self.button_frame, text='サインイン', command=self.auth, font=self.general_font)
self.signin_button.grid(row=0, column=2, padx=10, sticky="e") self.signin_button.grid(row=0, column=1, padx=10, pady=10)
self.signin_button.pack(padx=10, anchor=tkinter.E)
self.logout_button = customtkinter.CTkButton(self.button_frame, text='サインアウト', command=self.logout, font=self.button_font)
self.logout_button.grid(row=0, column=1, padx=10, sticky="e")
self.signin_button = customtkinter.CTkButton(self.button_frame, text='ヘルプ', command=self.help_wakeup, font=self.button_font)
self.signin_button.grid(row=0, column=0, padx=10, sticky="w")
self.keyboard_listener_thread = threading.Thread(target=self.keyboard_listener) self.keyboard_listener_thread = threading.Thread(target=self.keyboard_listener)
self.keyboard_listener_thread.daemon = True self.keyboard_listener_thread.daemon = True
self.keyboard_listener_thread.start() self.keyboard_listener_thread.start()
def help_wakeup(self):
help = Help()
def keyboard_listener(self): def keyboard_listener(self):
keyboard.add_hotkey('ctrl+shift+q', self.exit) keyboard.add_hotkey('ctrl+shift+q', self.exit)
@ -143,7 +114,6 @@ class Lock(customtkinter.CTkToplevel):
return hashed.hexdigest() return hashed.hexdigest()
def auth(self): def auth(self):
print("認証サーバーにアクセスします。")
auth_url = client_config["auth_host_url"] + "/verify" auth_url = client_config["auth_host_url"] + "/verify"
auth_json = { auth_json = {
"pc_number": int(client_config["pc_number"]), "pc_number": int(client_config["pc_number"]),
@ -151,26 +121,22 @@ class Lock(customtkinter.CTkToplevel):
} }
try: try:
responce = requests.post(auth_url, json=auth_json) responce = requests.post(auth_url, json=auth_json)
if responce.status_code == 200:
print("認証サーバー経由で認証しました。")
self.exit()
except: except:
print("認証サーバーにアクセスできません。マスターパスワードで認証を試行します。")
master_password_hash = self.hash_genarate(str(self.password_entry.get())) master_password_hash = self.hash_genarate(str(self.password_entry.get()))
if client_config["master_password_hash"] == master_password_hash: if client_config["master_password_hash"] == master_password_hash:
print("マスターパスワードで認証しました。")
self.exit() self.exit()
else: else:
print("マスターパスワードで認証できませんでした。") self.msg_subtitle_1.configure(text='サインインするには、パスワードが必要です。\nネットワークエラーが発生しています。\n続行するには、マスターパスワードを入力して下さい。')
self.msg_subtitle_1.configure(text='ネットワークエラーが発生しています。\n続行するには、マスターパスワードを入力して下さい。 ')
def logout(self):
self.destroy()
logout_command = subprocess.run(['shutdown', '/l', '/f'])
print(logout_command)
if responce.status_code == 200:
self.exit()
else:
master_password_hash = self.hash_genarate(str(self.password_entry.get()))
if client_config["master_password_hash"] == master_password_hash:
self.exit()
else:
self.msg_subtitle_1.configure(text='サインインするには、パスワードが必要です。\nパスワードが違います!')
def handler_close(self): def handler_close(self):
pass pass
@ -178,27 +144,6 @@ class Lock(customtkinter.CTkToplevel):
self.destroy() self.destroy()
app.exit() app.exit()
class Help(customtkinter.CTkToplevel):
def __init__(self):
super().__init__()
if client_config["testing"] == 1:
self.title(f'{app_name} | ヘルプ | テストモード')
else:
self.title(f'{app_name} | ヘルプ')
self.geometry("600x400")
self.resizable(height=False, width=False)
self.attributes('-topmost', True)
self.grab_set()
self.lift()
self.protocol("WM_DELETE_WINDOW", self.handler_close)
def handler_close(self):
self.destroy()
def master_password_gen(): def master_password_gen():
numbers = string.digits # (1) numbers = string.digits # (1)
password = ''.join(random.choice(numbers) for _ in range(10)) # (2) password = ''.join(random.choice(numbers) for _ in range(10)) # (2)

View file

@ -1,77 +0,0 @@
import psycopg2
import csv
from psycopg2 import sql
from datetime import datetime
def format_datetime(value):
if isinstance(value, datetime):
return value.strftime('%Y-%m-%d %H:%M:%S')
return value
def export_table_to_csv(host, port, database, user, password, main_table, related_table, related_column, csv_file_path):
try:
# データベースに接続
conn = psycopg2.connect(
host=host,
port=port,
database=database,
user=user,
password=password
)
cur = conn.cursor()
# メインテーブルの列情報を取得user_idを除く
cur.execute(sql.SQL("SELECT * FROM {} LIMIT 0").format(sql.Identifier(main_table)))
main_columns = [desc[0] for desc in cur.description if desc[0] != 'member_id']
# クエリを作成(列名を明確に指定)
query = sql.SQL("""
SELECT {main_columns}, {related_table}.name
FROM {main_table}
LEFT JOIN {related_table} ON {main_table}.member_id = {related_table}.id
""").format(
main_columns=sql.SQL(', ').join([sql.SQL("{}.{}").format(sql.Identifier(main_table), sql.Identifier(col)) for col in main_columns]),
main_table=sql.Identifier(main_table),
related_table=sql.Identifier(related_table)
)
cur.execute(query)
# 列名を再構成nameを2番目に配置
column_names = [main_columns[0], 'name'] + main_columns[1:]
rows = cur.fetchall()
with open(csv_file_path, 'w', newline='', encoding='utf-8-sig') as csvfile:
csvwriter = csv.writer(csvfile)
csvwriter.writerow(column_names)
for row in rows:
# nameを2番目に移動
formatted_row = [format_datetime(row[0])] + [row[-1]] + [format_datetime(field) if field is not None else '' for field in row[1:-1]]
csvwriter.writerow(formatted_row)
print(f"テーブル '{main_table}' の内容を '{csv_file_path}' に出力しました。")
except (Exception, psycopg2.Error) as error:
print("エラーが発生しました:", error)
finally:
if conn:
cur.close()
conn.close()
print("データベース接続を閉じました。")
# 使用例
host = "192.168.1.220"
port = "12245"
database = "dislocker"
user = "suti7"
password = "Testing1234"
main_table = "pc_usage_history"
related_table = "club_member"
related_column = "name" # 例: 登録者の名前を表示したい場合
csv_file_path = "output.csv"
export_table_to_csv(host, port, database, user, password, main_table, related_table, related_column, csv_file_path)

View file

@ -1,97 +0,0 @@
import psycopg2
import csv
from psycopg2 import sql
from datetime import datetime
from openpyxl import Workbook
from openpyxl.utils import get_column_letter
def format_datetime(value):
if isinstance(value, datetime):
return value.strftime('%Y-%m-%d %H:%M:%S')
return value
def export_table_to_excel(host, port, database, user, password, main_table, related_table, excel_file_path):
try:
conn = psycopg2.connect(
host=host,
port=port,
database=database,
user=user,
password=password
)
cur = conn.cursor()
# メインテーブルの列情報を取得member_idを除く
cur.execute(sql.SQL("SELECT * FROM {} LIMIT 0").format(sql.Identifier(main_table)))
main_columns = [desc[0] for desc in cur.description if desc[0] != 'member_id']
# クエリを作成(列名を明確に指定)
query = sql.SQL("""
SELECT {main_columns}, {related_table}.name
FROM {main_table}
LEFT JOIN {related_table} ON {main_table}.member_id = {related_table}.id
""").format(
main_columns=sql.SQL(', ').join([sql.SQL("{}.{}").format(sql.Identifier(main_table), sql.Identifier(col)) for col in main_columns]),
main_table=sql.Identifier(main_table),
related_table=sql.Identifier(related_table)
)
cur.execute(query)
# 列名を再構成nameを2番目に配置
column_names = [main_columns[0], 'name'] + main_columns[1:]
rows = cur.fetchall()
# Excelワークブックを作成
wb = Workbook()
ws = wb.active
# 列名を書き込み
ws.append(column_names)
# データを書き込み
for row in rows:
# nameを2番目に移動
formatted_row = [format_datetime(row[0])] + [row[-1]] + [format_datetime(field) if field is not None else '' for field in row[1:-1]]
ws.append(formatted_row)
# 列幅を自動調整
for col in ws.columns:
max_length = 0
column = col[0].column_letter
for cell in col:
try:
if len(str(cell.value)) > max_length:
max_length = len(str(cell.value))
except:
pass
adjusted_width = (max_length + 2) * 1.2
ws.column_dimensions[column].width = adjusted_width
# Excelファイルを保存
wb.save(excel_file_path)
print(f"テーブル '{main_table}' の内容を '{excel_file_path}' に出力しました。")
except (Exception, psycopg2.Error) as error:
print("エラーが発生しました:", error)
finally:
if conn:
cur.close()
conn.close()
print("データベース接続を閉じました。")
# 使用例
host = "192.168.1.220"
port = "12245"
database = "dislocker"
user = "suti7"
password = "Testing1234"
main_table = "pc_usage_history"
related_table = "club_member"
excel_file_path = "output.xlsx"
export_table_to_excel(host, port, database, user, password, main_table, related_table, excel_file_path)