from flask import Flask, request, jsonify from datetime import datetime from misskey import Misskey import json import os class Bot(): def __init__(self) -> None: self.config_dir_path = "./config/" self.server_config_path = self.config_dir_path + "config.json" try: if not os.path.isdir(self.config_dir_path): print("config ディレクトリが見つかりません... 作成します。") os.mkdir(self.config_dir_path) if not os.path.isfile(self.server_config_path): print("config ファイルが見つかりません... 作成します。") self.server_config = { "bot": { "host": "", "token": "" } } with open(self.server_config_path, "w") as w: json.dump(self.server_config, w, indent=4) elif os.path.isfile(self.server_config_path): with open(self.server_config_path, "r") as r: self.server_config = json.load(r) print("config ファイルを読み込みました。") self.mis = Misskey(self.server_config["bot"]["host"], i=self.server_config["bot"]["token"]) except (Exception) as error: print("初回処理でエラーが発生しました。\nエラー内容\n" + str(error)) self.init_result = "error" def format_earthquake_info(self, data): if data['type'] == 'eew': event_time = datetime.fromtimestamp(int(data['time']) / 1000).strftime("%Y年%m月%d日 %H時%M分%S秒") if data['report'] == 'final': report_number = '最終' else: report_number = '第' + data['report'] intensity = data['intensity'] epicenter = data['epicenter'] epicenter_depth = data['depth'] magnitude = data['magnitude'] result = {"result": 0, "detail": f"震源地 {epicenter} 予想震度 {intensity}\nマグニチュード {magnitude} 震源の深さ {epicenter_depth}\n緊急地震速報 {report_number}報\n時刻 | {event_time}"} elif data['type'] == 'pga_alert_cancel': print(data) result = {"result": 0, "detail": f"誤報を検出。一つ前のノートは誤報です。"} elif data['type'] == 'pga_alert': result = {"result": 1, "detail": "pga_alert_detected"} elif data['type'] == 'intensity_report': #event_time = datetime.fromtimestamp(int(data['time']) / 1000).strftime("%Y年%m月%d日 %H時%M分%S秒") #intensity_info = data['intensity_list'][0] #intensity = intensity_info['intensity'] #regions = intensity_info['region_list'][0] #result = {"result": 0, "detail": f"{regions} で地震 震度 {intensity}\n時刻 | {event_time}"} result = {"result": 1, "detail": "intensity_report_detected"} else: result = {"result": 1, "detail": "unknown_data_detected"} return result web = Flask(__name__) @web.route('/earthquake', methods=['POST']) def receive_earthquake_data(): earthquake_data = request.json #この時点でjsonは辞書に変換されている info = bot.format_earthquake_info(earthquake_data) current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") try: if info["result"] == 0: detail = info["detail"] bot.mis.notes_create(text=detail, local_only=True) print(f"{str(current_time)} jsonを受信") print(f"json_dict\n{earthquake_data}") print(f"投稿内容\n{detail}") return jsonify({"status": "success", "message": "Posted to Misskey"}), 200 elif info["result"] == 1: if info["detail"] == "pga_alert_detected": print(f"{str(current_time)} pga_alert のため、無視されました。") return jsonify({"status": "passed", "message": "pga_alert"}), 500 elif info["detail"] == "intensity_report_detected": print(f"{str(current_time)} intensity_report のため、無視されました。") return jsonify({"status": "passed", "message": "intensity_report"}), 500 else: print(f"{str(current_time)} 不明なデータ のため、無視されました。\njsonの内容") print(earthquake_data) return jsonify({"status": "passed", "message": "unknown_data"}), 500 except Exception as e: print(e) return jsonify({"status": "error", "message": str(e)}), 500 if __name__ == '__main__': bot = Bot() web.run(host='127.0.0.1', port=5000, debug=True)