気圧と健康の気象病予報士@東京

OpenWeatherMapのデータと生成AIを用いて記事を作成しています

main 関数内をシンプルに -ver.0.022-20240602

リーダブルコードを目指し

main 関数を簡略化しました。具体的には、圧縮・展開などの処理をそれぞれの関数にして、main()の中で呼び出すことにしました。読みやすいコードを意味する「リーダブルコード」を目指したいなぁ。

Amazonのアソシエイトとして、このブログの著者が適格販売により収入を得ている場合があります。

キンドルで読める電子版も

リーダブルコードの書籍と言えば、amazon.co.jp で、Kindle版は発売されていません。ですが、オライリーから PDF だったか、EPUB とかのKindle端末に送信できるタイプのフォーマットファイルが販売されていますよ(すみません、上記は何年も昔のことで、Kindle 端末側が EPUB 等のファイルのインポートが可能なのかどうか、現在は不明です)。

import os
import requests
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from datetime import datetime, timedelta, timezone
import smtplib
from email.mime.image import MIMEImage
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from dotenv import load_dotenv
import openai
import logging
import time
import gzip
import shutil

# 環境変数の読み込み
load_dotenv()

# 環境変数から設定を取得
log_dir = os.getenv('LOG_DIR')
api_key = os.getenv('OPENWEATHERMAP_API_KEY')
openai_api_key = os.getenv('OPENAI_API_KEY')
smtp_server = os.getenv('SMTP_SERVER')
smtp_port = int(os.getenv('SMTP_PORT'))
from_email = os.getenv('FROM_EMAIL')
to_addr = os.getenv('TO_ADDR')
email_password = os.getenv('EMAIL_PASSWORD')

# 日付ごとにログファイルを生成
current_date = datetime.now().strftime("%Y-%m-%d")
log_file_path = os.path.join(log_dir, f"{current_date}.log")
compressed_log_file_path = f"{log_file_path}.gz"

def decompress_log_file():
    if os.path.exists(compressed_log_file_path):
        try:
            with gzip.open(compressed_log_file_path, 'rb') as f_in:
                with open(log_file_path, 'wb') as f_out:
                    shutil.copyfileobj(f_in, f_out)
        except gzip.BadGzipFile:
            shutil.copy(compressed_log_file_path, log_file_path)
    else:
        open(log_file_path, 'w').close()

def compress_log_file():
    try:
        with open(log_file_path, 'rb') as f_in:
            with gzip.open(compressed_log_file_path, 'wb') as f_out:
                shutil.copyfileobj(f_in, f_out)
        os.remove(log_file_path)
        logging.debug("Log file compressed successfully")
    except Exception as e:
        logging.error(f"Error compressing log file: {e}")

# gzファイルの展開または新規作成
decompress_log_file()

# ログの設定
logging.basicConfig(filename=log_file_path, level=logging.DEBUG,
                    format='%(asctime)s %(levelname)s %(message)s')

logging.debug("Starting script")

if log_file_path:
    # ログの記録
    with open(log_file_path, "a") as log:
        log.write(f"Script executed at {datetime.now()}\n")
else:
    logging.error("LOG_FILE_PATH environment variable not set.")
    print("LOG_FILE_PATH environment variable not set.")

# 定数
CITY_NAME = 'Shibuya,JP'
API_URL = f'http://api.openweathermap.org/data/2.5/forecast?q={CITY_NAME}&appid={api_key}&units=metric'
THRESHOLD = -0.1  # 極端な下降トレンドとみなす閾値(調整可能)
MIN_INTERVAL = 2 * 3600  # 強調表示する最小間隔(2時間)

# OpenAI APIキーの設定
openai.api_key = openai_api_key

logging.debug("Environment variables loaded")

# 気圧データ取得関数
def get_pressure_data():
    logging.debug("Starting to fetch pressure data")
    max_retries = 3
    retries = 0
    while retries < max_retries:
        try:
            response = requests.get(API_URL)
            response.raise_for_status()
            data = response.json()
            logging.debug("Pressure data fetched successfully")
            return data
        except requests.RequestException as e:
            retries += 1
            logging.error(f"Error fetching data: {e}, retrying ({retries}/{max_retries})...")
            time.sleep(5)  # 5秒待ってからリトライ
    return None

# 気圧データの解析関数 (OpenAI API)
def analyze_pressure_data(timestamps, pressures):
    logging.debug("Starting to analyze pressure data")
    try:
        # 気圧データを簡略化してテキスト形式で整形
        data_text = "\n".join([f"{t.strftime('%Y-%m-%d %H:%M')}: {p}" for t, p in zip(timestamps, pressures)])

        # プロンプトに気圧データを説明する
        prompt = (
            "以下は東京渋谷の気圧データです。このデータに基づいて、気圧と気象病についてコメントを生成してください。コメントの最初に結論をやや長めに述べてください。その後、コメントは短いパラグラフを用い、見出しや箇条書き、番号付きリストなどを活用して読みやすくしてください。\n\n"
            + data_text
        )

        # テキストプロンプトに基づいた解析を行う
        response = openai.ChatCompletion.create(
            model="gpt-4o",  # 使用するモデル
            messages=[
                {"role": "system", "content": "You are an assistant that analyzes pressure data trends."},
                {"role": "user", "content": prompt}
            ],
            max_tokens=1000  # 生成されるコメントの最大トークン数
        )

        # 分析結果のコメントを取得
        comment = response['choices'][0]['message']['content'].strip()
        logging.debug("Comment from OpenAI: %s", comment)
        return format_comment(comment)

    except Exception as e:
        logging.error(f"Error analyzing pressure data: {e}")
        return None

