ぼうびろく (original) (raw)

開発初期あたりのとりあえず正常系のみ書いて動かしてみるときなどで、SEGVが発生した場合にバックトレースログを出力したいときのコード。

シグナルハンドラでSEGVなど異常終了のシグナルをハンドリングしてログ出力する。

#define _GNU_SOURCE #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #ifdef DEBUG #include <signal.h> #include <execinfo.h> #endif

#ifdef DEBUG

char exe_path[1024];

void signal_handler(int sig, siginfo_t *info, void *ucontext) { void *buffer[30]; int nptrs; char **symbols; char msg[256]; int len;

nptrs = backtrace(buffer, sizeof(buffer) / sizeof(void *));
symbols = backtrace_symbols(buffer, nptrs);
if (symbols == NULL) {
    const char *msg_fail = "Failed to get backtrace symbols.\n";
    write(STDERR_FILENO, msg_fail, strlen(msg_fail));
    _exit(EXIT_FAILURE);
}


len = snprintf(msg, sizeof(msg), "Received signal %d (%s)\n", sig, strsignal(sig));
write(STDERR_FILENO, msg, len);

if (sig == SIGSEGV || sig == SIGBUS) {
    len = snprintf(msg, sizeof(msg), "Fault address: %p\n", info->si_addr);
    write(STDERR_FILENO, msg, len);
}


len = snprintf(msg, sizeof(msg), "Executable Path: %s\n", exe_path);
write(STDERR_FILENO, msg, len);


write(STDERR_FILENO, "Backtrace:\n", strlen("Backtrace:\n"));
for (int i = 0; i < nptrs; i++) {
    
    
    char *start = strchr(symbols[i], '[');
    char *end = strchr(symbols[i], ']');
    if (start && end && end > start + 1) {
        *end = '\0'; 
        start++; 
        len = snprintf(msg, sizeof(msg), "  [%d] %s - %s:%s\n", i, symbols[i], exe_path, start);
        write(STDERR_FILENO, msg, len);
    } else {
        
        len = snprintf(msg, sizeof(msg), "  [%d] %s\n", i, symbols[i]);
        write(STDERR_FILENO, msg, len);
    }
}

free(symbols);


signal(sig, SIG_DFL);
raise(sig);

}

void setup_signal_handler() { struct sigaction sa; memset(&sa, 0, sizeof(sa)); sa.sa_sigaction = signal_handler; sa.sa_flags = SA_SIGINFO | SA_RESTART;

int signals[] = {SIGSEGV, SIGABRT, SIGFPE, SIGILL, SIGBUS, SIGTERM, SIGINT, SIGQUIT, SIGHUP};
int num_signals = sizeof(signals) / sizeof(signals[0]);

char msg[256];
int len;
for(int i = 0; i < num_signals; i++) {
    if (sigaction(signals[i], &sa, NULL) == -1) {
        perror("sigaction");
        exit(EXIT_FAILURE);
    } else {
        
        len = snprintf(msg, sizeof(msg), "Signal handler set for signal %d (%s)\n", signals[i], strsignal(signals[i]));
        write(STDERR_FILENO, msg, len);
    }
}

}

#endif

int main(int argc, char *argv[]) { #ifdef DEBUG

ssize_t len = readlink("/proc/self/exe", exe_path, sizeof(exe_path) - 1);
if (len != -1) {
    exe_path[len] = '\0';
} else {
    strcpy(exe_path, "unknown");
}


setup_signal_handler();

#endif

fprintf(stderr, "DEBUGモード: セグメンテーションフォルトを発生させます。\n");
fflush(stderr); 
int *p = NULL;
*p = 42;  

fprintf(stdout, "プログラムが正常に終了しました。\n");
return 0;

}

メインスレッドである処理を行いサブスレッドに通知し、サブスレッドでまた別の処理を行う、というマルチスレッドのプログラムを作成した際、結局どのくらい処理時間がかかっているのか調べる必要があったため、LTTng (Linux Tracing Toolkit Next Generation) を使用して計測を行った。

