ぼうびろく (original) (raw)

Windous側に入っているNode.jsにパスが通っています。Windows側はアンインストールすればOK

以下のようなExcelデータを使用する際、毎回Excelを読みこんでいるとめちゃ時間がかかるので高速化した。

方法としては、読みこんだデータを.npyで保存しておくようにした。今回はデータが更新されるのでチェックサムで更新を確認して、更新された場合は.npyも更新するようにしている。

import pandas as pd import numpy as np import os import hashlib import json

def compute_checksum(file_path: str) -> str: """Excelファイルのチェックサム(SHA-256)を計算する""" hash_sha256 = hashlib.sha256() with open(file_path, "rb") as f: for chunk in iter(lambda: f.read(4096), b""): hash_sha256.update(chunk) return hash_sha256.hexdigest()

def read_prices_from_sheets(file_path: str, sheet_names: list, step: int = 1, use_cache: bool = False) -> list:

cache_file = file_path.replace('.xlsx', '_cache.npy')
checksum_file = file_path.replace('.xlsx', '_checksum.json')


current_checksum = compute_checksum(file_path)
is_cache_valid = False

if os.path.exists(checksum_file):
    with open(checksum_file, 'r') as f:
        cached_data = json.load(f)
        if cached_data.get("checksum") == current_checksum:
            is_cache_valid = True


if use_cache and is_cache_valid and os.path.exists(cache_file):
    print(f"Loading data from cache: {cache_file}")
    all_data = np.load(cache_file, allow_pickle=True).item()
else:
    print(f"Reading data from Excel: {file_path}")
    all_data = {}

    
    for sheet_name in pd.ExcelFile(file_path).sheet_names:
        df = pd.read_excel(file_path, sheet_name=sheet_name)
        all_data[sheet_name] = df.iloc[:, 1].tolist()  

    
    np.save(cache_file, all_data)
    with open(checksum_file, 'w') as f:
        json.dump({"checksum": current_checksum}, f)
    print(f"Data cached to: {cache_file}")


all_prices = []
for sheet_name in sheet_names:
    if sheet_name in all_data:
        all_prices.extend(all_data[sheet_name][::step])

return all_prices

ビットコインの年間利益計算スクリプト作成した。BitFlyerの取引レポートをそのまま読みこめるようにしてある。

間違っている点あればコメントで教えてください。

import pandas as pd

def calculate_annual_profit_average_method(csv_file):

df = pd.read_csv(csv_file, parse_dates=['取引日時'])


df = df[['取引日時', '取引種別', '取引価格', '通貨1数量', '手数料']]
df = df.sort_values('取引日時')  


df['年'] = df['取引日時'].dt.year

annual_profit = {}


total_quantity = 0  
total_cost = 0      
for year, group in df.groupby('年'):
    profit = 0          

    
    for _, row in group.iterrows():
        if row['取引種別'] == '買い':
            price = float(row['取引価格'].replace(",", ""))
            quantity = abs(float(row['通貨1数量'].replace(",", "")))
            
            total_cost += price * quantity
            
            total_quantity += quantity - abs(float(row['手数料']))

        if row['取引種別'] == '受取':
            quantity = abs(float(row['通貨1数量'].replace(",", "")))
            total_quantity += quantity

    
    if total_quantity > 0:
        avg_cost_price = total_cost / total_quantity
    else:
        avg_cost_price = 0

    
    for _, row in group.iterrows():
        if row['取引種別'] == '売り':
            price = float(row['取引価格'].replace(",", ""))
            quantity = abs(float(row['通貨1数量'].replace(",", "")))
            
            profit += (price - avg_cost_price) * (quantity - abs(float(row['手数料'])))
            total_quantity -= quantity
            total_quantity -= abs(float(row['手数料']))

    
    annual_profit[year] = int(profit)

    total_cost = avg_cost_price * total_quantity

return annual_profit

def calculate_annual_profit_moving_average(csv_file):

df = pd.read_csv(csv_file, parse_dates=['取引日時'])


df = df[['取引日時', '取引種別', '取引価格', '通貨1数量', '手数料']]
df = df.sort_values('取引日時')  


