514 lines
20 KiB
Python
514 lines
20 KiB
Python
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import json
|
||
|
|
import os
|
||
|
|
from datetime import date
|
||
|
|
from typing import Any
|
||
|
|
|
||
|
|
from config import load_env
|
||
|
|
from FireflySync.firefly_client import FireflyClient
|
||
|
|
|
||
|
|
|
||
|
|
class FireflySync:
|
||
|
|
ALLOWED_ACCOUNT_TYPES = {"asset", "liability"}
|
||
|
|
SKIPPED_ACCOUNT_NAMES = {"emoney"}
|
||
|
|
LIABILITY_ACCOUNT_NAMES = {
|
||
|
|
"emoney",
|
||
|
|
"emoney-loan",
|
||
|
|
"emoney-forrester",
|
||
|
|
"emoney-swift",
|
||
|
|
}
|
||
|
|
EMONEY_POSITIVE_KEYWORDS = {
|
||
|
|
"normal payment",
|
||
|
|
}
|
||
|
|
EMONEY_NEGATIVE_KEYWORDS = {
|
||
|
|
"debit interest",
|
||
|
|
"monthly fee",
|
||
|
|
"mbi",
|
||
|
|
"advance",
|
||
|
|
"insurance",
|
||
|
|
"late fee",
|
||
|
|
"establishment fee",
|
||
|
|
}
|
||
|
|
EMONEY_SKIP_KEYWORDS = {
|
||
|
|
"retry normal payment",
|
||
|
|
}
|
||
|
|
FORCED_ASSET_ACCOUNT_NAMES = {
|
||
|
|
"jethro's kiwisaver",
|
||
|
|
"jethro's investments",
|
||
|
|
"jethro's rainy day fund",
|
||
|
|
}
|
||
|
|
LIABILITY_INTEREST_RATES = {
|
||
|
|
"emoney-forrester": 9.9,
|
||
|
|
"emoney-swift": 13.95,
|
||
|
|
"emoney-loan": 29.9,
|
||
|
|
}
|
||
|
|
EXCLUDED_TRANSACTION_ACCOUNTS = {"emoney"}
|
||
|
|
|
||
|
|
def __init__(self, dbconnection):
|
||
|
|
load_env("FireflySync")
|
||
|
|
self.dbconnection = dbconnection
|
||
|
|
self.base_url = self._require_env("FIREFLY_BASE_URL")
|
||
|
|
self.api_token = self._require_env("FIREFLY_API_TOKEN")
|
||
|
|
self.currency_code = os.getenv("FIREFLY_CURRENCY_CODE", "NZD").strip() or "NZD"
|
||
|
|
self.default_account_type = os.getenv("FIREFLY_DEFAULT_ACCOUNT_TYPE", "asset").strip() or "asset"
|
||
|
|
self.default_account_role = os.getenv("FIREFLY_ASSET_ACCOUNT_ROLE", "defaultAsset").strip() or "defaultAsset"
|
||
|
|
self.default_liability_type = os.getenv("FIREFLY_LIABILITY_TYPE", "loan").strip() or "loan"
|
||
|
|
self.default_liability_direction = os.getenv("FIREFLY_LIABILITY_DIRECTION", "debit").strip() or "debit"
|
||
|
|
self.default_liability_interest_period = os.getenv("FIREFLY_LIABILITY_INTEREST_PERIOD", "yearly").strip() or "yearly"
|
||
|
|
self.default_liability_interest = os.getenv("FIREFLY_LIABILITY_INTEREST", "0").strip() or "0"
|
||
|
|
self.account_type_overrides = self._parse_json_env("FIREFLY_ACCOUNT_TYPE_OVERRIDES")
|
||
|
|
self.balance_accounts = self._parse_csv_env("FIREFLY_BALANCE_ACCOUNTS")
|
||
|
|
self.adjustment_account_name = os.getenv("FIREFLY_ADJUSTMENT_ACCOUNT_NAME", "Balance Adjustment").strip()
|
||
|
|
self.revenue_account_name = os.getenv("FIREFLY_REVENUE_ACCOUNT_NAME", "Income").strip() or "Income"
|
||
|
|
self.error_if_duplicate_hash = os.getenv("FIREFLY_ERROR_IF_DUPLICATE_HASH", "true").strip().lower() in {
|
||
|
|
"1",
|
||
|
|
"true",
|
||
|
|
"yes",
|
||
|
|
}
|
||
|
|
self.client = FireflyClient(self.base_url, self.api_token)
|
||
|
|
|
||
|
|
def sync(self) -> None:
|
||
|
|
categories = self._load_local_categories()
|
||
|
|
self._ensure_categories(categories)
|
||
|
|
firefly_accounts = self._ensure_accounts()
|
||
|
|
self._sync_transactions(firefly_accounts)
|
||
|
|
self._sync_balance_adjustments(firefly_accounts)
|
||
|
|
|
||
|
|
def _ensure_categories(self, categories: list[dict[str, Any]]) -> dict[str, str]:
|
||
|
|
existing = self._map_by_name(self.client.list_categories())
|
||
|
|
for category in categories:
|
||
|
|
name = category.get("name")
|
||
|
|
if not name or name in existing:
|
||
|
|
continue
|
||
|
|
created = self.client.create_category({"name": name})
|
||
|
|
created_name = self._extract_attribute(created, "name")
|
||
|
|
if created_name:
|
||
|
|
existing[created_name] = self._extract_id(created)
|
||
|
|
return existing
|
||
|
|
|
||
|
|
def _ensure_accounts(self) -> dict[str, str]:
|
||
|
|
existing = self._map_accounts_by_name(self.client.list_accounts())
|
||
|
|
for account in self._load_local_accounts():
|
||
|
|
name = account.get("name")
|
||
|
|
if not name:
|
||
|
|
continue
|
||
|
|
if name.strip().lower() in self.SKIPPED_ACCOUNT_NAMES:
|
||
|
|
continue
|
||
|
|
account_type = self._resolve_account_type(account)
|
||
|
|
existing_entry = existing.get(name)
|
||
|
|
if existing_entry:
|
||
|
|
if existing_entry.get("type") == account_type:
|
||
|
|
continue
|
||
|
|
if self._should_update_account_type(account, existing_entry):
|
||
|
|
payload = self._build_account_payload(account, account_type)
|
||
|
|
try:
|
||
|
|
self.client.update_account(existing_entry["id"], payload)
|
||
|
|
existing_entry["type"] = account_type
|
||
|
|
except RuntimeError:
|
||
|
|
pass
|
||
|
|
continue
|
||
|
|
payload = self._build_account_payload(account, account_type)
|
||
|
|
created = self.client.create_account(payload)
|
||
|
|
created_name = self._extract_attribute(created, "name")
|
||
|
|
if created_name:
|
||
|
|
existing[created_name] = {
|
||
|
|
"id": self._extract_id(created),
|
||
|
|
"type": account_type,
|
||
|
|
"account_number": account.get("account_num"),
|
||
|
|
}
|
||
|
|
return {
|
||
|
|
name: entry.get("id", "")
|
||
|
|
for name, entry in existing.items()
|
||
|
|
if entry.get("type") in self.ALLOWED_ACCOUNT_TYPES
|
||
|
|
}
|
||
|
|
|
||
|
|
def _sync_transactions(self, firefly_accounts: dict[str, str]) -> None:
|
||
|
|
rows = self._load_unsynced_transactions()
|
||
|
|
for row in rows:
|
||
|
|
account_name = row.get("account_name")
|
||
|
|
if not account_name:
|
||
|
|
continue
|
||
|
|
account_id = firefly_accounts.get(account_name)
|
||
|
|
if not account_id:
|
||
|
|
continue
|
||
|
|
amount = float(row.get("amount"))
|
||
|
|
if amount == 0:
|
||
|
|
continue
|
||
|
|
description = row.get("description") or "Transaction"
|
||
|
|
vendor_name = row.get("vendor_name") or description
|
||
|
|
txn_date = row.get("date")
|
||
|
|
if isinstance(txn_date, date):
|
||
|
|
date_text = txn_date.isoformat()
|
||
|
|
else:
|
||
|
|
date_text = str(txn_date)
|
||
|
|
|
||
|
|
liability_handled = self._apply_emoney_liability_rules(
|
||
|
|
account_name,
|
||
|
|
description,
|
||
|
|
amount,
|
||
|
|
account_id,
|
||
|
|
vendor_name,
|
||
|
|
)
|
||
|
|
if liability_handled is None:
|
||
|
|
continue
|
||
|
|
amount_value, split = liability_handled
|
||
|
|
|
||
|
|
external_id = f"local-transaction:{row['id']}"
|
||
|
|
split.update({
|
||
|
|
"date": date_text,
|
||
|
|
"amount": f"{abs(amount_value):.2f}",
|
||
|
|
"description": description,
|
||
|
|
"external_id": external_id,
|
||
|
|
})
|
||
|
|
|
||
|
|
category_name = row.get("category_name")
|
||
|
|
if category_name:
|
||
|
|
split["category_name"] = category_name
|
||
|
|
if vendor_name.strip().lower() == "feijoa":
|
||
|
|
split["category_name"] = "Savings"
|
||
|
|
|
||
|
|
if "source_id" not in split and "destination_id" not in split:
|
||
|
|
if amount_value < 0:
|
||
|
|
split["source_id"] = account_id
|
||
|
|
split["destination_name"] = vendor_name
|
||
|
|
else:
|
||
|
|
split["source_name"] = self.revenue_account_name
|
||
|
|
split["destination_id"] = account_id
|
||
|
|
if vendor_name and vendor_name != self.revenue_account_name:
|
||
|
|
split["notes"] = f"Original source: {vendor_name}"
|
||
|
|
|
||
|
|
payload = {
|
||
|
|
"error_if_duplicate_hash": self.error_if_duplicate_hash,
|
||
|
|
"apply_rules": False,
|
||
|
|
"fire_webhooks": False,
|
||
|
|
"transactions": [split],
|
||
|
|
}
|
||
|
|
|
||
|
|
try:
|
||
|
|
created = self.client.create_transaction(payload)
|
||
|
|
firefly_id = self._extract_id(created)
|
||
|
|
except RuntimeError:
|
||
|
|
firefly_id = self._find_existing_transaction_id(external_id)
|
||
|
|
if not firefly_id:
|
||
|
|
continue
|
||
|
|
self._record_transaction_sync(row["id"], firefly_id, external_id)
|
||
|
|
|
||
|
|
def _sync_balance_adjustments(self, firefly_accounts: dict[str, str]) -> None:
|
||
|
|
if not self.balance_accounts:
|
||
|
|
return
|
||
|
|
for account in self._load_balance_accounts():
|
||
|
|
account_name = account.get("account_name")
|
||
|
|
account_id = firefly_accounts.get(account_name)
|
||
|
|
if not account_id:
|
||
|
|
continue
|
||
|
|
latest_snapshot = self._latest_snapshot_for_account(account["account_id"])
|
||
|
|
if not latest_snapshot:
|
||
|
|
continue
|
||
|
|
snapshot_date, snapshot_balance = latest_snapshot
|
||
|
|
if self._balance_adjustment_exists(account["account_id"], snapshot_date):
|
||
|
|
continue
|
||
|
|
firefly_balance = self._fetch_firefly_balance(account_id)
|
||
|
|
if firefly_balance is None:
|
||
|
|
continue
|
||
|
|
delta = snapshot_balance - firefly_balance
|
||
|
|
if abs(delta) < 0.01:
|
||
|
|
continue
|
||
|
|
|
||
|
|
external_id = f"balance-adjust:{account['account_id']}:{snapshot_date.isoformat()}"
|
||
|
|
split: dict[str, Any] = {
|
||
|
|
"type": "reconciliation",
|
||
|
|
"date": snapshot_date.isoformat(),
|
||
|
|
"amount": f"{abs(delta):.2f}",
|
||
|
|
"description": f"Balance adjustment ({snapshot_date.isoformat()})",
|
||
|
|
"external_id": external_id,
|
||
|
|
}
|
||
|
|
|
||
|
|
if delta > 0:
|
||
|
|
split["source_name"] = self.adjustment_account_name
|
||
|
|
split["destination_id"] = account_id
|
||
|
|
else:
|
||
|
|
split["source_id"] = account_id
|
||
|
|
split["destination_name"] = self.adjustment_account_name
|
||
|
|
|
||
|
|
payload = {
|
||
|
|
"error_if_duplicate_hash": self.error_if_duplicate_hash,
|
||
|
|
"apply_rules": False,
|
||
|
|
"fire_webhooks": False,
|
||
|
|
"transactions": [split],
|
||
|
|
}
|
||
|
|
|
||
|
|
try:
|
||
|
|
created = self.client.create_transaction(payload)
|
||
|
|
firefly_id = self._extract_id(created)
|
||
|
|
except RuntimeError:
|
||
|
|
firefly_id = self._find_existing_transaction_id(external_id)
|
||
|
|
if not firefly_id:
|
||
|
|
continue
|
||
|
|
self._record_balance_adjustment(account["account_id"], snapshot_date, firefly_id, external_id)
|
||
|
|
|
||
|
|
def _load_local_accounts(self) -> list[dict[str, Any]]:
|
||
|
|
sql = "SELECT id, accountname, accountnum FROM accounts ORDER BY accountname"
|
||
|
|
with self.dbconnection.cursor() as cursor:
|
||
|
|
cursor.execute(sql)
|
||
|
|
rows = cursor.fetchall()
|
||
|
|
return [
|
||
|
|
{
|
||
|
|
"id": row[0],
|
||
|
|
"name": row[1],
|
||
|
|
"account_num": row[2],
|
||
|
|
}
|
||
|
|
for row in rows
|
||
|
|
]
|
||
|
|
|
||
|
|
def _load_local_categories(self) -> list[dict[str, Any]]:
|
||
|
|
sql = "SELECT id, categoryname FROM categories ORDER BY categoryname"
|
||
|
|
with self.dbconnection.cursor() as cursor:
|
||
|
|
cursor.execute(sql)
|
||
|
|
rows = cursor.fetchall()
|
||
|
|
return [{"id": row[0], "name": row[1]} for row in rows]
|
||
|
|
|
||
|
|
def _load_unsynced_transactions(self) -> list[dict[str, Any]]:
|
||
|
|
excluded = [name.lower() for name in self.EXCLUDED_TRANSACTION_ACCOUNTS]
|
||
|
|
sql = """
|
||
|
|
SELECT t.id,
|
||
|
|
t.datetime,
|
||
|
|
t.description,
|
||
|
|
t.amount,
|
||
|
|
a.accountname,
|
||
|
|
v.vendorname,
|
||
|
|
c.categoryname
|
||
|
|
FROM transactions t
|
||
|
|
JOIN accounts a ON t.accountid = a.id
|
||
|
|
LEFT JOIN vendors v ON t.vendorid = v.id
|
||
|
|
LEFT JOIN categories c ON t.categoryid = c.id
|
||
|
|
LEFT JOIN firefly_transaction_sync fts ON fts.transactionid = t.id
|
||
|
|
WHERE fts.transactionid IS NULL
|
||
|
|
AND NOT (lower(a.accountname) = ANY(%s))
|
||
|
|
ORDER BY t.datetime ASC
|
||
|
|
"""
|
||
|
|
with self.dbconnection.cursor() as cursor:
|
||
|
|
cursor.execute(sql, (excluded,))
|
||
|
|
rows = cursor.fetchall()
|
||
|
|
return [
|
||
|
|
{
|
||
|
|
"id": row[0],
|
||
|
|
"date": row[1],
|
||
|
|
"description": row[2],
|
||
|
|
"amount": row[3],
|
||
|
|
"account_name": row[4],
|
||
|
|
"vendor_name": row[5],
|
||
|
|
"category_name": row[6],
|
||
|
|
}
|
||
|
|
for row in rows
|
||
|
|
]
|
||
|
|
|
||
|
|
def _load_balance_accounts(self) -> list[dict[str, Any]]:
|
||
|
|
if not self.balance_accounts:
|
||
|
|
return []
|
||
|
|
names = tuple(self.balance_accounts)
|
||
|
|
sql = """
|
||
|
|
SELECT id, accountname
|
||
|
|
FROM accounts
|
||
|
|
WHERE accountname = ANY(%s)
|
||
|
|
"""
|
||
|
|
with self.dbconnection.cursor() as cursor:
|
||
|
|
cursor.execute(sql, (list(names),))
|
||
|
|
rows = cursor.fetchall()
|
||
|
|
return [
|
||
|
|
{
|
||
|
|
"account_id": row[0],
|
||
|
|
"account_name": row[1],
|
||
|
|
}
|
||
|
|
for row in rows
|
||
|
|
]
|
||
|
|
|
||
|
|
def _latest_snapshot_for_account(self, account_id: int) -> tuple[date, float] | None:
|
||
|
|
sql = """
|
||
|
|
SELECT datetime, balance
|
||
|
|
FROM snapshots
|
||
|
|
WHERE accountid = %s
|
||
|
|
ORDER BY datetime DESC
|
||
|
|
LIMIT 1
|
||
|
|
"""
|
||
|
|
with self.dbconnection.cursor() as cursor:
|
||
|
|
cursor.execute(sql, (account_id,))
|
||
|
|
row = cursor.fetchone()
|
||
|
|
if not row:
|
||
|
|
return None
|
||
|
|
return row[0], float(row[1])
|
||
|
|
|
||
|
|
def _balance_adjustment_exists(self, account_id: int, snapshot_date: date) -> bool:
|
||
|
|
sql = """
|
||
|
|
SELECT 1
|
||
|
|
FROM firefly_balance_adjustments
|
||
|
|
WHERE accountid = %s AND snapshot_date = %s
|
||
|
|
"""
|
||
|
|
with self.dbconnection.cursor() as cursor:
|
||
|
|
cursor.execute(sql, (account_id, snapshot_date))
|
||
|
|
return cursor.fetchone() is not None
|
||
|
|
|
||
|
|
def _record_transaction_sync(self, transaction_id: int, firefly_id: str | None, external_id: str) -> None:
|
||
|
|
sql = """
|
||
|
|
INSERT INTO firefly_transaction_sync (transactionid, firefly_transaction_id, external_id)
|
||
|
|
VALUES (%s, %s, %s)
|
||
|
|
ON CONFLICT (transactionid) DO NOTHING
|
||
|
|
"""
|
||
|
|
with self.dbconnection.cursor() as cursor:
|
||
|
|
cursor.execute(sql, (transaction_id, firefly_id, external_id))
|
||
|
|
self.dbconnection.commit()
|
||
|
|
|
||
|
|
def _record_balance_adjustment(self, account_id: int, snapshot_date: date, firefly_id: str | None, external_id: str) -> None:
|
||
|
|
sql = """
|
||
|
|
INSERT INTO firefly_balance_adjustments (accountid, snapshot_date, firefly_transaction_id, external_id)
|
||
|
|
VALUES (%s, %s, %s, %s)
|
||
|
|
ON CONFLICT (accountid, snapshot_date) DO NOTHING
|
||
|
|
"""
|
||
|
|
with self.dbconnection.cursor() as cursor:
|
||
|
|
cursor.execute(sql, (account_id, snapshot_date, firefly_id, external_id))
|
||
|
|
self.dbconnection.commit()
|
||
|
|
|
||
|
|
def _fetch_firefly_balance(self, account_id: str) -> float | None:
|
||
|
|
payload = self.client.get_account(account_id)
|
||
|
|
attributes = (payload.get("data") or {}).get("attributes") or {}
|
||
|
|
balance_text = attributes.get("current_balance")
|
||
|
|
if balance_text is None:
|
||
|
|
return None
|
||
|
|
try:
|
||
|
|
return float(balance_text)
|
||
|
|
except ValueError:
|
||
|
|
return None
|
||
|
|
|
||
|
|
def _find_existing_transaction_id(self, external_id: str) -> str | None:
|
||
|
|
query = f'external_id:"{external_id}"'
|
||
|
|
try:
|
||
|
|
payload = self.client.search_transactions(query)
|
||
|
|
except RuntimeError:
|
||
|
|
return None
|
||
|
|
data = payload.get("data") or []
|
||
|
|
if not data:
|
||
|
|
return None
|
||
|
|
return self._extract_id({"data": data[0]})
|
||
|
|
|
||
|
|
def _resolve_account_type(self, account: dict[str, Any]) -> str:
|
||
|
|
overrides = self.account_type_overrides
|
||
|
|
account_name = account.get("name")
|
||
|
|
account_num = account.get("account_num")
|
||
|
|
override = None
|
||
|
|
if account_name and account_name.strip().lower() in self.LIABILITY_ACCOUNT_NAMES:
|
||
|
|
return "liability"
|
||
|
|
if account_name and account_name in overrides:
|
||
|
|
override = overrides.get(account_name)
|
||
|
|
if account_num and account_num in overrides:
|
||
|
|
override = overrides.get(account_num)
|
||
|
|
selected = (override or self.default_account_type).strip().lower()
|
||
|
|
if selected not in self.ALLOWED_ACCOUNT_TYPES:
|
||
|
|
return "asset"
|
||
|
|
return selected
|
||
|
|
|
||
|
|
def _build_account_payload(self, account: dict[str, Any], account_type: str) -> dict[str, Any]:
|
||
|
|
payload = {
|
||
|
|
"name": account.get("name"),
|
||
|
|
"type": account_type,
|
||
|
|
"account_number": account.get("account_num"),
|
||
|
|
"currency_code": self.currency_code,
|
||
|
|
"include_net_worth": True,
|
||
|
|
}
|
||
|
|
if account_type == "liability":
|
||
|
|
account_name = str(account.get("name") or "").strip().lower()
|
||
|
|
interest_rate = self.LIABILITY_INTEREST_RATES.get(account_name)
|
||
|
|
payload["liability_type"] = self.default_liability_type
|
||
|
|
payload["liability_direction"] = self.default_liability_direction
|
||
|
|
payload["interest_period"] = self.default_liability_interest_period
|
||
|
|
if interest_rate is None:
|
||
|
|
payload["interest"] = self.default_liability_interest
|
||
|
|
else:
|
||
|
|
payload["interest"] = f"{interest_rate}"
|
||
|
|
if account_type == "asset":
|
||
|
|
payload["account_role"] = self.default_account_role
|
||
|
|
return payload
|
||
|
|
|
||
|
|
def _should_update_account_type(self, account: dict[str, Any], existing: dict[str, Any]) -> bool:
|
||
|
|
existing_type = existing.get("type")
|
||
|
|
if existing_type in self.ALLOWED_ACCOUNT_TYPES:
|
||
|
|
return False
|
||
|
|
existing_number = existing.get("account_number")
|
||
|
|
account_number = account.get("account_num")
|
||
|
|
if existing_number and account_number and str(existing_number) != str(account_number):
|
||
|
|
account_name = str(account.get("name") or "").strip().lower()
|
||
|
|
if account_name in self.LIABILITY_ACCOUNT_NAMES:
|
||
|
|
return True
|
||
|
|
if account_name in self.FORCED_ASSET_ACCOUNT_NAMES:
|
||
|
|
return True
|
||
|
|
return False
|
||
|
|
return True
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
def _map_accounts_by_name(items: list[dict[str, Any]]) -> dict[str, dict[str, Any]]:
|
||
|
|
mapping: dict[str, dict[str, Any]] = {}
|
||
|
|
for item in items:
|
||
|
|
payload = {"data": item}
|
||
|
|
name = FireflySync._extract_attribute(payload, "name")
|
||
|
|
if not name:
|
||
|
|
continue
|
||
|
|
mapping[name] = {
|
||
|
|
"id": FireflySync._extract_id(payload),
|
||
|
|
"type": FireflySync._extract_attribute(payload, "type"),
|
||
|
|
"account_number": FireflySync._extract_attribute(payload, "account_number"),
|
||
|
|
}
|
||
|
|
return mapping
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
def _map_by_name(items: list[dict[str, Any]]) -> dict[str, str]:
|
||
|
|
mapping: dict[str, str] = {}
|
||
|
|
for item in items:
|
||
|
|
name = FireflySync._extract_attribute({"data": item}, "name")
|
||
|
|
if not name:
|
||
|
|
continue
|
||
|
|
mapping[name] = FireflySync._extract_id({"data": item})
|
||
|
|
return mapping
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
def _extract_id(payload: dict[str, Any]) -> str:
|
||
|
|
data = payload.get("data") or {}
|
||
|
|
value = data.get("id")
|
||
|
|
if value is None:
|
||
|
|
return ""
|
||
|
|
return str(value)
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
def _extract_attribute(payload: dict[str, Any], key: str) -> str | None:
|
||
|
|
data = payload.get("data") or {}
|
||
|
|
attributes = data.get("attributes") or {}
|
||
|
|
value = attributes.get(key)
|
||
|
|
if value is None:
|
||
|
|
return None
|
||
|
|
return str(value)
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
def _parse_json_env(name: str) -> dict[str, str]:
|
||
|
|
raw = os.getenv(name)
|
||
|
|
if not raw:
|
||
|
|
return {}
|
||
|
|
try:
|
||
|
|
parsed = json.loads(raw)
|
||
|
|
except json.JSONDecodeError:
|
||
|
|
return {}
|
||
|
|
if not isinstance(parsed, dict):
|
||
|
|
return {}
|
||
|
|
return {str(k): str(v) for k, v in parsed.items()}
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
def _parse_csv_env(name: str) -> list[str]:
|
||
|
|
raw = os.getenv(name, "")
|
||
|
|
if not raw:
|
||
|
|
return []
|
||
|
|
return [item.strip() for item in raw.split(",") if item.strip()]
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
def _require_env(name: str) -> str:
|
||
|
|
value = os.getenv(name)
|
||
|
|
if not value:
|
||
|
|
raise ValueError(f"Please set {name} in your environment.")
|
||
|
|
return value
|