ソースは以下のとおり、tracef()でトレースを行う。

#include #include #include #include #include

#include <lttng/tracef.h>

std::condition_variable cv; std::mutex mtx; bool button_pressed = false;

void device_thread() { std::unique_lockstd::mutex lock(mtx); while (true) {

    cv.wait(lock, [] { return button_pressed; });

    
    tracef("Audio playback started");

    
    std::cout << "Audio playback started.\n";
    std::this_thread::sleep_for(std::chrono::seconds(1));

    
    button_pressed = false;
}

}

void main_thread() { int event_id = 0;
while (true) { std::this_thread::sleep_for(std::chrono::milliseconds(1000));

    tracef("Button pressed, event_id=%d", event_id);

    
    {
        std::lock_guard<std::mutex> lock(mtx);
        button_pressed = true;
    }
    cv.notify_one();

    
    event_id++;
}

}

int main() { std::thread device(device_thread); std::thread main_t(main_thread);

main_t.join();
device.join();
return 0;

}

ビルド、計測方法は以下のとおり。

build

sudo apt-get update sudo apt-get install lttng-tools lttng-modules-dkms liblttng-ust-dev babeltrace2 g++ -o test test.cpp -llttng-ust -lpthread

セッションの作成

lttng create my_session

ユーザースペースのトレースポイントを有効化

lttng enable-event --userspace 'lttng_ust_tracef:*'

トレースの開始

lttng start

実行

./test

トレースの停止

lttng stop

セッションの破棄(必要に応じて)

lttng destroy

結果の確認

babeltrace2 ~/lttng-traces/my_session-2024*/

結果としては、以下のような出力が得られる。「+0.000251500」などがメインスレッドでtracef()が実行されるのを0秒として、デバイススレッドでtracef()が実行されるまでにかかった時間を計測している。

$ babeltrace2 ~/lttng-traces/my_session-2024*/ [12:59:59.132077740] (+?.?????????) DESKTOP-JRJKTJO lttng_ust_tracef:event: { cpu_id = 7 }, { _msg_length = 26, msg = "Button pressed, event_id=0" } [12:59:59.132329240] (+0.000251500) DESKTOP-JRJKTJO lttng_ust_tracef:event: { cpu_id = 2 }, { _msg_length = 22, msg = "Audio playback started" } [13:00:00.132408292] (+1.000079052) DESKTOP-JRJKTJO lttng_ust_tracef:event: { cpu_id = 7 }, { _msg_length = 26, msg = "Button pressed, event_id=1" } [13:00:00.133103892] (+0.000695600) DESKTOP-JRJKTJO lttng_ust_tracef:event: { cpu_id = 2 }, { _msg_length = 22, msg = "Audio playback started" } [13:00:01.133352844] (+1.000248952) DESKTOP-JRJKTJO lttng_ust_tracef:event: { cpu_id = 7 }, { _msg_length = 26, msg = "Button pressed, event_id=2" } [13:00:01.133578044] (+0.000225200) DESKTOP-JRJKTJO lttng_ust_tracef:event: { cpu_id = 2 }, { _msg_length = 22, msg = "Audio playback started" } [13:00:02.133783396] (+1.000205352) DESKTOP-JRJKTJO lttng_ust_tracef:event: { cpu_id = 7 }, { _msg_length = 26, msg = "Button pressed, event_id=3" } [13:00:02.134150696] (+0.000367300) DESKTOP-JRJKTJO lttng_ust_tracef:event: { cpu_id = 2 }, { _msg_length = 22, msg = "Audio playback started" }

収集している時系列データを、生データから等間隔に補完するプログラム。GoogleColabで実行すること。

データはGoogleスプレッドシートに保存しておき、1列目がタイムスタンプ、2列目が対象のデータとすること。

from google.colab import auth auth.authenticate_user() from gspread_dataframe import get_as_dataframe, set_with_dataframe import pandas as pd