df['年'] = df['取引日時'].dt.year

annual_profit = {}


total_quantity = 0  
total_cost = 0      

for year, group in df.groupby('年'):
    profit = 0          
    
    for _, row in group.iterrows():
        if row['取引種別'] == '買い' or row['取引種別'] == '売り' or row['取引種別'] == '受取':
            price = float(row['取引価格'].replace(",", ""))
            quantity = abs(float(row['通貨1数量'].replace(",", "")))

            if row['取引種別'] == '買い':
                
                total_cost += price * quantity
                total_quantity += quantity
                total_quantity -= abs(float(row['手数料']))
                
                avg_cost_price = total_cost / total_quantity

            elif row['取引種別'] == '売り':
                
                profit += (price - avg_cost_price) * quantity

                
                total_cost -= (avg_cost_price * quantity)
                total_quantity -= quantity
                total_quantity -= abs(float(row['手数料']))
                
            elif row['取引種別'] == '受取':
                total_quantity += quantity
                avg_cost_price = total_cost / total_quantity

    
    annual_profit[year] = int(profit)

return annual_profit

csv_file = 'TradeHistory.csv'

annual_profit = calculate_annual_profit_average_method(csv_file) print("総平均法") print(" 年間利益:", annual_profit)

annual_profit = calculate_annual_profit_moving_average(csv_file) print("移動平均法") print(" 年間利益:", annual_profit)

開発初期あたりのとりあえず正常系のみ書いて動かしてみるときなどで、SEGVが発生した場合にバックトレースログを出力したいときのコード。

シグナルハンドラでSEGVなど異常終了のシグナルをハンドリングしてログ出力する。

#define _GNU_SOURCE #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #ifdef DEBUG #include <signal.h> #include <execinfo.h> #endif

#ifdef DEBUG

char exe_path[1024];

void signal_handler(int sig, siginfo_t *info, void *ucontext) { void *buffer[30]; int nptrs; char **symbols; char msg[256]; int len;

nptrs = backtrace(buffer, sizeof(buffer) / sizeof(void *));
symbols = backtrace_symbols(buffer, nptrs);
if (symbols == NULL) {
    const char *msg_fail = "Failed to get backtrace symbols.\n";
    write(STDERR_FILENO, msg_fail, strlen(msg_fail));
    _exit(EXIT_FAILURE);
}


len = snprintf(msg, sizeof(msg), "Received signal %d (%s)\n", sig, strsignal(sig));
write(STDERR_FILENO, msg, len);

if (sig == SIGSEGV || sig == SIGBUS) {
    len = snprintf(msg, sizeof(msg), "Fault address: %p\n", info->si_addr);
    write(STDERR_FILENO, msg, len);
}


len = snprintf(msg, sizeof(msg), "Executable Path: %s\n", exe_path);
write(STDERR_FILENO, msg, len);


write(STDERR_FILENO, "Backtrace:\n", strlen("Backtrace:\n"));
for (int i = 0; i < nptrs; i++) {
    
    
    char *start = strchr(symbols[i], '[');
    char *end = strchr(symbols[i], ']');
    if (start && end && end > start + 1) {
        *end = '\0'; 
        start++; 
        len = snprintf(msg, sizeof(msg), "  [%d] %s - %s:%s\n", i, symbols[i], exe_path, start);
        write(STDERR_FILENO, msg, len);
    } else {
        
        len = snprintf(msg, sizeof(msg), "  [%d] %s\n", i, symbols[i]);
        write(STDERR_FILENO, msg, len);
    }
}

free(symbols);


signal(sig, SIG_DFL);
raise(sig);

}

void setup_signal_handler() { struct sigaction sa; memset(&sa, 0, sizeof(sa)); sa.sa_sigaction = signal_handler; sa.sa_flags = SA_SIGINFO | SA_RESTART;

int signals[] = {SIGSEGV, SIGABRT, SIGFPE, SIGILL, SIGBUS, SIGTERM, SIGINT, SIGQUIT, SIGHUP};
int num_signals = sizeof(signals) / sizeof(signals[0]);

char msg[256];
int len;
for(int i = 0; i < num_signals; i++) {
    if (sigaction(signals[i], &sa, NULL) == -1) {
        perror("sigaction");
        exit(EXIT_FAILURE);
    } else {
        
        len = snprintf(msg, sizeof(msg), "Signal handler set for signal %d (%s)\n", signals[i], strsignal(signals[i]));
        write(STDERR_FILENO, msg, len);
    }
}

}

