AkahuSync/IngestionService/normalizer.py

431 lines
16 KiB
Python
Raw Permalink Normal View History

2026-05-18 22:47:46 +00:00
from __future__ import annotations
from datetime import date, datetime
from typing import Any, Iterable
class DataNormalizer:
def __init__(self, dbconnection):
self.dbconnection = dbconnection
self._akahu_account_cache = self._build_akahu_account_cache()
def read_raw_transactions(self, source: str | None = None, limit: int | None = None) -> list[dict[str, Any]]:
sql = "SELECT id, data, source FROM rawtransactions WHERE processed = FALSE"
params: list[Any] = []
if source:
sql += " AND source = %s"
params.append(source)
sql += " ORDER BY received_at ASC"
if limit is not None:
sql += " LIMIT %s"
params.append(limit)
with self.dbconnection.cursor() as cursor:
cursor.execute(sql, params)
rows = cursor.fetchall()
return [{"id": row[0], "data": row[1], "source": row[2]} for row in rows]
def read_raw_snapshots(self, source: str | None = None, limit: int | None = None) -> list[dict[str, Any]]:
sql = "SELECT id, data, source FROM rawsnapshots WHERE processed = FALSE"
params: list[Any] = []
if source:
sql += " AND source = %s"
params.append(source)
sql += " ORDER BY received_at ASC"
if limit is not None:
sql += " LIMIT %s"
params.append(limit)
with self.dbconnection.cursor() as cursor:
cursor.execute(sql, params)
rows = cursor.fetchall()
return [{"id": row[0], "data": row[1], "source": row[2]} for row in rows]
def normalize_transactions(self, records: Iterable[dict[str, Any]]) -> None:
for record in records:
normalized = self._normalize_transaction(record.get("data"), record.get("source"))
if not normalized:
continue
self._write_transaction(normalized)
self._mark_processed("rawtransactions", record["id"])
self.dbconnection.commit()
def normalize_snapshots(self, records: Iterable[dict[str, Any]]) -> None:
for record in records:
normalized = self._normalize_snapshot(record.get("data"), record.get("source"))
if not normalized:
continue
self._write_snapshot(normalized)
self._mark_processed("rawsnapshots", record["id"])
self.dbconnection.commit()
def _normalize_transaction(self, data: Any, source: str | None) -> dict[str, Any] | None:
if not isinstance(data, dict):
return None
if source == "emoney":
return self._normalize_emoney_transaction(data)
if source == "akahu":
return self._normalize_akahu_transaction(data)
return None
def _normalize_snapshot(self, data: Any, source: str | None) -> dict[str, Any] | None:
if not isinstance(data, dict):
return None
if source == "emoney":
return self._normalize_emoney_snapshot(data)
if source == "akahu":
return self._normalize_akahu_snapshot(data)
return None
def _normalize_emoney_transaction(self, data: dict[str, Any]) -> dict[str, Any] | None:
required = {"date", "description", "amount"}
if not required.issubset(data):
return None
parsed_date = self._parse_date(data.get("date"))
if not parsed_date:
return None
2026-05-31 23:18:27 +00:00
account_name = self._string_or_none(data.get("account_name")) or "Emoney"
account_num = self._string_or_none(data.get("account_num")) or "emoney"
org_name = self._string_or_none(data.get("org_name")) or "Emoney"
description = self._string_or_none(data.get("description")) or "unknown"
adjusted_amount = self._adjust_emoney_amount(description, data.get("amount"))
if adjusted_amount is None:
return None
2026-05-18 22:47:46 +00:00
return {
"datetime": parsed_date,
2026-05-31 23:18:27 +00:00
"description": description,
"amount": float(adjusted_amount),
"account_name": account_name,
"account_num": account_num,
"org_name": org_name,
2026-05-18 22:47:46 +00:00
"vendor_name": "Finance Now",
}
def _normalize_emoney_snapshot(self, data: dict[str, Any]) -> dict[str, Any] | None:
if "balance" not in data:
return None
parsed_date = self._parse_date(data.get("date")) or date.today()
2026-05-31 23:18:27 +00:00
account_name = self._string_or_none(data.get("account_name")) or "Emoney"
account_num = self._string_or_none(data.get("account_num")) or "emoney"
org_name = self._string_or_none(data.get("org_name")) or "Emoney"
2026-05-18 22:47:46 +00:00
return {
"datetime": parsed_date,
"balance": float(data.get("balance")),
2026-05-31 23:18:27 +00:00
"account_name": account_name,
"account_num": account_num,
"org_name": org_name,
2026-05-18 22:47:46 +00:00
}
def _normalize_akahu_transaction(self, data: dict[str, Any]) -> dict[str, Any] | None:
if "amount" not in data or "description" not in data:
return None
parsed_date = self._parse_date(data.get("date") or data.get("created_at"))
if not parsed_date:
return None
account_id = self._string_or_none(data.get("_account") or data.get("account_id") or data.get("account"))
account_meta = self._akahu_account_cache.get(account_id or "", {})
account_num = account_meta.get("account_num") or account_id
account_name = account_meta.get("account_name") or self._string_or_none(data.get("account_name") or data.get("name"))
org_name = account_meta.get("org_name") or "unknown"
merchant = data.get("merchant")
if isinstance(merchant, dict):
vendor_name = self._string_or_none(merchant.get("name"))
else:
vendor_name = self._string_or_none(merchant)
if not vendor_name:
vendor_name = self._string_or_none(data.get("payee"))
if not vendor_name:
vendor_name = self._string_or_none(data.get("description"))
description = self._format_akahu_description(data)
return {
"datetime": parsed_date,
"description": description,
"amount": float(data.get("amount")),
"account_name": account_name,
"account_num": account_num,
"org_name": org_name,
"vendor_name": vendor_name,
}
def _normalize_akahu_snapshot(self, data: dict[str, Any]) -> dict[str, Any] | None:
if "balance" not in data and "current_balance" not in data:
return None
parsed_date = self._parse_date(data.get("date") or data.get("updated_at")) or date.today()
balance_value = self._parse_balance_value(data.get("balance") or data.get("current_balance"))
if balance_value is None:
return None
account_num = self._string_or_none(data.get("formatted_account") or data.get("_id") or data.get("account_id"))
connection = data.get("connection") if isinstance(data.get("connection"), dict) else {}
org_name = self._string_or_none(connection.get("name")) or "unknown"
return {
"datetime": parsed_date,
"balance": balance_value,
"account_name": self._string_or_none(data.get("name") or data.get("account_name")),
"account_num": account_num,
"org_name": org_name,
}
def _build_akahu_account_cache(self) -> dict[str, dict[str, str]]:
cache: dict[str, dict[str, str]] = {}
with self.dbconnection.cursor() as cursor:
cursor.execute(
"SELECT data FROM rawsnapshots WHERE source = %s",
("akahu",)
)
rows = cursor.fetchall()
for (data,) in rows:
if not isinstance(data, dict):
continue
account_id = self._string_or_none(data.get("_id") or data.get("id"))
if not account_id:
continue
account_num = self._string_or_none(data.get("formatted_account") or data.get("_id") or data.get("account_id"))
account_name = self._string_or_none(data.get("name") or data.get("account_name"))
connection = data.get("connection") if isinstance(data.get("connection"), dict) else {}
org_name = self._string_or_none(connection.get("name")) or "unknown"
cache[account_id] = {
"account_num": account_num or account_id,
"account_name": account_name or "unknown",
"org_name": org_name,
}
return cache
def _format_akahu_description(self, data: dict[str, Any]) -> str:
description = self._string_or_none(data.get("description")) or "unknown"
meta = data.get("meta") if isinstance(data.get("meta"), dict) else {}
other_account = self._string_or_none(meta.get("other_account"))
reference = self._string_or_none(meta.get("reference"))
particulars = self._string_or_none(meta.get("particulars"))
code = self._string_or_none(meta.get("code"))
if "INTERNET XFR" in description:
target = other_account or reference or particulars or code
if target:
description = description.replace("INTERNET XFR", f"-> {target}")
meta_bits: list[str] = []
if reference:
meta_bits.append(f"ref={reference}")
if particulars:
meta_bits.append(f"particulars={particulars}")
if code:
meta_bits.append(f"code={code}")
if other_account:
meta_bits.append(f"other={other_account}")
if meta_bits:
description = f"{description} | " + " | ".join(meta_bits)
return description
def _write_transaction(self, normalized: dict[str, Any]) -> None:
org_id = self._get_or_create_org(normalized.get("org_name"))
account_id = self._get_or_create_account(
normalized.get("account_num"),
normalized.get("account_name"),
org_id,
)
vendor_id = None
vendor_name = normalized.get("vendor_name")
if vendor_name:
vendor_id = self._get_or_create_vendor(vendor_name, org_id)
with self.dbconnection.cursor() as cursor:
cursor.execute(
"""
INSERT INTO transactions (datetime, description, amount, accountid, orgid, vendorid)
VALUES (%s, %s, %s, %s, %s, %s)
""",
(
normalized.get("datetime"),
normalized.get("description"),
normalized.get("amount"),
account_id,
org_id,
vendor_id,
),
)
def _write_snapshot(self, normalized: dict[str, Any]) -> None:
org_id = self._get_or_create_org(normalized.get("org_name"))
account_id = self._get_or_create_account(
normalized.get("account_num"),
normalized.get("account_name"),
org_id,
)
with self.dbconnection.cursor() as cursor:
cursor.execute(
"""
INSERT INTO snapshots (datetime, accountid, balance, orgid)
VALUES (%s, %s, %s, %s)
""",
(
normalized.get("datetime"),
account_id,
normalized.get("balance"),
org_id,
),
)
def _mark_processed(self, table: str, record_id: int) -> None:
with self.dbconnection.cursor() as cursor:
cursor.execute(
f"UPDATE {table} SET processed = TRUE WHERE id = %s",
(record_id,),
)
def _get_or_create_org(self, org_name: str | None) -> int:
org_name = org_name or "unknown"
with self.dbconnection.cursor() as cursor:
cursor.execute("SELECT id FROM organizations WHERE orgname = %s", (org_name,))
row = cursor.fetchone()
if row:
return row[0]
cursor.execute(
"INSERT INTO organizations (orgname) VALUES (%s) RETURNING id",
(org_name,),
)
return cursor.fetchone()[0]
def _get_or_create_account(self, account_num: str | None, account_name: str | None, org_id: int) -> int:
account_name = account_name or "unknown"
account_num = account_num or f"{org_id}:{account_name}"
with self.dbconnection.cursor() as cursor:
cursor.execute("SELECT id FROM accounts WHERE accountnum = %s", (account_num,))
row = cursor.fetchone()
if row:
return row[0]
cursor.execute(
"""
INSERT INTO accounts (accountnum, accountname, orgid)
VALUES (%s, %s, %s)
RETURNING id
""",
(account_num, account_name, org_id),
)
return cursor.fetchone()[0]
def _get_or_create_vendor(self, vendor_name: str, org_id: int) -> int:
if vendor_name == "Finance Now":
with self.dbconnection.cursor() as cursor:
cursor.execute(
"SELECT id FROM vendors WHERE vendorname = %s",
(vendor_name,),
)
row = cursor.fetchone()
if row:
return row[0]
with self.dbconnection.cursor() as cursor:
cursor.execute(
"SELECT id FROM vendors WHERE vendorname = %s AND orgid = %s",
(vendor_name, org_id),
)
row = cursor.fetchone()
if row:
return row[0]
cursor.execute(
"INSERT INTO vendors (vendorname, orgid) VALUES (%s, %s) RETURNING id",
(vendor_name, org_id),
)
return cursor.fetchone()[0]
@staticmethod
def _parse_date(value: Any) -> date | None:
if isinstance(value, date) and not isinstance(value, datetime):
return value
if isinstance(value, datetime):
return value.date()
if isinstance(value, str):
for fmt in (
"%Y-%m-%d",
"%d-%m-%Y",
"%Y-%m-%dT%H:%M:%S%z",
"%Y-%m-%dT%H:%M:%SZ",
"%Y-%m-%dT%H:%M:%S.%fZ",
):
try:
return datetime.strptime(value, fmt).date()
except ValueError:
continue
return None
@staticmethod
def _string_or_none(value: Any) -> str | None:
if value is None:
return None
return str(value)
2026-05-31 23:18:27 +00:00
@staticmethod
def _adjust_emoney_amount(description: str, amount: Any) -> float | None:
try:
value = float(amount)
except (TypeError, ValueError):
return None
desc = description.strip().lower()
if "retry normal payment" in desc:
return None
positive_keywords = {
"normal payment",
}
negative_keywords = {
"debit interest",
"monthly fee",
"mbi",
"advance",
"insurance",
"late fee",
"establishment fee",
}
if any(keyword in desc for keyword in negative_keywords):
return -abs(value)
if any(keyword in desc for keyword in positive_keywords):
return abs(value)
return value
2026-05-18 22:47:46 +00:00
@staticmethod
def _parse_balance_value(value: Any) -> float | None:
if value is None:
return None
if isinstance(value, (int, float)):
return float(value)
if isinstance(value, str):
try:
return float(value)
except ValueError:
return None
if isinstance(value, dict):
for key in ("amount", "value", "balance"):
if key in value:
return DataNormalizer._parse_balance_value(value.get(key))
return None