import gspread from google.auth import default creds, _ = default()

gc = gspread.authorize(creds)

def convert_timestamp_format(timestamp): return pd.to_datetime(timestamp)

def read_all_sheets(input_workbook): combined_df = pd.DataFrame()

for sheet in input_workbook.worksheets():
    df = get_as_dataframe(sheet, header=None, dtype={0: str, 1: float})
    df = df.iloc[:, :2]  
    df.columns = ["Timestamp", "Price"]
    
    
    df["Timestamp"] = df["Timestamp"].apply(convert_timestamp_format)
    
    
    combined_df = pd.concat([combined_df, df])

return combined_df

def interpolate_combined_df(combined_df): combined_df.set_index("Timestamp", inplace=True)

df_resampled = combined_df.resample("T").mean().interpolate(method='linear')
df_resampled['Sheet'] = df_resampled.index.to_series().dt.strftime('%Y%m')
return df_resampled.reset_index()

def save_sheets(output_workbook, interpolated_df): sheets = interpolated_df["Sheet"].unique()

for sheet_name in sheets:
    df_sheet = interpolated_df[interpolated_df["Sheet"] == sheet_name].drop(columns=["Sheet"])
    
    
    try:
        output_sheet = output_workbook.worksheet(sheet_name)
    except gspread.exceptions.WorksheetNotFound:
        output_sheet = output_workbook.add_worksheet(title=sheet_name, rows="1000", cols="2")
    
    set_with_dataframe(output_sheet, df_sheet, include_index=False, include_column_header=False)

def process_workbook(input_filename, output_filename):

input_workbook = gc.open(input_filename)


output_workbook = gc.open(output_filename)


combined_df = read_all_sheets(input_workbook)


interpolated_df = interpolate_combined_df(combined_df)


save_sheets(output_workbook, interpolated_df)


sheet_titles = sorted([sheet.title for sheet in output_workbook.worksheets()])


temp_workbook = gc.create('temp_workbook')
temp_sheet = temp_workbook.sheet1

for title in sheet_titles:
    worksheet = output_workbook.worksheet(title)
    df = get_as_dataframe(worksheet, header=None)
    new_sheet = temp_workbook.add_worksheet(title=title, rows=worksheet.row_count, cols=worksheet.col_count)
    set_with_dataframe(new_sheet, df, include_index=False, include_column_header=False)


temp_workbook.del_worksheet(temp_sheet)


dummy_sheet = output_workbook.add_worksheet(title="dummy_sheet", rows="1", cols="1")


for sheet in output_workbook.worksheets():
    if sheet.title != "dummy_sheet":
        output_workbook.del_worksheet(sheet)


for sheet in temp_workbook.worksheets():
    df = get_as_dataframe(sheet, header=None)
    new_sheet = output_workbook.add_worksheet(title=sheet.title, rows=sheet.row_count, cols=sheet.col_count)
    set_with_dataframe(new_sheet, df, include_index=False, include_column_header=False)


output_workbook.del_worksheet(dummy_sheet)


gc.del_spreadsheet(temp_workbook.id)

input_filename = "BitCoinPrice_backup"
output_filename = "BitCoinPrice_interp"

process_workbook(input_filename, output_filename)

Floatや配列はそのままでは保存できないので、バイナリに変換して保存する。ただしバイナリも400KBという上限があるので、分割して保存する。Noneはそのままでは保存できないので文字列に変換する。

以下にDynamoDBへの保存、リストア用の関数を示す。

import boto3 import base64 import math import numpy as np

def convert_numpy_array_to_dynamodb(np_array, chunk_size_kb=400):

array_bytes = np_array.tobytes()


array_base64 = base64.b64encode(array_bytes) 


chunk_size = chunk_size_kb * 1024
num_chunks = math.ceil(len(array_base64) / chunk_size)


data = {}
for i in range(num_chunks):
    chunk = array_base64[i * chunk_size:(i + 1) * chunk_size]
    data[str(i)] = chunk
