From c8e7ddce35d74371e1e596043563d88a63295ce4 Mon Sep 17 00:00:00 2001 From: Jethro Date: Tue, 19 May 2026 10:47:46 +1200 Subject: [PATCH] lots of vibe coded changes --- AkahuClient/akahuclient.py | 4 - AkahuClient/main.py | 4 +- EmoneyScraper/main.py | 8 +- EmoneyScraper/scraper.py | 43 ++- IngestionService/ingester.py | 256 +++++++++++--- IngestionService/normalizer.py | 388 +++++++++++++++++++++ config.py | 19 + emoney_cache.json | 1 + main.py | 24 +- sql/database-init/database definitions.sql | 339 +++++++++++++++++- 10 files changed, 1011 insertions(+), 75 deletions(-) create mode 100644 IngestionService/normalizer.py create mode 100644 config.py create mode 100644 emoney_cache.json diff --git a/AkahuClient/akahuclient.py b/AkahuClient/akahuclient.py index a6f88e6..216277f 100644 --- a/AkahuClient/akahuclient.py +++ b/AkahuClient/akahuclient.py @@ -1,11 +1,7 @@ import requests import os -from dotenv import load_dotenv from typing import Iterable, Optional, Any -load_dotenv() - - class AkahuClient: # Central place to configure which accounts you care about. # You can override this by passing account_names=... to get_accounts(). diff --git a/AkahuClient/main.py b/AkahuClient/main.py index c3aab62..08cb8be 100644 --- a/AkahuClient/main.py +++ b/AkahuClient/main.py @@ -1,9 +1,9 @@ from akahuclient import AkahuClient import os -from dotenv import load_dotenv +from config import load_env -load_dotenv() +load_env() TOKEN = os.getenv("AKAHU_API_TOKEN") APP_ID = os.getenv("AKAHU_APP_ID") diff --git a/EmoneyScraper/main.py b/EmoneyScraper/main.py index 3a50f2e..33fd229 100755 --- a/EmoneyScraper/main.py +++ b/EmoneyScraper/main.py @@ -3,9 +3,9 @@ from playwright.sync_api import sync_playwright, Playwright playwright = sync_playwright().start() scraper = Scraper(playwright, True) -print(scraper.get_balance()) -transactions = scraper.get_transactions() -parsed = scraper.parse_transactions(transactions) -print(parsed) +snapshot = scraper.get_snapshot() +transactions = scraper.get_transactions_parsed() +print(snapshot) +print(transactions) scraper.close() diff --git a/EmoneyScraper/scraper.py b/EmoneyScraper/scraper.py index b351ff2..98bc115 100755 --- a/EmoneyScraper/scraper.py +++ b/EmoneyScraper/scraper.py @@ -1,25 +1,41 @@ from playwright.sync_api import sync_playwright, Playwright import os -from dotenv import load_dotenv +from config import load_env import re -from datetime import datetime +from datetime import date, datetime -load_dotenv() class Scraper: def __init__(self, playwright: Playwright, headless: bool = True): + load_env("EmoneyScraper") + self._require_env("SCRAPER_URL") + self._require_env("SCRAPER_USERNAME") + self._require_env("SCRAPER_PASSWORD") self.playwright = playwright self.firefox = self.playwright.firefox # or "firefox" or "webkit". self.browser = self.firefox.launch(headless=headless) self.page = self.browser.new_page() - self.response = self.page.goto(os.getenv("URL")) - self.page.fill("input#ctl00_ContentPlaceHolder1_txtLoginID", os.getenv("USERNAME")) - self.page.fill("input#ctl00_ContentPlaceHolder1_txtPassword", os.getenv("PASSWORD")) + self.response = self.page.goto(os.getenv("SCRAPER_URL")) + self.page.fill("input#ctl00_ContentPlaceHolder1_txtLoginID", os.getenv("SCRAPER_USERNAME")) + self.page.fill("input#ctl00_ContentPlaceHolder1_txtPassword", os.getenv("SCRAPER_PASSWORD")) self.page.click("input#ctl00_ContentPlaceHolder1_btnLogin") def get_balance(self): current_balance = self.page.locator("xpath=/html/body/form/div[3]/div[3]/div[2]/div[3]/div[5]/span[2]").inner_text() return current_balance + def get_snapshot(self) -> dict[str, list[dict[str, object]]]: + balance_text = self.get_balance() + balance_value = self._parse_money(balance_text) + snapshot_date = date.today() + return { + "accounts": [ + { + "date": snapshot_date, + "balance": balance_value, + } + ] + } + def get_transactions(self): self.page.click("xpath=/html/body/form/div[3]/div[3]/div[2]/div[3]/div[1]/span[2]/a") transaction_body = self.page.locator("xpath=/html/body/form/div[3]/div[3]/div[2]/div/div[2]/div[3]/table/tbody").inner_text() @@ -63,9 +79,24 @@ class Scraper: }) return parsed + def get_transactions_parsed(self) -> list[dict[str, any]]: + raw = self.get_transactions() + return self.parse_transactions(raw) + def close(self): self.browser.close() + @staticmethod + def _parse_money(value: str) -> float: + return float(value.replace("$", "").replace(",", "").strip()) + + @staticmethod + def _require_env(name: str) -> str: + value = os.getenv(name) + if not value: + raise ValueError(f"Please set {name} in your environment.") + return value + #xpathbody=/html/body/form/div[3]/div[3]/div[2]/div/div[2]/div[3]/table/tbody #xpathaccountbutton = /html/body/form/div[3]/div[3]/div[2]/div[3]/div[1]/span[2]/a #xpath = /html/body/form/div[3]/div[3]/div[2]/div[3]/div[5]/span[2] diff --git a/IngestionService/ingester.py b/IngestionService/ingester.py index c71a8f6..f9825d6 100644 --- a/IngestionService/ingester.py +++ b/IngestionService/ingester.py @@ -1,38 +1,44 @@ from AkahuClient.akahuclient import AkahuClient from EmoneyScraper.scraper import Scraper -from dotenv import load_dotenv +from IngestionService.normalizer import DataNormalizer +from config import load_env import os import json import psycopg +from pathlib import Path +from datetime import datetime, date from playwright.sync_api import sync_playwright, Playwright +load_env("IngestionService") pw = sync_playwright().start() - -load_dotenv() - class Ingester: def __init__(self): - self.token = os.getenv("AKAHU_API_TOKEN") - self.app_id = os.getenv("AKAHU_APP_ID") - if not self.token or not self.app_id: - raise ValueError("Please set AKAHU_API_TOKEN and AKAHU_APP_ID in your environment.") + self.token = self._require_env("AKAHU_API_TOKEN") + self.app_id = self._require_env("AKAHU_APP_ID") + db_host = self._require_env("DB_HOST") + db_name = self._require_env("DB_NAME") + db_user = self._require_env("DB_USER") + db_password = self._require_env("DB_PASSWORD") self.client = AkahuClient(self.token, self.app_id) self.dbconnection = psycopg.connect( - host=os.getenv("DB_HOST"), - dbname=os.getenv("DB_NAME"), - user=os.getenv("DB_USER"), - password=os.getenv("DB_PASSWORD") + host=db_host, + dbname=db_name, + user=db_user, + password=db_password, ) - self.scraper = Scraper(pw, headless=False) + self.scraper: Scraper | None = None def test_connection(self): accounts = self.client.get_accounts() print("Akahu accounts:", accounts) def test_scraper(self): - data = self.scraper.get_balance() - print("Scraped data:", data) + snapshot = self.fetch_emoney_snapshot_data() + transactions = self.fetch_emoney_transaction_data() + print("Emoney snapshot accounts:", len(snapshot.get("accounts", []))) + print("Emoney transactions:", len(transactions.get("transactions", []))) + return {"snapshot": snapshot, "transactions": transactions} def test_db_connection(self): with self.dbconnection.cursor() as cursor: @@ -48,54 +54,212 @@ class Ingester: accounts = self.client.get_accounts(account_names) return accounts + def fetch_akahu_snapshot_data(self, account_names=None): + return self.client.get_accounts(account_names) + + def fetch_akahu_transaction_data(self, account_id: str, start_date: str, end_date: str): + return self.client.get_transactions(account_id, start_date, end_date) + + def fetch_akahu_transactions_backfill(self, start_date: str, end_date: str, account_names=None): + accounts = self.client.get_accounts(account_names) + combined = {"items": []} + for account in accounts.get("items", []): + account_id = account.get("_id") or account.get("id") + if not account_id: + continue + response = self.client.get_transactions(account_id, start_date, end_date) + combined["items"].extend(response.get("items", [])) + return combined + + def backfill_akahu_transactions(self, start_date: str, end_date: str, account_names=None): + data = self.fetch_akahu_transactions_backfill(start_date, end_date, account_names) + self.write_akahu_transaction_data(data) + return data + + def fetch_emoney_snapshot_data(self): + data = self.fetch_emoney_data() + return data.get("snapshot") or {"accounts": []} + + def fetch_emoney_transaction_data(self): + data = self.fetch_emoney_data() + return {"transactions": data.get("transactions") or []} + + def fetch_emoney_data(self) -> dict[str, object]: + cache = self._read_emoney_cache() or {} + snapshot = cache.get("snapshot") + transactions = cache.get("transactions") + + missing_snapshot = snapshot is None + missing_transactions = transactions is None + + if missing_snapshot or missing_transactions: + scraper = self._get_scraper() + if missing_snapshot: + snapshot = scraper.get_snapshot() + if missing_transactions: + transactions = scraper.get_transactions_parsed() + payload: dict[str, object] = {} + if missing_snapshot: + payload["snapshot"] = snapshot + if missing_transactions: + payload["transactions"] = transactions + if payload: + self._write_emoney_cache(payload) + + return { + "snapshot": snapshot, + "transactions": transactions, + } + def write_akahu_snapshot_data(self, data): with self.dbconnection.cursor() as cursor: for account in data.get("items", []): - cursor.execute( - """ - INSERT INTO rawsnapshot (data, source) - VALUES (%s, %s) - ON CONFLICT (raw_sha256) DO NOTHING - """, - (json.dumps(account), "akahu") - ) - self.dbconnection.commit() - - def write_akahu_transaction_data(self, data): - with self.dbconnection.cursor() as cursor: - for transaction in data.get("items", []): - cursor.execute( - """ - INSERT INTO rawtransactions (data, source) - VALUES (%s, %s) - ON CONFLICT (raw_sha256) DO NOTHING - """, - (json.dumps(transaction), "akahu") - ) - self.dbconnection.commit() - - def write_emoney_snapshot_data(self, data): - with self.dbconnection.cursor() as cursor: - for account in data.get("accounts", []): cursor.execute( """ INSERT INTO rawsnapshots (data, source) VALUES (%s, %s) ON CONFLICT (raw_sha256) DO NOTHING """, - (json.dumps(account), "emoney") + (json.dumps(account, default=str), "akahu") ) self.dbconnection.commit() - def write_emoney_transaction_data(self, data): + def write_akahu_transaction_data(self, data): + items = self._sort_transactions_oldest_first(data.get("items", [])) with self.dbconnection.cursor() as cursor: - for transaction in data.get("transactions", []): + for transaction in items: cursor.execute( """ INSERT INTO rawtransactions (data, source) VALUES (%s, %s) ON CONFLICT (raw_sha256) DO NOTHING """, - (json.dumps(transaction), "emoney") + (json.dumps(transaction, default=str), "akahu") ) - self.dbconnection.commit() \ No newline at end of file + self.dbconnection.commit() + + def write_emoney_snapshot_data(self, data): + accounts = self._sort_emoney_snapshots_oldest_first(data.get("accounts", [])) + with self.dbconnection.cursor() as cursor: + for account in accounts: + cursor.execute( + """ + INSERT INTO rawsnapshots (data, source) + VALUES (%s, %s) + ON CONFLICT (raw_sha256) DO NOTHING + """, + (json.dumps(account, default=str), "emoney") + ) + self.dbconnection.commit() + + def write_emoney_transaction_data(self, data): + transactions = data + if isinstance(data, dict): + transactions = data.get("transactions", []) + items = self._sort_transactions_oldest_first(transactions) + with self.dbconnection.cursor() as cursor: + for transaction in items: + cursor.execute( + """ + INSERT INTO rawtransactions (data, source) + VALUES (%s, %s) + ON CONFLICT (raw_sha256) DO NOTHING + """, + (json.dumps(transaction, default=str), "emoney") + ) + self.dbconnection.commit() + + def normalize_pending_data(self, source: str | None = None, limit: int | None = None): + normalizer = DataNormalizer(self.dbconnection) + transactions = normalizer.read_raw_transactions(source=source, limit=limit) + snapshots = normalizer.read_raw_snapshots(source=source, limit=limit) + normalizer.normalize_transactions(transactions) + normalizer.normalize_snapshots(snapshots) + + def _read_emoney_cache(self) -> dict[str, object] | None: + if not self._use_emoney_cache(): + return None + cache_path = self._emoney_cache_path() + if not cache_path.exists(): + return None + with cache_path.open("r", encoding="utf-8") as handle: + return json.load(handle) + + def _write_emoney_cache(self, payload: dict[str, object]) -> None: + if not self._use_emoney_cache(): + return + cache_path = self._emoney_cache_path() + existing: dict[str, object] = {} + if cache_path.exists(): + with cache_path.open("r", encoding="utf-8") as handle: + existing = json.load(handle) + existing.update(payload) + with cache_path.open("w", encoding="utf-8") as handle: + json.dump(existing, handle, default=str) + + def _emoney_cache_path(self) -> Path: + raw_path = os.getenv("EMONEY_CACHE_PATH") or "emoney_cache.json" + return Path(raw_path) + + def _use_emoney_cache(self) -> bool: + value = os.getenv("EMONEY_USE_CACHE", "false").strip().lower() + return value in {"1", "true", "yes"} + + def _get_scraper(self) -> Scraper: + if self.scraper is None: + self.scraper = Scraper(pw, headless=False) + return self.scraper + + def _sort_emoney_snapshots_oldest_first(self, accounts: list[dict[str, object]]) -> list[dict[str, object]]: + def key(item: dict[str, object]) -> tuple[bool, datetime]: + dt = self._parse_datetime(item.get("date")) + if dt is None: + return True, datetime.max + return False, dt + return sorted(accounts, key=key) + + def _sort_transactions_oldest_first(self, items: list[dict[str, object]]) -> list[dict[str, object]]: + def key(item: dict[str, object]) -> tuple[bool, datetime]: + dt = self._parse_datetime( + item.get("date") + or item.get("datetime") + or item.get("timestamp") + or item.get("created_at") + or item.get("effective_date") + ) + if dt is None: + return True, datetime.max + return False, dt + return sorted(items, key=key) + + @staticmethod + def _parse_datetime(value: object) -> datetime | None: + if isinstance(value, datetime): + return value + if isinstance(value, date): + return datetime.combine(value, datetime.min.time()) + if isinstance(value, (int, float)): + try: + return datetime.fromtimestamp(value) + except Exception: + return None + if isinstance(value, str): + text = value.strip() + if text.endswith("Z"): + text = f"{text[:-1]}+00:00" + try: + return datetime.fromisoformat(text) + except ValueError: + for fmt in ("%Y-%m-%d", "%d-%m-%Y", "%Y/%m/%d", "%d/%m/%Y"): + try: + return datetime.strptime(text, fmt) + except ValueError: + continue + return None + + @staticmethod + def _require_env(name: str) -> str: + value = os.getenv(name) + if not value: + raise ValueError(f"Please set {name} in your environment.") + return value \ No newline at end of file diff --git a/IngestionService/normalizer.py b/IngestionService/normalizer.py new file mode 100644 index 0000000..c32af8f --- /dev/null +++ b/IngestionService/normalizer.py @@ -0,0 +1,388 @@ +from __future__ import annotations + +from datetime import date, datetime +from typing import Any, Iterable + + +class DataNormalizer: + def __init__(self, dbconnection): + self.dbconnection = dbconnection + self._akahu_account_cache = self._build_akahu_account_cache() + + def read_raw_transactions(self, source: str | None = None, limit: int | None = None) -> list[dict[str, Any]]: + sql = "SELECT id, data, source FROM rawtransactions WHERE processed = FALSE" + params: list[Any] = [] + if source: + sql += " AND source = %s" + params.append(source) + sql += " ORDER BY received_at ASC" + if limit is not None: + sql += " LIMIT %s" + params.append(limit) + + with self.dbconnection.cursor() as cursor: + cursor.execute(sql, params) + rows = cursor.fetchall() + + return [{"id": row[0], "data": row[1], "source": row[2]} for row in rows] + + def read_raw_snapshots(self, source: str | None = None, limit: int | None = None) -> list[dict[str, Any]]: + sql = "SELECT id, data, source FROM rawsnapshots WHERE processed = FALSE" + params: list[Any] = [] + if source: + sql += " AND source = %s" + params.append(source) + sql += " ORDER BY received_at ASC" + if limit is not None: + sql += " LIMIT %s" + params.append(limit) + + with self.dbconnection.cursor() as cursor: + cursor.execute(sql, params) + rows = cursor.fetchall() + + return [{"id": row[0], "data": row[1], "source": row[2]} for row in rows] + + def normalize_transactions(self, records: Iterable[dict[str, Any]]) -> None: + for record in records: + normalized = self._normalize_transaction(record.get("data"), record.get("source")) + if not normalized: + continue + self._write_transaction(normalized) + self._mark_processed("rawtransactions", record["id"]) + + self.dbconnection.commit() + + def normalize_snapshots(self, records: Iterable[dict[str, Any]]) -> None: + for record in records: + normalized = self._normalize_snapshot(record.get("data"), record.get("source")) + if not normalized: + continue + self._write_snapshot(normalized) + self._mark_processed("rawsnapshots", record["id"]) + + self.dbconnection.commit() + + def _normalize_transaction(self, data: Any, source: str | None) -> dict[str, Any] | None: + if not isinstance(data, dict): + return None + + if source == "emoney": + return self._normalize_emoney_transaction(data) + if source == "akahu": + return self._normalize_akahu_transaction(data) + + return None + + def _normalize_snapshot(self, data: Any, source: str | None) -> dict[str, Any] | None: + if not isinstance(data, dict): + return None + + if source == "emoney": + return self._normalize_emoney_snapshot(data) + if source == "akahu": + return self._normalize_akahu_snapshot(data) + + return None + + def _normalize_emoney_transaction(self, data: dict[str, Any]) -> dict[str, Any] | None: + required = {"date", "description", "amount"} + if not required.issubset(data): + return None + + parsed_date = self._parse_date(data.get("date")) + if not parsed_date: + return None + + return { + "datetime": parsed_date, + "description": data.get("description"), + "amount": float(data.get("amount")), + "account_name": "Emoney", + "account_num": "emoney", + "org_name": "Emoney", + "vendor_name": "Finance Now", + } + + def _normalize_emoney_snapshot(self, data: dict[str, Any]) -> dict[str, Any] | None: + if "balance" not in data: + return None + + parsed_date = self._parse_date(data.get("date")) or date.today() + return { + "datetime": parsed_date, + "balance": float(data.get("balance")), + "account_name": "Emoney", + "account_num": "emoney", + "org_name": "Emoney", + } + + def _normalize_akahu_transaction(self, data: dict[str, Any]) -> dict[str, Any] | None: + if "amount" not in data or "description" not in data: + return None + + parsed_date = self._parse_date(data.get("date") or data.get("created_at")) + if not parsed_date: + return None + + account_id = self._string_or_none(data.get("_account") or data.get("account_id") or data.get("account")) + account_meta = self._akahu_account_cache.get(account_id or "", {}) + account_num = account_meta.get("account_num") or account_id + account_name = account_meta.get("account_name") or self._string_or_none(data.get("account_name") or data.get("name")) + org_name = account_meta.get("org_name") or "unknown" + + merchant = data.get("merchant") + if isinstance(merchant, dict): + vendor_name = self._string_or_none(merchant.get("name")) + else: + vendor_name = self._string_or_none(merchant) + if not vendor_name: + vendor_name = self._string_or_none(data.get("payee")) + if not vendor_name: + vendor_name = self._string_or_none(data.get("description")) + + description = self._format_akahu_description(data) + + return { + "datetime": parsed_date, + "description": description, + "amount": float(data.get("amount")), + "account_name": account_name, + "account_num": account_num, + "org_name": org_name, + "vendor_name": vendor_name, + } + + def _normalize_akahu_snapshot(self, data: dict[str, Any]) -> dict[str, Any] | None: + if "balance" not in data and "current_balance" not in data: + return None + + parsed_date = self._parse_date(data.get("date") or data.get("updated_at")) or date.today() + balance_value = self._parse_balance_value(data.get("balance") or data.get("current_balance")) + if balance_value is None: + return None + + account_num = self._string_or_none(data.get("formatted_account") or data.get("_id") or data.get("account_id")) + connection = data.get("connection") if isinstance(data.get("connection"), dict) else {} + org_name = self._string_or_none(connection.get("name")) or "unknown" + return { + "datetime": parsed_date, + "balance": balance_value, + "account_name": self._string_or_none(data.get("name") or data.get("account_name")), + "account_num": account_num, + "org_name": org_name, + } + + def _build_akahu_account_cache(self) -> dict[str, dict[str, str]]: + cache: dict[str, dict[str, str]] = {} + with self.dbconnection.cursor() as cursor: + cursor.execute( + "SELECT data FROM rawsnapshots WHERE source = %s", + ("akahu",) + ) + rows = cursor.fetchall() + + for (data,) in rows: + if not isinstance(data, dict): + continue + account_id = self._string_or_none(data.get("_id") or data.get("id")) + if not account_id: + continue + account_num = self._string_or_none(data.get("formatted_account") or data.get("_id") or data.get("account_id")) + account_name = self._string_or_none(data.get("name") or data.get("account_name")) + connection = data.get("connection") if isinstance(data.get("connection"), dict) else {} + org_name = self._string_or_none(connection.get("name")) or "unknown" + cache[account_id] = { + "account_num": account_num or account_id, + "account_name": account_name or "unknown", + "org_name": org_name, + } + + return cache + + def _format_akahu_description(self, data: dict[str, Any]) -> str: + description = self._string_or_none(data.get("description")) or "unknown" + meta = data.get("meta") if isinstance(data.get("meta"), dict) else {} + + other_account = self._string_or_none(meta.get("other_account")) + reference = self._string_or_none(meta.get("reference")) + particulars = self._string_or_none(meta.get("particulars")) + code = self._string_or_none(meta.get("code")) + + if "INTERNET XFR" in description: + target = other_account or reference or particulars or code + if target: + description = description.replace("INTERNET XFR", f"-> {target}") + + meta_bits: list[str] = [] + if reference: + meta_bits.append(f"ref={reference}") + if particulars: + meta_bits.append(f"particulars={particulars}") + if code: + meta_bits.append(f"code={code}") + if other_account: + meta_bits.append(f"other={other_account}") + + if meta_bits: + description = f"{description} | " + " | ".join(meta_bits) + + return description + + def _write_transaction(self, normalized: dict[str, Any]) -> None: + org_id = self._get_or_create_org(normalized.get("org_name")) + account_id = self._get_or_create_account( + normalized.get("account_num"), + normalized.get("account_name"), + org_id, + ) + vendor_id = None + vendor_name = normalized.get("vendor_name") + if vendor_name: + vendor_id = self._get_or_create_vendor(vendor_name, org_id) + + with self.dbconnection.cursor() as cursor: + cursor.execute( + """ + INSERT INTO transactions (datetime, description, amount, accountid, orgid, vendorid) + VALUES (%s, %s, %s, %s, %s, %s) + """, + ( + normalized.get("datetime"), + normalized.get("description"), + normalized.get("amount"), + account_id, + org_id, + vendor_id, + ), + ) + + def _write_snapshot(self, normalized: dict[str, Any]) -> None: + org_id = self._get_or_create_org(normalized.get("org_name")) + account_id = self._get_or_create_account( + normalized.get("account_num"), + normalized.get("account_name"), + org_id, + ) + + with self.dbconnection.cursor() as cursor: + cursor.execute( + """ + INSERT INTO snapshots (datetime, accountid, balance, orgid) + VALUES (%s, %s, %s, %s) + """, + ( + normalized.get("datetime"), + account_id, + normalized.get("balance"), + org_id, + ), + ) + + def _mark_processed(self, table: str, record_id: int) -> None: + with self.dbconnection.cursor() as cursor: + cursor.execute( + f"UPDATE {table} SET processed = TRUE WHERE id = %s", + (record_id,), + ) + + def _get_or_create_org(self, org_name: str | None) -> int: + org_name = org_name or "unknown" + with self.dbconnection.cursor() as cursor: + cursor.execute("SELECT id FROM organizations WHERE orgname = %s", (org_name,)) + row = cursor.fetchone() + if row: + return row[0] + + cursor.execute( + "INSERT INTO organizations (orgname) VALUES (%s) RETURNING id", + (org_name,), + ) + return cursor.fetchone()[0] + + def _get_or_create_account(self, account_num: str | None, account_name: str | None, org_id: int) -> int: + account_name = account_name or "unknown" + account_num = account_num or f"{org_id}:{account_name}" + with self.dbconnection.cursor() as cursor: + cursor.execute("SELECT id FROM accounts WHERE accountnum = %s", (account_num,)) + row = cursor.fetchone() + if row: + return row[0] + + cursor.execute( + """ + INSERT INTO accounts (accountnum, accountname, orgid) + VALUES (%s, %s, %s) + RETURNING id + """, + (account_num, account_name, org_id), + ) + return cursor.fetchone()[0] + + def _get_or_create_vendor(self, vendor_name: str, org_id: int) -> int: + if vendor_name == "Finance Now": + with self.dbconnection.cursor() as cursor: + cursor.execute( + "SELECT id FROM vendors WHERE vendorname = %s", + (vendor_name,), + ) + row = cursor.fetchone() + if row: + return row[0] + with self.dbconnection.cursor() as cursor: + cursor.execute( + "SELECT id FROM vendors WHERE vendorname = %s AND orgid = %s", + (vendor_name, org_id), + ) + row = cursor.fetchone() + if row: + return row[0] + + cursor.execute( + "INSERT INTO vendors (vendorname, orgid) VALUES (%s, %s) RETURNING id", + (vendor_name, org_id), + ) + return cursor.fetchone()[0] + + @staticmethod + def _parse_date(value: Any) -> date | None: + if isinstance(value, date) and not isinstance(value, datetime): + return value + if isinstance(value, datetime): + return value.date() + if isinstance(value, str): + for fmt in ( + "%Y-%m-%d", + "%d-%m-%Y", + "%Y-%m-%dT%H:%M:%S%z", + "%Y-%m-%dT%H:%M:%SZ", + "%Y-%m-%dT%H:%M:%S.%fZ", + ): + try: + return datetime.strptime(value, fmt).date() + except ValueError: + continue + return None + + @staticmethod + def _string_or_none(value: Any) -> str | None: + if value is None: + return None + return str(value) + + @staticmethod + def _parse_balance_value(value: Any) -> float | None: + if value is None: + return None + if isinstance(value, (int, float)): + return float(value) + if isinstance(value, str): + try: + return float(value) + except ValueError: + return None + if isinstance(value, dict): + for key in ("amount", "value", "balance"): + if key in value: + return DataNormalizer._parse_balance_value(value.get(key)) + return None diff --git a/config.py b/config.py new file mode 100644 index 0000000..a450bcc --- /dev/null +++ b/config.py @@ -0,0 +1,19 @@ +from __future__ import annotations + +from pathlib import Path +from dotenv import load_dotenv + + +REPO_ROOT = Path(__file__).resolve().parent + + +def load_env(service: str | None = None) -> None: + root_env = REPO_ROOT / ".env" + if root_env.exists(): + load_dotenv(root_env) + + if service: + service_env = REPO_ROOT / service / ".env" + if service_env.exists(): + # Service-specific values should override repo defaults. + load_dotenv(service_env, override=True) diff --git a/emoney_cache.json b/emoney_cache.json new file mode 100644 index 0000000..8c1f866 --- /dev/null +++ b/emoney_cache.json @@ -0,0 +1 @@ +{"snapshot": {"accounts": [{"date": "2026-05-19", "balance": 12889.76}]}, "transactions": [{"date": "2026-05-15", "description": "Normal Payment", "amount": 215.32, "balance": 12889.76}, {"date": "2026-05-04", "description": "Monthly Fee", "amount": 2.5, "balance": 13105.08}, {"date": "2026-05-04", "description": "Debit Interest", "amount": 321.69, "balance": 13102.58}, {"date": "2026-05-01", "description": "Normal Payment", "amount": 215.32, "balance": 12780.89}, {"date": "2026-04-17", "description": "Normal Payment", "amount": 215.32, "balance": 12996.21}, {"date": "2026-04-04", "description": "Monthly Fee", "amount": 2.5, "balance": 13211.53}, {"date": "2026-04-04", "description": "Debit Interest", "amount": 335.93, "balance": 13209.03}, {"date": "2026-04-03", "description": "Normal Payment", "amount": 215.32, "balance": 12873.1}, {"date": "2026-03-20", "description": "Normal Payment", "amount": 215.32, "balance": 13088.42}, {"date": "2026-03-06", "description": "Normal Payment", "amount": 215.32, "balance": 13303.74}, {"date": "2026-03-04", "description": "Monthly Fee", "amount": 2.5, "balance": 13519.06}, {"date": "2026-03-04", "description": "Debit Interest", "amount": 306.68, "balance": 13516.56}, {"date": "2026-02-20", "description": "Normal Payment", "amount": 215.32, "balance": 13209.88}, {"date": "2026-02-06", "description": "Normal Payment", "amount": 215.32, "balance": 13425.2}, {"date": "2026-02-04", "description": "Penalty Interest", "amount": 0.02, "balance": 13640.52}, {"date": "2026-02-04", "description": "Monthly Fee", "amount": 2.5, "balance": 13640.5}, {"date": "2026-02-04", "description": "Debit Interest", "amount": 342.44, "balance": 13638.0}, {"date": "2026-01-23", "description": "Normal Payment", "amount": 215.32, "balance": 13295.56}, {"date": "2026-01-09", "description": "Normal Payment", "amount": 215.32, "balance": 13510.88}, {"date": "2026-01-04", "description": "Monthly Fee", "amount": 2.5, "balance": 13726.2}, {"date": "2026-01-04", "description": "Debit Interest", "amount": 345.94, "balance": 13723.7}, {"date": "2025-12-29", "description": "Retry Normal Payment", "amount": 215.32, "balance": 13377.76}, {"date": "2025-12-26", "description": "Normal Payment", "amount": 215.32, "balance": 13377.76}, {"date": "2025-12-12", "description": "Normal Payment", "amount": 215.32, "balance": 13593.08}, {"date": "2025-12-04", "description": "Monthly Fee", "amount": 2.5, "balance": 13808.4}, {"date": "2025-12-04", "description": "Debit Interest", "amount": 335.42, "balance": 13805.9}, {"date": "2025-11-28", "description": "Normal Payment", "amount": 215.32, "balance": 13470.48}, {"date": "2025-11-19", "description": "Dishonour Fee", "amount": 5.0, "balance": 13685.8}, {"date": "2025-11-17", "description": "Retry Normal Payment", "amount": 215.32, "balance": 13465.48}, {"date": "2025-11-14", "description": "Normal Payment", "amount": 215.32, "balance": 13465.48}, {"date": "2025-11-04", "description": "Insurance Premium", "amount": 1465.8, "balance": 13680.8}, {"date": "2025-11-04", "description": "Booking Fee", "amount": 215.0, "balance": 12215.0}, {"date": "2025-11-04", "description": "Disbursement", "amount": 12000.0, "balance": 12000.0}]} \ No newline at end of file diff --git a/main.py b/main.py index 39f5c9b..bd5b4ad 100644 --- a/main.py +++ b/main.py @@ -1,7 +1,27 @@ from IngestionService.ingester import Ingester +from config import load_env +load_env("IngestionService") if __name__ == "__main__": ingester = Ingester() - data = ingester.test_scraper() - ingester.write_emoney_snapshot_data(data) + ingester.test_db_connection() + + akahu_accounts = ingester.fetch_akahu_snapshot_data() + print("Akahu accounts fetched:", len(akahu_accounts.get("items", []))) + ingester.write_akahu_snapshot_data(akahu_accounts) + + backfill_start = "2026-01-01" + backfill_end = "2026-12-31" + akahu_backfill = ingester.backfill_akahu_transactions(backfill_start, backfill_end) + print("Akahu backfill transactions:", len(akahu_backfill.get("items", []))) + + emoney_data = ingester.fetch_emoney_data() + emoney_snapshot = emoney_data.get("snapshot") or {"accounts": []} + emoney_transactions = emoney_data.get("transactions") or [] + print("Emoney snapshot accounts:", len(emoney_snapshot.get("accounts", []))) + ingester.write_emoney_snapshot_data(emoney_snapshot) + print("Emoney transactions:", len(emoney_transactions)) + ingester.write_emoney_transaction_data(emoney_transactions) + + ingester.normalize_pending_data() diff --git a/sql/database-init/database definitions.sql b/sql/database-init/database definitions.sql index 69481b5..7a084d7 100644 --- a/sql/database-init/database definitions.sql +++ b/sql/database-init/database definitions.sql @@ -1,5 +1,7 @@ CREATE DATABASE financial_data; +CREATE EXTENSION IF NOT EXISTS pgcrypto; + DO $$ BEGIN IF NOT EXISTS ( @@ -78,6 +80,28 @@ raw_sha256 CHAR(64) UNIQUE, processed BOOLEAN NOT NULL DEFAULT FALSE ); +CREATE OR REPLACE FUNCTION set_raw_sha256() +RETURNS TRIGGER AS $$ +BEGIN + IF NEW.raw_sha256 IS NULL THEN + NEW.raw_sha256 := encode(digest(convert_to(NEW.data::text, 'UTF8'), 'sha256'), 'hex'); + END IF; + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +DROP TRIGGER IF EXISTS rawtransactions_set_sha256 ON rawtransactions; +CREATE TRIGGER rawtransactions_set_sha256 +BEFORE INSERT OR UPDATE OF data ON rawtransactions +FOR EACH ROW +EXECUTE FUNCTION set_raw_sha256(); + +DROP TRIGGER IF EXISTS rawsnapshots_set_sha256 ON rawsnapshots; +CREATE TRIGGER rawsnapshots_set_sha256 +BEFORE INSERT OR UPDATE OF data ON rawsnapshots +FOR EACH ROW +EXECUTE FUNCTION set_raw_sha256(); + CREATE TABLE IF NOT EXISTS currency( id SERIAL PRIMARY KEY, currencycode CHAR(3) UNIQUE NOT NULL, @@ -86,15 +110,20 @@ currencyname VARCHAR(50) NOT NULL CREATE TABLE IF NOT EXISTS funds( id SERIAL PRIMARY KEY, +fundid VARCHAR(100) UNIQUE NOT NULL, name VARCHAR(100) NOT NULL, +symbol VARCHAR(50), +currencyid INT REFERENCES currency(id) ON DELETE RESTRICT +); + +CREATE TABLE IF NOT EXISTS fund_positions( +id SERIAL PRIMARY KEY, datetime DATE NOT NULL, -amount REAL NOT NULL, accountid INT REFERENCES accounts(id) ON DELETE RESTRICT, orgid INT REFERENCES organizations(id) ON DELETE RESTRICT, -fundid VARCHAR(100) NOT NULL, +fundid INT REFERENCES funds(id) ON DELETE RESTRICT, value REAL NOT NULL, -shares REAL NOT NULL, -currencyid INT REFERENCES currency(id) ON DELETE RESTRICT +shares REAL NOT NULL ); --assets, liabilities, equity, income, expenses @@ -106,7 +135,7 @@ includeinnetworth BOOLEAN NOT NULL CREATE INDEX IF NOT EXISTS rawtransactions_data_idx ON rawtransactions USING GIN (data); CREATE INDEX IF NOT EXISTS rawtransactions_received_at_idx ON rawtransactions (received_at); -CREATE INDEX IF NOT EXISTS rawtransactions_account_org_idx ON rawtransactions (orgid, accountid); +--CREATE INDEX IF NOT EXISTS rawtransactions_account_org_idx ON rawtransactions (orgid, accountid); CREATE INDEX IF NOT EXISTS accounts_orgid_idx ON accounts (orgid); CREATE INDEX IF NOT EXISTS vendors_orgid_idx ON vendors (orgid); @@ -127,11 +156,299 @@ CREATE INDEX IF NOT EXISTS syncs_orgid_idx ON syncs (orgid); CREATE INDEX IF NOT EXISTS syncs_account_datetime_idx ON syncs (accountid, datetime); CREATE INDEX IF NOT EXISTS syncs_org_datetime_idx ON syncs (orgid, datetime); -CREATE INDEX IF NOT EXISTS rawtransactions_accountid_idx ON rawtransactions (accountid); -CREATE INDEX IF NOT EXISTS rawtransactions_orgid_idx ON rawtransactions (orgid); +--CREATE INDEX IF NOT EXISTS rawtransactions_accountid_idx ON rawtransactions (accountid); +--CREATE INDEX IF NOT EXISTS rawtransactions_orgid_idx ON rawtransactions (orgid); -CREATE INDEX IF NOT EXISTS funds_accountid_idx ON funds (accountid); -CREATE INDEX IF NOT EXISTS funds_orgid_idx ON funds (orgid); CREATE INDEX IF NOT EXISTS funds_currencyid_idx ON funds (currencyid); -CREATE INDEX IF NOT EXISTS funds_account_datetime_idx ON funds (accountid, datetime); -CREATE INDEX IF NOT EXISTS funds_org_datetime_idx ON funds (orgid, datetime); \ No newline at end of file + +CREATE INDEX IF NOT EXISTS fund_positions_accountid_idx ON fund_positions (accountid); +CREATE INDEX IF NOT EXISTS fund_positions_orgid_idx ON fund_positions (orgid); +CREATE INDEX IF NOT EXISTS fund_positions_fundid_idx ON fund_positions (fundid); +CREATE INDEX IF NOT EXISTS fund_positions_account_datetime_idx ON fund_positions (accountid, datetime); +CREATE INDEX IF NOT EXISTS fund_positions_org_datetime_idx ON fund_positions (orgid, datetime); + +-- Seed data +INSERT INTO organizations (orgname) +VALUES + ('BNZ'), + ('Sharesies'), + ('Emoney') +ON CONFLICT (orgname) DO NOTHING; + +INSERT INTO currency (currencycode, currencyname) +VALUES + ('NZD', 'New Zealand Dollar'), + ('USD', 'US Dollar') +ON CONFLICT (currencycode) DO NOTHING; + +INSERT INTO accounttypes (typename, includeinnetworth) +SELECT 'assets', TRUE +WHERE NOT EXISTS (SELECT 1 FROM accounttypes WHERE typename = 'assets'); +INSERT INTO accounttypes (typename, includeinnetworth) +SELECT 'liabilities', TRUE +WHERE NOT EXISTS (SELECT 1 FROM accounttypes WHERE typename = 'liabilities'); +INSERT INTO accounttypes (typename, includeinnetworth) +SELECT 'equity', TRUE +WHERE NOT EXISTS (SELECT 1 FROM accounttypes WHERE typename = 'equity'); +INSERT INTO accounttypes (typename, includeinnetworth) +SELECT 'income', FALSE +WHERE NOT EXISTS (SELECT 1 FROM accounttypes WHERE typename = 'income'); +INSERT INTO accounttypes (typename, includeinnetworth) +SELECT 'expenses', FALSE +WHERE NOT EXISTS (SELECT 1 FROM accounttypes WHERE typename = 'expenses'); + +INSERT INTO accounts (accountnum, accountname, orgid) +VALUES + ('02-1244-0271514-00', 'Income Account', (SELECT id FROM organizations WHERE orgname = 'BNZ')), + ('02-1244-0271514-04', 'Rent', (SELECT id FROM organizations WHERE orgname = 'BNZ')), + ('12-3497-0007278-01', 'Jethro''s Investments', (SELECT id FROM organizations WHERE orgname = 'Sharesies')), + ('acc_cmoc72wzs000502kz7lnu345l', 'Jethro''s KiwiSaver', (SELECT id FROM organizations WHERE orgname = 'Sharesies')), + ('acc_cmoc72x21000a02kz9j3ve87q', 'Jethro''s Rainy day fund', (SELECT id FROM organizations WHERE orgname = 'Sharesies')) +ON CONFLICT DO NOTHING; + +INSERT INTO vendors (vendorname, orgid) +SELECT 'PAK''nSAVE', (SELECT id FROM organizations WHERE orgname = 'BNZ') +WHERE NOT EXISTS (SELECT 1 FROM vendors WHERE vendorname = 'PAK''nSAVE' AND orgid = (SELECT id FROM organizations WHERE orgname = 'BNZ')); +INSERT INTO vendors (vendorname, orgid) +SELECT 'PAK''nSAVE Fuel', (SELECT id FROM organizations WHERE orgname = 'BNZ') +WHERE NOT EXISTS (SELECT 1 FROM vendors WHERE vendorname = 'PAK''nSAVE Fuel' AND orgid = (SELECT id FROM organizations WHERE orgname = 'BNZ')); +INSERT INTO vendors (vendorname, orgid) +SELECT 'BP', (SELECT id FROM organizations WHERE orgname = 'BNZ') +WHERE NOT EXISTS (SELECT 1 FROM vendors WHERE vendorname = 'BP' AND orgid = (SELECT id FROM organizations WHERE orgname = 'BNZ')); +INSERT INTO vendors (vendorname, orgid) +SELECT 'Mobil', (SELECT id FROM organizations WHERE orgname = 'BNZ') +WHERE NOT EXISTS (SELECT 1 FROM vendors WHERE vendorname = 'Mobil' AND orgid = (SELECT id FROM organizations WHERE orgname = 'BNZ')); +INSERT INTO vendors (vendorname, orgid) +SELECT 'Caltex', (SELECT id FROM organizations WHERE orgname = 'BNZ') +WHERE NOT EXISTS (SELECT 1 FROM vendors WHERE vendorname = 'Caltex' AND orgid = (SELECT id FROM organizations WHERE orgname = 'BNZ')); +INSERT INTO vendors (vendorname, orgid) +SELECT 'Woolworths', (SELECT id FROM organizations WHERE orgname = 'BNZ') +WHERE NOT EXISTS (SELECT 1 FROM vendors WHERE vendorname = 'Woolworths' AND orgid = (SELECT id FROM organizations WHERE orgname = 'BNZ')); +INSERT INTO vendors (vendorname, orgid) +SELECT 'New World', (SELECT id FROM organizations WHERE orgname = 'BNZ') +WHERE NOT EXISTS (SELECT 1 FROM vendors WHERE vendorname = 'New World' AND orgid = (SELECT id FROM organizations WHERE orgname = 'BNZ')); +INSERT INTO vendors (vendorname, orgid) +SELECT 'Super Liquor', (SELECT id FROM organizations WHERE orgname = 'BNZ') +WHERE NOT EXISTS (SELECT 1 FROM vendors WHERE vendorname = 'Super Liquor' AND orgid = (SELECT id FROM organizations WHERE orgname = 'BNZ')); +INSERT INTO vendors (vendorname, orgid) +SELECT 'Lotto', (SELECT id FROM organizations WHERE orgname = 'BNZ') +WHERE NOT EXISTS (SELECT 1 FROM vendors WHERE vendorname = 'Lotto' AND orgid = (SELECT id FROM organizations WHERE orgname = 'BNZ')); +INSERT INTO vendors (vendorname, orgid) +SELECT 'Auckland Transport', (SELECT id FROM organizations WHERE orgname = 'BNZ') +WHERE NOT EXISTS (SELECT 1 FROM vendors WHERE vendorname = 'Auckland Transport' AND orgid = (SELECT id FROM organizations WHERE orgname = 'BNZ')); +INSERT INTO vendors (vendorname, orgid) +SELECT 'Finance Now', (SELECT id FROM organizations WHERE orgname = 'BNZ') +WHERE NOT EXISTS (SELECT 1 FROM vendors WHERE vendorname = 'Finance Now' AND orgid = (SELECT id FROM organizations WHERE orgname = 'BNZ')); +INSERT INTO vendors (vendorname, orgid) +SELECT 'Gem Finance', (SELECT id FROM organizations WHERE orgname = 'BNZ') +WHERE NOT EXISTS (SELECT 1 FROM vendors WHERE vendorname = 'Gem Finance' AND orgid = (SELECT id FROM organizations WHERE orgname = 'BNZ')); +INSERT INTO vendors (vendorname, orgid) +SELECT 'Sharesies', (SELECT id FROM organizations WHERE orgname = 'BNZ') +WHERE NOT EXISTS (SELECT 1 FROM vendors WHERE vendorname = 'Sharesies' AND orgid = (SELECT id FROM organizations WHERE orgname = 'BNZ')); +INSERT INTO vendors (vendorname, orgid) +SELECT 'Feijoa', (SELECT id FROM organizations WHERE orgname = 'BNZ') +WHERE NOT EXISTS (SELECT 1 FROM vendors WHERE vendorname = 'Feijoa' AND orgid = (SELECT id FROM organizations WHERE orgname = 'BNZ')); +INSERT INTO vendors (vendorname, orgid) +SELECT 'Spark', (SELECT id FROM organizations WHERE orgname = 'BNZ') +WHERE NOT EXISTS (SELECT 1 FROM vendors WHERE vendorname = 'Spark' AND orgid = (SELECT id FROM organizations WHERE orgname = 'BNZ')); +INSERT INTO vendors (vendorname, orgid) +SELECT 'Asteron Life', (SELECT id FROM organizations WHERE orgname = 'BNZ') +WHERE NOT EXISTS (SELECT 1 FROM vendors WHERE vendorname = 'Asteron Life' AND orgid = (SELECT id FROM organizations WHERE orgname = 'BNZ')); +INSERT INTO vendors (vendorname, orgid) +SELECT 'AA Insurance', (SELECT id FROM organizations WHERE orgname = 'BNZ') +WHERE NOT EXISTS (SELECT 1 FROM vendors WHERE vendorname = 'AA Insurance' AND orgid = (SELECT id FROM organizations WHERE orgname = 'BNZ')); +INSERT INTO vendors (vendorname, orgid) +SELECT 'Mercury', (SELECT id FROM organizations WHERE orgname = 'BNZ') +WHERE NOT EXISTS (SELECT 1 FROM vendors WHERE vendorname = 'Mercury' AND orgid = (SELECT id FROM organizations WHERE orgname = 'BNZ')); +INSERT INTO vendors (vendorname, orgid) +SELECT 'YouTube', (SELECT id FROM organizations WHERE orgname = 'BNZ') +WHERE NOT EXISTS (SELECT 1 FROM vendors WHERE vendorname = 'YouTube' AND orgid = (SELECT id FROM organizations WHERE orgname = 'BNZ')); +INSERT INTO vendors (vendorname, orgid) +SELECT 'Netflix', (SELECT id FROM organizations WHERE orgname = 'BNZ') +WHERE NOT EXISTS (SELECT 1 FROM vendors WHERE vendorname = 'Netflix' AND orgid = (SELECT id FROM organizations WHERE orgname = 'BNZ')); +INSERT INTO vendors (vendorname, orgid) +SELECT 'The Warehouse', (SELECT id FROM organizations WHERE orgname = 'BNZ') +WHERE NOT EXISTS (SELECT 1 FROM vendors WHERE vendorname = 'The Warehouse' AND orgid = (SELECT id FROM organizations WHERE orgname = 'BNZ')); +INSERT INTO vendors (vendorname, orgid) +SELECT 'Pit Stop', (SELECT id FROM organizations WHERE orgname = 'BNZ') +WHERE NOT EXISTS (SELECT 1 FROM vendors WHERE vendorname = 'Pit Stop' AND orgid = (SELECT id FROM organizations WHERE orgname = 'BNZ')); +INSERT INTO vendors (vendorname, orgid) +SELECT 'Lucky Star Bakery', (SELECT id FROM organizations WHERE orgname = 'BNZ') +WHERE NOT EXISTS (SELECT 1 FROM vendors WHERE vendorname = 'Lucky Star Bakery' AND orgid = (SELECT id FROM organizations WHERE orgname = 'BNZ')); +INSERT INTO vendors (vendorname, orgid) +SELECT 'Google', (SELECT id FROM organizations WHERE orgname = 'BNZ') +WHERE NOT EXISTS (SELECT 1 FROM vendors WHERE vendorname = 'Google' AND orgid = (SELECT id FROM organizations WHERE orgname = 'BNZ')); +INSERT INTO vendors (vendorname, orgid) +SELECT 'IT Live Limited', (SELECT id FROM organizations WHERE orgname = 'BNZ') +WHERE NOT EXISTS (SELECT 1 FROM vendors WHERE vendorname = 'IT Live Limited' AND orgid = (SELECT id FROM organizations WHERE orgname = 'BNZ')); + +INSERT INTO funds (fundid, name, symbol, currencyid) +VALUES + ('91232ea0-548b-47a1-93a0-495cbf40fbd9', 'Pie Global Growth 2 Fund', 'SKS.FUND.PGG2', (SELECT id FROM currency WHERE currencycode = 'NZD')), + ('d2d85cc8-1884-4512-ab2e-1a7110498247', 'Smartshares Growth Fund', 'SKS.FUND.SLGF', (SELECT id FROM currency WHERE currencycode = 'NZD')), + ('b627459d-e626-4afa-9a6f-b980e47120da', 'Sharesies US500 Fund', 'SKS.FUND.SVOO', (SELECT id FROM currency WHERE currencycode = 'NZD')), + ('4da3e348-95da-4872-9ac9-00978ab263af', 'VanEck Semiconductor ETF', 'SKS.NASDAQ.SMH', (SELECT id FROM currency WHERE currencycode = 'NZD')), + ('fdcfd374-403f-412d-9a38-1a497c3ec36b', 'Smartshares Global Equities Responsible Fund', 'SKS.NZX.ESG', (SELECT id FROM currency WHERE currencycode = 'NZD')), + ('8cd3115a-831b-4a5a-9cdd-a5d523c6814f', 'Smart Asia Pacific ETF', 'APA', (SELECT id FROM currency WHERE currencycode = 'NZD')), + ('84fb0b94-e7dd-4a48-996d-fd261b781c11', 'Smart Emerging Markets ETF', 'EMF', (SELECT id FROM currency WHERE currencycode = 'NZD')), + ('68d32bbf-13e7-46e3-b735-1a7bb91ad481', 'Smart Europe ETF', 'EUF', (SELECT id FROM currency WHERE currencycode = 'NZD')), + ('77866efa-d81e-4f71-beaf-371bb210ac8c', 'Smart NZ Top 50 ETF', 'FNZ', (SELECT id FROM currency WHERE currencycode = 'NZD')), + ('67a17798-c341-4af3-a06e-5a621d342adb', 'Smart Australian Top 20 ETF', 'OZY', (SELECT id FROM currency WHERE currencycode = 'NZD')), + ('af87fb44-ebf6-4239-ba08-ae1cc9a6461c', 'Smart US 500 ETF', 'USF', (SELECT id FROM currency WHERE currencycode = 'NZD')) +ON CONFLICT (fundid) DO NOTHING; + +INSERT INTO fund_positions (datetime, accountid, orgid, fundid, value, shares) +SELECT + '2026-05-18', + (SELECT id FROM accounts WHERE accountname = 'Jethro''s KiwiSaver'), + (SELECT id FROM organizations WHERE orgname = 'Sharesies'), + (SELECT id FROM funds WHERE fundid = '91232ea0-548b-47a1-93a0-495cbf40fbd9'), + 2818.92, + 1812.93 +WHERE NOT EXISTS ( + SELECT 1 FROM fund_positions + WHERE fundid = (SELECT id FROM funds WHERE fundid = '91232ea0-548b-47a1-93a0-495cbf40fbd9') + AND accountid = (SELECT id FROM accounts WHERE accountname = 'Jethro''s KiwiSaver') + AND datetime = '2026-05-18' +); +INSERT INTO fund_positions (datetime, accountid, orgid, fundid, value, shares) +SELECT + '2026-05-18', + (SELECT id FROM accounts WHERE accountname = 'Jethro''s KiwiSaver'), + (SELECT id FROM organizations WHERE orgname = 'Sharesies'), + (SELECT id FROM funds WHERE fundid = 'd2d85cc8-1884-4512-ab2e-1a7110498247'), + 2792.47, + 1940.03 +WHERE NOT EXISTS ( + SELECT 1 FROM fund_positions + WHERE fundid = (SELECT id FROM funds WHERE fundid = 'd2d85cc8-1884-4512-ab2e-1a7110498247') + AND accountid = (SELECT id FROM accounts WHERE accountname = 'Jethro''s KiwiSaver') + AND datetime = '2026-05-18' +); +INSERT INTO fund_positions (datetime, accountid, orgid, fundid, value, shares) +SELECT + '2026-05-18', + (SELECT id FROM accounts WHERE accountname = 'Jethro''s KiwiSaver'), + (SELECT id FROM organizations WHERE orgname = 'Sharesies'), + (SELECT id FROM funds WHERE fundid = 'b627459d-e626-4afa-9a6f-b980e47120da'), + 1665.11, + 1254.6 +WHERE NOT EXISTS ( + SELECT 1 FROM fund_positions + WHERE fundid = (SELECT id FROM funds WHERE fundid = 'b627459d-e626-4afa-9a6f-b980e47120da') + AND accountid = (SELECT id FROM accounts WHERE accountname = 'Jethro''s KiwiSaver') + AND datetime = '2026-05-18' +); +INSERT INTO fund_positions (datetime, accountid, orgid, fundid, value, shares) +SELECT + '2026-05-18', + (SELECT id FROM accounts WHERE accountname = 'Jethro''s KiwiSaver'), + (SELECT id FROM organizations WHERE orgname = 'Sharesies'), + (SELECT id FROM funds WHERE fundid = '4da3e348-95da-4872-9ac9-00978ab263af'), + 304.41, + 153.12 +WHERE NOT EXISTS ( + SELECT 1 FROM fund_positions + WHERE fundid = (SELECT id FROM funds WHERE fundid = '4da3e348-95da-4872-9ac9-00978ab263af') + AND accountid = (SELECT id FROM accounts WHERE accountname = 'Jethro''s KiwiSaver') + AND datetime = '2026-05-18' +); +INSERT INTO fund_positions (datetime, accountid, orgid, fundid, value, shares) +SELECT + '2026-05-18', + (SELECT id FROM accounts WHERE accountname = 'Jethro''s KiwiSaver'), + (SELECT id FROM organizations WHERE orgname = 'Sharesies'), + (SELECT id FROM funds WHERE fundid = 'fdcfd374-403f-412d-9a38-1a497c3ec36b'), + 465.52, + 283.3 +WHERE NOT EXISTS ( + SELECT 1 FROM fund_positions + WHERE fundid = (SELECT id FROM funds WHERE fundid = 'fdcfd374-403f-412d-9a38-1a497c3ec36b') + AND accountid = (SELECT id FROM accounts WHERE accountname = 'Jethro''s KiwiSaver') + AND datetime = '2026-05-18' +); + +INSERT INTO fund_positions (datetime, accountid, orgid, fundid, value, shares) +SELECT + '2026-05-18', + (SELECT id FROM accounts WHERE accountname = 'Jethro''s Investments'), + (SELECT id FROM organizations WHERE orgname = 'Sharesies'), + (SELECT id FROM funds WHERE fundid = '8cd3115a-831b-4a5a-9cdd-a5d523c6814f'), + 19.89, + 5.26 +WHERE NOT EXISTS ( + SELECT 1 FROM fund_positions + WHERE fundid = (SELECT id FROM funds WHERE fundid = '8cd3115a-831b-4a5a-9cdd-a5d523c6814f') + AND accountid = (SELECT id FROM accounts WHERE accountname = 'Jethro''s Investments') + AND datetime = '2026-05-18' +); +INSERT INTO fund_positions (datetime, accountid, orgid, fundid, value, shares) +SELECT + '2026-05-18', + (SELECT id FROM accounts WHERE accountname = 'Jethro''s Investments'), + (SELECT id FROM organizations WHERE orgname = 'Sharesies'), + (SELECT id FROM funds WHERE fundid = '84fb0b94-e7dd-4a48-996d-fd261b781c11'), + 14.6, + 7.33 +WHERE NOT EXISTS ( + SELECT 1 FROM fund_positions + WHERE fundid = (SELECT id FROM funds WHERE fundid = '84fb0b94-e7dd-4a48-996d-fd261b781c11') + AND accountid = (SELECT id FROM accounts WHERE accountname = 'Jethro''s Investments') + AND datetime = '2026-05-18' +); +INSERT INTO fund_positions (datetime, accountid, orgid, fundid, value, shares) +SELECT + '2026-05-18', + (SELECT id FROM accounts WHERE accountname = 'Jethro''s Investments'), + (SELECT id FROM organizations WHERE orgname = 'Sharesies'), + (SELECT id FROM funds WHERE fundid = '68d32bbf-13e7-46e3-b735-1a7bb91ad481'), + 19.28, + 6.6 +WHERE NOT EXISTS ( + SELECT 1 FROM fund_positions + WHERE fundid = (SELECT id FROM funds WHERE fundid = '68d32bbf-13e7-46e3-b735-1a7bb91ad481') + AND accountid = (SELECT id FROM accounts WHERE accountname = 'Jethro''s Investments') + AND datetime = '2026-05-18' +); +INSERT INTO fund_positions (datetime, accountid, orgid, fundid, value, shares) +SELECT + '2026-05-18', + (SELECT id FROM accounts WHERE accountname = 'Jethro''s Investments'), + (SELECT id FROM organizations WHERE orgname = 'Sharesies'), + (SELECT id FROM funds WHERE fundid = '77866efa-d81e-4f71-beaf-371bb210ac8c'), + 3.9, + 1.29 +WHERE NOT EXISTS ( + SELECT 1 FROM fund_positions + WHERE fundid = (SELECT id FROM funds WHERE fundid = '77866efa-d81e-4f71-beaf-371bb210ac8c') + AND accountid = (SELECT id FROM accounts WHERE accountname = 'Jethro''s Investments') + AND datetime = '2026-05-18' +); +INSERT INTO fund_positions (datetime, accountid, orgid, fundid, value, shares) +SELECT + '2026-05-18', + (SELECT id FROM accounts WHERE accountname = 'Jethro''s Investments'), + (SELECT id FROM organizations WHERE orgname = 'Sharesies'), + (SELECT id FROM funds WHERE fundid = '67a17798-c341-4af3-a06e-5a621d342adb'), + 5.79, + 0.95 +WHERE NOT EXISTS ( + SELECT 1 FROM fund_positions + WHERE fundid = (SELECT id FROM funds WHERE fundid = '67a17798-c341-4af3-a06e-5a621d342adb') + AND accountid = (SELECT id FROM accounts WHERE accountname = 'Jethro''s Investments') + AND datetime = '2026-05-18' +); +INSERT INTO fund_positions (datetime, accountid, orgid, fundid, value, shares) +SELECT + '2026-05-18', + (SELECT id FROM accounts WHERE accountname = 'Jethro''s Investments'), + (SELECT id FROM organizations WHERE orgname = 'Sharesies'), + (SELECT id FROM funds WHERE fundid = 'af87fb44-ebf6-4239-ba08-ae1cc9a6461c'), + 35.09, + 1.58 +WHERE NOT EXISTS ( + SELECT 1 FROM fund_positions + WHERE fundid = (SELECT id FROM funds WHERE fundid = 'af87fb44-ebf6-4239-ba08-ae1cc9a6461c') + AND accountid = (SELECT id FROM accounts WHERE accountname = 'Jethro''s Investments') + AND datetime = '2026-05-18' +); \ No newline at end of file