#endif

int main(int argc, char *argv[]) { #ifdef DEBUG

ssize_t len = readlink("/proc/self/exe", exe_path, sizeof(exe_path) - 1);
if (len != -1) {
    exe_path[len] = '\0';
} else {
    strcpy(exe_path, "unknown");
}


setup_signal_handler();

#endif

fprintf(stderr, "DEBUGモード: セグメンテーションフォルトを発生させます。\n");
fflush(stderr); 
int *p = NULL;
*p = 42;  

fprintf(stdout, "プログラムが正常に終了しました。\n");
return 0;

}

メインスレッドである処理を行いサブスレッドに通知し、サブスレッドでまた別の処理を行う、というマルチスレッドのプログラムを作成した際、結局どのくらい処理時間がかかっているのか調べる必要があったため、LTTng (Linux Tracing Toolkit Next Generation) を使用して計測を行った。

ソースは以下のとおり、tracef()でトレースを行う。

#include #include #include #include #include

#include <lttng/tracef.h>

std::condition_variable cv; std::mutex mtx; bool button_pressed = false;

void device_thread() { std::unique_lockstd::mutex lock(mtx); while (true) {

    cv.wait(lock, [] { return button_pressed; });

    
    tracef("Audio playback started");

    
    std::cout << "Audio playback started.\n";
    std::this_thread::sleep_for(std::chrono::seconds(1));

    
    button_pressed = false;
}

}

void main_thread() { int event_id = 0;
while (true) { std::this_thread::sleep_for(std::chrono::milliseconds(1000));

    tracef("Button pressed, event_id=%d", event_id);

    
    {
        std::lock_guard<std::mutex> lock(mtx);
        button_pressed = true;
    }
    cv.notify_one();

    
    event_id++;
}

}

int main() { std::thread device(device_thread); std::thread main_t(main_thread);

main_t.join();
device.join();
return 0;

}

ビルド、計測方法は以下のとおり。

build

sudo apt-get update sudo apt-get install lttng-tools lttng-modules-dkms liblttng-ust-dev babeltrace2 g++ -o test test.cpp -llttng-ust -lpthread

セッションの作成

lttng create my_session

ユーザースペースのトレースポイントを有効化

lttng enable-event --userspace 'lttng_ust_tracef:*'

トレースの開始

lttng start

実行

./test

トレースの停止

lttng stop

セッションの破棄(必要に応じて)

lttng destroy

結果の確認

babeltrace2 ~/lttng-traces/my_session-2024*/

結果としては、以下のような出力が得られる。「+0.000251500」などがメインスレッドでtracef()が実行されるのを0秒として、デバイススレッドでtracef()が実行されるまでにかかった時間を計測している。

$ babeltrace2 ~/lttng-traces/my_session-2024*/ [12:59:59.132077740] (+?.?????????) DESKTOP-JRJKTJO lttng_ust_tracef:event: { cpu_id = 7 }, { _msg_length = 26, msg = "Button pressed, event_id=0" } [12:59:59.132329240] (+0.000251500) DESKTOP-JRJKTJO lttng_ust_tracef:event: { cpu_id = 2 }, { _msg_length = 22, msg = "Audio playback started" } [13:00:00.132408292] (+1.000079052) DESKTOP-JRJKTJO lttng_ust_tracef:event: { cpu_id = 7 }, { _msg_length = 26, msg = "Button pressed, event_id=1" } [13:00:00.133103892] (+0.000695600) DESKTOP-JRJKTJO lttng_ust_tracef:event: { cpu_id = 2 }, { _msg_length = 22, msg = "Audio playback started" } [13:00:01.133352844] (+1.000248952) DESKTOP-JRJKTJO lttng_ust_tracef:event: { cpu_id = 7 }, { _msg_length = 26, msg = "Button pressed, event_id=2" } [13:00:01.133578044] (+0.000225200) DESKTOP-JRJKTJO lttng_ust_tracef:event: { cpu_id = 2 }, { _msg_length = 22, msg = "Audio playback started" } [13:00:02.133783396] (+1.000205352) DESKTOP-JRJKTJO lttng_ust_tracef:event: { cpu_id = 7 }, { _msg_length = 26, msg = "Button pressed, event_id=3" } [13:00:02.134150696] (+0.000367300) DESKTOP-JRJKTJO lttng_ust_tracef:event: { cpu_id = 2 }, { _msg_length = 22, msg = "Audio playback started" }