return data, np_array.dtype

def revert_numpy_array_from_dynamodb(data, dtype):

combined_base64 = b''.join([data[str(i)].value for i in range(len(data))])


array_bytes = base64.b64decode(combined_base64)


np_array = np.frombuffer(array_bytes, dtype=dtype)

return np_array

def convert_for_dynamodb(data): if data is None: return {'NULL': True} elif isinstance(data, bool): return {'BOOL': data} elif isinstance(data, int): return {'N': str(data)} elif isinstance(data, float): return {'F': str(data)} elif isinstance(data, str): return {'S': data} elif isinstance(data, list) or isinstance(data, np.ndarray): re_data, dtype = convert_numpy_array_to_dynamodb(np.array(data)) if "int" in str(dtype): k = "LI" elif "float" in str(dtype): k = "LF" else: return {'L': [convert_for_dynamodb(item) for item in data]} return {k: re_data} elif isinstance(data, dict): return {'M': {k: convert_for_dynamodb(v) for k, v in data.items()}} else: raise TypeError(f"Type {type(data)} is not supported")

def revert_from_dynamodb(data): if 'NULL' in data: return None elif 'BOOL' in data: return data['BOOL'] elif 'N' in data: return int(data['N']) elif 'F' in data: return float(data['F']) elif 'S' in data: return data['S'] elif 'LI' in data: return revert_numpy_array_from_dynamodb(data["LI"], np.int64) elif 'LF' in data: return revert_numpy_array_from_dynamodb(data["LF"], np.float64) elif "L" in data: return [revert_from_dynamodb(item) for item in data['L']] elif 'M' in data: return {k: revert_from_dynamodb(v) for k, v in data['M'].items()} else: raise TypeError(f"Type {data.keys()} is not supported")

def save_to_dynamodb(table: object, item: dict, partition_key: str): """save to dynamoDB

Args:
    table (object): DynamoDB table
    item (dict): save data
    partition_key (str): partition key of yuor table
"""
item_converted = {k: convert_for_dynamodb(v) for k, v in item.items()}
item_converted[partition_key] = item[partition_key]

table.put_item(Item=item_converted)

def read_from_dynamodb(table: object, key_value, partition_key: str) -> dict: """read from dynamoDB

Args:
    table (object): DynamoDB table
    key_value : partition key value of your data
    partition_key (str): partition key of yuor table

Returns:
    dict: data
"""
key = {partition_key: key_value}
response = table.get_item(Key=key)
item = response.get('Item')
if item:
    return {k: v if k == partition_key else revert_from_dynamodb(v) for k, v in item.items()}
else:
    return None

def get_dynamodb_table(table_name: str, aws_access_key_id: str, aws_secret_access_key: str, region_name: str = 'ap-northeast-1') -> object: """get dynamobd table

Args:
    table_name (str): table name of dynamodb
    aws_access_key_id (str): access key of aws
    aws_secret_access_key (str): secret access key of aws
    region_name (str, optional): resion name. Defaults to 'ap-northeast-1'.

Returns:
    object: dynamodb table
"""
dynamodb = boto3.resource('dynamodb', region_name=region_name,
                        aws_access_key_id=aws_access_key_id,
                        aws_secret_access_key=aws_secret_access_key)
return dynamodb.Table(table_name)

概要

MT4のような自動売買システムをビットコインでも行いたいということで開発。コンセプトとしては以下のような感じ。

構成

ビットコインのデータ収集や売買はBitflyerAPIを使用する。そのためBitflyerを利用することが前提となる。 構成としては以下の図のように、ほぼすべて無料のサービスを使用して実行できるようにしている。

開発の流れとしては、以下のようなサイクルを回していく。

  1. Google Apps Scriptで価格データを収集し、Google driveに保存する。
  2. Google driveからデータを取り出し、Google Colabで自動売買アルゴリズム開発を行う。
  3. AWS Labmda に自動売買アルゴリズムをデプロイする。
  4. AWS Labmdaを定期実行し自動売買を行う。

