AkahuSync/FireflySync/firefly_sync.py
2026-06-01 11:18:27 +12:00

514 lines
20 KiB
Python

from __future__ import annotations
import json
import os
from datetime import date
from typing import Any
from config import load_env
from FireflySync.firefly_client import FireflyClient
class FireflySync:
ALLOWED_ACCOUNT_TYPES = {"asset", "liability"}
SKIPPED_ACCOUNT_NAMES = {"emoney"}
LIABILITY_ACCOUNT_NAMES = {
"emoney",
"emoney-loan",
"emoney-forrester",
"emoney-swift",
}
EMONEY_POSITIVE_KEYWORDS = {
"normal payment",
}
EMONEY_NEGATIVE_KEYWORDS = {
"debit interest",
"monthly fee",
"mbi",
"advance",
"insurance",
"late fee",
"establishment fee",
}
EMONEY_SKIP_KEYWORDS = {
"retry normal payment",
}
FORCED_ASSET_ACCOUNT_NAMES = {
"jethro's kiwisaver",
"jethro's investments",
"jethro's rainy day fund",
}
LIABILITY_INTEREST_RATES = {
"emoney-forrester": 9.9,
"emoney-swift": 13.95,
"emoney-loan": 29.9,
}
EXCLUDED_TRANSACTION_ACCOUNTS = {"emoney"}
def __init__(self, dbconnection):
load_env("FireflySync")
self.dbconnection = dbconnection
self.base_url = self._require_env("FIREFLY_BASE_URL")
self.api_token = self._require_env("FIREFLY_API_TOKEN")
self.currency_code = os.getenv("FIREFLY_CURRENCY_CODE", "NZD").strip() or "NZD"
self.default_account_type = os.getenv("FIREFLY_DEFAULT_ACCOUNT_TYPE", "asset").strip() or "asset"
self.default_account_role = os.getenv("FIREFLY_ASSET_ACCOUNT_ROLE", "defaultAsset").strip() or "defaultAsset"
self.default_liability_type = os.getenv("FIREFLY_LIABILITY_TYPE", "loan").strip() or "loan"
self.default_liability_direction = os.getenv("FIREFLY_LIABILITY_DIRECTION", "debit").strip() or "debit"
self.default_liability_interest_period = os.getenv("FIREFLY_LIABILITY_INTEREST_PERIOD", "yearly").strip() or "yearly"
self.default_liability_interest = os.getenv("FIREFLY_LIABILITY_INTEREST", "0").strip() or "0"
self.account_type_overrides = self._parse_json_env("FIREFLY_ACCOUNT_TYPE_OVERRIDES")
self.balance_accounts = self._parse_csv_env("FIREFLY_BALANCE_ACCOUNTS")
self.adjustment_account_name = os.getenv("FIREFLY_ADJUSTMENT_ACCOUNT_NAME", "Balance Adjustment").strip()
self.revenue_account_name = os.getenv("FIREFLY_REVENUE_ACCOUNT_NAME", "Income").strip() or "Income"
self.error_if_duplicate_hash = os.getenv("FIREFLY_ERROR_IF_DUPLICATE_HASH", "true").strip().lower() in {
"1",
"true",
"yes",
}
self.client = FireflyClient(self.base_url, self.api_token)
def sync(self) -> None:
categories = self._load_local_categories()
self._ensure_categories(categories)
firefly_accounts = self._ensure_accounts()
self._sync_transactions(firefly_accounts)
self._sync_balance_adjustments(firefly_accounts)
def _ensure_categories(self, categories: list[dict[str, Any]]) -> dict[str, str]:
existing = self._map_by_name(self.client.list_categories())
for category in categories:
name = category.get("name")
if not name or name in existing:
continue
created = self.client.create_category({"name": name})
created_name = self._extract_attribute(created, "name")
if created_name:
existing[created_name] = self._extract_id(created)
return existing
def _ensure_accounts(self) -> dict[str, str]:
existing = self._map_accounts_by_name(self.client.list_accounts())
for account in self._load_local_accounts():
name = account.get("name")
if not name:
continue
if name.strip().lower() in self.SKIPPED_ACCOUNT_NAMES:
continue
account_type = self._resolve_account_type(account)
existing_entry = existing.get(name)
if existing_entry:
if existing_entry.get("type") == account_type:
continue
if self._should_update_account_type(account, existing_entry):
payload = self._build_account_payload(account, account_type)
try:
self.client.update_account(existing_entry["id"], payload)
existing_entry["type"] = account_type
except RuntimeError:
pass
continue
payload = self._build_account_payload(account, account_type)
created = self.client.create_account(payload)
created_name = self._extract_attribute(created, "name")
if created_name:
existing[created_name] = {
"id": self._extract_id(created),
"type": account_type,
"account_number": account.get("account_num"),
}
return {
name: entry.get("id", "")
for name, entry in existing.items()
if entry.get("type") in self.ALLOWED_ACCOUNT_TYPES
}
def _sync_transactions(self, firefly_accounts: dict[str, str]) -> None:
rows = self._load_unsynced_transactions()
for row in rows:
account_name = row.get("account_name")
if not account_name:
continue
account_id = firefly_accounts.get(account_name)
if not account_id:
continue
amount = float(row.get("amount"))
if amount == 0:
continue
description = row.get("description") or "Transaction"
vendor_name = row.get("vendor_name") or description
txn_date = row.get("date")
if isinstance(txn_date, date):
date_text = txn_date.isoformat()
else:
date_text = str(txn_date)
liability_handled = self._apply_emoney_liability_rules(
account_name,
description,
amount,
account_id,
vendor_name,
)
if liability_handled is None:
continue
amount_value, split = liability_handled
external_id = f"local-transaction:{row['id']}"
split.update({
"date": date_text,
"amount": f"{abs(amount_value):.2f}",
"description": description,
"external_id": external_id,
})
category_name = row.get("category_name")
if category_name:
split["category_name"] = category_name
if vendor_name.strip().lower() == "feijoa":
split["category_name"] = "Savings"
if "source_id" not in split and "destination_id" not in split:
if amount_value < 0:
split["source_id"] = account_id
split["destination_name"] = vendor_name
else:
split["source_name"] = self.revenue_account_name
split["destination_id"] = account_id
if vendor_name and vendor_name != self.revenue_account_name:
split["notes"] = f"Original source: {vendor_name}"
payload = {
"error_if_duplicate_hash": self.error_if_duplicate_hash,
"apply_rules": False,
"fire_webhooks": False,
"transactions": [split],
}
try:
created = self.client.create_transaction(payload)
firefly_id = self._extract_id(created)
except RuntimeError:
firefly_id = self._find_existing_transaction_id(external_id)
if not firefly_id:
continue
self._record_transaction_sync(row["id"], firefly_id, external_id)
def _sync_balance_adjustments(self, firefly_accounts: dict[str, str]) -> None:
if not self.balance_accounts:
return
for account in self._load_balance_accounts():
account_name = account.get("account_name")
account_id = firefly_accounts.get(account_name)
if not account_id:
continue
latest_snapshot = self._latest_snapshot_for_account(account["account_id"])
if not latest_snapshot:
continue
snapshot_date, snapshot_balance = latest_snapshot
if self._balance_adjustment_exists(account["account_id"], snapshot_date):
continue
firefly_balance = self._fetch_firefly_balance(account_id)
if firefly_balance is None:
continue
delta = snapshot_balance - firefly_balance
if abs(delta) < 0.01:
continue
external_id = f"balance-adjust:{account['account_id']}:{snapshot_date.isoformat()}"
split: dict[str, Any] = {
"type": "reconciliation",
"date": snapshot_date.isoformat(),
"amount": f"{abs(delta):.2f}",
"description": f"Balance adjustment ({snapshot_date.isoformat()})",
"external_id": external_id,
}
if delta > 0:
split["source_name"] = self.adjustment_account_name
split["destination_id"] = account_id
else:
split["source_id"] = account_id
split["destination_name"] = self.adjustment_account_name
payload = {
"error_if_duplicate_hash": self.error_if_duplicate_hash,
"apply_rules": False,
"fire_webhooks": False,
"transactions": [split],
}
try:
created = self.client.create_transaction(payload)
firefly_id = self._extract_id(created)
except RuntimeError:
firefly_id = self._find_existing_transaction_id(external_id)
if not firefly_id:
continue
self._record_balance_adjustment(account["account_id"], snapshot_date, firefly_id, external_id)
def _load_local_accounts(self) -> list[dict[str, Any]]:
sql = "SELECT id, accountname, accountnum FROM accounts ORDER BY accountname"
with self.dbconnection.cursor() as cursor:
cursor.execute(sql)
rows = cursor.fetchall()
return [
{
"id": row[0],
"name": row[1],
"account_num": row[2],
}
for row in rows
]
def _load_local_categories(self) -> list[dict[str, Any]]:
sql = "SELECT id, categoryname FROM categories ORDER BY categoryname"
with self.dbconnection.cursor() as cursor:
cursor.execute(sql)
rows = cursor.fetchall()
return [{"id": row[0], "name": row[1]} for row in rows]
def _load_unsynced_transactions(self) -> list[dict[str, Any]]:
excluded = [name.lower() for name in self.EXCLUDED_TRANSACTION_ACCOUNTS]
sql = """
SELECT t.id,
t.datetime,
t.description,
t.amount,
a.accountname,
v.vendorname,
c.categoryname
FROM transactions t
JOIN accounts a ON t.accountid = a.id
LEFT JOIN vendors v ON t.vendorid = v.id
LEFT JOIN categories c ON t.categoryid = c.id
LEFT JOIN firefly_transaction_sync fts ON fts.transactionid = t.id
WHERE fts.transactionid IS NULL
AND NOT (lower(a.accountname) = ANY(%s))
ORDER BY t.datetime ASC
"""
with self.dbconnection.cursor() as cursor:
cursor.execute(sql, (excluded,))
rows = cursor.fetchall()
return [
{
"id": row[0],
"date": row[1],
"description": row[2],
"amount": row[3],
"account_name": row[4],
"vendor_name": row[5],
"category_name": row[6],
}
for row in rows
]
def _load_balance_accounts(self) -> list[dict[str, Any]]:
if not self.balance_accounts:
return []
names = tuple(self.balance_accounts)
sql = """
SELECT id, accountname
FROM accounts
WHERE accountname = ANY(%s)
"""
with self.dbconnection.cursor() as cursor:
cursor.execute(sql, (list(names),))
rows = cursor.fetchall()
return [
{
"account_id": row[0],
"account_name": row[1],
}
for row in rows
]
def _latest_snapshot_for_account(self, account_id: int) -> tuple[date, float] | None:
sql = """
SELECT datetime, balance
FROM snapshots
WHERE accountid = %s
ORDER BY datetime DESC
LIMIT 1
"""
with self.dbconnection.cursor() as cursor:
cursor.execute(sql, (account_id,))
row = cursor.fetchone()
if not row:
return None
return row[0], float(row[1])
def _balance_adjustment_exists(self, account_id: int, snapshot_date: date) -> bool:
sql = """
SELECT 1
FROM firefly_balance_adjustments
WHERE accountid = %s AND snapshot_date = %s
"""
with self.dbconnection.cursor() as cursor:
cursor.execute(sql, (account_id, snapshot_date))
return cursor.fetchone() is not None
def _record_transaction_sync(self, transaction_id: int, firefly_id: str | None, external_id: str) -> None:
sql = """
INSERT INTO firefly_transaction_sync (transactionid, firefly_transaction_id, external_id)
VALUES (%s, %s, %s)
ON CONFLICT (transactionid) DO NOTHING
"""
with self.dbconnection.cursor() as cursor:
cursor.execute(sql, (transaction_id, firefly_id, external_id))
self.dbconnection.commit()
def _record_balance_adjustment(self, account_id: int, snapshot_date: date, firefly_id: str | None, external_id: str) -> None:
sql = """
INSERT INTO firefly_balance_adjustments (accountid, snapshot_date, firefly_transaction_id, external_id)
VALUES (%s, %s, %s, %s)
ON CONFLICT (accountid, snapshot_date) DO NOTHING
"""
with self.dbconnection.cursor() as cursor:
cursor.execute(sql, (account_id, snapshot_date, firefly_id, external_id))
self.dbconnection.commit()
def _fetch_firefly_balance(self, account_id: str) -> float | None:
payload = self.client.get_account(account_id)
attributes = (payload.get("data") or {}).get("attributes") or {}
balance_text = attributes.get("current_balance")
if balance_text is None:
return None
try:
return float(balance_text)
except ValueError:
return None
def _find_existing_transaction_id(self, external_id: str) -> str | None:
query = f'external_id:"{external_id}"'
try:
payload = self.client.search_transactions(query)
except RuntimeError:
return None
data = payload.get("data") or []
if not data:
return None
return self._extract_id({"data": data[0]})
def _resolve_account_type(self, account: dict[str, Any]) -> str:
overrides = self.account_type_overrides
account_name = account.get("name")
account_num = account.get("account_num")
override = None
if account_name and account_name.strip().lower() in self.LIABILITY_ACCOUNT_NAMES:
return "liability"
if account_name and account_name in overrides:
override = overrides.get(account_name)
if account_num and account_num in overrides:
override = overrides.get(account_num)
selected = (override or self.default_account_type).strip().lower()
if selected not in self.ALLOWED_ACCOUNT_TYPES:
return "asset"
return selected
def _build_account_payload(self, account: dict[str, Any], account_type: str) -> dict[str, Any]:
payload = {
"name": account.get("name"),
"type": account_type,
"account_number": account.get("account_num"),
"currency_code": self.currency_code,
"include_net_worth": True,
}
if account_type == "liability":
account_name = str(account.get("name") or "").strip().lower()
interest_rate = self.LIABILITY_INTEREST_RATES.get(account_name)
payload["liability_type"] = self.default_liability_type
payload["liability_direction"] = self.default_liability_direction
payload["interest_period"] = self.default_liability_interest_period
if interest_rate is None:
payload["interest"] = self.default_liability_interest
else:
payload["interest"] = f"{interest_rate}"
if account_type == "asset":
payload["account_role"] = self.default_account_role
return payload
def _should_update_account_type(self, account: dict[str, Any], existing: dict[str, Any]) -> bool:
existing_type = existing.get("type")
if existing_type in self.ALLOWED_ACCOUNT_TYPES:
return False
existing_number = existing.get("account_number")
account_number = account.get("account_num")
if existing_number and account_number and str(existing_number) != str(account_number):
account_name = str(account.get("name") or "").strip().lower()
if account_name in self.LIABILITY_ACCOUNT_NAMES:
return True
if account_name in self.FORCED_ASSET_ACCOUNT_NAMES:
return True
return False
return True
@staticmethod
def _map_accounts_by_name(items: list[dict[str, Any]]) -> dict[str, dict[str, Any]]:
mapping: dict[str, dict[str, Any]] = {}
for item in items:
payload = {"data": item}
name = FireflySync._extract_attribute(payload, "name")
if not name:
continue
mapping[name] = {
"id": FireflySync._extract_id(payload),
"type": FireflySync._extract_attribute(payload, "type"),
"account_number": FireflySync._extract_attribute(payload, "account_number"),
}
return mapping
@staticmethod
def _map_by_name(items: list[dict[str, Any]]) -> dict[str, str]:
mapping: dict[str, str] = {}
for item in items:
name = FireflySync._extract_attribute({"data": item}, "name")
if not name:
continue
mapping[name] = FireflySync._extract_id({"data": item})
return mapping
@staticmethod
def _extract_id(payload: dict[str, Any]) -> str:
data = payload.get("data") or {}
value = data.get("id")
if value is None:
return ""
return str(value)
@staticmethod
def _extract_attribute(payload: dict[str, Any], key: str) -> str | None:
data = payload.get("data") or {}
attributes = data.get("attributes") or {}
value = attributes.get(key)
if value is None:
return None
return str(value)
@staticmethod
def _parse_json_env(name: str) -> dict[str, str]:
raw = os.getenv(name)
if not raw:
return {}
try:
parsed = json.loads(raw)
except json.JSONDecodeError:
return {}
if not isinstance(parsed, dict):
return {}
return {str(k): str(v) for k, v in parsed.items()}
@staticmethod
def _parse_csv_env(name: str) -> list[str]:
raw = os.getenv(name, "")
if not raw:
return []
return [item.strip() for item in raw.split(",") if item.strip()]
@staticmethod
def _require_env(name: str) -> str:
value = os.getenv(name)
if not value:
raise ValueError(f"Please set {name} in your environment.")
return value