ぼうびろく (original) (raw)
Windous側に入っているNode.jsにパスが通っています。Windows側はアンインストールすればOK
以下のようなExcelデータを使用する際、毎回Excelを読みこんでいるとめちゃ時間がかかるので高速化した。
方法としては、読みこんだデータを.npyで保存しておくようにした。今回はデータが更新されるのでチェックサムで更新を確認して、更新された場合は.npyも更新するようにしている。
import pandas as pd import numpy as np import os import hashlib import json
def compute_checksum(file_path: str) -> str: """Excelファイルのチェックサム(SHA-256)を計算する""" hash_sha256 = hashlib.sha256() with open(file_path, "rb") as f: for chunk in iter(lambda: f.read(4096), b""): hash_sha256.update(chunk) return hash_sha256.hexdigest()
def read_prices_from_sheets(file_path: str, sheet_names: list, step: int = 1, use_cache: bool = False) -> list:
cache_file = file_path.replace('.xlsx', '_cache.npy')
checksum_file = file_path.replace('.xlsx', '_checksum.json')
current_checksum = compute_checksum(file_path)
is_cache_valid = False
if os.path.exists(checksum_file):
with open(checksum_file, 'r') as f:
cached_data = json.load(f)
if cached_data.get("checksum") == current_checksum:
is_cache_valid = True
if use_cache and is_cache_valid and os.path.exists(cache_file):
print(f"Loading data from cache: {cache_file}")
all_data = np.load(cache_file, allow_pickle=True).item()
else:
print(f"Reading data from Excel: {file_path}")
all_data = {}
for sheet_name in pd.ExcelFile(file_path).sheet_names:
df = pd.read_excel(file_path, sheet_name=sheet_name)
all_data[sheet_name] = df.iloc[:, 1].tolist()
np.save(cache_file, all_data)
with open(checksum_file, 'w') as f:
json.dump({"checksum": current_checksum}, f)
print(f"Data cached to: {cache_file}")
all_prices = []
for sheet_name in sheet_names:
if sheet_name in all_data:
all_prices.extend(all_data[sheet_name][::step])
return all_prices
ビットコインの年間利益計算スクリプト作成した。BitFlyerの取引レポートをそのまま読みこめるようにしてある。
間違っている点あればコメントで教えてください。
import pandas as pd
def calculate_annual_profit_average_method(csv_file):
df = pd.read_csv(csv_file, parse_dates=['取引日時'])
df = df[['取引日時', '取引種別', '取引価格', '通貨1数量', '手数料']]
df = df.sort_values('取引日時')
df['年'] = df['取引日時'].dt.year
annual_profit = {}
total_quantity = 0
total_cost = 0
for year, group in df.groupby('年'):
profit = 0
for _, row in group.iterrows():
if row['取引種別'] == '買い':
price = float(row['取引価格'].replace(",", ""))
quantity = abs(float(row['通貨1数量'].replace(",", "")))
total_cost += price * quantity
total_quantity += quantity - abs(float(row['手数料']))
if row['取引種別'] == '受取':
quantity = abs(float(row['通貨1数量'].replace(",", "")))
total_quantity += quantity
if total_quantity > 0:
avg_cost_price = total_cost / total_quantity
else:
avg_cost_price = 0
for _, row in group.iterrows():
if row['取引種別'] == '売り':
price = float(row['取引価格'].replace(",", ""))
quantity = abs(float(row['通貨1数量'].replace(",", "")))
profit += (price - avg_cost_price) * (quantity - abs(float(row['手数料'])))
total_quantity -= quantity
total_quantity -= abs(float(row['手数料']))
annual_profit[year] = int(profit)
total_cost = avg_cost_price * total_quantity
return annual_profit
def calculate_annual_profit_moving_average(csv_file):
df = pd.read_csv(csv_file, parse_dates=['取引日時'])
df = df[['取引日時', '取引種別', '取引価格', '通貨1数量', '手数料']]
df = df.sort_values('取引日時')
df['年'] = df['取引日時'].dt.year
annual_profit = {}
total_quantity = 0
total_cost = 0
for year, group in df.groupby('年'):
profit = 0
for _, row in group.iterrows():
if row['取引種別'] == '買い' or row['取引種別'] == '売り' or row['取引種別'] == '受取':
price = float(row['取引価格'].replace(",", ""))
quantity = abs(float(row['通貨1数量'].replace(",", "")))
if row['取引種別'] == '買い':
total_cost += price * quantity
total_quantity += quantity
total_quantity -= abs(float(row['手数料']))
avg_cost_price = total_cost / total_quantity
elif row['取引種別'] == '売り':
profit += (price - avg_cost_price) * quantity
total_cost -= (avg_cost_price * quantity)
total_quantity -= quantity
total_quantity -= abs(float(row['手数料']))
elif row['取引種別'] == '受取':
total_quantity += quantity
avg_cost_price = total_cost / total_quantity
annual_profit[year] = int(profit)
return annual_profit
csv_file = 'TradeHistory.csv'
annual_profit = calculate_annual_profit_average_method(csv_file) print("総平均法") print(" 年間利益:", annual_profit)
annual_profit = calculate_annual_profit_moving_average(csv_file) print("移動平均法") print(" 年間利益:", annual_profit)
開発初期あたりのとりあえず正常系のみ書いて動かしてみるときなどで、SEGVが発生した場合にバックトレースログを出力したいときのコード。
シグナルハンドラでSEGVなど異常終了のシグナルをハンドリングしてログ出力する。
#define _GNU_SOURCE #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #ifdef DEBUG #include <signal.h> #include <execinfo.h> #endif
#ifdef DEBUG
char exe_path[1024];
void signal_handler(int sig, siginfo_t *info, void *ucontext) { void *buffer[30]; int nptrs; char **symbols; char msg[256]; int len;
nptrs = backtrace(buffer, sizeof(buffer) / sizeof(void *));
symbols = backtrace_symbols(buffer, nptrs);
if (symbols == NULL) {
const char *msg_fail = "Failed to get backtrace symbols.\n";
write(STDERR_FILENO, msg_fail, strlen(msg_fail));
_exit(EXIT_FAILURE);
}
len = snprintf(msg, sizeof(msg), "Received signal %d (%s)\n", sig, strsignal(sig));
write(STDERR_FILENO, msg, len);
if (sig == SIGSEGV || sig == SIGBUS) {
len = snprintf(msg, sizeof(msg), "Fault address: %p\n", info->si_addr);
write(STDERR_FILENO, msg, len);
}
len = snprintf(msg, sizeof(msg), "Executable Path: %s\n", exe_path);
write(STDERR_FILENO, msg, len);
write(STDERR_FILENO, "Backtrace:\n", strlen("Backtrace:\n"));
for (int i = 0; i < nptrs; i++) {
char *start = strchr(symbols[i], '[');
char *end = strchr(symbols[i], ']');
if (start && end && end > start + 1) {
*end = '\0';
start++;
len = snprintf(msg, sizeof(msg), " [%d] %s - %s:%s\n", i, symbols[i], exe_path, start);
write(STDERR_FILENO, msg, len);
} else {
len = snprintf(msg, sizeof(msg), " [%d] %s\n", i, symbols[i]);
write(STDERR_FILENO, msg, len);
}
}
free(symbols);
signal(sig, SIG_DFL);
raise(sig);
}
void setup_signal_handler() { struct sigaction sa; memset(&sa, 0, sizeof(sa)); sa.sa_sigaction = signal_handler; sa.sa_flags = SA_SIGINFO | SA_RESTART;
int signals[] = {SIGSEGV, SIGABRT, SIGFPE, SIGILL, SIGBUS, SIGTERM, SIGINT, SIGQUIT, SIGHUP};
int num_signals = sizeof(signals) / sizeof(signals[0]);
char msg[256];
int len;
for(int i = 0; i < num_signals; i++) {
if (sigaction(signals[i], &sa, NULL) == -1) {
perror("sigaction");
exit(EXIT_FAILURE);
} else {
len = snprintf(msg, sizeof(msg), "Signal handler set for signal %d (%s)\n", signals[i], strsignal(signals[i]));
write(STDERR_FILENO, msg, len);
}
}
}
#endif
int main(int argc, char *argv[]) { #ifdef DEBUG
ssize_t len = readlink("/proc/self/exe", exe_path, sizeof(exe_path) - 1);
if (len != -1) {
exe_path[len] = '\0';
} else {
strcpy(exe_path, "unknown");
}
setup_signal_handler();
#endif
fprintf(stderr, "DEBUGモード: セグメンテーションフォルトを発生させます。\n");
fflush(stderr);
int *p = NULL;
*p = 42;
fprintf(stdout, "プログラムが正常に終了しました。\n");
return 0;
}
メインスレッドである処理を行いサブスレッドに通知し、サブスレッドでまた別の処理を行う、というマルチスレッドのプログラムを作成した際、結局どのくらい処理時間がかかっているのか調べる必要があったため、LTTng (Linux Tracing Toolkit Next Generation) を使用して計測を行った。
ソースは以下のとおり、tracef()でトレースを行う。
#include #include #include #include #include
#include <lttng/tracef.h>
std::condition_variable cv; std::mutex mtx; bool button_pressed = false;
void device_thread() { std::unique_lockstd::mutex lock(mtx); while (true) {
cv.wait(lock, [] { return button_pressed; });
tracef("Audio playback started");
std::cout << "Audio playback started.\n";
std::this_thread::sleep_for(std::chrono::seconds(1));
button_pressed = false;
}
}
void main_thread() {
int event_id = 0;
while (true) {
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
tracef("Button pressed, event_id=%d", event_id);
{
std::lock_guard<std::mutex> lock(mtx);
button_pressed = true;
}
cv.notify_one();
event_id++;
}
}
int main() { std::thread device(device_thread); std::thread main_t(main_thread);
main_t.join();
device.join();
return 0;
}
ビルド、計測方法は以下のとおり。
build
sudo apt-get update sudo apt-get install lttng-tools lttng-modules-dkms liblttng-ust-dev babeltrace2 g++ -o test test.cpp -llttng-ust -lpthread
セッションの作成
lttng create my_session
ユーザースペースのトレースポイントを有効化
lttng enable-event --userspace 'lttng_ust_tracef:*'
トレースの開始
lttng start
実行
./test
トレースの停止
lttng stop
セッションの破棄(必要に応じて)
lttng destroy
結果の確認
babeltrace2 ~/lttng-traces/my_session-2024*/
結果としては、以下のような出力が得られる。「+0.000251500」などがメインスレッドでtracef()が実行されるのを0秒として、デバイススレッドでtracef()が実行されるまでにかかった時間を計測している。
$ babeltrace2 ~/lttng-traces/my_session-2024*/ [12:59:59.132077740] (+?.?????????) DESKTOP-JRJKTJO lttng_ust_tracef:event: { cpu_id = 7 }, { _msg_length = 26, msg = "Button pressed, event_id=0" } [12:59:59.132329240] (+0.000251500) DESKTOP-JRJKTJO lttng_ust_tracef:event: { cpu_id = 2 }, { _msg_length = 22, msg = "Audio playback started" } [13:00:00.132408292] (+1.000079052) DESKTOP-JRJKTJO lttng_ust_tracef:event: { cpu_id = 7 }, { _msg_length = 26, msg = "Button pressed, event_id=1" } [13:00:00.133103892] (+0.000695600) DESKTOP-JRJKTJO lttng_ust_tracef:event: { cpu_id = 2 }, { _msg_length = 22, msg = "Audio playback started" } [13:00:01.133352844] (+1.000248952) DESKTOP-JRJKTJO lttng_ust_tracef:event: { cpu_id = 7 }, { _msg_length = 26, msg = "Button pressed, event_id=2" } [13:00:01.133578044] (+0.000225200) DESKTOP-JRJKTJO lttng_ust_tracef:event: { cpu_id = 2 }, { _msg_length = 22, msg = "Audio playback started" } [13:00:02.133783396] (+1.000205352) DESKTOP-JRJKTJO lttng_ust_tracef:event: { cpu_id = 7 }, { _msg_length = 26, msg = "Button pressed, event_id=3" } [13:00:02.134150696] (+0.000367300) DESKTOP-JRJKTJO lttng_ust_tracef:event: { cpu_id = 2 }, { _msg_length = 22, msg = "Audio playback started" }
収集している時系列データを、生データから等間隔に補完するプログラム。GoogleColabで実行すること。
データはGoogleスプレッドシートに保存しておき、1列目がタイムスタンプ、2列目が対象のデータとすること。
from google.colab import auth auth.authenticate_user() from gspread_dataframe import get_as_dataframe, set_with_dataframe import pandas as pd
import gspread from google.auth import default creds, _ = default()
gc = gspread.authorize(creds)
def convert_timestamp_format(timestamp): return pd.to_datetime(timestamp)
def read_all_sheets(input_workbook): combined_df = pd.DataFrame()
for sheet in input_workbook.worksheets():
df = get_as_dataframe(sheet, header=None, dtype={0: str, 1: float})
df = df.iloc[:, :2]
df.columns = ["Timestamp", "Price"]
df["Timestamp"] = df["Timestamp"].apply(convert_timestamp_format)
combined_df = pd.concat([combined_df, df])
return combined_df
def interpolate_combined_df(combined_df): combined_df.set_index("Timestamp", inplace=True)
df_resampled = combined_df.resample("T").mean().interpolate(method='linear')
df_resampled['Sheet'] = df_resampled.index.to_series().dt.strftime('%Y%m')
return df_resampled.reset_index()
def save_sheets(output_workbook, interpolated_df): sheets = interpolated_df["Sheet"].unique()
for sheet_name in sheets:
df_sheet = interpolated_df[interpolated_df["Sheet"] == sheet_name].drop(columns=["Sheet"])
try:
output_sheet = output_workbook.worksheet(sheet_name)
except gspread.exceptions.WorksheetNotFound:
output_sheet = output_workbook.add_worksheet(title=sheet_name, rows="1000", cols="2")
set_with_dataframe(output_sheet, df_sheet, include_index=False, include_column_header=False)
def process_workbook(input_filename, output_filename):
input_workbook = gc.open(input_filename)
output_workbook = gc.open(output_filename)
combined_df = read_all_sheets(input_workbook)
interpolated_df = interpolate_combined_df(combined_df)
save_sheets(output_workbook, interpolated_df)
sheet_titles = sorted([sheet.title for sheet in output_workbook.worksheets()])
temp_workbook = gc.create('temp_workbook')
temp_sheet = temp_workbook.sheet1
for title in sheet_titles:
worksheet = output_workbook.worksheet(title)
df = get_as_dataframe(worksheet, header=None)
new_sheet = temp_workbook.add_worksheet(title=title, rows=worksheet.row_count, cols=worksheet.col_count)
set_with_dataframe(new_sheet, df, include_index=False, include_column_header=False)
temp_workbook.del_worksheet(temp_sheet)
dummy_sheet = output_workbook.add_worksheet(title="dummy_sheet", rows="1", cols="1")
for sheet in output_workbook.worksheets():
if sheet.title != "dummy_sheet":
output_workbook.del_worksheet(sheet)
for sheet in temp_workbook.worksheets():
df = get_as_dataframe(sheet, header=None)
new_sheet = output_workbook.add_worksheet(title=sheet.title, rows=sheet.row_count, cols=sheet.col_count)
set_with_dataframe(new_sheet, df, include_index=False, include_column_header=False)
output_workbook.del_worksheet(dummy_sheet)
gc.del_spreadsheet(temp_workbook.id)
input_filename = "BitCoinPrice_backup"
output_filename = "BitCoinPrice_interp"
process_workbook(input_filename, output_filename)