データ収集

(参考)過去に収集したデータ

以下のページで公開中。

wooolwoool.hatenablog.com

Google Apps Scriptで価格データを収集

以下のページを参考に、価格データを収集する。

wooolwoool.hatenablog.com

データクリーニング

BitFlyerのメンテナンスや何かしらのエラーがあるため、データは必ずしも1分ごとではない。これでは使いにくいので、データを補完して1分ごとのデータを作成する。以下のページを参照。

wooolwoool.hatenablog.com

アルゴリズム開発

開発用のコードは自作した。Pythonアルゴリズムを作成し、バックテスト、パラメータチューニング、AWSへのデプロイ用のファイル生成ができる。

github.com

以下をGoogleColabで実行していく。まず、コードのCloneとライブラリのインストールを行う。

! git clone https://github.com/wooolwooolwoool/bitbacktest.git ! cd bitbacktest && pip install -U pip &&
pip install -r requirements.txt

アルゴリズムは以下のファイル内のStrategy()クラスを継承して作成する。

bitbacktest/src/bitbacktest/strategy.py at main · wooolwooolwoool/bitbacktest · GitHub

Strategy()クラスのうち、以下のメソッドを実装する。

Method 説明
reset_param() パラメータのリセットを行うメソッド。開始時に1度だけ呼び出される。
generate_signals() 価格に基づいて売買シグナルを生成するメソッド。
execute_trade() トレードを実行するメソッド

これらのメソッドで使用するパラメータは、以下のメンバ変数に辞書形式で保存しておく。

変数名 説明
self.static 静的なパラメータ。動的に変更不可(generate_signals()などで変更した場合もエラーとはならないが、保持されない)
self.dynamic 動的なパラメータ。動的に変更可能。

これらの違いとしてAWS Lambdaで実際にシステムトレードを行うコード上での保持方法が異なる。self.staticはAWS Lambdaの環境変数でセットするようになっており、AWS Lambdaのコードから更新することはできない。例えばMACDのWindowサイズや、一回に売買するBitCoinの数量などユーザで設定する値の利用を想定している。 一方でself.dynamicはAWS DynamoDBに保存するようになっており自動更新が可能である。例えば過去の価格データなど自動で更新される値の利用を想定している。

実際に売買を行うには、以下のメソッドを使用する。

Method 説明
self.market.place_market_order() 成り行き注文
self.market.place_limit_order() 指値注文

以下にStrategy()クラスの実装例として、MACDStrategy()を示す。

class MACDStrategy(Strategy):

def reset_param(self, param):
    super().reset_param(param)
    self.dynamic["count"] = 0
    self.dynamic["prices"] = None
    self.dynamic["emashort_values"] = None
    self.dynamic["emalong_values"] = None
    self.dynamic["macd_values"] = None
    self.dynamic["signal_line_values"] = None

def _calculate_ema(self, current_price, previous_ema, window):
    alpha = 2 / (window + 1.0)
    return alpha * current_price + (1 - alpha) * previous_ema

def generate_signals(self, price):
    if self.dynamic["prices"] is None:
        
        emashort = emalong = price
        macd = signal_line = 0.0
    else:
        
        emashort = self._calculate_ema(price,
                                       self.dynamic["emashort_values"],
                                       self.static["short_window"])
        emalong = self._calculate_ema(price,
                                      self.dynamic["emalong_values"],
                                      self.static["long_window"])

        
        macd = emashort - emalong

        
        if self.dynamic["macd_values"] == 0:
            signal_line = macd
        else:
            signal_line = self._calculate_ema(
                macd, self.dynamic["signal_line_values"],
                self.static["signal_window"])

    self.dynamic["prices"] = price

    self.dynamic["emashort_values"] = emashort
    self.dynamic["emalong_values"] = emalong
    self.dynamic["macd_values_old"] = self.dynamic["macd_values"]
    self.dynamic["macd_values"] = macd
    self.dynamic["signal_line_values_old"] = self.dynamic[
        "signal_line_values"]
    self.dynamic["signal_line_values"] = signal_line

    
    signal = ""
    if self.dynamic["macd_values_old"] is not None:
        if self.dynamic["macd_values_old"] <= self.dynamic[
                "signal_line_values_old"] and macd > signal_line:
            signal = "Buy"
        elif self.dynamic["macd_values_old"] >= self.dynamic[
                "signal_line_values_old"] and macd < signal_line:
            signal = "Sell"
    return signal

