Dislocker/temp/csv.py

69 lines
2.1 KiB
Python

import psycopg2
import csv
from psycopg2 import sql
from datetime import datetime
def format_datetime(value):
if isinstance(value, datetime):
return value.strftime('%Y-%m-%d %H:%M:%S')
return value
def export_table_to_csv(host, port, database, user, password, table_name, csv_file_path):
try:
# データベースに接続
conn = psycopg2.connect(
host=host,
port=port,
database=database,
user=user,
password=password
)
# カーソルを作成
cur = conn.cursor()
# テーブルの全データを取得
cur.execute(sql.SQL("SELECT * FROM {}").format(sql.Identifier(table_name)))
# 列名を取得
column_names = [desc[0] for desc in cur.description]
# 結果を取得
rows = cur.fetchall()
# CSVファイルに書き込み
with open(csv_file_path, 'w', newline='', encoding='utf-8-sig') as csvfile:
csvwriter = csv.writer(csvfile)
# 列名を書き込み
csvwriter.writerow(column_names)
# データを書き込み
for row in rows:
# 各フィールドを適切にフォーマット
formatted_row = [format_datetime(field) if field is not None else '' for field in row]
csvwriter.writerow(formatted_row)
print(f"テーブル '{table_name}' の内容を '{csv_file_path}' に出力しました。")
except (Exception, psycopg2.Error) as error:
print("エラーが発生しました:", error)
finally:
# データベース接続を閉じる
if conn:
cur.close()
conn.close()
print("データベース接続を閉じました。")
# 使用例
# 使用例
host = "192.168.1.220"
port = "12245"
database = "dislocker"
user = "suti7"
password = "Testing1234"
table_name = "pc_usage_history"
csv_file_path = "./output.csv"
export_table_to_csv(host, port, database, user, password, table_name, csv_file_path)