diff --git a/AkahuClient/main.py b/AkahuClient/main.py index dfcf466..c3aab62 100644 --- a/AkahuClient/main.py +++ b/AkahuClient/main.py @@ -8,9 +8,9 @@ load_dotenv() TOKEN = os.getenv("AKAHU_API_TOKEN") APP_ID = os.getenv("AKAHU_APP_ID") -if not TOKEN or not APP_ID: - print("Please set AKAHU_API_TOKEN and AKAHU_APP_ID in your environment.") - exit(1) -client = AkahuClient(TOKEN, APP_ID) -accounts = client.get_accounts() -print(accounts) \ No newline at end of file +# if not TOKEN or not APP_ID: +# print("Please set AKAHU_API_TOKEN and AKAHU_APP_ID in your environment.") +# exit(1) +# client = AkahuClient(TOKEN, APP_ID) +# accounts = client.get_accounts() +#print(accounts) \ No newline at end of file diff --git a/IngestionService/ingester.py b/IngestionService/ingester.py index 8bb358d..c71a8f6 100644 --- a/IngestionService/ingester.py +++ b/IngestionService/ingester.py @@ -1,15 +1,101 @@ -from ..EmoneyScraper import Scraper -from ..AkahuClient import AkahuClient +from AkahuClient.akahuclient import AkahuClient +from EmoneyScraper.scraper import Scraper from dotenv import load_dotenv import os +import json +import psycopg +from playwright.sync_api import sync_playwright, Playwright + +pw = sync_playwright().start() + load_dotenv() -TOKEN = os.getenv("AKAHU_API_TOKEN") -APP_ID = os.getenv("AKAHU_APP_ID") +class Ingester: + def __init__(self): + self.token = os.getenv("AKAHU_API_TOKEN") + self.app_id = os.getenv("AKAHU_APP_ID") + if not self.token or not self.app_id: + raise ValueError("Please set AKAHU_API_TOKEN and AKAHU_APP_ID in your environment.") + self.client = AkahuClient(self.token, self.app_id) + self.dbconnection = psycopg.connect( + host=os.getenv("DB_HOST"), + dbname=os.getenv("DB_NAME"), + user=os.getenv("DB_USER"), + password=os.getenv("DB_PASSWORD") + ) + self.scraper = Scraper(pw, headless=False) -if not TOKEN or not APP_ID: - print("Please set AKAHU_API_TOKEN and AKAHU_APP_ID in your environment.") - exit(1) + def test_connection(self): + accounts = self.client.get_accounts() + print("Akahu accounts:", accounts) -client = AkahuClient(TOKEN, APP_ID) + def test_scraper(self): + data = self.scraper.get_balance() + print("Scraped data:", data) + + def test_db_connection(self): + with self.dbconnection.cursor() as cursor: + cursor.execute("SELECT 1") + result = cursor.fetchone() + print("Database connection test result:", result) + + def get_transactions(self, account_id: str, start_date: str, end_date: str): + transactions = self.client.get_transactions(account_id, start_date, end_date) + print(f"Transactions for account {account_id} from {start_date} to {end_date}:", transactions) + + def get_accounts(self, account_names=None): + accounts = self.client.get_accounts(account_names) + return accounts + + def write_akahu_snapshot_data(self, data): + with self.dbconnection.cursor() as cursor: + for account in data.get("items", []): + cursor.execute( + """ + INSERT INTO rawsnapshot (data, source) + VALUES (%s, %s) + ON CONFLICT (raw_sha256) DO NOTHING + """, + (json.dumps(account), "akahu") + ) + self.dbconnection.commit() + + def write_akahu_transaction_data(self, data): + with self.dbconnection.cursor() as cursor: + for transaction in data.get("items", []): + cursor.execute( + """ + INSERT INTO rawtransactions (data, source) + VALUES (%s, %s) + ON CONFLICT (raw_sha256) DO NOTHING + """, + (json.dumps(transaction), "akahu") + ) + self.dbconnection.commit() + + def write_emoney_snapshot_data(self, data): + with self.dbconnection.cursor() as cursor: + for account in data.get("accounts", []): + cursor.execute( + """ + INSERT INTO rawsnapshots (data, source) + VALUES (%s, %s) + ON CONFLICT (raw_sha256) DO NOTHING + """, + (json.dumps(account), "emoney") + ) + self.dbconnection.commit() + + def write_emoney_transaction_data(self, data): + with self.dbconnection.cursor() as cursor: + for transaction in data.get("transactions", []): + cursor.execute( + """ + INSERT INTO rawtransactions (data, source) + VALUES (%s, %s) + ON CONFLICT (raw_sha256) DO NOTHING + """, + (json.dumps(transaction), "emoney") + ) + self.dbconnection.commit() \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..39f5c9b --- /dev/null +++ b/main.py @@ -0,0 +1,7 @@ +from IngestionService.ingester import Ingester + + +if __name__ == "__main__": + ingester = Ingester() + data = ingester.test_scraper() + ingester.write_emoney_snapshot_data(data) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..f7731ca --- /dev/null +++ b/requirements.txt @@ -0,0 +1,4 @@ +playwright +requests +dotenv +psycopg[binary] diff --git a/sql/database-init/database definitions.sql b/sql/database-init/database definitions.sql index 2f953f8..69481b5 100644 --- a/sql/database-init/database definitions.sql +++ b/sql/database-init/database definitions.sql @@ -61,8 +61,19 @@ id SERIAL PRIMARY KEY, data JSONB NOT NULL, received_at TIMESTAMPTZ NOT NULL DEFAULT now(), source VARCHAR(100), -accountid INT REFERENCES accounts(id) ON DELETE RESTRICT, -orgid INT REFERENCES organizations(id) ON DELETE RESTRICT, +--accountid INT REFERENCES accounts(id) ON DELETE RESTRICT, +--orgid INT REFERENCES organizations(id) ON DELETE RESTRICT, +raw_sha256 CHAR(64) UNIQUE, +processed BOOLEAN NOT NULL DEFAULT FALSE +); + +CREATE TABLE IF NOT EXISTS rawsnapshots( +id SERIAL PRIMARY KEY, +data JSONB NOT NULL, +received_at TIMESTAMPTZ NOT NULL DEFAULT now(), +source VARCHAR(100), +--accountid INT REFERENCES accounts(id) ON DELETE RESTRICT, +--orgid INT REFERENCES organizations(id) ON DELETE RESTRICT, raw_sha256 CHAR(64) UNIQUE, processed BOOLEAN NOT NULL DEFAULT FALSE );