気圧と健康の気象病予報士@東京

OpenWeatherMapのデータと生成AIを用いて記事を作成しています

リトライ機能の追加 -ver.0.02-20240531

OpenWheatherMap から気圧データを取得(fetching)する、get_pressure_data 関数でエラーが発生しました。

$ cat python_script.err 

"Error fetching data: 502 Server Error: Bad Gateway for url: http://api.openweathermap...."

リトライ機能で修正

get_pressure_data 関数でリクエストが失敗した場合に、3回までリトライする機能を追加しました。リトライ間隔は5秒です。これで、APIリクエストが一時的なエラーで失敗した場合でも、リトライを試みることで成功する可能性が高まるはずです。

リーナスさんのこと

ところで、このような愚かなブログ(愚ログ<ぐろぐ>)で、ソースコードを公開していると、なんだか、お世話になったLinus Benedict Torvaldsさんを思い出します。もちろん、彼と私に直接的な関係はないのですが。Linux(当時は、Red Hat Linux, もういまは無償版って無いんですね) でだいぶ勉強をさせてもらいました。


get_pressure_data 関数だけのフローチャート


スクリプト全体のフローチャート


import os
import requests
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from datetime import datetime, timedelta, timezone
import smtplib
from email.mime.image import MIMEImage
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from dotenv import load_dotenv
import openai
import logging
import time

# 環境変数の読み込み
load_dotenv()

# 環境変数から設定を取得
log_file_path = os.getenv('LOG_FILE_PATH')
api_key = os.getenv('OPENWEATHERMAP_API_KEY')
openai_api_key = os.getenv('OPENAI_API_KEY')
smtp_server = os.getenv('SMTP_SERVER')
smtp_port = int(os.getenv('SMTP_PORT'))
from_email = os.getenv('FROM_EMAIL')
to_addr = os.getenv('TO_ADDR')
email_password = os.getenv('EMAIL_PASSWORD')

# ログの設定
logging.basicConfig(filename=log_file_path, level=logging.DEBUG,
                    format='%(asctime)s %(levelname)s %(message)s')

logging.debug("Starting script")

if log_file_path:
    # ログの記録
    with open(log_file_path, "a") as log:
        log.write(f"Script executed at {datetime.now()}\n")
else:
    logging.error("LOG_FILE_PATH environment variable not set.")
    print("LOG_FILE_PATH environment variable not set.")

# 定数
CITY_NAME = 'Shibuya,JP'
API_URL = f'http://api.openweathermap.org/data/2.5/forecast?q={CITY_NAME}&appid={api_key}&units=metric'
THRESHOLD = -0.1  # 極端な下降トレンドとみなす閾値(調整可能)
MIN_INTERVAL = 2 * 3600  # 強調表示する最小間隔(2時間)

# OpenAI APIキーの設定
openai.api_key = openai_api_key

logging.debug("Environment variables loaded")

# 気圧データ取得関数
def get_pressure_data():
    logging.debug("Starting to fetch pressure data")
    max_retries = 3
    retries = 0
    while retries < max_retries:
        try:
            response = requests.get(API_URL)
            response.raise_for_status()
            data = response.json()
            logging.debug("Pressure data fetched successfully")
            return data
        except requests.RequestException as e:
            retries += 1
            logging.error(f"Error fetching data: {e}, retrying ({retries}/{max_retries})...")
            time.sleep(5)  # 5秒待ってからリトライ
    return None

# 気圧データの解析関数 (OpenAI API)
def analyze_pressure_data(timestamps, pressures):
    logging.debug("Starting to analyze pressure data")
    try:
        # 気圧データを簡略化してテキスト形式で整形
        data_text = "\n".join([f"{t.strftime('%Y-%m-%d %H:%M')}: {p}" for t, p in zip(timestamps, pressures)])

        # プロンプトに気圧データを説明する
        prompt = (
            "以下は東京渋谷の気圧データです。このデータに基づいて、気圧と気象病についてコメントを生成してください。コメントの最初に結論をやや長めに述べてください。その後、コメントは短いパラグラフを用い、見出しや箇条書き、番号付きリストなどを活用して読みやすくしてください。\n\n"
            + data_text
        )

        # テキストプロンプトに基づいた解析を行う
        response = openai.ChatCompletion.create(
            model="gpt-4o",  # 使用するモデル
            messages=[
                {"role": "system", "content": "You are an assistant that analyzes pressure data trends."},
                {"role": "user", "content": prompt}
            ],
            max_tokens=1000  # 生成されるコメントの最大トークン数
        )

        # 分析結果のコメントを取得
        comment = response['choices'][0]['message']['content'].strip()
        logging.debug("Comment from OpenAI: %s", comment)
        return format_comment(comment)

    except Exception as e:
        logging.error(f"Error analyzing pressure data: {e}")
        return None

