データ集計を高速化したい

投稿者: | 2022-06-25

https://teratail.com/questions/6awlnff8pa23e7

  • とても参考になる
  • deque:両端のアクセスだけであればリストより高速(参考URL
  • 辞書の使い方

目次

メイン

  • やりがちなのが、2000万行などサイズが大きいデータをDataFrameで読み込んで、if word in row:などしてしまうこと。とんでもなく遅くなる

  • データは1行ずつしか読み込まない
  • ほしい情報を含むようなサイズで deque ≒ リスト を用意
  • 1行ずつ読み込み、5行分を deque で保持する
  • 1行ずつ読み込み、キーワードが含まれる場合、deque から(何行か前の)必要なデータを取得する
  • 取得したら用意した辞書にappendする

  • ログファイルは全読込せずに、行毎に必要なものだけ処理する。
  • (カメラ→日付リスト辞書のように)必要なデータのみ保持する。

deque
import pandas as pd
from datetime import datetime
from collections import deque

def read_log(path):

    cams = dict() # キー=カメラ, 値=日時リスト

    with open(path, 'r', encoding='shift_jis', errors='ignore') as f:
        read_state = 0
     # いったんqueはリストみたいに考えておく(5行しか格納しない)
        que = deque(maxlen=5) 
        while True:
            line = f.readline()
            if not line:
                break

            que.append(line) # いったん流れた日時行を取得するため直近5行をキューに保持

            if read_state == 0:   # 開始行の探索中=「動体検知開始」
                if line.startswith('サブリスト: 動体検知開始'):
                    line = que[-4] # 日時行
                    pos = line.find(' ')
                    dt = datetime.strptime(line[pos+1:].strip(), '%Y-%m-%d %H:%M:%S')
                    # フラグを切り替え(次のカメラ行を探索するため)
                    read_state = 1

            elif read_state == 1: # カメラ行の探索中
                if line.startswith('カメラ No.:'):
                    cam = line.split(':')[1].strip()
                    if cam not in cams:
                        cams[cam] = []
                    cams[cam].append(dt)
            # フラグを切り替え(開始行の探索のため)
                    read_state = 0

    return cams

cams = read_log('logTest.txt')
print('read_log end.')

# 結果をカメラ毎にCSV出力
for cam, dts in cams.items():
    df = pd.DataFrame({'Date':dts})
    df['cnt'] = 1
    df = df.set_index('Date')
    df = df.resample('H').sum()
    df.to_csv(f'{cam}.csv')

テストデータ作成用

from datetime import datetime, timedelta
import random

def make_test_log(path, n=1):
    random.seed(110)
    dt_cur = datetime(2022,6,3,12)
    dt_delta = timedelta(seconds=1)
    with open(path, 'w', encoding='shift_jis') as f:
        for i in range(n):
            log_id = i+1
            log_time = dt_cur.strftime('%Y-%m-%d %H:%M:%S')
            mes = '開始' if random.randint(0,1) == 1 else '終了'
            cam_no = f'D{random.randint(1,10)}'
            dt_cur += dt_delta

            lines = f"""----------------------------
{log_id}    {log_time}
----------------------------
メインリスト: アラーム録画
サブリスト: 動体検知{mes}
ローカルユーザー: N/A
ホストIPアドレス: N/A
パラメータタイプ: N/A
カメラ No.: {cam_no}

"""
            f.write(lines)

make_test_log('logTest.txt', 2000000)