def execute_trade(self, price, signal):
    if signal in ['Buy', "Sell"]:
        self.market.place_market_order(signal,
                                       self.static["one_order_quantity"])

このMACDStrategy()を改造する場合は、以下のようにする。以下ではexecute_trade()のみを改造しており、買いシグナルが出た場合のみ買い注文を行い、買い注文に成功した場合現在価格にself.static["profit"]分利益を乗せて、指値で売り注文を出している。

import sys sys.path.append("/content/bitbacktest")

from src.bitbacktest.strategy import MACDStrategy from src.bitbacktest.market import BacktestMarket from src.bitbacktest.data_generater import random_data from src.bitbacktest.backtester import Backtester

class MACDForcusBuyStrategy(MACDStrategy): def reset_params(self, param, start_cash): super().reset_params(param, start_cash)

def execute_trade(self, price: float, signal: str):
    if signal == 'Buy':
        success = self.market.place_market_order(
            'Buy', self.static["one_order_quantity"])
        if success:
            self.market.place_limit_order(
                "Sell", self.static["one_order_quantity"],
                price * self.static["profit"])
補足

実際にAWS Lambdaにデプロイするコードのベースは以下。

bitbacktest/app/aws_build/_lambda_base.py at main · wooolwooolwoool/bitbacktest · GitHub

トレードを行っているのは以下の箇所で、(現在出している注文で制限をかけているが)generate_signals()、execute_trade()、の順に実行している。

signals = strategy.generate_signals(current_price)

orders = market.get_open_orders() if os.environ["TRADE_ENABLE"] == "1" and int(os.environ["ORDER_NUM_MAX"]) > len(orders): strategy.execute_trade(current_price, signals)

バックテスト

自動売買アルゴリズムが実装出来たらバックテストを行う。バックテストでは主に売買や注文の管理を行うMarketクラスと、先ほど自動売買アルゴリズムを実装したStrategyクラスを使用する。 参考↓

github.com

まずはデータを用意する。以下のようにランダムなデータを生成することも可能。

seed = 111 start_price = 1e7
price_range = 0.001
length = 60 * 24 * 7 * 4
price_data = random_data(start_price, price_range, length, seed)

次にバックテスト用のMarketクラスと、先ほど開発したStrategyクラスを準備する。

market = BacktestMarket(price_data) strategy = MACDStrategy(market)

次にパラメータと開始時の現金をセットする。

param = { "short_window": 12, "long_window": 26, "signal_window": 9, "one_order_quantity": 0.01 } start_cash = 1e6

strategy.reset_all(param, start_cash)

最後にバックテストを実行する。

portfolio_result = strategy.backtest() print(portfolio_result)

バックテスト結果としては、以下のようにバックテスト終了時のトレード回数(trade_count)、現金(cash)、ビットコイン数(position)、総資産(total_value)が出力される。

