봇을 만들려고 할 때 가장 궁금한 것이 '어떻게 중단되지 않고 항상 봇을 돌릴 수 있느냐?'이다. 항상 숨을 쉬지만 아무 의식없이, 노력없이 사는 것처럼, 메모리누수없이 각종 런타임에러없이 중단되지 않고 실행가능한 상태로 만드는 게 중요하다.
다음의 코드는 이런 나의 고민을 제미나이가 듣고 제시한 기본 코드다. 코드의 핵심은 asyncio다. 비동기처리를 위한 라이브러리. 아래의 코드는 일정시간마다 시세를 다운 받아 모종의 처리를 한 후 지표를 만들고 csv로 저장하는 예이다. 여러 가지의 동작이 순차적으로 진행된다(그러나 실세 돌리면 이전 코드에 비해 업타임이 짧다는 느낌)
import asyncio
import signal
from fetcher import fetch_all_ohlcv
from processor import process_data
from storage import save_to_csv
from config import INTERVAL_SEC
from indicators import indicators
async def periodic_task():
try:
while True:
print(f"🔄 Starting Myramid...")
raw_data = await fetch_all_ohlcv()
dfs = process_data(raw_data)
indicators(dfs)
save_to_csv(dfs)
await asyncio.sleep(INTERVAL_SEC)
except asyncio.CancelledError:
print("🛑 Shutting down Myramid...")
async def main():
task = asyncio.create_task(periodic_task())
# Handle shutdown signals
loop = asyncio.get_running_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, task.cancel)
try:
await task
except asyncio.CancelledError:
pass
if __name__=='__main__':
asyncio.run(main())
위의 코드는 나중에 커서 봇의 뼈대가 된다.