csv出力のテスト
This commit is contained in:
parent
dde933fc14
commit
41e85cb6bc
1 changed files with 77 additions and 0 deletions
77
temp/csv_test.py
Normal file
77
temp/csv_test.py
Normal file
|
@ -0,0 +1,77 @@
|
||||||
|
import psycopg2
|
||||||
|
import csv
|
||||||
|
from psycopg2 import sql
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
def format_datetime(value):
|
||||||
|
if isinstance(value, datetime):
|
||||||
|
return value.strftime('%Y-%m-%d %H:%M:%S')
|
||||||
|
return value
|
||||||
|
|
||||||
|
def export_table_to_csv(host, port, database, user, password, main_table, related_table, related_column, csv_file_path):
|
||||||
|
try:
|
||||||
|
# データベースに接続
|
||||||
|
conn = psycopg2.connect(
|
||||||
|
host=host,
|
||||||
|
port=port,
|
||||||
|
database=database,
|
||||||
|
user=user,
|
||||||
|
password=password
|
||||||
|
)
|
||||||
|
|
||||||
|
cur = conn.cursor()
|
||||||
|
|
||||||
|
# メインテーブルの列情報を取得(user_idを除く)
|
||||||
|
cur.execute(sql.SQL("SELECT * FROM {} LIMIT 0").format(sql.Identifier(main_table)))
|
||||||
|
main_columns = [desc[0] for desc in cur.description if desc[0] != 'member_id']
|
||||||
|
|
||||||
|
# クエリを作成(列名を明確に指定)
|
||||||
|
query = sql.SQL("""
|
||||||
|
SELECT {main_columns}, {related_table}.name
|
||||||
|
FROM {main_table}
|
||||||
|
LEFT JOIN {related_table} ON {main_table}.member_id = {related_table}.id
|
||||||
|
""").format(
|
||||||
|
main_columns=sql.SQL(', ').join([sql.SQL("{}.{}").format(sql.Identifier(main_table), sql.Identifier(col)) for col in main_columns]),
|
||||||
|
main_table=sql.Identifier(main_table),
|
||||||
|
related_table=sql.Identifier(related_table)
|
||||||
|
)
|
||||||
|
|
||||||
|
cur.execute(query)
|
||||||
|
|
||||||
|
# 列名を再構成(nameを2番目に配置)
|
||||||
|
column_names = [main_columns[0], 'name'] + main_columns[1:]
|
||||||
|
|
||||||
|
rows = cur.fetchall()
|
||||||
|
|
||||||
|
with open(csv_file_path, 'w', newline='', encoding='utf-8-sig') as csvfile:
|
||||||
|
csvwriter = csv.writer(csvfile)
|
||||||
|
csvwriter.writerow(column_names)
|
||||||
|
|
||||||
|
for row in rows:
|
||||||
|
# nameを2番目に移動
|
||||||
|
formatted_row = [format_datetime(row[0])] + [row[-1]] + [format_datetime(field) if field is not None else '' for field in row[1:-1]]
|
||||||
|
csvwriter.writerow(formatted_row)
|
||||||
|
|
||||||
|
print(f"テーブル '{main_table}' の内容を '{csv_file_path}' に出力しました。")
|
||||||
|
|
||||||
|
except (Exception, psycopg2.Error) as error:
|
||||||
|
print("エラーが発生しました:", error)
|
||||||
|
|
||||||
|
finally:
|
||||||
|
if conn:
|
||||||
|
cur.close()
|
||||||
|
conn.close()
|
||||||
|
print("データベース接続を閉じました。")
|
||||||
|
|
||||||
|
# 使用例
|
||||||
|
host = "192.168.1.220"
|
||||||
|
port = "12245"
|
||||||
|
database = "dislocker"
|
||||||
|
user = "suti7"
|
||||||
|
password = "Testing1234"
|
||||||
|
main_table = "pc_usage_history"
|
||||||
|
related_table = "club_member"
|
||||||
|
related_column = "name" # 例: 登録者の名前を表示したい場合
|
||||||
|
csv_file_path = "output.csv"
|
||||||
|
|
||||||
|
export_table_to_csv(host, port, database, user, password, main_table, related_table, related_column, csv_file_path)
|
Loading…
Reference in a new issue