This commit is contained in:
Jethro 2026-06-01 11:18:27 +12:00
parent 0433c6aa52
commit 317e4c0097
10 changed files with 819 additions and 40 deletions

View file

@ -5,6 +5,28 @@ import re
from datetime import date, datetime
class Scraper:
ACCOUNT_CONFIGS = [
{
"account_key": "loan",
"account_name": "emoney-loan",
"balance_xpath": "xpath=/html/body/form/div[3]/div[3]/div[2]/div[3]/div[5]/span[2]",
"link_xpath": "xpath=/html/body/form/div[3]/div[3]/div[2]/div[3]/div[1]/span[2]/a",
},
{
"account_key": "forrester",
"account_name": "emoney-forrester",
"balance_xpath": "xpath=/html/body/form/div[3]/div[3]/div[2]/div[1]/div[5]/span[2]",
"link_xpath": "xpath=/html/body/form/div[3]/div[3]/div[2]/div[1]/div[1]/span[2]/a",
},
{
"account_key": "swift",
"account_name": "emoney-swift",
"balance_xpath": "xpath=/html/body/form/div[3]/div[3]/div[2]/div[2]/div[5]/span[2]",
"link_xpath": "xpath=/html/body/form/div[3]/div[3]/div[2]/div[2]/div[1]/span[2]/a",
},
]
TRANSACTIONS_TABLE_XPATH = "xpath=/html/body/form/div[3]/div[3]/div[2]/div/div[2]/div[3]/table/tbody"
def __init__(self, playwright: Playwright, headless: bool = True):
load_env("EmoneyScraper")
self._require_env("SCRAPER_URL")
@ -20,28 +42,52 @@ class Scraper:
self.page.click("input#ctl00_ContentPlaceHolder1_btnLogin")
def get_balance(self):
current_balance = self.page.locator("xpath=/html/body/form/div[3]/div[3]/div[2]/div[3]/div[5]/span[2]").inner_text()
return current_balance
balances: dict[str, str] = {}
for account in self.ACCOUNT_CONFIGS:
balance_text = self.page.locator(account["balance_xpath"]).inner_text()
balances[account["account_key"]] = balance_text
return balances
def get_snapshot(self) -> dict[str, list[dict[str, object]]]:
balance_text = self.get_balance()
balance_value = self._parse_money(balance_text)
snapshot_date = date.today()
return {
"accounts": [
{
balances = self.get_balance()
accounts: list[dict[str, object]] = []
for account in self.ACCOUNT_CONFIGS:
balance_text = balances.get(account["account_key"], "")
if not balance_text:
continue
account_num = self.page.locator(account["link_xpath"]).inner_text().strip()
balance_value = self._parse_money(balance_text)
accounts.append({
"date": snapshot_date,
"balance": balance_value,
}
]
"account_name": account["account_name"],
"account_num": account_num,
"org_name": "Emoney",
})
return {
"accounts": accounts
}
def get_transactions(self):
self.page.click("xpath=/html/body/form/div[3]/div[3]/div[2]/div[3]/div[1]/span[2]/a")
transaction_body = self.page.locator("xpath=/html/body/form/div[3]/div[3]/div[2]/div/div[2]/div[3]/table/tbody").inner_text()
return transaction_body
raw_sets: list[dict[str, object]] = []
for account in self.ACCOUNT_CONFIGS:
account_num = self.page.locator(account["link_xpath"]).inner_text().strip()
self.page.click(account["link_xpath"])
self.page.wait_for_selector(self.TRANSACTIONS_TABLE_XPATH)
transaction_body = self.page.locator(self.TRANSACTIONS_TABLE_XPATH).inner_text()
raw_sets.append({
"account": {
**account,
"account_num": account_num,
},
"raw": transaction_body,
})
self.page.go_back()
self.page.wait_for_selector(account["link_xpath"])
return raw_sets
def parse_transactions(self, raw: str = None) -> list[dict[str, any]]:
def parse_transactions(self, raw: str | list[dict[str, object]] | None = None) -> list[dict[str, object]]:
"""
Parse raw transactions text into a list of dicts:
[{'date': date, 'description': str, 'amount': float, 'balance': float}, ...]
@ -50,11 +96,27 @@ class Scraper:
"""
if raw is None:
raw = self.get_transactions() or ""
if isinstance(raw, list):
combined: list[dict[str, object]] = []
for entry in raw:
if not isinstance(entry, dict):
continue
entry_raw = entry.get("raw") or ""
account_info = entry.get("account") if isinstance(entry.get("account"), dict) else None
combined.extend(self._parse_transactions_for_account(entry_raw, account_info))
return combined
return self._parse_transactions_for_account(raw)
def _parse_transactions_for_account(
self,
raw: str,
account_info: dict[str, object] | None = None,
) -> list[dict[str, object]]:
lines = [ln.strip() for ln in raw.splitlines() if ln.strip()]
pattern = re.compile(
r'^(?P<date>\d{2}-\d{2}-\d{4})\s+(?P<desc>.*?)\s+(?P<amount>[-\$\d,\.]+)\s+(?P<balance>[-\$\d,\.]+)\s*$'
)
parsed: list[dict[str, any]] = []
parsed: list[dict[str, object]] = []
for ln in lines:
m = pattern.match(ln)
if not m:
@ -71,17 +133,21 @@ class Scraper:
date_obj = datetime.strptime(m.group('date'), '%d-%m-%Y').date()
except Exception:
continue
parsed.append({
entry: dict[str, object] = {
'date': date_obj,
'description': desc,
'amount': amount,
'balance': balance,
})
}
if account_info:
entry['account_name'] = account_info.get('account_name')
entry['account_num'] = account_info.get('account_num')
entry['org_name'] = "Emoney"
parsed.append(entry)
return parsed
def get_transactions_parsed(self) -> list[dict[str, any]]:
raw = self.get_transactions()
return self.parse_transactions(raw)
def get_transactions_parsed(self) -> list[dict[str, object]]:
return self.parse_transactions()
def close(self):
self.browser.close()

0
FireflySync/__init__.py Normal file
View file

View file

@ -0,0 +1,79 @@
from __future__ import annotations
from typing import Any, Iterable
from urllib.parse import urljoin
import requests
class FireflyClient:
def __init__(self, base_url: str, api_token: str, page_limit: int = 200):
self.base_url = self._normalize_base_url(base_url)
self.page_limit = page_limit
self.session = requests.Session()
self.session.headers.update({
"Authorization": f"Bearer {api_token}",
"Accept": "application/json",
"Content-Type": "application/json",
})
def list_accounts(self, types: list[str] | None = None) -> list[dict[str, Any]]:
params: dict[str, Any] | None = None
if types:
params = {"type": ",".join(types)}
return list(self._list_all("accounts", params=params))
def get_account(self, account_id: str) -> dict[str, Any]:
return self._request("GET", f"accounts/{account_id}")
def create_account(self, payload: dict[str, Any]) -> dict[str, Any]:
return self._request("POST", "accounts", json=payload)
def update_account(self, account_id: str, payload: dict[str, Any]) -> dict[str, Any]:
return self._request("PUT", f"accounts/{account_id}", json=payload)
def list_categories(self) -> list[dict[str, Any]]:
return list(self._list_all("categories"))
def create_category(self, payload: dict[str, Any]) -> dict[str, Any]:
return self._request("POST", "categories", json=payload)
def create_transaction(self, payload: dict[str, Any]) -> dict[str, Any]:
return self._request("POST", "transactions", json=payload)
def search_transactions(self, query: str) -> dict[str, Any]:
return self._request("GET", "search/transactions", params={"query": query})
def _list_all(self, path: str, params: dict[str, Any] | None = None) -> Iterable[dict[str, Any]]:
page = 1
while True:
merged = {"page": page, "limit": self.page_limit}
if params:
merged.update(params)
payload = self._request("GET", path, params=merged)
data = payload.get("data") or []
if not data:
break
for item in data:
yield item
if len(data) < self.page_limit:
break
page += 1
def _request(self, method: str, path: str, params: dict[str, Any] | None = None, json: dict[str, Any] | None = None) -> dict[str, Any]:
url = urljoin(f"{self.base_url}/", path)
response = self.session.request(method, url, params=params, json=json, timeout=30)
if response.status_code >= 400:
detail = response.text.strip()
raise RuntimeError(f"Firefly API {method} {url} failed: {response.status_code} {detail}")
if not response.content:
return {}
return response.json()
@staticmethod
def _normalize_base_url(base_url: str) -> str:
base = base_url.rstrip("/")
if base.endswith("/api/v1"):
return base
if base.endswith("/api"):
return f"{base}/v1"
return f"{base}/api/v1"

513
FireflySync/firefly_sync.py Normal file
View file

@ -0,0 +1,513 @@
from __future__ import annotations
import json
import os
from datetime import date
from typing import Any
from config import load_env
from FireflySync.firefly_client import FireflyClient
class FireflySync:
ALLOWED_ACCOUNT_TYPES = {"asset", "liability"}
SKIPPED_ACCOUNT_NAMES = {"emoney"}
LIABILITY_ACCOUNT_NAMES = {
"emoney",
"emoney-loan",
"emoney-forrester",
"emoney-swift",
}
EMONEY_POSITIVE_KEYWORDS = {
"normal payment",
}
EMONEY_NEGATIVE_KEYWORDS = {
"debit interest",
"monthly fee",
"mbi",
"advance",
"insurance",
"late fee",
"establishment fee",
}
EMONEY_SKIP_KEYWORDS = {
"retry normal payment",
}
FORCED_ASSET_ACCOUNT_NAMES = {
"jethro's kiwisaver",
"jethro's investments",
"jethro's rainy day fund",
}
LIABILITY_INTEREST_RATES = {
"emoney-forrester": 9.9,
"emoney-swift": 13.95,
"emoney-loan": 29.9,
}
EXCLUDED_TRANSACTION_ACCOUNTS = {"emoney"}
def __init__(self, dbconnection):
load_env("FireflySync")
self.dbconnection = dbconnection
self.base_url = self._require_env("FIREFLY_BASE_URL")
self.api_token = self._require_env("FIREFLY_API_TOKEN")
self.currency_code = os.getenv("FIREFLY_CURRENCY_CODE", "NZD").strip() or "NZD"
self.default_account_type = os.getenv("FIREFLY_DEFAULT_ACCOUNT_TYPE", "asset").strip() or "asset"
self.default_account_role = os.getenv("FIREFLY_ASSET_ACCOUNT_ROLE", "defaultAsset").strip() or "defaultAsset"
self.default_liability_type = os.getenv("FIREFLY_LIABILITY_TYPE", "loan").strip() or "loan"
self.default_liability_direction = os.getenv("FIREFLY_LIABILITY_DIRECTION", "debit").strip() or "debit"
self.default_liability_interest_period = os.getenv("FIREFLY_LIABILITY_INTEREST_PERIOD", "yearly").strip() or "yearly"
self.default_liability_interest = os.getenv("FIREFLY_LIABILITY_INTEREST", "0").strip() or "0"
self.account_type_overrides = self._parse_json_env("FIREFLY_ACCOUNT_TYPE_OVERRIDES")
self.balance_accounts = self._parse_csv_env("FIREFLY_BALANCE_ACCOUNTS")
self.adjustment_account_name = os.getenv("FIREFLY_ADJUSTMENT_ACCOUNT_NAME", "Balance Adjustment").strip()
self.revenue_account_name = os.getenv("FIREFLY_REVENUE_ACCOUNT_NAME", "Income").strip() or "Income"
self.error_if_duplicate_hash = os.getenv("FIREFLY_ERROR_IF_DUPLICATE_HASH", "true").strip().lower() in {
"1",
"true",
"yes",
}
self.client = FireflyClient(self.base_url, self.api_token)
def sync(self) -> None:
categories = self._load_local_categories()
self._ensure_categories(categories)
firefly_accounts = self._ensure_accounts()
self._sync_transactions(firefly_accounts)
self._sync_balance_adjustments(firefly_accounts)
def _ensure_categories(self, categories: list[dict[str, Any]]) -> dict[str, str]:
existing = self._map_by_name(self.client.list_categories())
for category in categories:
name = category.get("name")
if not name or name in existing:
continue
created = self.client.create_category({"name": name})
created_name = self._extract_attribute(created, "name")
if created_name:
existing[created_name] = self._extract_id(created)
return existing
def _ensure_accounts(self) -> dict[str, str]:
existing = self._map_accounts_by_name(self.client.list_accounts())
for account in self._load_local_accounts():
name = account.get("name")
if not name:
continue
if name.strip().lower() in self.SKIPPED_ACCOUNT_NAMES:
continue
account_type = self._resolve_account_type(account)
existing_entry = existing.get(name)
if existing_entry:
if existing_entry.get("type") == account_type:
continue
if self._should_update_account_type(account, existing_entry):
payload = self._build_account_payload(account, account_type)
try:
self.client.update_account(existing_entry["id"], payload)
existing_entry["type"] = account_type
except RuntimeError:
pass
continue
payload = self._build_account_payload(account, account_type)
created = self.client.create_account(payload)
created_name = self._extract_attribute(created, "name")
if created_name:
existing[created_name] = {
"id": self._extract_id(created),
"type": account_type,
"account_number": account.get("account_num"),
}
return {
name: entry.get("id", "")
for name, entry in existing.items()
if entry.get("type") in self.ALLOWED_ACCOUNT_TYPES
}
def _sync_transactions(self, firefly_accounts: dict[str, str]) -> None:
rows = self._load_unsynced_transactions()
for row in rows:
account_name = row.get("account_name")
if not account_name:
continue
account_id = firefly_accounts.get(account_name)
if not account_id:
continue
amount = float(row.get("amount"))
if amount == 0:
continue
description = row.get("description") or "Transaction"
vendor_name = row.get("vendor_name") or description
txn_date = row.get("date")
if isinstance(txn_date, date):
date_text = txn_date.isoformat()
else:
date_text = str(txn_date)
liability_handled = self._apply_emoney_liability_rules(
account_name,
description,
amount,
account_id,
vendor_name,
)
if liability_handled is None:
continue
amount_value, split = liability_handled
external_id = f"local-transaction:{row['id']}"
split.update({
"date": date_text,
"amount": f"{abs(amount_value):.2f}",
"description": description,
"external_id": external_id,
})
category_name = row.get("category_name")
if category_name:
split["category_name"] = category_name
if vendor_name.strip().lower() == "feijoa":
split["category_name"] = "Savings"
if "source_id" not in split and "destination_id" not in split:
if amount_value < 0:
split["source_id"] = account_id
split["destination_name"] = vendor_name
else:
split["source_name"] = self.revenue_account_name
split["destination_id"] = account_id
if vendor_name and vendor_name != self.revenue_account_name:
split["notes"] = f"Original source: {vendor_name}"
payload = {
"error_if_duplicate_hash": self.error_if_duplicate_hash,
"apply_rules": False,
"fire_webhooks": False,
"transactions": [split],
}
try:
created = self.client.create_transaction(payload)
firefly_id = self._extract_id(created)
except RuntimeError:
firefly_id = self._find_existing_transaction_id(external_id)
if not firefly_id:
continue
self._record_transaction_sync(row["id"], firefly_id, external_id)
def _sync_balance_adjustments(self, firefly_accounts: dict[str, str]) -> None:
if not self.balance_accounts:
return
for account in self._load_balance_accounts():
account_name = account.get("account_name")
account_id = firefly_accounts.get(account_name)
if not account_id:
continue
latest_snapshot = self._latest_snapshot_for_account(account["account_id"])
if not latest_snapshot:
continue
snapshot_date, snapshot_balance = latest_snapshot
if self._balance_adjustment_exists(account["account_id"], snapshot_date):
continue
firefly_balance = self._fetch_firefly_balance(account_id)
if firefly_balance is None:
continue
delta = snapshot_balance - firefly_balance
if abs(delta) < 0.01:
continue
external_id = f"balance-adjust:{account['account_id']}:{snapshot_date.isoformat()}"
split: dict[str, Any] = {
"type": "reconciliation",
"date": snapshot_date.isoformat(),
"amount": f"{abs(delta):.2f}",
"description": f"Balance adjustment ({snapshot_date.isoformat()})",
"external_id": external_id,
}
if delta > 0:
split["source_name"] = self.adjustment_account_name
split["destination_id"] = account_id
else:
split["source_id"] = account_id
split["destination_name"] = self.adjustment_account_name
payload = {
"error_if_duplicate_hash": self.error_if_duplicate_hash,
"apply_rules": False,
"fire_webhooks": False,
"transactions": [split],
}
try:
created = self.client.create_transaction(payload)
firefly_id = self._extract_id(created)
except RuntimeError:
firefly_id = self._find_existing_transaction_id(external_id)
if not firefly_id:
continue
self._record_balance_adjustment(account["account_id"], snapshot_date, firefly_id, external_id)
def _load_local_accounts(self) -> list[dict[str, Any]]:
sql = "SELECT id, accountname, accountnum FROM accounts ORDER BY accountname"
with self.dbconnection.cursor() as cursor:
cursor.execute(sql)
rows = cursor.fetchall()
return [
{
"id": row[0],
"name": row[1],
"account_num": row[2],
}
for row in rows
]
def _load_local_categories(self) -> list[dict[str, Any]]:
sql = "SELECT id, categoryname FROM categories ORDER BY categoryname"
with self.dbconnection.cursor() as cursor:
cursor.execute(sql)
rows = cursor.fetchall()
return [{"id": row[0], "name": row[1]} for row in rows]
def _load_unsynced_transactions(self) -> list[dict[str, Any]]:
excluded = [name.lower() for name in self.EXCLUDED_TRANSACTION_ACCOUNTS]
sql = """
SELECT t.id,
t.datetime,
t.description,
t.amount,
a.accountname,
v.vendorname,
c.categoryname
FROM transactions t
JOIN accounts a ON t.accountid = a.id
LEFT JOIN vendors v ON t.vendorid = v.id
LEFT JOIN categories c ON t.categoryid = c.id
LEFT JOIN firefly_transaction_sync fts ON fts.transactionid = t.id
WHERE fts.transactionid IS NULL
AND NOT (lower(a.accountname) = ANY(%s))
ORDER BY t.datetime ASC
"""
with self.dbconnection.cursor() as cursor:
cursor.execute(sql, (excluded,))
rows = cursor.fetchall()
return [
{
"id": row[0],
"date": row[1],
"description": row[2],
"amount": row[3],
"account_name": row[4],
"vendor_name": row[5],
"category_name": row[6],
}
for row in rows
]
def _load_balance_accounts(self) -> list[dict[str, Any]]:
if not self.balance_accounts:
return []
names = tuple(self.balance_accounts)
sql = """
SELECT id, accountname
FROM accounts
WHERE accountname = ANY(%s)
"""
with self.dbconnection.cursor() as cursor:
cursor.execute(sql, (list(names),))
rows = cursor.fetchall()
return [
{
"account_id": row[0],
"account_name": row[1],
}
for row in rows
]
def _latest_snapshot_for_account(self, account_id: int) -> tuple[date, float] | None:
sql = """
SELECT datetime, balance
FROM snapshots
WHERE accountid = %s
ORDER BY datetime DESC
LIMIT 1
"""
with self.dbconnection.cursor() as cursor:
cursor.execute(sql, (account_id,))
row = cursor.fetchone()
if not row:
return None
return row[0], float(row[1])
def _balance_adjustment_exists(self, account_id: int, snapshot_date: date) -> bool:
sql = """
SELECT 1
FROM firefly_balance_adjustments
WHERE accountid = %s AND snapshot_date = %s
"""
with self.dbconnection.cursor() as cursor:
cursor.execute(sql, (account_id, snapshot_date))
return cursor.fetchone() is not None
def _record_transaction_sync(self, transaction_id: int, firefly_id: str | None, external_id: str) -> None:
sql = """
INSERT INTO firefly_transaction_sync (transactionid, firefly_transaction_id, external_id)
VALUES (%s, %s, %s)
ON CONFLICT (transactionid) DO NOTHING
"""
with self.dbconnection.cursor() as cursor:
cursor.execute(sql, (transaction_id, firefly_id, external_id))
self.dbconnection.commit()
def _record_balance_adjustment(self, account_id: int, snapshot_date: date, firefly_id: str | None, external_id: str) -> None:
sql = """
INSERT INTO firefly_balance_adjustments (accountid, snapshot_date, firefly_transaction_id, external_id)
VALUES (%s, %s, %s, %s)
ON CONFLICT (accountid, snapshot_date) DO NOTHING
"""
with self.dbconnection.cursor() as cursor:
cursor.execute(sql, (account_id, snapshot_date, firefly_id, external_id))
self.dbconnection.commit()
def _fetch_firefly_balance(self, account_id: str) -> float | None:
payload = self.client.get_account(account_id)
attributes = (payload.get("data") or {}).get("attributes") or {}
balance_text = attributes.get("current_balance")
if balance_text is None:
return None
try:
return float(balance_text)
except ValueError:
return None
def _find_existing_transaction_id(self, external_id: str) -> str | None:
query = f'external_id:"{external_id}"'
try:
payload = self.client.search_transactions(query)
except RuntimeError:
return None
data = payload.get("data") or []
if not data:
return None
return self._extract_id({"data": data[0]})
def _resolve_account_type(self, account: dict[str, Any]) -> str:
overrides = self.account_type_overrides
account_name = account.get("name")
account_num = account.get("account_num")
override = None
if account_name and account_name.strip().lower() in self.LIABILITY_ACCOUNT_NAMES:
return "liability"
if account_name and account_name in overrides:
override = overrides.get(account_name)
if account_num and account_num in overrides:
override = overrides.get(account_num)
selected = (override or self.default_account_type).strip().lower()
if selected not in self.ALLOWED_ACCOUNT_TYPES:
return "asset"
return selected
def _build_account_payload(self, account: dict[str, Any], account_type: str) -> dict[str, Any]:
payload = {
"name": account.get("name"),
"type": account_type,
"account_number": account.get("account_num"),
"currency_code": self.currency_code,
"include_net_worth": True,
}
if account_type == "liability":
account_name = str(account.get("name") or "").strip().lower()
interest_rate = self.LIABILITY_INTEREST_RATES.get(account_name)
payload["liability_type"] = self.default_liability_type
payload["liability_direction"] = self.default_liability_direction
payload["interest_period"] = self.default_liability_interest_period
if interest_rate is None:
payload["interest"] = self.default_liability_interest
else:
payload["interest"] = f"{interest_rate}"
if account_type == "asset":
payload["account_role"] = self.default_account_role
return payload
def _should_update_account_type(self, account: dict[str, Any], existing: dict[str, Any]) -> bool:
existing_type = existing.get("type")
if existing_type in self.ALLOWED_ACCOUNT_TYPES:
return False
existing_number = existing.get("account_number")
account_number = account.get("account_num")
if existing_number and account_number and str(existing_number) != str(account_number):
account_name = str(account.get("name") or "").strip().lower()
if account_name in self.LIABILITY_ACCOUNT_NAMES:
return True
if account_name in self.FORCED_ASSET_ACCOUNT_NAMES:
return True
return False
return True
@staticmethod
def _map_accounts_by_name(items: list[dict[str, Any]]) -> dict[str, dict[str, Any]]:
mapping: dict[str, dict[str, Any]] = {}
for item in items:
payload = {"data": item}
name = FireflySync._extract_attribute(payload, "name")
if not name:
continue
mapping[name] = {
"id": FireflySync._extract_id(payload),
"type": FireflySync._extract_attribute(payload, "type"),
"account_number": FireflySync._extract_attribute(payload, "account_number"),
}
return mapping
@staticmethod
def _map_by_name(items: list[dict[str, Any]]) -> dict[str, str]:
mapping: dict[str, str] = {}
for item in items:
name = FireflySync._extract_attribute({"data": item}, "name")
if not name:
continue
mapping[name] = FireflySync._extract_id({"data": item})
return mapping
@staticmethod
def _extract_id(payload: dict[str, Any]) -> str:
data = payload.get("data") or {}
value = data.get("id")
if value is None:
return ""
return str(value)
@staticmethod
def _extract_attribute(payload: dict[str, Any], key: str) -> str | None:
data = payload.get("data") or {}
attributes = data.get("attributes") or {}
value = attributes.get(key)
if value is None:
return None
return str(value)
@staticmethod
def _parse_json_env(name: str) -> dict[str, str]:
raw = os.getenv(name)
if not raw:
return {}
try:
parsed = json.loads(raw)
except json.JSONDecodeError:
return {}
if not isinstance(parsed, dict):
return {}
return {str(k): str(v) for k, v in parsed.items()}
@staticmethod
def _parse_csv_env(name: str) -> list[str]:
raw = os.getenv(name, "")
if not raw:
return []
return [item.strip() for item in raw.split(",") if item.strip()]
@staticmethod
def _require_env(name: str) -> str:
value = os.getenv(name)
if not value:
raise ValueError(f"Please set {name} in your environment.")
return value

View file

@ -5,6 +5,7 @@ from config import load_env
import os
import json
import psycopg
from psycopg.errors import UniqueViolation
from pathlib import Path
from datetime import datetime, date
from playwright.sync_api import sync_playwright, Playwright
@ -149,10 +150,13 @@ class Ingester:
accounts = self._sort_emoney_snapshots_oldest_first(data.get("accounts", []))
with self.dbconnection.cursor() as cursor:
for account in accounts:
account_num = self._string_or_none(account.get("account_num")) or "emoney"
account_name = self._string_or_none(account.get("account_name")) or "Emoney"
org_name = self._string_or_none(account.get("org_name")) or "Emoney"
self._record_sync(
account_num="emoney",
account_name="Emoney",
org_name="Emoney",
account_num=account_num,
account_name=account_name,
org_name=org_name,
)
cursor.execute(
"""
@ -279,6 +283,15 @@ class Ingester:
if row:
return row[0]
cursor.execute(
"SELECT id FROM accounts WHERE accountname = %s",
(account_name,),
)
row = cursor.fetchone()
if row:
return row[0]
try:
cursor.execute(
"""
INSERT INTO accounts (accountnum, accountname, orgid)
@ -288,6 +301,15 @@ class Ingester:
(account_num, account_name, org_id),
)
return cursor.fetchone()[0]
except UniqueViolation:
cursor.execute(
"SELECT id FROM accounts WHERE accountname = %s",
(account_name,),
)
row = cursor.fetchone()
if row:
return row[0]
raise
@staticmethod
def _string_or_none(value: object) -> str | None:

View file

@ -94,13 +94,22 @@ class DataNormalizer:
if not parsed_date:
return None
account_name = self._string_or_none(data.get("account_name")) or "Emoney"
account_num = self._string_or_none(data.get("account_num")) or "emoney"
org_name = self._string_or_none(data.get("org_name")) or "Emoney"
description = self._string_or_none(data.get("description")) or "unknown"
adjusted_amount = self._adjust_emoney_amount(description, data.get("amount"))
if adjusted_amount is None:
return None
return {
"datetime": parsed_date,
"description": data.get("description"),
"amount": float(data.get("amount")),
"account_name": "Emoney",
"account_num": "emoney",
"org_name": "Emoney",
"description": description,
"amount": float(adjusted_amount),
"account_name": account_name,
"account_num": account_num,
"org_name": org_name,
"vendor_name": "Finance Now",
}
@ -109,12 +118,15 @@ class DataNormalizer:
return None
parsed_date = self._parse_date(data.get("date")) or date.today()
account_name = self._string_or_none(data.get("account_name")) or "Emoney"
account_num = self._string_or_none(data.get("account_num")) or "emoney"
org_name = self._string_or_none(data.get("org_name")) or "Emoney"
return {
"datetime": parsed_date,
"balance": float(data.get("balance")),
"account_name": "Emoney",
"account_num": "emoney",
"org_name": "Emoney",
"account_name": account_name,
"account_num": account_num,
"org_name": org_name,
}
def _normalize_akahu_transaction(self, data: dict[str, Any]) -> dict[str, Any] | None:
@ -370,6 +382,36 @@ class DataNormalizer:
return None
return str(value)
@staticmethod
def _adjust_emoney_amount(description: str, amount: Any) -> float | None:
try:
value = float(amount)
except (TypeError, ValueError):
return None
desc = description.strip().lower()
if "retry normal payment" in desc:
return None
positive_keywords = {
"normal payment",
}
negative_keywords = {
"debit interest",
"monthly fee",
"mbi",
"advance",
"insurance",
"late fee",
"establishment fee",
}
if any(keyword in desc for keyword in negative_keywords):
return -abs(value)
if any(keyword in desc for keyword in positive_keywords):
return abs(value)
return value
@staticmethod
def _parse_balance_value(value: Any) -> float | None:
if value is None:

View file

@ -16,6 +16,7 @@ These env vars let you split Akahu vs Emoney runs (useful for separate k8s cron
- RUN_AKAHU (default true)
- RUN_EMONEY (default true)
- RUN_NORMALIZE (default true)
- RUN_FIREFLY (default false)
Example:
```
@ -37,6 +38,8 @@ Required:
- SCRAPER_URL
- SCRAPER_USERNAME
- SCRAPER_PASSWORD
- FIREFLY_BASE_URL
- FIREFLY_API_TOKEN
Optional:
- EMONEY_USE_CACHE (true/false, default false)
@ -44,6 +47,19 @@ Optional:
- RUN_AKAHU (default true)
- RUN_EMONEY (default true)
- RUN_NORMALIZE (default true)
- RUN_FIREFLY (default false)
- FIREFLY_CURRENCY_CODE (default NZD)
- FIREFLY_DEFAULT_ACCOUNT_TYPE (default asset)
- FIREFLY_LIABILITY_TYPE (default loan)
- FIREFLY_LIABILITY_DIRECTION (default debit)
- FIREFLY_LIABILITY_INTEREST_PERIOD (default yearly)
- FIREFLY_LIABILITY_INTEREST (default 0)
- FIREFLY_ASSET_ACCOUNT_ROLE (default defaultAsset)
- FIREFLY_ACCOUNT_TYPE_OVERRIDES (JSON mapping of account name/number to Firefly account type)
- FIREFLY_BALANCE_ACCOUNTS (comma-separated account names to adjust by balance)
- FIREFLY_ADJUSTMENT_ACCOUNT_NAME (default Balance Adjustment)
- FIREFLY_REVENUE_ACCOUNT_NAME (default Income)
- FIREFLY_ERROR_IF_DUPLICATE_HASH (default true)
## Build and test the Docker image
Build:

File diff suppressed because one or more lines are too long

View file

@ -1,4 +1,5 @@
from IngestionService.ingester import Ingester
from FireflySync.firefly_sync import FireflySync
from config import load_env
import os
@ -11,6 +12,7 @@ if __name__ == "__main__":
run_akahu = os.getenv("RUN_AKAHU", "true").strip().lower() in {"1", "true", "yes"}
run_emoney = os.getenv("RUN_EMONEY", "true").strip().lower() in {"1", "true", "yes"}
run_normalize = os.getenv("RUN_NORMALIZE", "true").strip().lower() in {"1", "true", "yes"}
run_firefly = os.getenv("RUN_FIREFLY", "false").strip().lower() in {"1", "true", "yes"}
if run_akahu:
akahu_accounts = ingester.fetch_akahu_snapshot_data()
@ -33,3 +35,7 @@ if __name__ == "__main__":
if run_normalize:
ingester.normalize_pending_data()
if run_firefly:
firefly = FireflySync(ingester.dbconnection)
firefly.sync()

View file

@ -32,6 +32,11 @@ vendorname VARCHAR NOT NULL,
orgid INT REFERENCES organizations(id) ON DELETE RESTRICT
);
CREATE TABLE IF NOT EXISTS categories(
id SERIAL PRIMARY KEY,
categoryname VARCHAR(100) UNIQUE NOT NULL
);
-- how can I normalize this further? going to end up with lots of nulls in the transactions table, but not sure if it's worth it to have separate tables for each transaction type (e.g. fund transactions, card transactions, etc.)
CREATE TABLE IF NOT EXISTS transactions(
id SERIAL PRIMARY KEY,
@ -40,9 +45,13 @@ description VARCHAR NOT NULL,
amount REAL NOT NULL,
accountid INT REFERENCES accounts(id) ON DELETE RESTRICT,
orgid INT REFERENCES organizations(id) ON DELETE RESTRICT,
vendorid INT REFERENCES vendors(id) ON DELETE RESTRICT
vendorid INT REFERENCES vendors(id) ON DELETE RESTRICT,
categoryid INT REFERENCES categories(id) ON DELETE RESTRICT
);
ALTER TABLE transactions
ADD COLUMN IF NOT EXISTS categoryid INT REFERENCES categories(id) ON DELETE RESTRICT;
CREATE TABLE IF NOT EXISTS snapshots(
id SERIAL PRIMARY KEY,
datetime DATE NOT NULL,
@ -80,6 +89,24 @@ raw_sha256 CHAR(64) UNIQUE,
processed BOOLEAN NOT NULL DEFAULT FALSE
);
CREATE TABLE IF NOT EXISTS firefly_transaction_sync(
id SERIAL PRIMARY KEY,
transactionid INT UNIQUE REFERENCES transactions(id) ON DELETE CASCADE,
firefly_transaction_id VARCHAR(100),
external_id VARCHAR(120) UNIQUE,
synced_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE TABLE IF NOT EXISTS firefly_balance_adjustments(
id SERIAL PRIMARY KEY,
accountid INT REFERENCES accounts(id) ON DELETE CASCADE,
snapshot_date DATE NOT NULL,
firefly_transaction_id VARCHAR(100),
external_id VARCHAR(120) UNIQUE,
synced_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE (accountid, snapshot_date)
);
CREATE OR REPLACE FUNCTION set_raw_sha256()
RETURNS TRIGGER AS $$
BEGIN
@ -143,6 +170,7 @@ CREATE INDEX IF NOT EXISTS vendors_orgid_idx ON vendors (orgid);
CREATE INDEX IF NOT EXISTS transactions_accountid_idx ON transactions (accountid);
CREATE INDEX IF NOT EXISTS transactions_orgid_idx ON transactions (orgid);
CREATE INDEX IF NOT EXISTS transactions_vendorid_idx ON transactions (vendorid);
CREATE INDEX IF NOT EXISTS transactions_categoryid_idx ON transactions (categoryid);
CREATE INDEX IF NOT EXISTS transactions_account_datetime_idx ON transactions (accountid, datetime);
CREATE INDEX IF NOT EXISTS transactions_org_datetime_idx ON transactions (orgid, datetime);
@ -209,6 +237,13 @@ ON CONFLICT DO NOTHING;
INSERT INTO vendors (vendorname, orgid)
SELECT 'PAK''nSAVE', (SELECT id FROM organizations WHERE orgname = 'BNZ')
WHERE NOT EXISTS (SELECT 1 FROM vendors WHERE vendorname = 'PAK''nSAVE' AND orgid = (SELECT id FROM organizations WHERE orgname = 'BNZ'));
INSERT INTO categories (categoryname)
VALUES
('Groceries'),
('Debt'),
('Fuel/Transport'),
('Entertainment')
ON CONFLICT (categoryname) DO NOTHING;
INSERT INTO vendors (vendorname, orgid)
SELECT 'PAK''nSAVE Fuel', (SELECT id FROM organizations WHERE orgname = 'BNZ')
WHERE NOT EXISTS (SELECT 1 FROM vendors WHERE vendorname = 'PAK''nSAVE Fuel' AND orgid = (SELECT id FROM organizations WHERE orgname = 'BNZ'));