デバイス番号の入力の自動化 (#14) への対応

This commit is contained in:
suti7yk5032 2024-09-06 01:30:44 +09:00
parent 550cf83e86
commit 4713dc7a23
3 changed files with 276 additions and 15 deletions

View file

@ -687,17 +687,51 @@ class Bot(discord.Client):
if os.path.isfile(dislocker.onetime_config_path): if os.path.isfile(dislocker.onetime_config_path):
with open(dislocker.onetime_config_path, "r") as r: with open(dislocker.onetime_config_path, "r") as r:
onetime_config = json.load(r) onetime_config = json.load(r)
onetime = str(onetime_config["onetime"]) onetime = str(onetime_config["onetime"]["pc_register"])
if onetime == None:
onetime = str(self.password_generate(8))
onetime_config["onetime"]["pc_register"] = onetime
with open(dislocker.onetime_config_path, "w") as w:
json.dump(onetime_config, w, indent=4)
await message.channel.send(f"# :dizzy_face: PC登録時のワンタイムパスワードを発行します。\n# パスワード | {onetime}")
else:
await message.channel.send(f"# :dizzy_face: 既にワンタイムパスワードは発行されています。\n# パスワード | {onetime}") await message.channel.send(f"# :dizzy_face: 既にワンタイムパスワードは発行されています。\n# パスワード | {onetime}")
else: else:
onetime = str(self.password_generate(8)) onetime = str(self.password_generate(8))
onetime_config = { onetime_config = {
"onetime": str(onetime) "onetime": {
"pc_register": onetime,
"device_register": None
}
} }
with open(dislocker.onetime_config_path, "w") as w: with open(dislocker.onetime_config_path, "w") as w:
json.dump(onetime_config, w, indent=4) json.dump(onetime_config, w, indent=4)
await message.channel.send(f"# :dizzy_face: PC登録時のワンタイムパスワードを発行します。\n# パスワード | {onetime}") await message.channel.send(f"# :dizzy_face: PC登録時のワンタイムパスワードを発行します。\n# パスワード | {onetime}")
elif msg_split[0] == "/devreg":
if os.path.isfile(dislocker.onetime_config_path):
with open(dislocker.onetime_config_path, "r") as r:
onetime_config = json.load(r)
onetime = str(onetime_config["onetime"]["device_register"])
if onetime == None:
onetime = str(self.password_generate(8))
onetime_config["onetime"]["device_register"] = onetime
with open(dislocker.onetime_config_path, "w") as w:
json.dump(onetime_config, w, indent=4)
await message.channel.send(f"# :dizzy_face: デバイス登録時のワンタイムパスワードを発行します。\n# パスワード | {onetime}")
else:
await message.channel.send(f"# :dizzy_face: 既にワンタイムパスワードは発行されています。\n# パスワード | {onetime}")
else:
onetime = str(self.password_generate(8))
onetime_config = {
"onetime": {
"pc_register": None,
"device_register": onetime
}
}
with open(dislocker.onetime_config_path, "w") as w:
json.dump(onetime_config, w, indent=4)
await message.channel.send(f"# :dizzy_face: デバイス登録時のワンタイムパスワードを発行します。\n# パスワード | {onetime}")
else: else:
await message.channel.send("# :warning: DMでの応答は、現在無効化されています。") await message.channel.send("# :warning: DMでの応答は、現在無効化されています。")
else: else:

View file

@ -65,15 +65,39 @@ class Auth():
pc_number = int(kwargs["pc_number"]) pc_number = int(kwargs["pc_number"])
pc_uuid = str(kwargs["pc_uuid"]) pc_uuid = str(kwargs["pc_uuid"])
pc_token = str(kwargs["pc_token"]) pc_token = str(kwargs["pc_token"])
device_list = kwargs["device_list"]
keyboard_number = "own"
mouse_number = "own"
if "password_hash" in kwargs: if "password_hash" in kwargs:
password_hash = str(kwargs["password_hash"]) password_hash = str(kwargs["password_hash"])
cursor.execute("SELECT * FROM pc_list WHERE pc_number = %s AND password_hash = %s AND pc_uuid = %s AND pc_token = %s", (pc_number, password_hash, pc_uuid, pc_token)) cursor.execute("SELECT * FROM pc_list WHERE pc_number = %s AND password_hash = %s AND pc_uuid = %s AND pc_token = %s", (pc_number, password_hash, pc_uuid, pc_token))
pc_info = cursor.fetchall() pc_info = cursor.fetchall()
if pc_info:
for device in device_list:
cursor.execute("SELECT * FROM keyboard_list WHERE device_id = %s", (device["device_id"],))
keyboard_record = cursor.fetchall()
if keyboard_record:
keyboard_number = int(keyboard_record[0][0])
break
else:
pass
for device in device_list:
cursor.execute("SELECT * FROM mouse_list WHERE device_id = %s", (device["device_id"],))
mouse_record = cursor.fetchall()
if mouse_record:
mouse_number = int(mouse_record[0][0])
break
else:
pass
return {"result": 0, "about": "ok", "output_dict": {"keyboard_number": keyboard_number, "mouse_number": mouse_number}}
else:
return {"result": 1, "about": "unregistered_pc"}
else: else:
cursor.execute("SELECT * FROM pc_list WHERE pc_number = %s AND pc_uuid = %s AND pc_token = %s", (pc_number, pc_uuid, pc_token)) cursor.execute("SELECT * FROM pc_list WHERE pc_number = %s AND pc_uuid = %s AND pc_token = %s", (pc_number, pc_uuid, pc_token))
pc_info = cursor.fetchall() pc_info = cursor.fetchall()
if pc_info: if pc_info:
return {"result": 0, "about": "ok"} return {"result": 0, "about": "ok"}
else: else:
@ -89,6 +113,79 @@ class Auth():
finally: finally:
cursor.close() cursor.close()
def device_use_register(self, **kwargs):
try:
pc_number = int(kwargs["pc_number"])
if "keyboard_number" in kwargs:
keyboard_number = int(kwargs["keyboard_number"])
else:
keyboard_number = None
if "mouse_number" in kwargs:
mouse_number = int(kwargs["mouse_number"])
else:
mouse_number = None
cursor = self.db.cursor()
cursor.execute("SELECT * FROM pc_list WHERE pc_number = %s", (pc_number,))
pc_list_record = cursor.fetchall()
pc_using_member_id = pc_list_record[0][1]
if pc_using_member_id == None:
return {"result": 1, "about": "not_used"}
else:
cursor.execute("SELECT * FROM pc_usage_history WHERE member_id = %s AND pc_number = %s ORDER BY id DESC LIMIT 1", (pc_using_member_id, pc_number))
pc_usage_history_record = cursor.fetchall()
pc_usage_history_record_id = pc_usage_history_record[0][0]
cursor.execute("UPDATE pc_usage_history SET keyboard_number = %s, mouse_number = %s WHERE id = %s", (keyboard_number, mouse_number, pc_usage_history_record_id))
cursor.execute("UPDATE keyboard_list SET using_member_id = %s WHERE keyboard_number = %s", (pc_using_member_id, keyboard_number))
cursor.execute("UPDATE mouse_list SET using_member_id = %s WHERE mouse_number = %s", (pc_using_member_id, mouse_number))
self.db.commit()
return {"result": 0, "about": "ok"}
except Exception as error:
print("デバイスの使用登録中にエラーが発生しました。\nエラー内容")
print(str(error.__class__.__name__))
print(str(error.args))
print(str(error))
return {"result": 1, "about": "error"}
def device_register(self, **kwargs):
try:
cursor = self.db.cursor()
mode = kwargs["mode"]
number = kwargs["number"]
device_id = kwargs["device_id"]
device_name = kwargs["device_name"]
if mode == "keyboard":
keyboard_number = int(kwargs["number"])
cursor.execute("SELECT * FROM keyboard_list WHERE keyboard_number = %s", (keyboard_number,))
keyboard_record = cursor.fetchall()
if keyboard_record:
cursor.execute("UPDATE keyboard_list SET device_id = %s, device_name = %s WHERE keyboard_number = %s", (device_id, device_name, keyboard_number))
self.db.commit()
return {"result": 0, "about": "ok"}
else:
cursor.execute("INSERT INTO keyboard_list (keyboard_number, device_id, device_name) VALUES (%s, %s, %s)", (keyboard_number, device_id, device_name))
return {"result": 0, "about": "ok"}
elif mode == "mouse":
mouse_number = int(kwargs["number"])
cursor.execute("SELECT * FROM mouse_list WHERE mouse_number = %s", (mouse_number,))
mouse_record = cursor.fetchall()
if mouse_record:
cursor.execute("UPDATE mouse_list SET device_id = %s, device_name = %s WHERE mouse_number = %s", (device_id, device_name, mouse_number))
self.db.commit()
return {"result": 0, "about": "ok"}
else:
cursor.execute("INSERT INTO mouse_list (mouse_number, device_id, device_name) VALUES (%s, %s, %s)", (mouse_number, device_id, device_name))
return {"result": 0, "about": "ok"}
except Exception as error:
print("停止処理中にエラーが発生しました。\nエラー内容")
print(str(error.__class__.__name__))
print(str(error.args))
print(str(error))
return {"result": 1, "about": "error"}
def delete(self, pc_number): def delete(self, pc_number):
try: try:
cursor = self.db.cursor() cursor = self.db.cursor()
@ -194,7 +291,6 @@ class Auth():
master_password_hash = self.hash_genarate(master_password) master_password_hash = self.hash_genarate(master_password)
cursor.execute("UPDATE pc_list SET pc_uuid = %s, pc_token = %s, master_password = %s WHERE pc_number = %s", (pc_uuid, pc_token, master_password, pc_number)) cursor.execute("UPDATE pc_list SET pc_uuid = %s, pc_token = %s, master_password = %s WHERE pc_number = %s", (pc_uuid, pc_token, master_password, pc_number))
self.db.commit() self.db.commit()
os.remove(onetime_config_path)
return {"result": 0, "about": "ok", "output_dict": {"pc_token": pc_token, "master_password": master_password, "master_password_hash": master_password_hash}} return {"result": 0, "about": "ok", "output_dict": {"pc_token": pc_token, "master_password": master_password, "master_password_hash": master_password_hash}}
else: else:
return {"result": 1, "about": "exist"} return {"result": 1, "about": "exist"}
@ -223,11 +319,14 @@ def register():
with open(onetime_config_path, "r") as r: with open(onetime_config_path, "r") as r:
onetime_config = json.load(r) onetime_config = json.load(r)
if onetime_password == onetime_config["onetime"]: if onetime_password == onetime_config["onetime"]["pc_register"]:
register_result = auth.register(pc_number=pc_number, pc_uuid=pc_uuid) register_result = auth.register(pc_number=pc_number, pc_uuid=pc_uuid)
pc_token = register_result["output_dict"]["pc_token"] pc_token = register_result["output_dict"]["pc_token"]
master_password = register_result["output_dict"]["master_password"] master_password = register_result["output_dict"]["master_password"]
master_password_hash = register_result["output_dict"]["master_password_hash"] master_password_hash = register_result["output_dict"]["master_password_hash"]
onetime_config["onetime"]["pcregister"] = None
with open(onetime_config_path, "w") as w:
json.dump(onetime_config, w, indent=4)
return jsonify({'message': 'ok', 'pc_token': pc_token, 'master_password': master_password, 'master_password_hash': master_password_hash}), 200 return jsonify({'message': 'ok', 'pc_token': pc_token, 'master_password': master_password, 'master_password_hash': master_password_hash}), 200
else: else:
return jsonify({'message': 'damedesu'}), 401 return jsonify({'message': 'damedesu'}), 401
@ -240,11 +339,14 @@ def verify():
password_hash = request.json.get('password') password_hash = request.json.get('password')
pc_uuid = request.json.get('pc_uuid') pc_uuid = request.json.get('pc_uuid')
pc_token = request.json.get('pc_token') pc_token = request.json.get('pc_token')
devices = request.json.get('devices')
print(str(pc_number) + "の認証処理を開始...") print(str(pc_number) + "の認証処理を開始...")
pc_auth = auth.check(pc_number=pc_number, password_hash=password_hash, pc_uuid=pc_uuid, pc_token=pc_token) pc_auth = auth.check(pc_number=pc_number, password_hash=password_hash, pc_uuid=pc_uuid, pc_token=pc_token, device_list=devices)
if pc_auth["result"] == 0: if pc_auth["result"] == 0:
auth.delete(pc_number) auth.delete(pc_number)
auth.device_use_register(pc_number=pc_number, keyboard_number=pc_auth["output_dict"]["keyboard_number"], mouse_number=pc_auth["output_dict"]["mouse_number"])
print(str(pc_number) + "の認証処理は成功しました.") print(str(pc_number) + "の認証処理は成功しました.")
return jsonify({'message': 'ok'}), 200 return jsonify({'message': 'ok'}), 200
else: else:
@ -271,6 +373,40 @@ def stop():
else: else:
return jsonify({'message': 'damedesu'}), 401 return jsonify({'message': 'damedesu'}), 401
@app.route('/device_register', methods=['POST'])
def device_register():
onetime_password = str(request.json.get('onetime'))
mode = str(request.json.get('mode'))
number = int(request.json.get('number'))
device_id = str(request.json.get('device_id'))
device_name = str(request.json.get('device_name'))
if os.path.isfile(onetime_config_path):
with open(onetime_config_path, "r") as r:
onetime_config = json.load(r)
if onetime_password == onetime_config["onetime"]["device_register"]:
if mode == "keyboard":
print("キーボードの登録処理を開始...")
device_register = auth.device_register(mode="keyboard", number=number, device_id=device_id, device_name=device_name)
if device_register["result"] == 0:
print(f"キーボード {number} 番の登録処理は成功しました.")
return jsonify({'message': 'ok'}), 200
else:
print(f"キーボード {number} 番の登録処理は失敗しました.")
return jsonify({'message': 'error'}), 500
elif mode == "mouse":
print("マウスの登録処理を開始...")
device_register = auth.device_register(mode="mouse", number=number, device_id=device_id, device_name=device_name)
if device_register["result"] == 0:
print(f"マウス {number} 番の登録処理は成功しました.")
return jsonify({'message': 'ok'}), 200
else:
print(f"マウス {number} 番の登録処理は失敗しました.")
return jsonify({'message': 'error'}), 500
else:
return jsonify({'message': 'damedesu'}), 401
if __name__ == '__main__': if __name__ == '__main__':
app.run(host="0.0.0.0", port=5000, debug=False) app.run(host="0.0.0.0", port=5000, debug=False)

View file

@ -15,6 +15,7 @@ import sys
import shutil import shutil
import uuid import uuid
import time import time
import win32com.client
app_name = "Dislocker" app_name = "Dislocker"
dislocker_dir = os.path.dirname(os.path.abspath(sys.argv[0])) dislocker_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
@ -43,6 +44,50 @@ elif os.path.isfile(client_config_path):
with open(client_config_path, "r") as r: with open(client_config_path, "r") as r:
client_config = json.load(r) client_config = json.load(r)
def get_usb_devices(self):
str_computer = "."
obj_wmi_service = win32com.client.Dispatch("WbemScripting.SWbemLocator")
obj_swem_services = obj_wmi_service.ConnectServer(str_computer, "root\\cimv2")
col_items = obj_swem_services.ExecQuery("Select * from Win32_PnPEntity where PNPDeviceID like 'USB%'")
devices = []
for obj_item in col_items:
devices.append({
"device_id": obj_item.DeviceID,
"PNPDeviceID": obj_item.PNPDeviceID,
"device_name": obj_item.Description,
"name": obj_item.Name,
"Manufacturer": obj_item.Manufacturer,
"Service": obj_item.Service
})
return devices
def device_register(**kwargs):
onetime = str(kwargs["onetime"])
mode = str(kwargs["mode"])
number = int(kwargs["number"])
device_id = str(kwargs["device_id"])
device_name = str(kwargs["device_name"])
device_register_json = {
"number": number,
"device_id": device_id,
"device_name": device_name,
"mode": mode,
"onetime": onetime
}
register_url = client_config["auth_host_url"] + "/device_register"
responce = requests.post(register_url, json=device_register_json)
if responce.status_code == 200:
print("デバイスの登録に成功しました。")
return {"result": 0, "about": "ok"}
elif responce.status_code == 401:
print("認証に失敗しました。")
return {"result": 1, "about": "auth_failed"}
else:
print("内部エラーによりデバイスの登録に失敗しました。")
return {"result": 1, "about": "error"}
def master_password_gen(): def master_password_gen():
numbers = string.digits # (1) numbers = string.digits # (1)
password = ''.join(random.choice(numbers) for _ in range(10)) # (2) password = ''.join(random.choice(numbers) for _ in range(10)) # (2)
@ -270,10 +315,29 @@ class Lock(customtkinter.CTkToplevel):
self.signin_button.configure(state="normal", fg_color="#3c8dd0") self.signin_button.configure(state="normal", fg_color="#3c8dd0")
self.signout_button.configure(state="normal", fg_color="#3c8dd0") self.signout_button.configure(state="normal", fg_color="#3c8dd0")
def get_usb_devices(self):
str_computer = "."
obj_wmi_service = win32com.client.Dispatch("WbemScripting.SWbemLocator")
obj_swem_services = obj_wmi_service.ConnectServer(str_computer, "root\\cimv2")
col_items = obj_swem_services.ExecQuery("Select * from Win32_PnPEntity where PNPDeviceID like 'USB%'")
devices = []
for obj_item in col_items:
devices.append({
"device_id": obj_item.DeviceID,
"PNPDeviceID": obj_item.PNPDeviceID,
"Description": obj_item.Description,
"device_name": obj_item.Name,
"Manufacturer": obj_item.Manufacturer,
"Service": obj_item.Service
})
return devices
def auth(self): def auth(self):
self.button_disable() self.button_disable()
password = str(self.password_entry.get()) password = str(self.password_entry.get())
devices = self.get_usb_devices()
if len(password) == 10: if len(password) == 10:
print("マスターパスワードで認証を試行します。") print("マスターパスワードで認証を試行します。")
@ -289,13 +353,13 @@ class Lock(customtkinter.CTkToplevel):
self.button_enable() self.button_enable()
self.deiconify() self.deiconify()
print("認証サーバーにアクセスします。") print("認証サーバーにアクセスします。")
auth_url = client_config["auth_host_url"] + "/verify" auth_url = client_config["auth_host_url"] + "/verify"
auth_json = { auth_json = {
"pc_number": int(client_config["pc_number"]), "pc_number": int(client_config["pc_number"]),
"pc_uuid": str(client_config["pc_uuid"]), "pc_uuid": str(client_config["pc_uuid"]),
"pc_token": str(client_config["pc_token"]), "pc_token": str(client_config["pc_token"]),
"devices": devices,
"password": self.hash_genarate(str(self.password_entry.get())) "password": self.hash_genarate(str(self.password_entry.get()))
} }
try: try:
@ -483,6 +547,33 @@ if __name__ == '__main__':
pass pass
else: else:
pass pass
elif args[1] == "devicesetup":
init_result = init(pc_number=args[2], onetime=args[3])
if init_result == 1:
warning_msgbox = tkinter.messagebox.showwarning(title=f"{app_name} | 多重起動エラー", message=f"すでに {app_name} は実行されています。\n正常に起動しない場合は、既に起動しているプロセスを終了してから、もう一度起動してみてください。")
elif init_result == 2:
pass
else:
mode = input('登録するデバイスはキーボードとマウスのどちらですか?(keyboard/mouse): ')
usb_devices = get_usb_devices()
i = 0
for device in usb_devices:
print(f"{str(i + 1)} 番目 | デバイス名: {device['Description']} \n製造元: {device['Manufacturer']} \nデバイスID: {device['DeviceID']}")
print("-" * 20)
i += 1
device_num = input('どのデバイスを登録しますか?番号で指定してください: ')
device_register_num = input('そのデバイスは何番目のデバイスとして登録しますか?番号で指定してください: ')
onetime = input('ワンタイムパスワードを入力してください: ')
device_register_result = device_register(onetime=onetime, mode=mode, number=int(device_register_num), device_id=usb_devices[int(device_num) - 1]["device_id"], device_name=usb_devices[int(device_num) - 1]["device_name"])
if device_register_result["result"] == 0:
print("登録されました。")
elif device_register_result["result"] == 1:
if device_register_result["about"] == "auth_failed":
print("認証に失敗しました。")
else:
print("エラーが発生しました。")
else: else:
print("引数エラー。") print("引数エラー。")
else: else: