気圧と健康の気象病予報士@東京

OpenWeatherMapのデータと生成AIを用いて記事を作成しています

GR-M10-RP の出力を文字情報に変換するだけ

準天頂衛星システムの「みちびき」。この、みちびきからは災害危険情報が発せられているようです。それを受信するデバイスを結びつけて、受信データを文字情報に変換するだけのことを書いています。

まずはドキュメントに目を通す

まずは、次のドキュメントに目を通してから。

prioris.jp

バイスを次々と接続していく

バイスを接続していく。

データがスルスルと入ってくる

先のドキュメントの read.py を実行すると、次のようなデータが流れていきます。

$QZQSM,56,c6adf3b4518002cc2659860b30e966222d00e5a030b408969c4000103329954*78
$QZQSM,56,9ab0840de0000000000000000000000000000000000000000000000227cbb28*2e
$QZQSM,56,53adf3b4518002d3f65a8014115a83ed507e4a178544e228a00000105942ed0*76
$QZQSM,56,c6b0840de000000000000000000000000000000000000000000000025283edc*21
$QZQSM,56,9aadf3b4518002c3e8588acb118162352c47e58fa0b2f0965e6000109826a08*7b
$QZQSM,56,53b0840de000000000000000000000000000000000000000000000029879b04*75
$QZQSM,56,c6adf3b4518002cc2659860b30e966222d00e5a030b408969c400010f00fe8c*75
$QZQSM,56,9ab0840de00000000000000000000000000000000000000000000002e4edcf0*27
$QZQSM,56,53adf3b4518002d3f65a8014115a83ed507e4a178544e228a00000111c286b8*78

サンプルスクリプトを参考にし

サンプルスクリプトの、read.py を参考にして。

import sys
import argparse
import operator
from functools import reduce
import serial
import time
import azarashi

VAL_SET_RAM_UBX_RXM_SFRBX_UART1_ON = bytes([0xB5, 0x62, 0x06, 0x8A, 0x09, 0x00, 0x01, 0x01, 0x00, 0x00, 0x32, 0x02, 0x91, 0x20, 0x01, 0x81, 0x30])

satellite_id = {
    184: '56',
    185: '57',
    189: '61',
    183: '55',
    186: '58',
}

def nmea_checksum(sentence):
    data = sentence.strip("$").split('*', 1)[0]
    cksum = reduce(operator.xor, (ord(s) for s in data), 0)
    return cksum

def ubx_checksum(message):
    ck_a = 0
    ck_b = 0
    i = 0
    while i < len(message):
        ck_a = (ck_a + message[i]) & 0xff
        ck_b = (ck_b + ck_a) & 0xff
        i += 1
    return ck_a, ck_b

def ubx2qzqsm(line):
    if line[:7] == b'\xB5\x62\x02\x13\x2C\x00\x05': # UBX-RXM-SFRBX, 44 bytes, QZSS
        satId = satellite_id[line[7] + 182] # PRN -> Satellite ID
        data = b''
        for i in range(9):
            data += bytes((line[14+3+i*4], line[14+2+i*4], line[14+1+i*4], line[14+0+i*4]))
        if data[1] >> 2 == 43 or data[1] >> 2 == 44: # Message Type 43=JMA-DC Report, 44=Other
            dcr_message = (data[:31] + bytes((data[31] & 0xC0,))).hex()[:-1] # 256-4=252 bit
            sentence = '$QZQSM,' + satId + ',' + dcr_message + '*'
            return sentence + format(nmea_checksum(sentence), 'x')

if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='Print QZQSM NMEA format sentence')  
    parser.add_argument('port', help='serial port. ex: /dev/ttyUSB0')
    parser.add_argument('baudrate', help='baudrate. ex: 115200')
    parser.add_argument('-n', '--nmea', help='print other standard NMEA sentence', action='store_true')
    args = parser.parse_args()

    with serial.Serial(args.port, args.baudrate) as ser:
        print('initializing...')
        ser.write(VAL_SET_RAM_UBX_RXM_SFRBX_UART1_ON) # UBX-RXM-SFRBX Output ON
        time.sleep(1)
        print('start!')

        # 標準出力をファイルにリダイレクト
        with open('output.txt', 'w') as f:
            sys.stdout = f

            while True:
                line = b''
                nmea_flag = False
                ubx_flag = False
                count = 0
                payload_length = 0
                while True:
                    if ubx_flag:
                        if count > 4 and payload_length == 0:
                            payload_length = int.from_bytes(line[4:5], "little")
                        if payload_length > 0 and count == payload_length + 8: # header 6 bytes + checksum 2 bytes
                            break
                    b = ser.read()
                    if b == b'$' and not ubx_flag:
                        nmea_flag = True
                    if b == b'\x62' and line == b'\xB5':
                        ubx_flag = True
                    if b == b'\n':
                        if line.endswith(b'\r'):
                            line += b
                            break
                        else:
                            line += b
                    else:
                        line += b
                    count += 1

                if args.nmea and nmea_flag:
                    sentence = line.decode().strip('\r\n')
                    ck = nmea_checksum(sentence)
                    if format(ck, 'x') == sentence.split('*', 1)[1]:
                        print(sentence)

                if ubx_flag:
                    ck_a, ck_b = ubx_checksum(line[2:payload_length+6])
                    if line[-2] == ck_a and line[-1] == ck_b:
                        sentence = ubx2qzqsm(line)
                        if sentence:
                            try:
                                report = azarashi.decode(sentence, msg_type='spresense')
                                print(report)
                            except azarashi.QzssDcrDecoderException as e:
                                print(f'Error decoding line: {sentence} - {e}')

実行時には引数をつけよう

実行時にには、引数(だっけ?)も添えて。上記のスクリプト名を、"read_V1.py"とします。

python3 read_V1.py /dev/ttyS0 38400

6月以降に購入した人は、"38400"らしいです。それ以前の人とかは、冒頭のドキュメントに数字が書かれています。

データが日本語として出力

そして、実行をしたら、下記の情報が出力されます。

発表時刻: 7131135分

警報等情報要素: 海上濃霧警報
沿海州南部沖

警報等情報要素: 海上濃霧警報
秋田沖

警報等情報要素: 海上濃霧警報
佐渡沖

警報等情報要素: 海上濃霧警報
能登沖

警報等情報要素: 海上濃霧警報
日本海北西部

警報等情報要素: 海上濃霧警報
山陰沖東部及び若狭湾付近

警報等情報要素: 海上濃霧警報
山陰沖西部

警報等情報要素: 海上濃霧警報
対馬海峡
防災気象情報(海上)(発表)(通常)
海上警報が発表されました。

発表時刻: 7131135分

警報等情報要素: 海上濃霧警報
済州島西海上

警報等情報要素: 海上濃霧警報
長崎西海上

警報等情報要素: 海上風警報
北海道東方海上

警報等情報要素: 海上風警報
三陸沖東部

警報等情報要素: 海上風警報
三陸沖西部

警報等情報要素: 海上風警報
関東海域北部

警報等情報要素: 海上風警報
対馬海峡

警報等情報要素: 海上風警報
長崎西海上
### DCX Message - Null Message ###
### DCX Message - Null Message ###

今後の使い方は今後考えます

どういう使い方をしていくかは、今後考えたいと思います。