from AkahuClient.akahuclient import AkahuClient from EmoneyScraper.scraper import Scraper from IngestionService.normalizer import DataNormalizer from config import load_env import os import json import psycopg from pathlib import Path from datetime import datetime, date from playwright.sync_api import sync_playwright, Playwright load_env("IngestionService") pw = sync_playwright().start() class Ingester: def __init__(self): self.token = self._require_env("AKAHU_API_TOKEN") self.app_id = self._require_env("AKAHU_APP_ID") db_host = self._require_env("DB_HOST") db_name = self._require_env("DB_NAME") db_user = self._require_env("DB_USER") db_password = self._require_env("DB_PASSWORD") self.client = AkahuClient(self.token, self.app_id) self.dbconnection = psycopg.connect( host=db_host, dbname=db_name, user=db_user, password=db_password, ) self.scraper: Scraper | None = None def test_connection(self): accounts = self.client.get_accounts() print("Akahu accounts:", accounts) def test_scraper(self): snapshot = self.fetch_emoney_snapshot_data() transactions = self.fetch_emoney_transaction_data() print("Emoney snapshot accounts:", len(snapshot.get("accounts", []))) print("Emoney transactions:", len(transactions.get("transactions", []))) return {"snapshot": snapshot, "transactions": transactions} def test_db_connection(self): with self.dbconnection.cursor() as cursor: cursor.execute("SELECT 1") result = cursor.fetchone() print("Database connection test result:", result) def get_transactions(self, account_id: str, start_date: str, end_date: str): transactions = self.client.get_transactions(account_id, start_date, end_date) print(f"Transactions for account {account_id} from {start_date} to {end_date}:", transactions) def get_accounts(self, account_names=None): accounts = self.client.get_accounts(account_names) return accounts def fetch_akahu_snapshot_data(self, account_names=None): return self.client.get_accounts(account_names) def fetch_akahu_transaction_data(self, account_id: str, start_date: str, end_date: str): return self.client.get_transactions(account_id, start_date, end_date) def fetch_akahu_transactions_backfill(self, start_date: str, end_date: str, account_names=None): accounts = self.client.get_accounts(account_names) combined = {"items": []} for account in accounts.get("items", []): account_id = account.get("_id") or account.get("id") if not account_id: continue response = self.client.get_transactions(account_id, start_date, end_date) combined["items"].extend(response.get("items", [])) return combined def backfill_akahu_transactions(self, start_date: str, end_date: str, account_names=None): data = self.fetch_akahu_transactions_backfill(start_date, end_date, account_names) self.write_akahu_transaction_data(data) return data def fetch_emoney_snapshot_data(self): data = self.fetch_emoney_data() return data.get("snapshot") or {"accounts": []} def fetch_emoney_transaction_data(self): data = self.fetch_emoney_data() return {"transactions": data.get("transactions") or []} def fetch_emoney_data(self) -> dict[str, object]: cache = self._read_emoney_cache() or {} snapshot = cache.get("snapshot") transactions = cache.get("transactions") missing_snapshot = snapshot is None missing_transactions = transactions is None if missing_snapshot or missing_transactions: scraper = self._get_scraper() if missing_snapshot: snapshot = scraper.get_snapshot() if missing_transactions: transactions = scraper.get_transactions_parsed() payload: dict[str, object] = {} if missing_snapshot: payload["snapshot"] = snapshot if missing_transactions: payload["transactions"] = transactions if payload: self._write_emoney_cache(payload) return { "snapshot": snapshot, "transactions": transactions, } def write_akahu_snapshot_data(self, data): with self.dbconnection.cursor() as cursor: for account in data.get("items", []): cursor.execute( """ INSERT INTO rawsnapshots (data, source) VALUES (%s, %s) ON CONFLICT (raw_sha256) DO NOTHING """, (json.dumps(account, default=str), "akahu") ) self.dbconnection.commit() def write_akahu_transaction_data(self, data): items = self._sort_transactions_oldest_first(data.get("items", [])) with self.dbconnection.cursor() as cursor: for transaction in items: cursor.execute( """ INSERT INTO rawtransactions (data, source) VALUES (%s, %s) ON CONFLICT (raw_sha256) DO NOTHING """, (json.dumps(transaction, default=str), "akahu") ) self.dbconnection.commit() def write_emoney_snapshot_data(self, data): accounts = self._sort_emoney_snapshots_oldest_first(data.get("accounts", [])) with self.dbconnection.cursor() as cursor: for account in accounts: cursor.execute( """ INSERT INTO rawsnapshots (data, source) VALUES (%s, %s) ON CONFLICT (raw_sha256) DO NOTHING """, (json.dumps(account, default=str), "emoney") ) self.dbconnection.commit() def write_emoney_transaction_data(self, data): transactions = data if isinstance(data, dict): transactions = data.get("transactions", []) items = self._sort_transactions_oldest_first(transactions) with self.dbconnection.cursor() as cursor: for transaction in items: cursor.execute( """ INSERT INTO rawtransactions (data, source) VALUES (%s, %s) ON CONFLICT (raw_sha256) DO NOTHING """, (json.dumps(transaction, default=str), "emoney") ) self.dbconnection.commit() def normalize_pending_data(self, source: str | None = None, limit: int | None = None): normalizer = DataNormalizer(self.dbconnection) transactions = normalizer.read_raw_transactions(source=source, limit=limit) snapshots = normalizer.read_raw_snapshots(source=source, limit=limit) normalizer.normalize_transactions(transactions) normalizer.normalize_snapshots(snapshots) def _read_emoney_cache(self) -> dict[str, object] | None: if not self._use_emoney_cache(): return None cache_path = self._emoney_cache_path() if not cache_path.exists(): return None with cache_path.open("r", encoding="utf-8") as handle: return json.load(handle) def _write_emoney_cache(self, payload: dict[str, object]) -> None: if not self._use_emoney_cache(): return cache_path = self._emoney_cache_path() existing: dict[str, object] = {} if cache_path.exists(): with cache_path.open("r", encoding="utf-8") as handle: existing = json.load(handle) existing.update(payload) with cache_path.open("w", encoding="utf-8") as handle: json.dump(existing, handle, default=str) def _emoney_cache_path(self) -> Path: raw_path = os.getenv("EMONEY_CACHE_PATH") or "emoney_cache.json" return Path(raw_path) def _use_emoney_cache(self) -> bool: value = os.getenv("EMONEY_USE_CACHE", "false").strip().lower() return value in {"1", "true", "yes"} def _get_scraper(self) -> Scraper: if self.scraper is None: self.scraper = Scraper(pw, headless=True) return self.scraper def _sort_emoney_snapshots_oldest_first(self, accounts: list[dict[str, object]]) -> list[dict[str, object]]: def key(item: dict[str, object]) -> tuple[bool, datetime]: dt = self._parse_datetime(item.get("date")) if dt is None: return True, datetime.max return False, dt return sorted(accounts, key=key) def _sort_transactions_oldest_first(self, items: list[dict[str, object]]) -> list[dict[str, object]]: def key(item: dict[str, object]) -> tuple[bool, datetime]: dt = self._parse_datetime( item.get("date") or item.get("datetime") or item.get("timestamp") or item.get("created_at") or item.get("effective_date") ) if dt is None: return True, datetime.max return False, dt return sorted(items, key=key) @staticmethod def _parse_datetime(value: object) -> datetime | None: if isinstance(value, datetime): return value if isinstance(value, date): return datetime.combine(value, datetime.min.time()) if isinstance(value, (int, float)): try: return datetime.fromtimestamp(value) except Exception: return None if isinstance(value, str): text = value.strip() if text.endswith("Z"): text = f"{text[:-1]}+00:00" try: return datetime.fromisoformat(text) except ValueError: for fmt in ("%Y-%m-%d", "%d-%m-%Y", "%Y/%m/%d", "%d/%m/%Y"): try: return datetime.strptime(text, fmt) except ValueError: continue return None @staticmethod def _require_env(name: str) -> str: value = os.getenv(name) if not value: raise ValueError(f"Please set {name} in your environment.") return value