# コメントの形式を整える関数
def format_comment(comment):
    # "@category: 日記\n"
    # "@tag: タグ名1, タグ名2\n"
    return f"{comment}"

# 気圧グラフ作成関数 (極端な下降トレンドをハイライト)
def highlight_extreme_downward_trend(timestamps, pressures, threshold, min_interval):
    logging.debug("Starting to create pressure graph")
    plt.figure(figsize=(12, 6))
    plt.plot(timestamps, pressures, marker='o', linestyle='-')

    highlight_start = None
    for i in range(len(pressures) - 1):
        rate_of_change = (pressures[i + 1] - pressures[i]) / ((timestamps[i + 1] - timestamps[i]).total_seconds() / 3600)
        if rate_of_change < threshold:
            if highlight_start is None:
                highlight_start = i
        else:
            if highlight_start is not None:
                interval = (timestamps[i] - timestamps[highlight_start]).total_seconds()
                if interval >= min_interval:
                    alpha = max(0.1, min(1.0, abs(rate_of_change)/2))
                    color = (1.0, 0.0, 0.0, alpha)

                    plt.axvspan(timestamps[highlight_start], timestamps[i], color=color)
                highlight_start = None

    plt.title('Pressure Data in Shibuya, Tokyo')
    plt.xlabel('Date')
    plt.ylabel('Pressure (hPa)')
    plt.xticks(rotation=45)

    # 横軸の日付フォーマットを変更
    ax = plt.gca()
    ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))
    ax.xaxis.set_major_locator(mdates.DayLocator())
    ax.xaxis.set_minor_locator(mdates.HourLocator(interval=6))

    plt.grid(True)
    plt.tight_layout()

    # ファイル名に日付と時刻を含めて保存
    output_dir = os.getenv('OUTPUT_DIR', os.path.join(os.path.dirname(__file__), "png"))
    os.makedirs(output_dir, exist_ok=True)

    current_datetime = datetime.now().strftime("%Y%m%d_%H%M%S")
    file_name = os.path.join(output_dir, f"{current_datetime}_fig.png")
    plt.savefig(file_name)
    logging.debug("Pressure graph saved as %s", file_name)
    return file_name

# メール送信関数
def send_email(file_path, comment):
    logging.debug("Starting to send email")
    current_time = datetime.now().strftime("%I:%M %p")

    msg = MIMEMultipart()
    msg['Subject'] = f'Pressure data in Tokyo at {current_time}'
    msg['From'] = from_email
    msg['To'] = to_addr

    # メール本文をMarkdown形式で作成
    if comment:
        email_body = f"""
**Current time:** {current_time}

{comment}

        """
        msg.attach(MIMEText(email_body, 'plain'))  # Markdown形式で送信
    else:
        email_body = f"""
**Current time:** {current_time}

No analysis comment available.

        """
        msg.attach(MIMEText(email_body, 'plain'))  # Markdown形式で送信

    with open(file_path, 'rb') as f:
        img_data = f.read()
        image = MIMEImage(img_data, name=os.path.basename(file_path))
        msg.attach(image)

    try:
        with smtplib.SMTP(smtp_server, smtp_port) as server:
            server.starttls()
            server.login(from_email, email_password)
            server.send_message(msg)
            logging.debug("Email sent successfully")
    except smtplib.SMTPException as e:
        logging.error(f"Error sending email: {e}")

# メイン関数
def main():
    logging.debug("Main function started")
    try:
        data = get_pressure_data()
        if data:
            timestamps, pressures = [], []
            jst = timezone(timedelta(hours=9), 'JST')
            current_time = datetime.now(jst)
            start_time = current_time - timedelta(hours=24)
            for item in data.get('list', []):
                timestamp = datetime.utcfromtimestamp(item.get('dt', 0)).replace(tzinfo=timezone.utc).astimezone(jst)
                if timestamp >= start_time:
                    timestamps.append(timestamp)
                    pressures.append(item.get('main', {}).get('pressure', 0))

            # 画像データのBase64エンコード
            file_name = highlight_extreme_downward_trend(timestamps, pressures, THRESHOLD, MIN_INTERVAL)

            # 気圧データの解析コメント取得
            comment = analyze_pressure_data(timestamps, pressures)

            # メール送信
            send_email(file_name, comment)
        else:
            logging.error("Failed to fetch pressure data after retries.")
    except Exception as e:
        logging.error(f"Error in main function: {e}")

    # ログファイルをgzipで再圧縮
    compress_log_file()

if __name__ == "__main__":
    main()