# コメントの形式を整える関数
def format_comment(comment):
    return f"{comment}"

# 気圧グラフ作成関数 (極端な下降トレンドをハイライト)
def highlight_extreme_downward_trend(timestamps, pressures, threshold, min_interval):
    logging.debug("Starting to create pressure graph")
    plt.figure(figsize=(12, 6))
    plt.plot(timestamps, pressures, marker='o', linestyle='-')

    highlight_start = None
    for i in range(len(pressures) - 1):
        rate_of_change = (pressures[i + 1] - pressures[i]) / ((timestamps[i + 1] - timestamps[i]).total_seconds() / 3600)
        if rate_of_change < threshold:
            if highlight_start is None:
                highlight_start = i
        else:
            if highlight_start is not None:
                interval = (timestamps[i] - timestamps[highlight_start]).total_seconds()
                if interval >= min_interval:
                    alpha = max(0.1, min(1.0, abs(rate_of_change)/2))
                    color = (1.0, 0.0, 0.0, alpha)

                    plt.axvspan(timestamps[highlight_start], timestamps[i], color=color)
                highlight_start = None

    plt.title('Pressure Data in Shibuya, Tokyo')
    plt.xlabel('Date')
    plt.ylabel('Pressure (hPa)')
    plt.xticks(rotation=45)

    # 横軸の日付フォーマットを変更
    ax = plt.gca()
    ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))
    ax.xaxis.set_major_locator(mdates.DayLocator())
    ax.xaxis.set_minor_locator(mdates.HourLocator(interval=6))

    plt.grid(True)
    plt.tight_layout()

    # ファイル名に日付と時刻を含めて保存
    output_dir = os.getenv('OUTPUT_DIR', os.path.join(os.path.dirname(__file__), "png"))
    os.makedirs(output_dir, exist_ok=True)

    current_datetime = datetime.now().strftime("%Y%m%d_%H%M%S")
    file_name = os.path.join(output_dir, f"{current_datetime}_fig.png")
    plt.savefig(file_name)
    logging.debug("Pressure graph saved as %s", file_name)
    return file_name

# メール送信関数
def send_email(file_path, comment):
    logging.debug("Starting to send email")
    current_time = datetime.now().strftime("%I:%M %p")

    msg = MIMEMultipart()
    msg['Subject'] = f'Pressure data in Tokyo at {current_time}'
    msg['From'] = from_email
    msg['To'] = to_addr

    # メール本文をMarkdown形式で作成
    if comment:
        email_body = f"""
**Current time:** {current_time}

{comment}
        """
        msg.attach(MIMEText(email_body, 'plain'))  # Markdown形式で送信
    else:
        email_body = f"""
**Current time:** {current_time}

No analysis comment available.
        """
        msg.attach(MIMEText(email_body, 'plain'))  # Markdown形式で送信

    with open(file_path, 'rb') as f:
        img_data = f.read()
        image = MIMEImage(img_data, name=os.path.basename(file_path))
        msg.attach(image)

    try:
        with smtplib.SMTP(smtp_server, smtp_port) as server:
            server.starttls()
            server.login(from_email, email_password)
            server.send_message(msg)
            logging.debug("Email sent successfully")
    except smtplib.SMTPException as e:
        logging.error(f"Error sending email: {e}")

# メイン関数
def main():
    logging.debug("Main function started")
    try:
        data = get_pressure_data()
        if data:
            timestamps, pressures = [], []
            jst = timezone(timedelta(hours=9), 'JST')
            current_time = datetime.now(jst)
            start_time = current_time - timedelta(hours=24)
            for item in data.get('list', []):
                timestamp = datetime.utcfromtimestamp(item.get('dt', 0)).replace(tzinfo=timezone.utc).astimezone(jst)
                if timestamp >= start_time:
                    timestamps.append(timestamp)
                    pressures.append(item.get('main', {}).get('pressure', 0))

            # 画像データのBase64エンコード
            file_name = highlight_extreme_downward_trend(timestamps, pressures, THRESHOLD, MIN_INTERVAL)

            # 気圧データの解析コメント取得
            comment = analyze_pressure_data(timestamps, pressures)

            # メール送信
            send_email(file_name, comment)
        else:
            logging.error("Failed to fetch pressure data after retries.")
    except Exception as e:
        logging.error(f"Error in main function: {e}")

if __name__ == "__main__":
    main()