AkahuSync/IngestionService/ingester.py

101 lines
3.6 KiB
Python
Raw Normal View History

2026-05-18 09:10:09 +00:00
from AkahuClient.akahuclient import AkahuClient
from EmoneyScraper.scraper import Scraper
2026-05-13 06:49:03 +00:00
from dotenv import load_dotenv
import os
2026-05-18 09:10:09 +00:00
import json
import psycopg
from playwright.sync_api import sync_playwright, Playwright
pw = sync_playwright().start()
2026-05-13 06:49:03 +00:00
load_dotenv()
2026-05-18 09:10:09 +00:00
class Ingester:
def __init__(self):
self.token = os.getenv("AKAHU_API_TOKEN")
self.app_id = os.getenv("AKAHU_APP_ID")
if not self.token or not self.app_id:
raise ValueError("Please set AKAHU_API_TOKEN and AKAHU_APP_ID in your environment.")
self.client = AkahuClient(self.token, self.app_id)
self.dbconnection = psycopg.connect(
host=os.getenv("DB_HOST"),
dbname=os.getenv("DB_NAME"),
user=os.getenv("DB_USER"),
password=os.getenv("DB_PASSWORD")
)
self.scraper = Scraper(pw, headless=False)
def test_connection(self):
accounts = self.client.get_accounts()
print("Akahu accounts:", accounts)
def test_scraper(self):
data = self.scraper.get_balance()
print("Scraped data:", data)
def test_db_connection(self):
with self.dbconnection.cursor() as cursor:
cursor.execute("SELECT 1")
result = cursor.fetchone()
print("Database connection test result:", result)
def get_transactions(self, account_id: str, start_date: str, end_date: str):
transactions = self.client.get_transactions(account_id, start_date, end_date)
print(f"Transactions for account {account_id} from {start_date} to {end_date}:", transactions)
def get_accounts(self, account_names=None):
accounts = self.client.get_accounts(account_names)
return accounts
def write_akahu_snapshot_data(self, data):
with self.dbconnection.cursor() as cursor:
for account in data.get("items", []):
cursor.execute(
"""
INSERT INTO rawsnapshot (data, source)
VALUES (%s, %s)
ON CONFLICT (raw_sha256) DO NOTHING
""",
(json.dumps(account), "akahu")
)
self.dbconnection.commit()
def write_akahu_transaction_data(self, data):
with self.dbconnection.cursor() as cursor:
for transaction in data.get("items", []):
cursor.execute(
"""
INSERT INTO rawtransactions (data, source)
VALUES (%s, %s)
ON CONFLICT (raw_sha256) DO NOTHING
""",
(json.dumps(transaction), "akahu")
)
self.dbconnection.commit()
2026-05-13 06:49:03 +00:00
2026-05-18 09:10:09 +00:00
def write_emoney_snapshot_data(self, data):
with self.dbconnection.cursor() as cursor:
for account in data.get("accounts", []):
cursor.execute(
"""
INSERT INTO rawsnapshots (data, source)
VALUES (%s, %s)
ON CONFLICT (raw_sha256) DO NOTHING
""",
(json.dumps(account), "emoney")
)
self.dbconnection.commit()
2026-05-13 06:49:03 +00:00
2026-05-18 09:10:09 +00:00
def write_emoney_transaction_data(self, data):
with self.dbconnection.cursor() as cursor:
for transaction in data.get("transactions", []):
cursor.execute(
"""
INSERT INTO rawtransactions (data, source)
VALUES (%s, %s)
ON CONFLICT (raw_sha256) DO NOTHING
""",
(json.dumps(transaction), "emoney")
)
self.dbconnection.commit()