from __future__ import annotations import json import os from datetime import date from typing import Any from config import load_env from FireflySync.firefly_client import FireflyClient class FireflySync: ALLOWED_ACCOUNT_TYPES = {"asset", "liability"} SKIPPED_ACCOUNT_NAMES = {"emoney"} LIABILITY_ACCOUNT_NAMES = { "emoney", "emoney-loan", "emoney-forrester", "emoney-swift", } EMONEY_POSITIVE_KEYWORDS = { "normal payment", } EMONEY_NEGATIVE_KEYWORDS = { "debit interest", "monthly fee", "mbi", "advance", "insurance", "late fee", "establishment fee", } EMONEY_SKIP_KEYWORDS = { "retry normal payment", } FORCED_ASSET_ACCOUNT_NAMES = { "jethro's kiwisaver", "jethro's investments", "jethro's rainy day fund", } LIABILITY_INTEREST_RATES = { "emoney-forrester": 9.9, "emoney-swift": 13.95, "emoney-loan": 29.9, } EXCLUDED_TRANSACTION_ACCOUNTS = {"emoney"} def __init__(self, dbconnection): load_env("FireflySync") self.dbconnection = dbconnection self.base_url = self._require_env("FIREFLY_BASE_URL") self.api_token = self._require_env("FIREFLY_API_TOKEN") self.currency_code = os.getenv("FIREFLY_CURRENCY_CODE", "NZD").strip() or "NZD" self.default_account_type = os.getenv("FIREFLY_DEFAULT_ACCOUNT_TYPE", "asset").strip() or "asset" self.default_account_role = os.getenv("FIREFLY_ASSET_ACCOUNT_ROLE", "defaultAsset").strip() or "defaultAsset" self.default_liability_type = os.getenv("FIREFLY_LIABILITY_TYPE", "loan").strip() or "loan" self.default_liability_direction = os.getenv("FIREFLY_LIABILITY_DIRECTION", "debit").strip() or "debit" self.default_liability_interest_period = os.getenv("FIREFLY_LIABILITY_INTEREST_PERIOD", "yearly").strip() or "yearly" self.default_liability_interest = os.getenv("FIREFLY_LIABILITY_INTEREST", "0").strip() or "0" self.account_type_overrides = self._parse_json_env("FIREFLY_ACCOUNT_TYPE_OVERRIDES") self.balance_accounts = self._parse_csv_env("FIREFLY_BALANCE_ACCOUNTS") self.adjustment_account_name = os.getenv("FIREFLY_ADJUSTMENT_ACCOUNT_NAME", "Balance Adjustment").strip() self.revenue_account_name = os.getenv("FIREFLY_REVENUE_ACCOUNT_NAME", "Income").strip() or "Income" self.error_if_duplicate_hash = os.getenv("FIREFLY_ERROR_IF_DUPLICATE_HASH", "true").strip().lower() in { "1", "true", "yes", } self.client = FireflyClient(self.base_url, self.api_token) def sync(self) -> None: categories = self._load_local_categories() self._ensure_categories(categories) firefly_accounts = self._ensure_accounts() self._sync_transactions(firefly_accounts) self._sync_balance_adjustments(firefly_accounts) def _ensure_categories(self, categories: list[dict[str, Any]]) -> dict[str, str]: existing = self._map_by_name(self.client.list_categories()) for category in categories: name = category.get("name") if not name or name in existing: continue created = self.client.create_category({"name": name}) created_name = self._extract_attribute(created, "name") if created_name: existing[created_name] = self._extract_id(created) return existing def _ensure_accounts(self) -> dict[str, str]: existing = self._map_accounts_by_name(self.client.list_accounts()) for account in self._load_local_accounts(): name = account.get("name") if not name: continue if name.strip().lower() in self.SKIPPED_ACCOUNT_NAMES: continue account_type = self._resolve_account_type(account) existing_entry = existing.get(name) if existing_entry: if existing_entry.get("type") == account_type: continue if self._should_update_account_type(account, existing_entry): payload = self._build_account_payload(account, account_type) try: self.client.update_account(existing_entry["id"], payload) existing_entry["type"] = account_type except RuntimeError: pass continue payload = self._build_account_payload(account, account_type) created = self.client.create_account(payload) created_name = self._extract_attribute(created, "name") if created_name: existing[created_name] = { "id": self._extract_id(created), "type": account_type, "account_number": account.get("account_num"), } return { name: entry.get("id", "") for name, entry in existing.items() if entry.get("type") in self.ALLOWED_ACCOUNT_TYPES } def _sync_transactions(self, firefly_accounts: dict[str, str]) -> None: rows = self._load_unsynced_transactions() for row in rows: account_name = row.get("account_name") if not account_name: continue account_id = firefly_accounts.get(account_name) if not account_id: continue amount = float(row.get("amount")) if amount == 0: continue description = row.get("description") or "Transaction" vendor_name = row.get("vendor_name") or description txn_date = row.get("date") if isinstance(txn_date, date): date_text = txn_date.isoformat() else: date_text = str(txn_date) liability_handled = self._apply_emoney_liability_rules( account_name, description, amount, account_id, vendor_name, ) if liability_handled is None: continue amount_value, split = liability_handled external_id = f"local-transaction:{row['id']}" split.update({ "date": date_text, "amount": f"{abs(amount_value):.2f}", "description": description, "external_id": external_id, }) category_name = row.get("category_name") if category_name: split["category_name"] = category_name if vendor_name.strip().lower() == "feijoa": split["category_name"] = "Savings" if "source_id" not in split and "destination_id" not in split: if amount_value < 0: split["source_id"] = account_id split["destination_name"] = vendor_name else: split["source_name"] = self.revenue_account_name split["destination_id"] = account_id if vendor_name and vendor_name != self.revenue_account_name: split["notes"] = f"Original source: {vendor_name}" payload = { "error_if_duplicate_hash": self.error_if_duplicate_hash, "apply_rules": False, "fire_webhooks": False, "transactions": [split], } try: created = self.client.create_transaction(payload) firefly_id = self._extract_id(created) except RuntimeError: firefly_id = self._find_existing_transaction_id(external_id) if not firefly_id: continue self._record_transaction_sync(row["id"], firefly_id, external_id) def _sync_balance_adjustments(self, firefly_accounts: dict[str, str]) -> None: if not self.balance_accounts: return for account in self._load_balance_accounts(): account_name = account.get("account_name") account_id = firefly_accounts.get(account_name) if not account_id: continue latest_snapshot = self._latest_snapshot_for_account(account["account_id"]) if not latest_snapshot: continue snapshot_date, snapshot_balance = latest_snapshot if self._balance_adjustment_exists(account["account_id"], snapshot_date): continue firefly_balance = self._fetch_firefly_balance(account_id) if firefly_balance is None: continue delta = snapshot_balance - firefly_balance if abs(delta) < 0.01: continue external_id = f"balance-adjust:{account['account_id']}:{snapshot_date.isoformat()}" split: dict[str, Any] = { "type": "reconciliation", "date": snapshot_date.isoformat(), "amount": f"{abs(delta):.2f}", "description": f"Balance adjustment ({snapshot_date.isoformat()})", "external_id": external_id, } if delta > 0: split["source_name"] = self.adjustment_account_name split["destination_id"] = account_id else: split["source_id"] = account_id split["destination_name"] = self.adjustment_account_name payload = { "error_if_duplicate_hash": self.error_if_duplicate_hash, "apply_rules": False, "fire_webhooks": False, "transactions": [split], } try: created = self.client.create_transaction(payload) firefly_id = self._extract_id(created) except RuntimeError: firefly_id = self._find_existing_transaction_id(external_id) if not firefly_id: continue self._record_balance_adjustment(account["account_id"], snapshot_date, firefly_id, external_id) def _load_local_accounts(self) -> list[dict[str, Any]]: sql = "SELECT id, accountname, accountnum FROM accounts ORDER BY accountname" with self.dbconnection.cursor() as cursor: cursor.execute(sql) rows = cursor.fetchall() return [ { "id": row[0], "name": row[1], "account_num": row[2], } for row in rows ] def _load_local_categories(self) -> list[dict[str, Any]]: sql = "SELECT id, categoryname FROM categories ORDER BY categoryname" with self.dbconnection.cursor() as cursor: cursor.execute(sql) rows = cursor.fetchall() return [{"id": row[0], "name": row[1]} for row in rows] def _load_unsynced_transactions(self) -> list[dict[str, Any]]: excluded = [name.lower() for name in self.EXCLUDED_TRANSACTION_ACCOUNTS] sql = """ SELECT t.id, t.datetime, t.description, t.amount, a.accountname, v.vendorname, c.categoryname FROM transactions t JOIN accounts a ON t.accountid = a.id LEFT JOIN vendors v ON t.vendorid = v.id LEFT JOIN categories c ON t.categoryid = c.id LEFT JOIN firefly_transaction_sync fts ON fts.transactionid = t.id WHERE fts.transactionid IS NULL AND NOT (lower(a.accountname) = ANY(%s)) ORDER BY t.datetime ASC """ with self.dbconnection.cursor() as cursor: cursor.execute(sql, (excluded,)) rows = cursor.fetchall() return [ { "id": row[0], "date": row[1], "description": row[2], "amount": row[3], "account_name": row[4], "vendor_name": row[5], "category_name": row[6], } for row in rows ] def _load_balance_accounts(self) -> list[dict[str, Any]]: if not self.balance_accounts: return [] names = tuple(self.balance_accounts) sql = """ SELECT id, accountname FROM accounts WHERE accountname = ANY(%s) """ with self.dbconnection.cursor() as cursor: cursor.execute(sql, (list(names),)) rows = cursor.fetchall() return [ { "account_id": row[0], "account_name": row[1], } for row in rows ] def _latest_snapshot_for_account(self, account_id: int) -> tuple[date, float] | None: sql = """ SELECT datetime, balance FROM snapshots WHERE accountid = %s ORDER BY datetime DESC LIMIT 1 """ with self.dbconnection.cursor() as cursor: cursor.execute(sql, (account_id,)) row = cursor.fetchone() if not row: return None return row[0], float(row[1]) def _balance_adjustment_exists(self, account_id: int, snapshot_date: date) -> bool: sql = """ SELECT 1 FROM firefly_balance_adjustments WHERE accountid = %s AND snapshot_date = %s """ with self.dbconnection.cursor() as cursor: cursor.execute(sql, (account_id, snapshot_date)) return cursor.fetchone() is not None def _record_transaction_sync(self, transaction_id: int, firefly_id: str | None, external_id: str) -> None: sql = """ INSERT INTO firefly_transaction_sync (transactionid, firefly_transaction_id, external_id) VALUES (%s, %s, %s) ON CONFLICT (transactionid) DO NOTHING """ with self.dbconnection.cursor() as cursor: cursor.execute(sql, (transaction_id, firefly_id, external_id)) self.dbconnection.commit() def _record_balance_adjustment(self, account_id: int, snapshot_date: date, firefly_id: str | None, external_id: str) -> None: sql = """ INSERT INTO firefly_balance_adjustments (accountid, snapshot_date, firefly_transaction_id, external_id) VALUES (%s, %s, %s, %s) ON CONFLICT (accountid, snapshot_date) DO NOTHING """ with self.dbconnection.cursor() as cursor: cursor.execute(sql, (account_id, snapshot_date, firefly_id, external_id)) self.dbconnection.commit() def _fetch_firefly_balance(self, account_id: str) -> float | None: payload = self.client.get_account(account_id) attributes = (payload.get("data") or {}).get("attributes") or {} balance_text = attributes.get("current_balance") if balance_text is None: return None try: return float(balance_text) except ValueError: return None def _find_existing_transaction_id(self, external_id: str) -> str | None: query = f'external_id:"{external_id}"' try: payload = self.client.search_transactions(query) except RuntimeError: return None data = payload.get("data") or [] if not data: return None return self._extract_id({"data": data[0]}) def _resolve_account_type(self, account: dict[str, Any]) -> str: overrides = self.account_type_overrides account_name = account.get("name") account_num = account.get("account_num") override = None if account_name and account_name.strip().lower() in self.LIABILITY_ACCOUNT_NAMES: return "liability" if account_name and account_name in overrides: override = overrides.get(account_name) if account_num and account_num in overrides: override = overrides.get(account_num) selected = (override or self.default_account_type).strip().lower() if selected not in self.ALLOWED_ACCOUNT_TYPES: return "asset" return selected def _build_account_payload(self, account: dict[str, Any], account_type: str) -> dict[str, Any]: payload = { "name": account.get("name"), "type": account_type, "account_number": account.get("account_num"), "currency_code": self.currency_code, "include_net_worth": True, } if account_type == "liability": account_name = str(account.get("name") or "").strip().lower() interest_rate = self.LIABILITY_INTEREST_RATES.get(account_name) payload["liability_type"] = self.default_liability_type payload["liability_direction"] = self.default_liability_direction payload["interest_period"] = self.default_liability_interest_period if interest_rate is None: payload["interest"] = self.default_liability_interest else: payload["interest"] = f"{interest_rate}" if account_type == "asset": payload["account_role"] = self.default_account_role return payload def _should_update_account_type(self, account: dict[str, Any], existing: dict[str, Any]) -> bool: existing_type = existing.get("type") if existing_type in self.ALLOWED_ACCOUNT_TYPES: return False existing_number = existing.get("account_number") account_number = account.get("account_num") if existing_number and account_number and str(existing_number) != str(account_number): account_name = str(account.get("name") or "").strip().lower() if account_name in self.LIABILITY_ACCOUNT_NAMES: return True if account_name in self.FORCED_ASSET_ACCOUNT_NAMES: return True return False return True @staticmethod def _map_accounts_by_name(items: list[dict[str, Any]]) -> dict[str, dict[str, Any]]: mapping: dict[str, dict[str, Any]] = {} for item in items: payload = {"data": item} name = FireflySync._extract_attribute(payload, "name") if not name: continue mapping[name] = { "id": FireflySync._extract_id(payload), "type": FireflySync._extract_attribute(payload, "type"), "account_number": FireflySync._extract_attribute(payload, "account_number"), } return mapping @staticmethod def _map_by_name(items: list[dict[str, Any]]) -> dict[str, str]: mapping: dict[str, str] = {} for item in items: name = FireflySync._extract_attribute({"data": item}, "name") if not name: continue mapping[name] = FireflySync._extract_id({"data": item}) return mapping @staticmethod def _extract_id(payload: dict[str, Any]) -> str: data = payload.get("data") or {} value = data.get("id") if value is None: return "" return str(value) @staticmethod def _extract_attribute(payload: dict[str, Any], key: str) -> str | None: data = payload.get("data") or {} attributes = data.get("attributes") or {} value = attributes.get(key) if value is None: return None return str(value) @staticmethod def _parse_json_env(name: str) -> dict[str, str]: raw = os.getenv(name) if not raw: return {} try: parsed = json.loads(raw) except json.JSONDecodeError: return {} if not isinstance(parsed, dict): return {} return {str(k): str(v) for k, v in parsed.items()} @staticmethod def _parse_csv_env(name: str) -> list[str]: raw = os.getenv(name, "") if not raw: return [] return [item.strip() for item in raw.split(",") if item.strip()] @staticmethod def _require_env(name: str) -> str: value = os.getenv(name) if not value: raise ValueError(f"Please set {name} in your environment.") return value