Files
mia-accounting/src/accounting/journal_entry/forms/journal_entry.py

589 lines
24 KiB
Python

# The Mia! Accounting Project.
# Author: imacat@mail.imacat.idv.tw (imacat), 2023/2/18
# Copyright (c) 2023-2024 imacat.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""The journal entry forms for the journal entry management.
"""
import datetime as dt
from abc import ABC, abstractmethod
from typing import Type
import sqlalchemy as sa
from flask_babel import LazyString
from flask_wtf import FlaskForm
from wtforms import DateField, FieldList, FormField, TextAreaField, \
BooleanField
from wtforms.validators import DataRequired, ValidationError
from accounting import db
from accounting.journal_entry.utils.account_option import AccountOption
from accounting.journal_entry.utils.description_editor import DescriptionEditor
from accounting.journal_entry.utils.original_line_items import \
get_selectable_original_line_items
from accounting.locale import lazy_gettext
from accounting.models import JournalEntry, Account, JournalEntryLineItem, \
JournalEntryCurrency
from accounting.utils.random_id import new_id
from accounting.utils.strip_text import strip_multiline_text
from accounting.utils.user import get_current_user_pk
from .currency import CurrencyForm, CashReceiptCurrencyForm, \
CashDisbursementCurrencyForm, TransferCurrencyForm
from .line_item import LineItemForm, DebitLineItemForm, CreditLineItemForm
from .reorder import sort_journal_entries_in
DATE_REQUIRED: DataRequired = DataRequired(
lazy_gettext("Please fill in the date."))
"""The validator to check if the date is empty."""
class NotBeforeOriginalLineItems:
"""The validator to check if the date is not before the
original line items."""
def __call__(self, form: FlaskForm, field: DateField) -> None:
assert isinstance(form, JournalEntryForm)
if field.data is None:
return
min_date: dt.date | None = form.min_date
if min_date is None:
return
if field.data < min_date:
raise ValidationError(lazy_gettext(
"The date cannot be earlier than the original line items."))
class NotAfterOffsetItems:
"""The validator to check if the date is not after the offset items."""
def __call__(self, form: FlaskForm, field: DateField) -> None:
assert isinstance(form, JournalEntryForm)
if field.data is None:
return
max_date: dt.date | None = form.max_date
if max_date is None:
return
if field.data > max_date:
raise ValidationError(lazy_gettext(
"The date cannot be later than the offset items."))
class NeedSomeCurrencies:
"""The validator to check if there is any currency sub-form."""
def __call__(self, form: FlaskForm, field: FieldList) -> None:
if len(field) == 0:
raise ValidationError(lazy_gettext("Please add some currencies."))
class CannotDeleteOriginalLineItemsWithOffset:
"""The validator to check the original line items with offset."""
def __call__(self, form: FlaskForm, field: FieldList) -> None:
assert isinstance(form, JournalEntryForm)
if form.obj is None:
return
existing_matched_original_line_item_id: set[int] \
= {x.id for x in form.obj.line_items if len(x.offsets) > 0}
line_item_id_in_form: set[int] \
= {x.id.data for x in form.line_items if x.id.data is not None}
for line_item_id in existing_matched_original_line_item_id:
if line_item_id not in line_item_id_in_form:
raise ValidationError(lazy_gettext(
"Line items with offset cannot be deleted."))
class JournalEntryForm(FlaskForm):
"""The base form to create or edit a journal entry."""
date = DateField()
"""The date."""
currencies = FieldList(FormField(CurrencyForm))
"""The line items categorized by their currencies."""
note = TextAreaField()
"""The note."""
def __init__(self, *args, **kwargs):
"""Constructs a base journal entry form.
:param args: The arguments.
:param kwargs: The keyword arguments.
"""
super().__init__(*args, **kwargs)
self.is_modified: bool = False
"""Whether the journal entry is modified during populate_obj()."""
self.collector: Type[LineItemCollector] = LineItemCollector
"""The line item collector. The default is the base abstract
collector only to provide the correct type. The subclass forms should
provide their own collectors."""
self.obj: JournalEntry | None = kwargs.get("obj")
"""The journal entry, when editing an existing one."""
self._is_need_payable: bool = False
"""Whether we need the payable original line items."""
self._is_need_receivable: bool = False
"""Whether we need the receivable original line items."""
self.__original_line_item_options: list[JournalEntryLineItem] | None \
= None
"""The options of the original line items."""
self.__net_balance_exceeded: dict[int, LazyString] | None = None
"""The original line items whose net balances were exceeded by the
amounts in the line item sub-forms."""
for line_item in self.line_items:
line_item.journal_entry_form = self
def populate_obj(self, obj: JournalEntry) -> None:
"""Populates the form data into a journal entry object.
:param obj: The journal entry object.
:return: None.
"""
is_new: bool = obj.id is None
if is_new:
obj.id = new_id(JournalEntry)
self.__set_date(obj, self.date.data)
obj.note = self.note.data
collector_cls: Type[LineItemCollector] = self.collector
collector: collector_cls = collector_cls(self, obj)
collector.collect()
to_delete: set[int] = {x.id for x in obj.line_items
if x.id not in collector.to_keep}
if len(to_delete) > 0:
JournalEntryLineItem.query\
.filter(JournalEntryLineItem.id.in_(to_delete)).delete()
self.is_modified = True
if is_new or db.session.is_modified(obj):
self.is_modified = True
if is_new:
current_user_pk: int = get_current_user_pk()
obj.created_by_id = current_user_pk
obj.updated_by_id = current_user_pk
@property
def line_items(self) -> list[LineItemForm]:
"""Collects and returns the line item sub-forms.
:return: The line item sub-forms.
"""
line_items: list[LineItemForm] = []
for currency in self.currencies:
line_items.extend(currency.line_items)
return line_items
def __set_date(self, obj: JournalEntry, new_date: dt.date) -> None:
"""Sets the journal entry date and number.
:param obj: The journal entry object.
:param new_date: The new date.
:return: None.
"""
if obj.date is None or obj.date != new_date:
if obj.date is not None:
sort_journal_entries_in(obj.date, obj.id)
if self.max_date is not None and new_date == self.max_date:
db_min_no: int | None = db.session.scalar(
sa.select(sa.func.min(JournalEntry.no))
.filter(JournalEntry.date == new_date))
if db_min_no is None:
obj.date = new_date
obj.no = 1
else:
obj.date = new_date
obj.no = db_min_no - 1
sort_journal_entries_in(new_date)
else:
sort_journal_entries_in(new_date, obj.id)
count: int = JournalEntry.query\
.filter(JournalEntry.date == new_date).count()
obj.date = new_date
obj.no = count + 1
@property
def debit_account_options(self) -> list[AccountOption]:
"""The selectable debit accounts.
:return: The selectable debit accounts.
"""
accounts: list[AccountOption] \
= [AccountOption(x) for x in Account.selectable_debit()
if not (x.code[0] == "2" and x.is_need_offset)]
in_use: set[int] = set(db.session.scalars(
sa.select(JournalEntryLineItem.account_id)
.filter(JournalEntryLineItem.is_debit)
.group_by(JournalEntryLineItem.account_id)).all())
for account in accounts:
account.is_in_use = account.id in in_use
return accounts
@property
def credit_account_options(self) -> list[AccountOption]:
"""The selectable credit accounts.
:return: The selectable credit accounts.
"""
accounts: list[AccountOption] \
= [AccountOption(x) for x in Account.selectable_credit()
if not (x.code[0] == "1" and x.is_need_offset)]
in_use: set[int] = set(db.session.scalars(
sa.select(JournalEntryLineItem.account_id)
.filter(sa.not_(JournalEntryLineItem.is_debit))
.group_by(JournalEntryLineItem.account_id)).all())
for account in accounts:
account.is_in_use = account.id in in_use
return accounts
@property
def currencies_errors(self) -> list[str | LazyString]:
"""Returns the currency errors, without the errors in their sub-forms.
:return: The currency errors, without the errors in their sub-forms.
"""
return [x for x in self.currencies.errors
if isinstance(x, str) or isinstance(x, LazyString)]
@property
def description_editor(self) -> DescriptionEditor:
"""Returns the description editor.
:return: The description editor.
"""
return DescriptionEditor()
@property
def original_line_item_options(self) -> list[JournalEntryLineItem]:
"""Returns the selectable original line items.
:return: The selectable original line items.
"""
if self.__original_line_item_options is None:
self.__original_line_item_options \
= get_selectable_original_line_items(
{x.id.data for x in self.line_items
if x.id.data is not None},
self._is_need_payable, self._is_need_receivable)
return self.__original_line_item_options
@property
def min_date(self) -> dt.date | None:
"""Returns the minimal available date.
:return: The minimal available date.
"""
original_line_item_id: set[int] \
= {x.original_line_item_id.data for x in self.line_items
if x.original_line_item_id.data is not None}
if len(original_line_item_id) == 0:
return None
select: sa.Select = sa.select(sa.func.max(JournalEntry.date))\
.join(JournalEntryLineItem)\
.filter(JournalEntryLineItem.id.in_(original_line_item_id))
return db.session.scalar(select)
@property
def max_date(self) -> dt.date | None:
"""Returns the maximum available date.
:return: The maximum available date.
"""
line_item_id: set[int] = {x.id.data for x in self.line_items
if x.id.data is not None}
select: sa.Select = sa.select(sa.func.min(JournalEntry.date))\
.join(JournalEntryLineItem)\
.filter(JournalEntryLineItem.original_line_item_id
.in_(line_item_id))
return db.session.scalar(select)
class LineItemCollector[T: JournalEntryForm](ABC):
"""The line item collector."""
def __init__(self, form: T, obj: JournalEntry):
"""Constructs the line item collector.
:param form: The journal entry form.
:param obj: The journal entry.
"""
self.form: T = form
"""The journal entry form."""
self.__obj: JournalEntry = obj
"""The journal entry object."""
self.__line_items: list[JournalEntryLineItem] = list(obj.line_items)
"""The existing line items."""
self.__line_items_by_id: dict[int, JournalEntryLineItem] \
= {x.id: x for x in self.__line_items}
"""A dictionary from the line item ID to their line items."""
self.__no_by_id: dict[int, int] \
= {x.id: x.no for x in self.__line_items}
"""A dictionary from the line item number to their line items."""
self.__currencies: list[JournalEntryCurrency] = obj.currencies
"""The currencies in the journal entry."""
self._debit_no: int = 1
"""The number index for the debit line items."""
self._credit_no: int = 1
"""The number index for the credit line items."""
self.to_keep: set[int] = set()
"""The ID of the existing line items to keep."""
@abstractmethod
def collect(self) -> set[int]:
"""Collects the line items.
:return: The ID of the line items to keep.
"""
def _add_line_item(self, form: LineItemForm, currency_code: str, no: int) \
-> None:
"""Composes a line item from the form.
:param form: The line item form.
:param currency_code: The code of the currency.
:param no: The number of the line item.
:return: None.
"""
line_item: JournalEntryLineItem | None \
= self.__line_items_by_id.get(form.id.data)
if line_item is not None:
line_item.currency_code = currency_code
form.populate_obj(line_item)
line_item.no = no
if db.session.is_modified(line_item):
self.form.is_modified = True
else:
line_item = JournalEntryLineItem()
line_item.currency_code = currency_code
form.populate_obj(line_item)
line_item.no = no
self.__obj.line_items.append(line_item)
self.form.is_modified = True
self.to_keep.add(line_item.id)
def _make_cash_line_item(self, forms: list[LineItemForm], is_debit: bool,
currency_code: str, no: int) -> None:
"""Composes the cash line item at the other debit or credit of the
cash journal entry.
:param forms: The line item forms in the same currency.
:param is_debit: True for a cash receipt journal entry, or False for a
cash disbursement journal entry.
:param currency_code: The code of the currency.
:param no: The number of the line item.
:return: None.
"""
candidates: list[JournalEntryLineItem] \
= [x for x in self.__line_items
if x.is_debit == is_debit and x.currency_code == currency_code]
line_item: JournalEntryLineItem
if len(candidates) > 0:
candidates.sort(key=lambda x: x.no)
line_item = candidates[0]
line_item.account_id = Account.cash().id
line_item.description = None
line_item.amount = sum([x.amount.data for x in forms])
line_item.no = no
if db.session.is_modified(line_item):
self.form.is_modified = True
else:
line_item = JournalEntryLineItem()
line_item.id = new_id(JournalEntryLineItem)
line_item.is_debit = is_debit
line_item.currency_code = currency_code
line_item.account_id = Account.cash().id
line_item.description = None
line_item.amount = sum([x.amount.data for x in forms])
line_item.no = no
self.__obj.line_items.append(line_item)
self.form.is_modified = True
self.to_keep.add(line_item.id)
def _sort_line_item_forms(self, forms: list[LineItemForm]) -> None:
"""Sorts the line item sub-forms.
:param forms: The line item sub-forms.
:return: None.
"""
missing_no: int = 100 if len(self.__no_by_id) == 0 \
else max(self.__no_by_id.values()) + 100
ord_by_form: dict[LineItemForm, int] \
= {forms[i]: i for i in range(len(forms))}
recv_no: set[int] = {x.no.data for x in forms if x.no.data is not None}
missing_recv_no: int = 100 if len(recv_no) == 0 else max(recv_no) + 100
forms.sort(key=lambda x: (x.no.data or missing_recv_no,
missing_no if x.id.data is None else
self.__no_by_id.get(x.id.data, missing_no),
ord_by_form.get(x)))
def _sort_currency_forms(self, forms: list[CurrencyForm]) -> None:
"""Sorts the currency forms.
:param forms: The currency forms.
:return: None.
"""
missing_no: int = len(self.__currencies) + 100
no_by_code: dict[str, int] = {self.__currencies[i].code: i
for i in range(len(self.__currencies))}
ord_by_form: dict[CurrencyForm, int] \
= {forms[i]: i for i in range(len(forms))}
recv_no: set[int] = {x.no.data for x in forms if x.no.data is not None}
missing_recv_no: int = 100 if len(recv_no) == 0 else max(recv_no) + 100
forms.sort(key=lambda x: (x.no.data or missing_recv_no,
no_by_code.get(x.code.data, missing_no),
ord_by_form.get(x)))
class CashReceiptJournalEntryForm(JournalEntryForm):
"""The form to create or edit a cash receipt journal entry."""
date = DateField(
validators=[DATE_REQUIRED,
NotBeforeOriginalLineItems(),
NotAfterOffsetItems()])
"""The date."""
currencies = FieldList(FormField(CashReceiptCurrencyForm), name="currency",
validators=[NeedSomeCurrencies()])
"""The line items categorized by their currencies."""
note = TextAreaField(filters=[strip_multiline_text])
"""The note."""
whole_form = BooleanField(
validators=[CannotDeleteOriginalLineItemsWithOffset()])
"""The pseudo field for the whole form validators."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._is_need_receivable = True
class Collector(LineItemCollector[CashReceiptJournalEntryForm]):
"""The line item collector for the cash receipt journal entries."""
def collect(self) -> None:
currencies: list[CashReceiptCurrencyForm] \
= [x.form for x in self.form.currencies]
self._sort_currency_forms(currencies)
for currency in currencies:
# The debit cash line item
self._make_cash_line_item(list(currency.credit), True,
currency.code.data,
self._debit_no)
self._debit_no = self._debit_no + 1
# The credit forms
credit_forms: list[CreditLineItemForm] \
= [x.form for x in currency.credit]
self._sort_line_item_forms(credit_forms)
for credit_form in credit_forms:
self._add_line_item(credit_form, currency.code.data,
self._credit_no)
self._credit_no = self._credit_no + 1
self.collector = Collector
class CashDisbursementJournalEntryForm(JournalEntryForm):
"""The form to create or edit a cash disbursement journal entry."""
date = DateField(
validators=[DATE_REQUIRED,
NotBeforeOriginalLineItems(),
NotAfterOffsetItems()])
"""The date."""
currencies = FieldList(FormField(CashDisbursementCurrencyForm),
name="currency",
validators=[NeedSomeCurrencies()])
"""The line items categorized by their currencies."""
note = TextAreaField(filters=[strip_multiline_text])
"""The note."""
whole_form = BooleanField(
validators=[CannotDeleteOriginalLineItemsWithOffset()])
"""The pseudo field for the whole form validators."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._is_need_payable = True
class Collector(LineItemCollector[CashDisbursementJournalEntryForm]):
"""The line item collector for the cash disbursement journal
entries."""
def collect(self) -> None:
currencies: list[CashDisbursementCurrencyForm] \
= [x.form for x in self.form.currencies]
self._sort_currency_forms(currencies)
for currency in currencies:
# The debit forms
debit_forms: list[DebitLineItemForm] \
= [x.form for x in currency.debit]
self._sort_line_item_forms(debit_forms)
for debit_form in debit_forms:
self._add_line_item(debit_form, currency.code.data,
self._debit_no)
self._debit_no = self._debit_no + 1
# The credit forms
self._make_cash_line_item(list(currency.debit), False,
currency.code.data,
self._credit_no)
self._credit_no = self._credit_no + 1
self.collector = Collector
class TransferJournalEntryForm(JournalEntryForm):
"""The form to create or edit a transfer journal entry."""
date = DateField(
validators=[DATE_REQUIRED,
NotBeforeOriginalLineItems(),
NotAfterOffsetItems()])
"""The date."""
currencies = FieldList(FormField(TransferCurrencyForm), name="currency",
validators=[NeedSomeCurrencies()])
"""The line items categorized by their currencies."""
note = TextAreaField(filters=[strip_multiline_text])
"""The note."""
whole_form = BooleanField(
validators=[CannotDeleteOriginalLineItemsWithOffset()])
"""The pseudo field for the whole form validators."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._is_need_payable = True
self._is_need_receivable = True
class Collector(LineItemCollector[TransferJournalEntryForm]):
"""The line item collector for the transfer journal entries."""
def collect(self) -> None:
currencies: list[TransferCurrencyForm] \
= [x.form for x in self.form.currencies]
self._sort_currency_forms(currencies)
for currency in currencies:
# The debit forms
debit_forms: list[DebitLineItemForm] \
= [x.form for x in currency.debit]
self._sort_line_item_forms(debit_forms)
for debit_form in debit_forms:
self._add_line_item(debit_form, currency.code.data,
self._debit_no)
self._debit_no = self._debit_no + 1
# The credit forms
credit_forms: list[CreditLineItemForm] \
= [x.form for x in currency.credit]
self._sort_line_item_forms(credit_forms)
for credit_form in credit_forms:
self._add_line_item(credit_form, currency.code.data,
self._credit_no)
self._credit_no = self._credit_no + 1
self.collector = Collector