収集している時系列データを、生データから等間隔に補完するプログラム。GoogleColabで実行すること。

データはGoogleスプレッドシートに保存しておき、1列目がタイムスタンプ、2列目が対象のデータとすること。

from google.colab import auth auth.authenticate_user() from gspread_dataframe import get_as_dataframe, set_with_dataframe import pandas as pd

import gspread from google.auth import default creds, _ = default()

gc = gspread.authorize(creds)

def convert_timestamp_format(timestamp): return pd.to_datetime(timestamp)

def read_all_sheets(input_workbook): combined_df = pd.DataFrame()

for sheet in input_workbook.worksheets():
    df = get_as_dataframe(sheet, header=None, dtype={0: str, 1: float})
    df = df.iloc[:, :2]  
    df.columns = ["Timestamp", "Price"]
    
    
    df["Timestamp"] = df["Timestamp"].apply(convert_timestamp_format)
    
    
    combined_df = pd.concat([combined_df, df])

return combined_df

def interpolate_combined_df(combined_df): combined_df.set_index("Timestamp", inplace=True)

df_resampled = combined_df.resample("T").mean().interpolate(method='linear')
df_resampled['Sheet'] = df_resampled.index.to_series().dt.strftime('%Y%m')
return df_resampled.reset_index()

def save_sheets(output_workbook, interpolated_df): sheets = interpolated_df["Sheet"].unique()

for sheet_name in sheets:
    df_sheet = interpolated_df[interpolated_df["Sheet"] == sheet_name].drop(columns=["Sheet"])
    
    
    try:
        output_sheet = output_workbook.worksheet(sheet_name)
    except gspread.exceptions.WorksheetNotFound:
        output_sheet = output_workbook.add_worksheet(title=sheet_name, rows="1000", cols="2")
    
    set_with_dataframe(output_sheet, df_sheet, include_index=False, include_column_header=False)

def process_workbook(input_filename, output_filename):

input_workbook = gc.open(input_filename)


output_workbook = gc.open(output_filename)


combined_df = read_all_sheets(input_workbook)


interpolated_df = interpolate_combined_df(combined_df)


save_sheets(output_workbook, interpolated_df)


sheet_titles = sorted([sheet.title for sheet in output_workbook.worksheets()])


temp_workbook = gc.create('temp_workbook')
temp_sheet = temp_workbook.sheet1

for title in sheet_titles:
    worksheet = output_workbook.worksheet(title)
    df = get_as_dataframe(worksheet, header=None)
    new_sheet = temp_workbook.add_worksheet(title=title, rows=worksheet.row_count, cols=worksheet.col_count)
    set_with_dataframe(new_sheet, df, include_index=False, include_column_header=False)


temp_workbook.del_worksheet(temp_sheet)


dummy_sheet = output_workbook.add_worksheet(title="dummy_sheet", rows="1", cols="1")


for sheet in output_workbook.worksheets():
    if sheet.title != "dummy_sheet":
        output_workbook.del_worksheet(sheet)


for sheet in temp_workbook.worksheets():
    df = get_as_dataframe(sheet, header=None)
    new_sheet = output_workbook.add_worksheet(title=sheet.title, rows=sheet.row_count, cols=sheet.col_count)
    set_with_dataframe(new_sheet, df, include_index=False, include_column_header=False)


output_workbook.del_worksheet(dummy_sheet)


gc.del_spreadsheet(temp_workbook.id)

input_filename = "BitCoinPrice_backup"
output_filename = "BitCoinPrice_interp"

process_workbook(input_filename, output_filename)