100%|█| 40320/40320 [00:00<00:00, 186697. {'trade_count': 63, 'cash': np.float64(989527.8085676738), 'position': 0.0009054999999999949, 'total_value': np.float64(998356.0314502214)}

パラメータチューニング

アルゴリズムができたらチューニングを行う。target_paramsにパラメータをセットし、BayesianBacktester()でバックテストを行うことで、ベイズ最適化を行ってくれる。

target_paramsには固定化したいパラメータはその値、チューニングしたいパラメータはscikit-optimizeのInteger、Real、Categoricalでセットする。

from skopt.space import Integer, Real, Categorical

start_cash = 2e5 market = BacktestMarket(back_data) strategy = MACDForcusBuyStrategy(market) backtester = BayesianBacktester(strategy)

target_params = { "short_window": Integer(32, 360, name='short_window'), "long_window": Integer(240, 720, name='long_window'), "signal_window": Integer(8, 360, name='signal_window'), "profit": 1.01, "one_order_quantity": 0.001 }

backtester.backtest(target_params, start_cash, n_calls=100)

デプロイ

作成した自動売買アルゴリズムAWS Lambdaへデプロイする。

GoogleColabで以下を実行する。オプションは以下の通り。

! python3 app/aws_build/build_all.py
-d your_src/ -s MACDForcusBuyStrategy -o CloudFormation.yaml

上記を実行すると、CloudFormation.yamlが作成される。

このCloudFormation.yamlAWS CloudFormationで指定してデプロイすることで、Lamda関数や定期実行用のEventBridgeが作成される。CloudFormationでのデプロイ時に設定するパラメータは以下のとおり。

名前 説明
LambdaFunctionName Lamda関数名。自由に設定可能。
LambdaRoleArn Lamdaを実行するロール。DynamoDBのRW権限など必要な権限を設定したロールを指定する。
実行準備

Lamda関数の環境変数に移動し、以下を設定する。

名前 説明
API_KEY BitflyerAPI_KEY
API_SECRET BitflyerAPI_SECRET
TABLE_NAME DynamoDBのテーブル名
PARAMS_KEY DynamoDB上でパラメータを保存するキー
TRADE_ENABLE トレードを実行するか。1の場合はexecute_trade()が実行され、それ以外場合は実行しない。(EventBridgeを止めるとDynamoDBに保存しているデータも更新されなくなり不都合が生じるので、止める際はこのフラグを使用する。)
ORDER_NUM_MAX 最大注文数。

このほかに作成したアルゴリズム内でself.staticで新たなパラメータを使用している場合環境変数が追加されるので、必要に応じてセットする。

実行

EventBridgeをEnableにすると実行が開始される。

以下のコードをGoogle Colabで実行する。ファイルごとにシート名を分けて、複数行ずつ書き込んでくれる。

from itertools import count import csv import gspread from datetime import datetime import glob import tqdm

from google.colab import auth auth.authenticate_user()

import gspread from google.auth import default creds, _ = default()

gc = gspread.authorize(creds)

filename = "BitCoinPrice"
ss = gc.open(filename) worksheet_list = ss.worksheets()

files = glob.glob("/content/*.csv") print(files) for csv_file_path in files: with open(csv_file_path, 'r') as file: reader = csv.reader(file) csv_data = list(reader)

sheet_name = csv_file_path.split("/")[-1].replace(".csv", "").replace("-", "")
print(sheet_name)


try:
    worksheet = ss.worksheet(sheet_name)
except gspread.exceptions.WorksheetNotFound:
    worksheet = ss.add_worksheet(title=sheet_name, rows=f"{60 * 24 * 32}", cols="2")

count = 1
ones_rows = 60 * 24 
ones_cols = 2 

ds = worksheet.range(f'A1:B{ones_rows}')

for row_index, row in tqdm.tqdm(enumerate(csv_data), total=len(csv_data)):
    if row_index == 0:
        continue
    for col_index, cell in enumerate(row):
        if col_index == ones_cols:
            break  
        idx = ((row_index - 1) * ones_cols + col_index) % (ones_cols * ones_rows)
        ds[idx].value = cell
    if (row_index) % ones_rows == 0:
        worksheet.update_cells(ds)
        count += 1
        ds = worksheet.range(f'A{ones_rows * count + 1}:B{ones_rows * (count+1)}')
worksheet.update_cells(ds)

print(f"Data from {csv_file_path} has been written to the sheet '{sheet_name}' in the spreadsheet.")