AkahuSync/IngestionService/ingester.py
2026-05-19 13:07:18 +12:00

328 lines
13 KiB
Python

from AkahuClient.akahuclient import AkahuClient
from EmoneyScraper.scraper import Scraper
from IngestionService.normalizer import DataNormalizer
from config import load_env
import os
import json
import psycopg
from pathlib import Path
from datetime import datetime, date
from playwright.sync_api import sync_playwright, Playwright
load_env("IngestionService")
pw = sync_playwright().start()
class Ingester:
def __init__(self):
self.token = self._require_env("AKAHU_API_TOKEN")
self.app_id = self._require_env("AKAHU_APP_ID")
db_host = self._require_env("DB_HOST")
db_name = self._require_env("DB_NAME")
db_user = self._require_env("DB_USER")
db_password = self._require_env("DB_PASSWORD")
self.client = AkahuClient(self.token, self.app_id)
self.dbconnection = psycopg.connect(
host=db_host,
dbname=db_name,
user=db_user,
password=db_password,
)
self.scraper: Scraper | None = None
def test_connection(self):
accounts = self.client.get_accounts()
print("Akahu accounts:", accounts)
def test_scraper(self):
snapshot = self.fetch_emoney_snapshot_data()
transactions = self.fetch_emoney_transaction_data()
print("Emoney snapshot accounts:", len(snapshot.get("accounts", [])))
print("Emoney transactions:", len(transactions.get("transactions", [])))
return {"snapshot": snapshot, "transactions": transactions}
def test_db_connection(self):
with self.dbconnection.cursor() as cursor:
cursor.execute("SELECT 1")
result = cursor.fetchone()
print("Database connection test result:", result)
def get_transactions(self, account_id: str, start_date: str, end_date: str):
transactions = self.client.get_transactions(account_id, start_date, end_date)
print(f"Transactions for account {account_id} from {start_date} to {end_date}:", transactions)
def get_accounts(self, account_names=None):
accounts = self.client.get_accounts(account_names)
return accounts
def fetch_akahu_snapshot_data(self, account_names=None):
return self.client.get_accounts(account_names)
def fetch_akahu_transaction_data(self, account_id: str, start_date: str, end_date: str):
return self.client.get_transactions(account_id, start_date, end_date)
def fetch_akahu_transactions_backfill(self, start_date: str, end_date: str, account_names=None):
accounts = self.client.get_accounts(account_names)
combined = {"items": []}
for account in accounts.get("items", []):
account_id = account.get("_id") or account.get("id")
if not account_id:
continue
response = self.client.get_transactions(account_id, start_date, end_date)
combined["items"].extend(response.get("items", []))
return combined
def backfill_akahu_transactions(self, start_date: str, end_date: str, account_names=None):
data = self.fetch_akahu_transactions_backfill(start_date, end_date, account_names)
self.write_akahu_transaction_data(data)
return data
def fetch_emoney_snapshot_data(self):
data = self.fetch_emoney_data()
return data.get("snapshot") or {"accounts": []}
def fetch_emoney_transaction_data(self):
data = self.fetch_emoney_data()
return {"transactions": data.get("transactions") or []}
def fetch_emoney_data(self) -> dict[str, object]:
cache = self._read_emoney_cache() or {}
snapshot = cache.get("snapshot")
transactions = cache.get("transactions")
missing_snapshot = snapshot is None
missing_transactions = transactions is None
if missing_snapshot or missing_transactions:
scraper = self._get_scraper()
if missing_snapshot:
snapshot = scraper.get_snapshot()
if missing_transactions:
transactions = scraper.get_transactions_parsed()
payload: dict[str, object] = {}
if missing_snapshot:
payload["snapshot"] = snapshot
if missing_transactions:
payload["transactions"] = transactions
if payload:
self._write_emoney_cache(payload)
return {
"snapshot": snapshot,
"transactions": transactions,
}
def write_akahu_snapshot_data(self, data):
with self.dbconnection.cursor() as cursor:
for account in data.get("items", []):
self._record_sync(
account_num=self._string_or_none(account.get("formatted_account") or account.get("_id") or account.get("account_id")),
account_name=self._string_or_none(account.get("name") or account.get("account_name")),
org_name=self._string_or_none(
(account.get("connection") or {}).get("name") if isinstance(account.get("connection"), dict) else None
),
)
cursor.execute(
"""
INSERT INTO rawsnapshots (data, source)
VALUES (%s, %s)
ON CONFLICT (raw_sha256) DO NOTHING
""",
(json.dumps(account, default=str), "akahu")
)
self.dbconnection.commit()
def write_akahu_transaction_data(self, data):
items = self._sort_transactions_oldest_first(data.get("items", []))
with self.dbconnection.cursor() as cursor:
for transaction in items:
cursor.execute(
"""
INSERT INTO rawtransactions (data, source)
VALUES (%s, %s)
ON CONFLICT (raw_sha256) DO NOTHING
""",
(json.dumps(transaction, default=str), "akahu")
)
self.dbconnection.commit()
def write_emoney_snapshot_data(self, data):
accounts = self._sort_emoney_snapshots_oldest_first(data.get("accounts", []))
with self.dbconnection.cursor() as cursor:
for account in accounts:
self._record_sync(
account_num="emoney",
account_name="Emoney",
org_name="Emoney",
)
cursor.execute(
"""
INSERT INTO rawsnapshots (data, source)
VALUES (%s, %s)
ON CONFLICT (raw_sha256) DO NOTHING
""",
(json.dumps(account, default=str), "emoney")
)
self.dbconnection.commit()
def write_emoney_transaction_data(self, data):
transactions = data
if isinstance(data, dict):
transactions = data.get("transactions", [])
items = self._sort_transactions_oldest_first(transactions)
with self.dbconnection.cursor() as cursor:
for transaction in items:
cursor.execute(
"""
INSERT INTO rawtransactions (data, source)
VALUES (%s, %s)
ON CONFLICT (raw_sha256) DO NOTHING
""",
(json.dumps(transaction, default=str), "emoney")
)
self.dbconnection.commit()
def normalize_pending_data(self, source: str | None = None, limit: int | None = None):
normalizer = DataNormalizer(self.dbconnection)
transactions = normalizer.read_raw_transactions(source=source, limit=limit)
snapshots = normalizer.read_raw_snapshots(source=source, limit=limit)
normalizer.normalize_transactions(transactions)
normalizer.normalize_snapshots(snapshots)
def _read_emoney_cache(self) -> dict[str, object] | None:
if not self._use_emoney_cache():
return None
cache_path = self._emoney_cache_path()
if not cache_path.exists():
return None
with cache_path.open("r", encoding="utf-8") as handle:
return json.load(handle)
def _write_emoney_cache(self, payload: dict[str, object]) -> None:
if not self._use_emoney_cache():
return
cache_path = self._emoney_cache_path()
existing: dict[str, object] = {}
if cache_path.exists():
with cache_path.open("r", encoding="utf-8") as handle:
existing = json.load(handle)
existing.update(payload)
with cache_path.open("w", encoding="utf-8") as handle:
json.dump(existing, handle, default=str)
def _emoney_cache_path(self) -> Path:
raw_path = os.getenv("EMONEY_CACHE_PATH") or "emoney_cache.json"
return Path(raw_path)
def _use_emoney_cache(self) -> bool:
value = os.getenv("EMONEY_USE_CACHE", "false").strip().lower()
return value in {"1", "true", "yes"}
def _get_scraper(self) -> Scraper:
if self.scraper is None:
self.scraper = Scraper(pw, headless=True)
return self.scraper
def _sort_emoney_snapshots_oldest_first(self, accounts: list[dict[str, object]]) -> list[dict[str, object]]:
def key(item: dict[str, object]) -> tuple[bool, datetime]:
dt = self._parse_datetime(item.get("date"))
if dt is None:
return True, datetime.max
return False, dt
return sorted(accounts, key=key)
def _sort_transactions_oldest_first(self, items: list[dict[str, object]]) -> list[dict[str, object]]:
def key(item: dict[str, object]) -> tuple[bool, datetime]:
dt = self._parse_datetime(
item.get("date")
or item.get("datetime")
or item.get("timestamp")
or item.get("created_at")
or item.get("effective_date")
)
if dt is None:
return True, datetime.max
return False, dt
return sorted(items, key=key)
def _record_sync(self, account_num: str | None, account_name: str | None, org_name: str | None) -> None:
org_id = self._get_or_create_org(org_name)
account_id = self._get_or_create_account(account_num, account_name, org_id)
with self.dbconnection.cursor() as cursor:
cursor.execute(
"""
INSERT INTO syncs (datetime, accountid, orgid)
VALUES (%s, %s, %s)
""",
(date.today(), account_id, org_id),
)
def _get_or_create_org(self, org_name: str | None) -> int:
org_name = org_name or "unknown"
with self.dbconnection.cursor() as cursor:
cursor.execute("SELECT id FROM organizations WHERE orgname = %s", (org_name,))
row = cursor.fetchone()
if row:
return row[0]
cursor.execute(
"INSERT INTO organizations (orgname) VALUES (%s) RETURNING id",
(org_name,),
)
return cursor.fetchone()[0]
def _get_or_create_account(self, account_num: str | None, account_name: str | None, org_id: int) -> int:
account_name = account_name or "unknown"
account_num = account_num or f"{org_id}:{account_name}"
with self.dbconnection.cursor() as cursor:
cursor.execute("SELECT id FROM accounts WHERE accountnum = %s", (account_num,))
row = cursor.fetchone()
if row:
return row[0]
cursor.execute(
"""
INSERT INTO accounts (accountnum, accountname, orgid)
VALUES (%s, %s, %s)
RETURNING id
""",
(account_num, account_name, org_id),
)
return cursor.fetchone()[0]
@staticmethod
def _string_or_none(value: object) -> str | None:
if value is None:
return None
return str(value)
@staticmethod
def _parse_datetime(value: object) -> datetime | None:
if isinstance(value, datetime):
return value
if isinstance(value, date):
return datetime.combine(value, datetime.min.time())
if isinstance(value, (int, float)):
try:
return datetime.fromtimestamp(value)
except Exception:
return None
if isinstance(value, str):
text = value.strip()
if text.endswith("Z"):
text = f"{text[:-1]}+00:00"
try:
return datetime.fromisoformat(text)
except ValueError:
for fmt in ("%Y-%m-%d", "%d-%m-%Y", "%Y/%m/%d", "%d/%m/%Y"):
try:
return datetime.strptime(text, fmt)
except ValueError:
continue
return None
@staticmethod
def _require_env(name: str) -> str:
value = os.getenv(name)
if not value:
raise ValueError(f"Please set {name} in your environment.")
return value