Changed the sample data format from JSON to CSV for the test site live demonstration.

This commit is contained in:
2023-04-12 21:32:21 +08:00
parent 621020b0f0
commit 165e28441a
6 changed files with 2429 additions and 64 deletions

View File

@@ -19,17 +19,15 @@
"""
from __future__ import annotations
import json
import re
import typing as t
from abc import ABC, abstractmethod
from datetime import date, timedelta
from decimal import Decimal
from secrets import randbelow
from decimal import Decimal
import sqlalchemy as sa
import httpx
import sqlalchemy as sa
from flask import Flask, render_template_string
from test_site import create_app, db
@@ -305,14 +303,14 @@ class BaseTestData(ABC):
:param app: The Flask application.
:param username: The username.
"""
self.__app: Flask = app
with self.__app.app_context():
self._app: Flask = app
with self._app.app_context():
current_user: User | None = User.query\
.filter(User.username == username).first()
assert current_user is not None
self.__current_user_id: int = current_user.id
self.__journal_entries: list[dict[str, t.Any]] = []
self.__line_items: list[dict[str, t.Any]] = []
self._journal_entries: list[dict[str, t.Any]] = []
self._line_items: list[dict[str, t.Any]] = []
self._init_data()
@abstractmethod
@@ -328,21 +326,23 @@ class BaseTestData(ABC):
:return: None
"""
from accounting.models import JournalEntry, JournalEntryLineItem
with self.__app.app_context():
db.session.execute(sa.insert(JournalEntry), self.__journal_entries)
with self._app.app_context():
db.session.execute(sa.insert(JournalEntry), self._journal_entries)
db.session.execute(sa.insert(JournalEntryLineItem),
self.__line_items)
self._line_items)
db.session.commit()
def json(self) -> str:
"""Returns the data as JSON.
@property
def csv_data(self) -> tuple[list[dict[str, t.Any]],
list[dict[str, t.Any]]]:
"""Returns the data for CSV.
:return: The JSON string.
:return: The data for CSV.
"""
from accounting.models import Account
today: date = date.today()
def filter_journal_entry(data: dict[str, t.Any]) -> list[t.Any]:
def filter_journal_entry(data: dict[str, t.Any]) -> dict[str, t.Any]:
"""Filters the journal entry data for JSON encoding.
:param data: The journal entry data.
@@ -352,30 +352,24 @@ class BaseTestData(ABC):
data["date"] = (today - data["date"]).days
del data["created_by_id"]
del data["updated_by_id"]
return [data[x] for x in ["id", "date", "no", "note"]]
return data
def filter_line_item(data: dict[str, t.Any]) -> list[t.Any]:
def filter_line_item(data: dict[str, t.Any]) -> dict[str, t.Any]:
"""Filters the journal entry line item data for JSON encoding.
:param data: The journal entry line item data.
:return: The journal entry line item data for JSON encoding.
"""
data = data.copy()
with self.__app.app_context():
with self._app.app_context():
data["account_id"] \
= db.session.get(Account, data["account_id"]).code
data["amount"] = str(data["amount"])
if "original_line_item_id" not in data:
data["original_line_item_id"] = None
return [data[x] for x in ["id", "journal_entry_id",
"original_line_item_id", "is_debit",
"no", "account_id", "currency_code",
"description", "amount"]]
return data
return json.dumps(
[[filter_journal_entry(x) for x in self.__journal_entries],
[filter_line_item(x) for x in self.__line_items]],
ensure_ascii=False, separators=(",", ":"))
return [filter_journal_entry(x) for x in self._journal_entries], \
[filter_line_item(x) for x in self._line_items]
@staticmethod
def _couple(description: str, amount: str, debit: str, credit: str) \
@@ -398,11 +392,11 @@ class BaseTestData(ABC):
:return: None.
"""
from accounting.models import Account
existing_j_id: set[int] = {x["id"] for x in self.__journal_entries}
existing_l_id: set[int] = {x["id"] for x in self.__line_items}
existing_j_id: set[int] = {x["id"] for x in self._journal_entries}
existing_l_id: set[int] = {x["id"] for x in self._line_items}
journal_entry_data.id = self.__new_id(existing_j_id)
j_date: date = date.today() - timedelta(days=journal_entry_data.days)
self.__journal_entries.append(
self._journal_entries.append(
{"id": journal_entry_data.id,
"date": j_date,
"no": self.__next_j_no(j_date),
@@ -430,7 +424,7 @@ class BaseTestData(ABC):
if line_item.original_line_item is not None:
data["original_line_item_id"] \
= line_item.original_line_item.id
self.__line_items.append(data)
self._line_items.append(data)
for line_item in currency.credit:
account: Account | None \
= Account.find_by_code(line_item.account)
@@ -449,7 +443,7 @@ class BaseTestData(ABC):
if line_item.original_line_item is not None:
data["original_line_item_id"] \
= line_item.original_line_item.id
self.__line_items.append(data)
self._line_items.append(data)
@staticmethod
def __new_id(existing_id: set[int]) -> int:
@@ -470,7 +464,7 @@ class BaseTestData(ABC):
:param j_date: The journal entry date.
:return: The next journal entry number.
"""
existing: set[int] = {x["no"] for x in self.__journal_entries
existing: set[int] = {x["no"] for x in self._journal_entries
if x["date"] == j_date}
return 1 if len(existing) == 0 else max(existing) + 1