ぼうびろく (original) (raw)
開発初期あたりのとりあえず正常系のみ書いて動かしてみるときなどで、SEGVが発生した場合にバックトレースログを出力したいときのコード。
シグナルハンドラでSEGVなど異常終了のシグナルをハンドリングしてログ出力する。
#define _GNU_SOURCE #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #ifdef DEBUG #include <signal.h> #include <execinfo.h> #endif
#ifdef DEBUG
char exe_path[1024];
void signal_handler(int sig, siginfo_t *info, void *ucontext) { void *buffer[30]; int nptrs; char **symbols; char msg[256]; int len;
nptrs = backtrace(buffer, sizeof(buffer) / sizeof(void *));
symbols = backtrace_symbols(buffer, nptrs);
if (symbols == NULL) {
const char *msg_fail = "Failed to get backtrace symbols.\n";
write(STDERR_FILENO, msg_fail, strlen(msg_fail));
_exit(EXIT_FAILURE);
}
len = snprintf(msg, sizeof(msg), "Received signal %d (%s)\n", sig, strsignal(sig));
write(STDERR_FILENO, msg, len);
if (sig == SIGSEGV || sig == SIGBUS) {
len = snprintf(msg, sizeof(msg), "Fault address: %p\n", info->si_addr);
write(STDERR_FILENO, msg, len);
}
len = snprintf(msg, sizeof(msg), "Executable Path: %s\n", exe_path);
write(STDERR_FILENO, msg, len);
write(STDERR_FILENO, "Backtrace:\n", strlen("Backtrace:\n"));
for (int i = 0; i < nptrs; i++) {
char *start = strchr(symbols[i], '[');
char *end = strchr(symbols[i], ']');
if (start && end && end > start + 1) {
*end = '\0';
start++;
len = snprintf(msg, sizeof(msg), " [%d] %s - %s:%s\n", i, symbols[i], exe_path, start);
write(STDERR_FILENO, msg, len);
} else {
len = snprintf(msg, sizeof(msg), " [%d] %s\n", i, symbols[i]);
write(STDERR_FILENO, msg, len);
}
}
free(symbols);
signal(sig, SIG_DFL);
raise(sig);
}
void setup_signal_handler() { struct sigaction sa; memset(&sa, 0, sizeof(sa)); sa.sa_sigaction = signal_handler; sa.sa_flags = SA_SIGINFO | SA_RESTART;
int signals[] = {SIGSEGV, SIGABRT, SIGFPE, SIGILL, SIGBUS, SIGTERM, SIGINT, SIGQUIT, SIGHUP};
int num_signals = sizeof(signals) / sizeof(signals[0]);
char msg[256];
int len;
for(int i = 0; i < num_signals; i++) {
if (sigaction(signals[i], &sa, NULL) == -1) {
perror("sigaction");
exit(EXIT_FAILURE);
} else {
len = snprintf(msg, sizeof(msg), "Signal handler set for signal %d (%s)\n", signals[i], strsignal(signals[i]));
write(STDERR_FILENO, msg, len);
}
}
}
#endif
int main(int argc, char *argv[]) { #ifdef DEBUG
ssize_t len = readlink("/proc/self/exe", exe_path, sizeof(exe_path) - 1);
if (len != -1) {
exe_path[len] = '\0';
} else {
strcpy(exe_path, "unknown");
}
setup_signal_handler();
#endif
fprintf(stderr, "DEBUGモード: セグメンテーションフォルトを発生させます。\n");
fflush(stderr);
int *p = NULL;
*p = 42;
fprintf(stdout, "プログラムが正常に終了しました。\n");
return 0;
}
メインスレッドである処理を行いサブスレッドに通知し、サブスレッドでまた別の処理を行う、というマルチスレッドのプログラムを作成した際、結局どのくらい処理時間がかかっているのか調べる必要があったため、LTTng (Linux Tracing Toolkit Next Generation) を使用して計測を行った。
ソースは以下のとおり、tracef()でトレースを行う。
#include #include #include #include #include
#include <lttng/tracef.h>
std::condition_variable cv; std::mutex mtx; bool button_pressed = false;
void device_thread() { std::unique_lockstd::mutex lock(mtx); while (true) {
cv.wait(lock, [] { return button_pressed; });
tracef("Audio playback started");
std::cout << "Audio playback started.\n";
std::this_thread::sleep_for(std::chrono::seconds(1));
button_pressed = false;
}
}
void main_thread() {
int event_id = 0;
while (true) {
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
tracef("Button pressed, event_id=%d", event_id);
{
std::lock_guard<std::mutex> lock(mtx);
button_pressed = true;
}
cv.notify_one();
event_id++;
}
}
int main() { std::thread device(device_thread); std::thread main_t(main_thread);
main_t.join();
device.join();
return 0;
}
ビルド、計測方法は以下のとおり。
build
sudo apt-get update sudo apt-get install lttng-tools lttng-modules-dkms liblttng-ust-dev babeltrace2 g++ -o test test.cpp -llttng-ust -lpthread
セッションの作成
lttng create my_session
ユーザースペースのトレースポイントを有効化
lttng enable-event --userspace 'lttng_ust_tracef:*'
トレースの開始
lttng start
実行
./test
トレースの停止
lttng stop
セッションの破棄(必要に応じて)
lttng destroy
結果の確認
babeltrace2 ~/lttng-traces/my_session-2024*/
結果としては、以下のような出力が得られる。「+0.000251500」などがメインスレッドでtracef()が実行されるのを0秒として、デバイススレッドでtracef()が実行されるまでにかかった時間を計測している。
$ babeltrace2 ~/lttng-traces/my_session-2024*/ [12:59:59.132077740] (+?.?????????) DESKTOP-JRJKTJO lttng_ust_tracef:event: { cpu_id = 7 }, { _msg_length = 26, msg = "Button pressed, event_id=0" } [12:59:59.132329240] (+0.000251500) DESKTOP-JRJKTJO lttng_ust_tracef:event: { cpu_id = 2 }, { _msg_length = 22, msg = "Audio playback started" } [13:00:00.132408292] (+1.000079052) DESKTOP-JRJKTJO lttng_ust_tracef:event: { cpu_id = 7 }, { _msg_length = 26, msg = "Button pressed, event_id=1" } [13:00:00.133103892] (+0.000695600) DESKTOP-JRJKTJO lttng_ust_tracef:event: { cpu_id = 2 }, { _msg_length = 22, msg = "Audio playback started" } [13:00:01.133352844] (+1.000248952) DESKTOP-JRJKTJO lttng_ust_tracef:event: { cpu_id = 7 }, { _msg_length = 26, msg = "Button pressed, event_id=2" } [13:00:01.133578044] (+0.000225200) DESKTOP-JRJKTJO lttng_ust_tracef:event: { cpu_id = 2 }, { _msg_length = 22, msg = "Audio playback started" } [13:00:02.133783396] (+1.000205352) DESKTOP-JRJKTJO lttng_ust_tracef:event: { cpu_id = 7 }, { _msg_length = 26, msg = "Button pressed, event_id=3" } [13:00:02.134150696] (+0.000367300) DESKTOP-JRJKTJO lttng_ust_tracef:event: { cpu_id = 2 }, { _msg_length = 22, msg = "Audio playback started" }
収集している時系列データを、生データから等間隔に補完するプログラム。GoogleColabで実行すること。
データはGoogleスプレッドシートに保存しておき、1列目がタイムスタンプ、2列目が対象のデータとすること。
from google.colab import auth auth.authenticate_user() from gspread_dataframe import get_as_dataframe, set_with_dataframe import pandas as pd
import gspread from google.auth import default creds, _ = default()
gc = gspread.authorize(creds)
def convert_timestamp_format(timestamp): return pd.to_datetime(timestamp)
def read_all_sheets(input_workbook): combined_df = pd.DataFrame()
for sheet in input_workbook.worksheets():
df = get_as_dataframe(sheet, header=None, dtype={0: str, 1: float})
df = df.iloc[:, :2]
df.columns = ["Timestamp", "Price"]
df["Timestamp"] = df["Timestamp"].apply(convert_timestamp_format)
combined_df = pd.concat([combined_df, df])
return combined_df
def interpolate_combined_df(combined_df): combined_df.set_index("Timestamp", inplace=True)
df_resampled = combined_df.resample("T").mean().interpolate(method='linear')
df_resampled['Sheet'] = df_resampled.index.to_series().dt.strftime('%Y%m')
return df_resampled.reset_index()
def save_sheets(output_workbook, interpolated_df): sheets = interpolated_df["Sheet"].unique()
for sheet_name in sheets:
df_sheet = interpolated_df[interpolated_df["Sheet"] == sheet_name].drop(columns=["Sheet"])
try:
output_sheet = output_workbook.worksheet(sheet_name)
except gspread.exceptions.WorksheetNotFound:
output_sheet = output_workbook.add_worksheet(title=sheet_name, rows="1000", cols="2")
set_with_dataframe(output_sheet, df_sheet, include_index=False, include_column_header=False)
def process_workbook(input_filename, output_filename):
input_workbook = gc.open(input_filename)
output_workbook = gc.open(output_filename)
combined_df = read_all_sheets(input_workbook)
interpolated_df = interpolate_combined_df(combined_df)
save_sheets(output_workbook, interpolated_df)
sheet_titles = sorted([sheet.title for sheet in output_workbook.worksheets()])
temp_workbook = gc.create('temp_workbook')
temp_sheet = temp_workbook.sheet1
for title in sheet_titles:
worksheet = output_workbook.worksheet(title)
df = get_as_dataframe(worksheet, header=None)
new_sheet = temp_workbook.add_worksheet(title=title, rows=worksheet.row_count, cols=worksheet.col_count)
set_with_dataframe(new_sheet, df, include_index=False, include_column_header=False)
temp_workbook.del_worksheet(temp_sheet)
dummy_sheet = output_workbook.add_worksheet(title="dummy_sheet", rows="1", cols="1")
for sheet in output_workbook.worksheets():
if sheet.title != "dummy_sheet":
output_workbook.del_worksheet(sheet)
for sheet in temp_workbook.worksheets():
df = get_as_dataframe(sheet, header=None)
new_sheet = output_workbook.add_worksheet(title=sheet.title, rows=sheet.row_count, cols=sheet.col_count)
set_with_dataframe(new_sheet, df, include_index=False, include_column_header=False)
output_workbook.del_worksheet(dummy_sheet)
gc.del_spreadsheet(temp_workbook.id)
input_filename = "BitCoinPrice_backup"
output_filename = "BitCoinPrice_interp"
process_workbook(input_filename, output_filename)
Floatや配列はそのままでは保存できないので、バイナリに変換して保存する。ただしバイナリも400KBという上限があるので、分割して保存する。Noneはそのままでは保存できないので文字列に変換する。
以下にDynamoDBへの保存、リストア用の関数を示す。
import boto3 import base64 import math import numpy as np
def convert_numpy_array_to_dynamodb(np_array, chunk_size_kb=400):
array_bytes = np_array.tobytes()
array_base64 = base64.b64encode(array_bytes)
chunk_size = chunk_size_kb * 1024
num_chunks = math.ceil(len(array_base64) / chunk_size)
data = {}
for i in range(num_chunks):
chunk = array_base64[i * chunk_size:(i + 1) * chunk_size]
data[str(i)] = chunk
return data, np_array.dtype
def revert_numpy_array_from_dynamodb(data, dtype):
combined_base64 = b''.join([data[str(i)].value for i in range(len(data))])
array_bytes = base64.b64decode(combined_base64)
np_array = np.frombuffer(array_bytes, dtype=dtype)
return np_array
def convert_for_dynamodb(data): if data is None: return {'NULL': True} elif isinstance(data, bool): return {'BOOL': data} elif isinstance(data, int): return {'N': str(data)} elif isinstance(data, float): return {'F': str(data)} elif isinstance(data, str): return {'S': data} elif isinstance(data, list) or isinstance(data, np.ndarray): re_data, dtype = convert_numpy_array_to_dynamodb(np.array(data)) if "int" in str(dtype): k = "LI" elif "float" in str(dtype): k = "LF" else: return {'L': [convert_for_dynamodb(item) for item in data]} return {k: re_data} elif isinstance(data, dict): return {'M': {k: convert_for_dynamodb(v) for k, v in data.items()}} else: raise TypeError(f"Type {type(data)} is not supported")
def revert_from_dynamodb(data): if 'NULL' in data: return None elif 'BOOL' in data: return data['BOOL'] elif 'N' in data: return int(data['N']) elif 'F' in data: return float(data['F']) elif 'S' in data: return data['S'] elif 'LI' in data: return revert_numpy_array_from_dynamodb(data["LI"], np.int64) elif 'LF' in data: return revert_numpy_array_from_dynamodb(data["LF"], np.float64) elif "L" in data: return [revert_from_dynamodb(item) for item in data['L']] elif 'M' in data: return {k: revert_from_dynamodb(v) for k, v in data['M'].items()} else: raise TypeError(f"Type {data.keys()} is not supported")
def save_to_dynamodb(table: object, item: dict, partition_key: str): """save to dynamoDB
Args:
table (object): DynamoDB table
item (dict): save data
partition_key (str): partition key of yuor table
"""
item_converted = {k: convert_for_dynamodb(v) for k, v in item.items()}
item_converted[partition_key] = item[partition_key]
table.put_item(Item=item_converted)
def read_from_dynamodb(table: object, key_value, partition_key: str) -> dict: """read from dynamoDB
Args:
table (object): DynamoDB table
key_value : partition key value of your data
partition_key (str): partition key of yuor table
Returns:
dict: data
"""
key = {partition_key: key_value}
response = table.get_item(Key=key)
item = response.get('Item')
if item:
return {k: v if k == partition_key else revert_from_dynamodb(v) for k, v in item.items()}
else:
return None
def get_dynamodb_table(table_name: str, aws_access_key_id: str, aws_secret_access_key: str, region_name: str = 'ap-northeast-1') -> object: """get dynamobd table
Args:
table_name (str): table name of dynamodb
aws_access_key_id (str): access key of aws
aws_secret_access_key (str): secret access key of aws
region_name (str, optional): resion name. Defaults to 'ap-northeast-1'.
Returns:
object: dynamodb table
"""
dynamodb = boto3.resource('dynamodb', region_name=region_name,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key)
return dynamodb.Table(table_name)
概要
MT4のような自動売買システムをビットコインでも行いたいということで開発。コンセプトとしては以下のような感じ。
- 運用コストはかけない(AWSの無料枠で動かせるような構成にする)
- 開発に時間をかけない(自動売買のアルゴリズムなど必要な部分のみ開発するだけで動かせるようにする)
- 言語はPythonとする(データ分析と相性がいいため)
構成
ビットコインのデータ収集や売買はBitflyerのAPIを使用する。そのためBitflyerを利用することが前提となる。 構成としては以下の図のように、ほぼすべて無料のサービスを使用して実行できるようにしている。
開発の流れとしては、以下のようなサイクルを回していく。
- Google Apps Scriptで価格データを収集し、Google driveに保存する。
- Google driveからデータを取り出し、Google Colabで自動売買アルゴリズム開発を行う。
- AWS Labmda に自動売買アルゴリズムをデプロイする。
- AWS Labmdaを定期実行し自動売買を行う。
データ収集
(参考)過去に収集したデータ
以下のページで公開中。
Google Apps Scriptで価格データを収集
以下のページを参考に、価格データを収集する。
データクリーニング
BitFlyerのメンテナンスや何かしらのエラーがあるため、データは必ずしも1分ごとではない。これでは使いにくいので、データを補完して1分ごとのデータを作成する。以下のページを参照。
アルゴリズム開発
開発用のコードは自作した。Pythonでアルゴリズムを作成し、バックテスト、パラメータチューニング、AWSへのデプロイ用のファイル生成ができる。
以下をGoogleColabで実行していく。まず、コードのCloneとライブラリのインストールを行う。
! git clone https://github.com/wooolwooolwoool/bitbacktest.git
! cd bitbacktest && pip install -U pip &&
pip install -r requirements.txt
アルゴリズムは以下のファイル内のStrategy()クラスを継承して作成する。
bitbacktest/src/bitbacktest/strategy.py at main · wooolwooolwoool/bitbacktest · GitHub
Strategy()クラスのうち、以下のメソッドを実装する。
Method | 説明 |
---|---|
reset_param() | パラメータのリセットを行うメソッド。開始時に1度だけ呼び出される。 |
generate_signals() | 価格に基づいて売買シグナルを生成するメソッド。 |
execute_trade() | トレードを実行するメソッド |
これらのメソッドで使用するパラメータは、以下のメンバ変数に辞書形式で保存しておく。
変数名 | 説明 |
---|---|
self.static | 静的なパラメータ。動的に変更不可(generate_signals()などで変更した場合もエラーとはならないが、保持されない) |
self.dynamic | 動的なパラメータ。動的に変更可能。 |
これらの違いとしてAWS Lambdaで実際にシステムトレードを行うコード上での保持方法が異なる。self.staticはAWS Lambdaの環境変数でセットするようになっており、AWS Lambdaのコードから更新することはできない。例えばMACDのWindowサイズや、一回に売買するBitCoinの数量などユーザで設定する値の利用を想定している。 一方でself.dynamicはAWS DynamoDBに保存するようになっており自動更新が可能である。例えば過去の価格データなど自動で更新される値の利用を想定している。
実際に売買を行うには、以下のメソッドを使用する。
Method | 説明 |
---|---|
self.market.place_market_order() | 成り行き注文 |
self.market.place_limit_order() | 指値注文 |
以下にStrategy()クラスの実装例として、MACDStrategy()を示す。
class MACDStrategy(Strategy):
def reset_param(self, param):
super().reset_param(param)
self.dynamic["count"] = 0
self.dynamic["prices"] = None
self.dynamic["emashort_values"] = None
self.dynamic["emalong_values"] = None
self.dynamic["macd_values"] = None
self.dynamic["signal_line_values"] = None
def _calculate_ema(self, current_price, previous_ema, window):
alpha = 2 / (window + 1.0)
return alpha * current_price + (1 - alpha) * previous_ema
def generate_signals(self, price):
if self.dynamic["prices"] is None:
emashort = emalong = price
macd = signal_line = 0.0
else:
emashort = self._calculate_ema(price,
self.dynamic["emashort_values"],
self.static["short_window"])
emalong = self._calculate_ema(price,
self.dynamic["emalong_values"],
self.static["long_window"])
macd = emashort - emalong
if self.dynamic["macd_values"] == 0:
signal_line = macd
else:
signal_line = self._calculate_ema(
macd, self.dynamic["signal_line_values"],
self.static["signal_window"])
self.dynamic["prices"] = price
self.dynamic["emashort_values"] = emashort
self.dynamic["emalong_values"] = emalong
self.dynamic["macd_values_old"] = self.dynamic["macd_values"]
self.dynamic["macd_values"] = macd
self.dynamic["signal_line_values_old"] = self.dynamic[
"signal_line_values"]
self.dynamic["signal_line_values"] = signal_line
signal = ""
if self.dynamic["macd_values_old"] is not None:
if self.dynamic["macd_values_old"] <= self.dynamic[
"signal_line_values_old"] and macd > signal_line:
signal = "Buy"
elif self.dynamic["macd_values_old"] >= self.dynamic[
"signal_line_values_old"] and macd < signal_line:
signal = "Sell"
return signal
def execute_trade(self, price, signal):
if signal in ['Buy', "Sell"]:
self.market.place_market_order(signal,
self.static["one_order_quantity"])
このMACDStrategy()を改造する場合は、以下のようにする。以下ではexecute_trade()のみを改造しており、買いシグナルが出た場合のみ買い注文を行い、買い注文に成功した場合現在価格にself.static["profit"]分利益を乗せて、指値で売り注文を出している。
import sys sys.path.append("/content/bitbacktest")
from src.bitbacktest.strategy import MACDStrategy from src.bitbacktest.market import BacktestMarket from src.bitbacktest.data_generater import random_data from src.bitbacktest.backtester import Backtester
class MACDForcusBuyStrategy(MACDStrategy): def reset_params(self, param, start_cash): super().reset_params(param, start_cash)
def execute_trade(self, price: float, signal: str):
if signal == 'Buy':
success = self.market.place_market_order(
'Buy', self.static["one_order_quantity"])
if success:
self.market.place_limit_order(
"Sell", self.static["one_order_quantity"],
price * self.static["profit"])
補足
実際にAWS Lambdaにデプロイするコードのベースは以下。
bitbacktest/app/aws_build/_lambda_base.py at main · wooolwooolwoool/bitbacktest · GitHub
トレードを行っているのは以下の箇所で、(現在出している注文で制限をかけているが)generate_signals()、execute_trade()、の順に実行している。
signals = strategy.generate_signals(current_price)
orders = market.get_open_orders() if os.environ["TRADE_ENABLE"] == "1" and int(os.environ["ORDER_NUM_MAX"]) > len(orders): strategy.execute_trade(current_price, signals)
バックテスト
自動売買アルゴリズムが実装出来たらバックテストを行う。バックテストでは主に売買や注文の管理を行うMarketクラスと、先ほど自動売買アルゴリズムを実装したStrategyクラスを使用する。 参考↓
まずはデータを用意する。以下のようにランダムなデータを生成することも可能。
seed = 111
start_price = 1e7
price_range = 0.001
length = 60 * 24 * 7 * 4
price_data = random_data(start_price, price_range, length, seed)
次にバックテスト用のMarketクラスと、先ほど開発したStrategyクラスを準備する。
market = BacktestMarket(price_data) strategy = MACDStrategy(market)
次にパラメータと開始時の現金をセットする。
param = { "short_window": 12, "long_window": 26, "signal_window": 9, "one_order_quantity": 0.01 } start_cash = 1e6
strategy.reset_all(param, start_cash)
最後にバックテストを実行する。
portfolio_result = strategy.backtest() print(portfolio_result)
バックテスト結果としては、以下のようにバックテスト終了時のトレード回数(trade_count)、現金(cash)、ビットコイン数(position)、総資産(total_value)が出力される。
100%|█| 40320/40320 [00:00<00:00, 186697. {'trade_count': 63, 'cash': np.float64(989527.8085676738), 'position': 0.0009054999999999949, 'total_value': np.float64(998356.0314502214)}
パラメータチューニング
アルゴリズムができたらチューニングを行う。target_paramsにパラメータをセットし、BayesianBacktester()でバックテストを行うことで、ベイズ最適化を行ってくれる。
target_paramsには固定化したいパラメータはその値、チューニングしたいパラメータはscikit-optimizeのInteger、Real、Categoricalでセットする。
from skopt.space import Integer, Real, Categorical
start_cash = 2e5 market = BacktestMarket(back_data) strategy = MACDForcusBuyStrategy(market) backtester = BayesianBacktester(strategy)
target_params = { "short_window": Integer(32, 360, name='short_window'), "long_window": Integer(240, 720, name='long_window'), "signal_window": Integer(8, 360, name='signal_window'), "profit": 1.01, "one_order_quantity": 0.001 }
backtester.backtest(target_params, start_cash, n_calls=100)
デプロイ
作成した自動売買アルゴリズムをAWS Lambdaへデプロイする。
GoogleColabで以下を実行する。オプションは以下の通り。
- -d: 作成したソースコードがおいてあるディレクトリを指定する。
- -s: 作成した自動売買アルゴリズムのクラス名を指定する。このクラスが見つからない場合はエラーになる。
- -o: 出力するCloudFormation用yamlファイルのファイル名。
! python3 app/aws_build/build_all.py
-d your_src/ -s MACDForcusBuyStrategy -o CloudFormation.yaml
上記を実行すると、CloudFormation.yamlが作成される。
このCloudFormation.yamlをAWS CloudFormationで指定してデプロイすることで、Lamda関数や定期実行用のEventBridgeが作成される。CloudFormationでのデプロイ時に設定するパラメータは以下のとおり。
名前 | 説明 |
---|---|
LambdaFunctionName | Lamda関数名。自由に設定可能。 |
LambdaRoleArn | Lamdaを実行するロール。DynamoDBのRW権限など必要な権限を設定したロールを指定する。 |
実行準備
Lamda関数の環境変数に移動し、以下を設定する。
名前 | 説明 |
---|---|
API_KEY | BitflyerのAPI_KEY |
API_SECRET | BitflyerのAPI_SECRET |
TABLE_NAME | DynamoDBのテーブル名 |
PARAMS_KEY | DynamoDB上でパラメータを保存するキー |
TRADE_ENABLE | トレードを実行するか。1の場合はexecute_trade()が実行され、それ以外場合は実行しない。(EventBridgeを止めるとDynamoDBに保存しているデータも更新されなくなり不都合が生じるので、止める際はこのフラグを使用する。) |
ORDER_NUM_MAX | 最大注文数。 |
このほかに作成したアルゴリズム内でself.staticで新たなパラメータを使用している場合環境変数が追加されるので、必要に応じてセットする。
実行
EventBridgeをEnableにすると実行が開始される。
以下のコードをGoogle Colabで実行する。ファイルごとにシート名を分けて、複数行ずつ書き込んでくれる。
from itertools import count import csv import gspread from datetime import datetime import glob import tqdm
from google.colab import auth auth.authenticate_user()
import gspread from google.auth import default creds, _ = default()
gc = gspread.authorize(creds)
filename = "BitCoinPrice"
ss = gc.open(filename)
worksheet_list = ss.worksheets()
files = glob.glob("/content/*.csv") print(files) for csv_file_path in files: with open(csv_file_path, 'r') as file: reader = csv.reader(file) csv_data = list(reader)
sheet_name = csv_file_path.split("/")[-1].replace(".csv", "").replace("-", "")
print(sheet_name)
try:
worksheet = ss.worksheet(sheet_name)
except gspread.exceptions.WorksheetNotFound:
worksheet = ss.add_worksheet(title=sheet_name, rows=f"{60 * 24 * 32}", cols="2")
count = 1
ones_rows = 60 * 24
ones_cols = 2
ds = worksheet.range(f'A1:B{ones_rows}')
for row_index, row in tqdm.tqdm(enumerate(csv_data), total=len(csv_data)):
if row_index == 0:
continue
for col_index, cell in enumerate(row):
if col_index == ones_cols:
break
idx = ((row_index - 1) * ones_cols + col_index) % (ones_cols * ones_rows)
ds[idx].value = cell
if (row_index) % ones_rows == 0:
worksheet.update_cells(ds)
count += 1
ds = worksheet.range(f'A{ones_rows * count + 1}:B{ones_rows * (count+1)}')
worksheet.update_cells(ds)
print(f"Data from {csv_file_path} has been written to the sheet '{sheet_name}' in the spreadsheet.")