AkahuSync/IngestionService/ingester.py

265 lines
10 KiB
Python
Raw Normal View History

2026-05-18 09:10:09 +00:00
from AkahuClient.akahuclient import AkahuClient
from EmoneyScraper.scraper import Scraper
2026-05-18 22:47:46 +00:00
from IngestionService.normalizer import DataNormalizer
from config import load_env
2026-05-13 06:49:03 +00:00
import os
2026-05-18 09:10:09 +00:00
import json
import psycopg
2026-05-18 22:47:46 +00:00
from pathlib import Path
from datetime import datetime, date
2026-05-18 09:10:09 +00:00
from playwright.sync_api import sync_playwright, Playwright
2026-05-18 22:47:46 +00:00
load_env("IngestionService")
2026-05-18 09:10:09 +00:00
pw = sync_playwright().start()
class Ingester:
def __init__(self):
2026-05-18 22:47:46 +00:00
self.token = self._require_env("AKAHU_API_TOKEN")
self.app_id = self._require_env("AKAHU_APP_ID")
db_host = self._require_env("DB_HOST")
db_name = self._require_env("DB_NAME")
db_user = self._require_env("DB_USER")
db_password = self._require_env("DB_PASSWORD")
2026-05-18 09:10:09 +00:00
self.client = AkahuClient(self.token, self.app_id)
self.dbconnection = psycopg.connect(
2026-05-18 22:47:46 +00:00
host=db_host,
dbname=db_name,
user=db_user,
password=db_password,
2026-05-18 09:10:09 +00:00
)
2026-05-18 22:47:46 +00:00
self.scraper: Scraper | None = None
2026-05-18 09:10:09 +00:00
def test_connection(self):
accounts = self.client.get_accounts()
print("Akahu accounts:", accounts)
def test_scraper(self):
2026-05-18 22:47:46 +00:00
snapshot = self.fetch_emoney_snapshot_data()
transactions = self.fetch_emoney_transaction_data()
print("Emoney snapshot accounts:", len(snapshot.get("accounts", [])))
print("Emoney transactions:", len(transactions.get("transactions", [])))
return {"snapshot": snapshot, "transactions": transactions}
2026-05-18 09:10:09 +00:00
def test_db_connection(self):
with self.dbconnection.cursor() as cursor:
cursor.execute("SELECT 1")
result = cursor.fetchone()
print("Database connection test result:", result)
def get_transactions(self, account_id: str, start_date: str, end_date: str):
transactions = self.client.get_transactions(account_id, start_date, end_date)
print(f"Transactions for account {account_id} from {start_date} to {end_date}:", transactions)
def get_accounts(self, account_names=None):
accounts = self.client.get_accounts(account_names)
return accounts
2026-05-18 22:47:46 +00:00
def fetch_akahu_snapshot_data(self, account_names=None):
return self.client.get_accounts(account_names)
def fetch_akahu_transaction_data(self, account_id: str, start_date: str, end_date: str):
return self.client.get_transactions(account_id, start_date, end_date)
def fetch_akahu_transactions_backfill(self, start_date: str, end_date: str, account_names=None):
accounts = self.client.get_accounts(account_names)
combined = {"items": []}
for account in accounts.get("items", []):
account_id = account.get("_id") or account.get("id")
if not account_id:
continue
response = self.client.get_transactions(account_id, start_date, end_date)
combined["items"].extend(response.get("items", []))
return combined
def backfill_akahu_transactions(self, start_date: str, end_date: str, account_names=None):
data = self.fetch_akahu_transactions_backfill(start_date, end_date, account_names)
self.write_akahu_transaction_data(data)
return data
def fetch_emoney_snapshot_data(self):
data = self.fetch_emoney_data()
return data.get("snapshot") or {"accounts": []}
def fetch_emoney_transaction_data(self):
data = self.fetch_emoney_data()
return {"transactions": data.get("transactions") or []}
def fetch_emoney_data(self) -> dict[str, object]:
cache = self._read_emoney_cache() or {}
snapshot = cache.get("snapshot")
transactions = cache.get("transactions")
missing_snapshot = snapshot is None
missing_transactions = transactions is None
if missing_snapshot or missing_transactions:
scraper = self._get_scraper()
if missing_snapshot:
snapshot = scraper.get_snapshot()
if missing_transactions:
transactions = scraper.get_transactions_parsed()
payload: dict[str, object] = {}
if missing_snapshot:
payload["snapshot"] = snapshot
if missing_transactions:
payload["transactions"] = transactions
if payload:
self._write_emoney_cache(payload)
return {
"snapshot": snapshot,
"transactions": transactions,
}
2026-05-18 09:10:09 +00:00
def write_akahu_snapshot_data(self, data):
with self.dbconnection.cursor() as cursor:
for account in data.get("items", []):
cursor.execute(
"""
2026-05-18 22:47:46 +00:00
INSERT INTO rawsnapshots (data, source)
2026-05-18 09:10:09 +00:00
VALUES (%s, %s)
ON CONFLICT (raw_sha256) DO NOTHING
""",
2026-05-18 22:47:46 +00:00
(json.dumps(account, default=str), "akahu")
2026-05-18 09:10:09 +00:00
)
self.dbconnection.commit()
def write_akahu_transaction_data(self, data):
2026-05-18 22:47:46 +00:00
items = self._sort_transactions_oldest_first(data.get("items", []))
2026-05-18 09:10:09 +00:00
with self.dbconnection.cursor() as cursor:
2026-05-18 22:47:46 +00:00
for transaction in items:
2026-05-18 09:10:09 +00:00
cursor.execute(
"""
INSERT INTO rawtransactions (data, source)
VALUES (%s, %s)
ON CONFLICT (raw_sha256) DO NOTHING
""",
2026-05-18 22:47:46 +00:00
(json.dumps(transaction, default=str), "akahu")
2026-05-18 09:10:09 +00:00
)
self.dbconnection.commit()
2026-05-13 06:49:03 +00:00
2026-05-18 09:10:09 +00:00
def write_emoney_snapshot_data(self, data):
2026-05-18 22:47:46 +00:00
accounts = self._sort_emoney_snapshots_oldest_first(data.get("accounts", []))
2026-05-18 09:10:09 +00:00
with self.dbconnection.cursor() as cursor:
2026-05-18 22:47:46 +00:00
for account in accounts:
2026-05-18 09:10:09 +00:00
cursor.execute(
"""
INSERT INTO rawsnapshots (data, source)
VALUES (%s, %s)
ON CONFLICT (raw_sha256) DO NOTHING
""",
2026-05-18 22:47:46 +00:00
(json.dumps(account, default=str), "emoney")
2026-05-18 09:10:09 +00:00
)
self.dbconnection.commit()
2026-05-13 06:49:03 +00:00
2026-05-18 09:10:09 +00:00
def write_emoney_transaction_data(self, data):
2026-05-18 22:47:46 +00:00
transactions = data
if isinstance(data, dict):
transactions = data.get("transactions", [])
items = self._sort_transactions_oldest_first(transactions)
2026-05-18 09:10:09 +00:00
with self.dbconnection.cursor() as cursor:
2026-05-18 22:47:46 +00:00
for transaction in items:
2026-05-18 09:10:09 +00:00
cursor.execute(
"""
INSERT INTO rawtransactions (data, source)
VALUES (%s, %s)
ON CONFLICT (raw_sha256) DO NOTHING
""",
2026-05-18 22:47:46 +00:00
(json.dumps(transaction, default=str), "emoney")
2026-05-18 09:10:09 +00:00
)
2026-05-18 22:47:46 +00:00
self.dbconnection.commit()
def normalize_pending_data(self, source: str | None = None, limit: int | None = None):
normalizer = DataNormalizer(self.dbconnection)
transactions = normalizer.read_raw_transactions(source=source, limit=limit)
snapshots = normalizer.read_raw_snapshots(source=source, limit=limit)
normalizer.normalize_transactions(transactions)
normalizer.normalize_snapshots(snapshots)
def _read_emoney_cache(self) -> dict[str, object] | None:
if not self._use_emoney_cache():
return None
cache_path = self._emoney_cache_path()
if not cache_path.exists():
return None
with cache_path.open("r", encoding="utf-8") as handle:
return json.load(handle)
def _write_emoney_cache(self, payload: dict[str, object]) -> None:
if not self._use_emoney_cache():
return
cache_path = self._emoney_cache_path()
existing: dict[str, object] = {}
if cache_path.exists():
with cache_path.open("r", encoding="utf-8") as handle:
existing = json.load(handle)
existing.update(payload)
with cache_path.open("w", encoding="utf-8") as handle:
json.dump(existing, handle, default=str)
def _emoney_cache_path(self) -> Path:
raw_path = os.getenv("EMONEY_CACHE_PATH") or "emoney_cache.json"
return Path(raw_path)
def _use_emoney_cache(self) -> bool:
value = os.getenv("EMONEY_USE_CACHE", "false").strip().lower()
return value in {"1", "true", "yes"}
def _get_scraper(self) -> Scraper:
if self.scraper is None:
self.scraper = Scraper(pw, headless=False)
return self.scraper
def _sort_emoney_snapshots_oldest_first(self, accounts: list[dict[str, object]]) -> list[dict[str, object]]:
def key(item: dict[str, object]) -> tuple[bool, datetime]:
dt = self._parse_datetime(item.get("date"))
if dt is None:
return True, datetime.max
return False, dt
return sorted(accounts, key=key)
def _sort_transactions_oldest_first(self, items: list[dict[str, object]]) -> list[dict[str, object]]:
def key(item: dict[str, object]) -> tuple[bool, datetime]:
dt = self._parse_datetime(
item.get("date")
or item.get("datetime")
or item.get("timestamp")
or item.get("created_at")
or item.get("effective_date")
)
if dt is None:
return True, datetime.max
return False, dt
return sorted(items, key=key)
@staticmethod
def _parse_datetime(value: object) -> datetime | None:
if isinstance(value, datetime):
return value
if isinstance(value, date):
return datetime.combine(value, datetime.min.time())
if isinstance(value, (int, float)):
try:
return datetime.fromtimestamp(value)
except Exception:
return None
if isinstance(value, str):
text = value.strip()
if text.endswith("Z"):
text = f"{text[:-1]}+00:00"
try:
return datetime.fromisoformat(text)
except ValueError:
for fmt in ("%Y-%m-%d", "%d-%m-%Y", "%Y/%m/%d", "%d/%m/%Y"):
try:
return datetime.strptime(text, fmt)
except ValueError:
continue
return None
@staticmethod
def _require_env(name: str) -> str:
value = os.getenv(name)
if not value:
raise ValueError(f"Please set {name} in your environment.")
return value