Renamed "voucher" to "journal entry".

This commit is contained in:
2023-03-20 22:08:58 +08:00
parent 8f909965a9
commit b1af1d7425
74 changed files with 1956 additions and 1816 deletions

View File

@ -0,0 +1,37 @@
# The Mia! Accounting Flask Project.
# Author: imacat@mail.imacat.idv.tw (imacat), 2023/2/18
# Copyright (c) 2023 imacat.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""The journal entry management.
"""
from flask import Flask, Blueprint
def init_app(app: Flask, bp: Blueprint) -> None:
"""Initialize the application.
:param app: The Flask application.
:param bp: The blueprint of the accounting application.
:return: None.
"""
from .converters import JournalEntryConverter, JournalEntryTypeConverter, \
DateConverter
app.url_map.converters["journalEntry"] = JournalEntryConverter
app.url_map.converters["journalEntryType"] = JournalEntryTypeConverter
app.url_map.converters["date"] = DateConverter
from .views import bp as journal_entry_bp
bp.register_blueprint(journal_entry_bp, url_prefix="/journal-entries")

View File

@ -0,0 +1,107 @@
# The Mia! Accounting Flask Project.
# Author: imacat@mail.imacat.idv.tw (imacat), 2023/2/19
# Copyright (c) 2023 imacat.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""The path converters for the journal entry management.
"""
from datetime import date
from flask import abort
from sqlalchemy.orm import selectinload
from werkzeug.routing import BaseConverter
from accounting.models import JournalEntry, JournalEntryLineItem
from accounting.utils.journal_entry_types import JournalEntryType
class JournalEntryConverter(BaseConverter):
"""The journal entry converter to convert the journal entry ID from and to
the corresponding journal entry in the routes."""
def to_python(self, value: str) -> JournalEntry:
"""Converts a journal entry ID to a journal entry.
:param value: The journal entry ID.
:return: The corresponding journal entry.
"""
journal_entry: JournalEntry | None = JournalEntry.query\
.join(JournalEntryLineItem)\
.filter(JournalEntry.id == value)\
.options(selectinload(JournalEntry.line_items)
.selectinload(JournalEntryLineItem.offsets)
.selectinload(JournalEntryLineItem.journal_entry))\
.first()
if journal_entry is None:
abort(404)
return journal_entry
def to_url(self, value: JournalEntry) -> str:
"""Converts a journal entry to its ID.
:param value: The journal entry.
:return: The ID.
"""
return str(value.id)
class JournalEntryTypeConverter(BaseConverter):
"""The journal entry converter to convert the journal entry type ID from
and to the corresponding journal entry type in the routes."""
def to_python(self, value: str) -> JournalEntryType:
"""Converts a journal entry ID to a journal entry.
:param value: The journal entry ID.
:return: The corresponding journal entry type.
"""
type_dict: dict[str, JournalEntryType] \
= {x.value: x for x in JournalEntryType}
journal_entry_type: JournalEntryType | None = type_dict.get(value)
if journal_entry_type is None:
abort(404)
return journal_entry_type
def to_url(self, value: JournalEntryType) -> str:
"""Converts a journal entry type to its ID.
:param value: The journal entry type.
:return: The ID.
"""
return str(value.value)
class DateConverter(BaseConverter):
"""The date converter to convert the ISO date from and to the
corresponding date in the routes."""
def to_python(self, value: str) -> date:
"""Converts an ISO date to a date.
:param value: The ISO date.
:return: The corresponding date.
"""
try:
return date.fromisoformat(value)
except ValueError:
abort(404)
def to_url(self, value: date) -> str:
"""Converts a date to its ISO date.
:param value: The date.
:return: The ISO date.
"""
return value.isoformat()

View File

@ -0,0 +1,22 @@
# The Mia! Accounting Flask Project.
# Author: imacat@mail.imacat.idv.tw (imacat), 2023/3/10
# Copyright (c) 2023 imacat.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""The forms for the journal entry management.
"""
from .reorder import sort_journal_entries_in, JournalEntryReorderForm
from .journal_entry import JournalEntryForm, CashReceiptJournalEntryForm, \
CashDisbursementJournalEntryForm, TransferJournalEntryForm

View File

@ -0,0 +1,303 @@
# The Mia! Accounting Flask Project.
# Author: imacat@mail.imacat.idv.tw (imacat), 2023/3/10
# Copyright (c) 2023 imacat.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""The currency sub-forms for the journal entry management.
"""
from decimal import Decimal
import sqlalchemy as sa
from flask_babel import LazyString
from flask_wtf import FlaskForm
from wtforms import StringField, ValidationError, FieldList, IntegerField, \
BooleanField, FormField
from wtforms.validators import DataRequired
from accounting import db
from accounting.locale import lazy_gettext
from accounting.models import Currency, JournalEntryLineItem
from accounting.journal_entry.utils.offset_alias import offset_alias
from accounting.utils.cast import be
from accounting.utils.strip_text import strip_text
from .line_item import LineItemForm, CreditLineItemForm, DebitLineItemForm
CURRENCY_REQUIRED: DataRequired = DataRequired(
lazy_gettext("Please select the currency."))
"""The validator to check if the currency code is empty."""
class CurrencyExists:
"""The validator to check if the account exists."""
def __call__(self, form: FlaskForm, field: StringField) -> None:
if field.data is None:
return
if db.session.get(Currency, field.data) is None:
raise ValidationError(lazy_gettext(
"The currency does not exist."))
class SameCurrencyAsOriginalLineItems:
"""The validator to check if the currency is the same as the
original line items."""
def __call__(self, form: FlaskForm, field: StringField) -> None:
assert isinstance(form, CurrencyForm)
if field.data is None:
return
original_line_item_id: set[int] \
= {x.original_line_item_id.data
for x in form.line_items
if x.original_line_item_id.data is not None}
if len(original_line_item_id) == 0:
return
original_line_item_currency_codes: set[str] = set(db.session.scalars(
sa.select(JournalEntryLineItem.currency_code)
.filter(JournalEntryLineItem.id.in_(original_line_item_id))).all())
for currency_code in original_line_item_currency_codes:
if field.data != currency_code:
raise ValidationError(lazy_gettext(
"The currency must be the same as the"
" original line item."))
class KeepCurrencyWhenHavingOffset:
"""The validator to check if the currency is the same when there is
offset."""
def __call__(self, form: FlaskForm, field: StringField) -> None:
assert isinstance(form, CurrencyForm)
if field.data is None:
return
offset: sa.Alias = offset_alias()
original_line_items: list[JournalEntryLineItem]\
= JournalEntryLineItem.query\
.join(offset, be(JournalEntryLineItem.id
== offset.c.original_line_item_id),
isouter=True)\
.filter(JournalEntryLineItem.id
.in_({x.eid.data for x in form.line_items
if x.eid.data is not None}))\
.group_by(JournalEntryLineItem.id,
JournalEntryLineItem.currency_code)\
.having(sa.func.count(offset.c.id) > 0).all()
for original_line_item in original_line_items:
if original_line_item.currency_code != field.data:
raise ValidationError(lazy_gettext(
"The currency must not be changed when there is offset."))
class NeedSomeLineItems:
"""The validator to check if there is any line item sub-form."""
def __call__(self, form: FlaskForm, field: FieldList) -> None:
if len(field) == 0:
raise ValidationError(lazy_gettext(
"Please add some line items."))
class IsBalanced:
"""The validator to check that the total amount of the debit and credit
line items are equal."""
def __call__(self, form: FlaskForm, field: BooleanField) -> None:
assert isinstance(form, TransferCurrencyForm)
if len(form.debit) == 0 or len(form.credit) == 0:
return
if form.debit_total != form.credit_total:
raise ValidationError(lazy_gettext(
"The totals of the debit and credit amounts do not match."))
class CurrencyForm(FlaskForm):
"""The form to create or edit a currency in a journal entry."""
no = IntegerField()
"""The order in the journal entry."""
code = StringField()
"""The currency code."""
whole_form = BooleanField()
"""The pseudo field for the whole form validators."""
@property
def line_items(self) -> list[LineItemForm]:
"""Returns the line item sub-forms.
:return: The line item sub-forms.
"""
line_item_forms: list[LineItemForm] = []
if isinstance(self, CashReceiptCurrencyForm):
line_item_forms.extend([x.form for x in self.credit])
elif isinstance(self, CashDisbursementCurrencyForm):
line_item_forms.extend([x.form for x in self.debit])
elif isinstance(self, TransferCurrencyForm):
line_item_forms.extend([x.form for x in self.debit])
line_item_forms.extend([x.form for x in self.credit])
return line_item_forms
@property
def is_code_locked(self) -> bool:
"""Returns whether the currency code should not be changed.
:return: True if the currency code should not be changed, or False
otherwise
"""
line_item_forms: list[LineItemForm] = self.line_items
original_line_item_id: set[int] \
= {x.original_line_item_id.data for x in line_item_forms
if x.original_line_item_id.data is not None}
if len(original_line_item_id) > 0:
return True
line_item_id: set[int] = {x.eid.data for x in line_item_forms
if x.eid.data is not None}
select: sa.Select = sa.select(sa.func.count(JournalEntryLineItem.id))\
.filter(JournalEntryLineItem.original_line_item_id
.in_(line_item_id))
return db.session.scalar(select) > 0
class CashReceiptCurrencyForm(CurrencyForm):
"""The form to create or edit a currency in a
cash receipt journal entry."""
no = IntegerField()
"""The order in the journal entry."""
code = StringField(
filters=[strip_text],
validators=[CURRENCY_REQUIRED,
CurrencyExists(),
SameCurrencyAsOriginalLineItems(),
KeepCurrencyWhenHavingOffset()])
"""The currency code."""
credit = FieldList(FormField(CreditLineItemForm),
validators=[NeedSomeLineItems()])
"""The credit line items."""
whole_form = BooleanField()
"""The pseudo field for the whole form validators."""
@property
def credit_total(self) -> Decimal:
"""Returns the total amount of the credit line items.
:return: The total amount of the credit line items.
"""
return sum([x.amount.data for x in self.credit
if x.amount.data is not None])
@property
def credit_errors(self) -> list[str | LazyString]:
"""Returns the credit line item errors, without the errors in their
sub-forms.
:return:
"""
return [x for x in self.credit.errors
if isinstance(x, str) or isinstance(x, LazyString)]
class CashDisbursementCurrencyForm(CurrencyForm):
"""The form to create or edit a currency in a
cash disbursement journal entry."""
no = IntegerField()
"""The order in the journal entry."""
code = StringField(
filters=[strip_text],
validators=[CURRENCY_REQUIRED,
CurrencyExists(),
SameCurrencyAsOriginalLineItems(),
KeepCurrencyWhenHavingOffset()])
"""The currency code."""
debit = FieldList(FormField(DebitLineItemForm),
validators=[NeedSomeLineItems()])
"""The debit line items."""
whole_form = BooleanField()
"""The pseudo field for the whole form validators."""
@property
def debit_total(self) -> Decimal:
"""Returns the total amount of the debit line items.
:return: The total amount of the debit line items.
"""
return sum([x.amount.data for x in self.debit
if x.amount.data is not None])
@property
def debit_errors(self) -> list[str | LazyString]:
"""Returns the debit line item errors, without the errors in their
sub-forms.
:return:
"""
return [x for x in self.debit.errors
if isinstance(x, str) or isinstance(x, LazyString)]
class TransferCurrencyForm(CurrencyForm):
"""The form to create or edit a currency in a transfer journal entry."""
no = IntegerField()
"""The order in the journal entry."""
code = StringField(
filters=[strip_text],
validators=[CURRENCY_REQUIRED,
CurrencyExists(),
SameCurrencyAsOriginalLineItems(),
KeepCurrencyWhenHavingOffset()])
"""The currency code."""
debit = FieldList(FormField(DebitLineItemForm),
validators=[NeedSomeLineItems()])
"""The debit line items."""
credit = FieldList(FormField(CreditLineItemForm),
validators=[NeedSomeLineItems()])
"""The credit line items."""
whole_form = BooleanField(validators=[IsBalanced()])
"""The pseudo field for the whole form validators."""
@property
def debit_total(self) -> Decimal:
"""Returns the total amount of the debit line items.
:return: The total amount of the debit line items.
"""
return sum([x.amount.data for x in self.debit
if x.amount.data is not None])
@property
def credit_total(self) -> Decimal:
"""Returns the total amount of the credit line items.
:return: The total amount of the credit line items.
"""
return sum([x.amount.data for x in self.credit
if x.amount.data is not None])
@property
def debit_errors(self) -> list[str | LazyString]:
"""Returns the debit line item errors, without the errors in their
sub-forms.
:return:
"""
return [x for x in self.debit.errors
if isinstance(x, str) or isinstance(x, LazyString)]
@property
def credit_errors(self) -> list[str | LazyString]:
"""Returns the credit line item errors, without the errors in their
sub-forms.
:return:
"""
return [x for x in self.credit.errors
if isinstance(x, str) or isinstance(x, LazyString)]

View File

@ -0,0 +1,593 @@
# The Mia! Accounting Flask Project.
# Author: imacat@mail.imacat.idv.tw (imacat), 2023/2/18
# Copyright (c) 2023 imacat.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""The journal entry forms for the journal entry management.
"""
import datetime as dt
import typing as t
from abc import ABC, abstractmethod
import sqlalchemy as sa
from flask_babel import LazyString
from flask_wtf import FlaskForm
from wtforms import DateField, FieldList, FormField, TextAreaField, \
BooleanField
from wtforms.validators import DataRequired, ValidationError
from accounting import db
from accounting.locale import lazy_gettext
from accounting.models import JournalEntry, Account, JournalEntryLineItem, \
JournalEntryCurrency
from accounting.journal_entry.utils.account_option import AccountOption
from accounting.journal_entry.utils.original_line_items import \
get_selectable_original_line_items
from accounting.journal_entry.utils.description_editor import DescriptionEditor
from accounting.utils.random_id import new_id
from accounting.utils.strip_text import strip_multiline_text
from accounting.utils.user import get_current_user_pk
from .currency import CurrencyForm, CashReceiptCurrencyForm, \
CashDisbursementCurrencyForm, TransferCurrencyForm
from .line_item import LineItemForm, DebitLineItemForm, CreditLineItemForm
from .reorder import sort_journal_entries_in
DATE_REQUIRED: DataRequired = DataRequired(
lazy_gettext("Please fill in the date."))
"""The validator to check if the date is empty."""
class NotBeforeOriginalLineItems:
"""The validator to check if the date is not before the
original line items."""
def __call__(self, form: FlaskForm, field: DateField) -> None:
assert isinstance(form, JournalEntryForm)
if field.data is None:
return
min_date: dt.date | None = form.min_date
if min_date is None:
return
if field.data < min_date:
raise ValidationError(lazy_gettext(
"The date cannot be earlier than the original line items."))
class NotAfterOffsetItems:
"""The validator to check if the date is not after the offset items."""
def __call__(self, form: FlaskForm, field: DateField) -> None:
assert isinstance(form, JournalEntryForm)
if field.data is None:
return
max_date: dt.date | None = form.max_date
if max_date is None:
return
if field.data > max_date:
raise ValidationError(lazy_gettext(
"The date cannot be later than the offset items."))
class NeedSomeCurrencies:
"""The validator to check if there is any currency sub-form."""
def __call__(self, form: FlaskForm, field: FieldList) -> None:
if len(field) == 0:
raise ValidationError(lazy_gettext("Please add some currencies."))
class CannotDeleteOriginalLineItemsWithOffset:
"""The validator to check the original line items with offset."""
def __call__(self, form: FlaskForm, field: FieldList) -> None:
assert isinstance(form, JournalEntryForm)
if form.obj is None:
return
existing_matched_original_line_item_id: set[int] \
= {x.id for x in form.obj.line_items if len(x.offsets) > 0}
line_item_id_in_form: set[int] \
= {x.eid.data for x in form.line_items if x.eid.data is not None}
for line_item_id in existing_matched_original_line_item_id:
if line_item_id not in line_item_id_in_form:
raise ValidationError(lazy_gettext(
"Line items with offset cannot be deleted."))
class JournalEntryForm(FlaskForm):
"""The base form to create or edit a journal entry."""
date = DateField()
"""The date."""
currencies = FieldList(FormField(CurrencyForm))
"""The line items categorized by their currencies."""
note = TextAreaField()
"""The note."""
def __init__(self, *args, **kwargs):
"""Constructs a base journal entry form.
:param args: The arguments.
:param kwargs: The keyword arguments.
"""
super().__init__(*args, **kwargs)
self.is_modified: bool = False
"""Whether the journal entry is modified during populate_obj()."""
self.collector: t.Type[LineItemCollector] = LineItemCollector
"""The line item collector. The default is the base abstract
collector only to provide the correct type. The subclass forms should
provide their own collectors."""
self.obj: JournalEntry | None = kwargs.get("obj")
"""The journal entry, when editing an existing one."""
self._is_need_payable: bool = False
"""Whether we need the payable original line items."""
self._is_need_receivable: bool = False
"""Whether we need the receivable original line items."""
self.__original_line_item_options: list[JournalEntryLineItem] | None \
= None
"""The options of the original line items."""
self.__net_balance_exceeded: dict[int, LazyString] | None = None
"""The original line items whose net balances were exceeded by the
amounts in the line item sub-forms."""
for line_item in self.line_items:
line_item.journal_entry_form = self
def populate_obj(self, obj: JournalEntry) -> None:
"""Populates the form data into a journal entry object.
:param obj: The journal entry object.
:return: None.
"""
is_new: bool = obj.id is None
if is_new:
obj.id = new_id(JournalEntry)
self.date: DateField
self.__set_date(obj, self.date.data)
obj.note = self.note.data
collector_cls: t.Type[LineItemCollector] = self.collector
collector: collector_cls = collector_cls(self, obj)
collector.collect()
to_delete: set[int] = {x.id for x in obj.line_items
if x.id not in collector.to_keep}
if len(to_delete) > 0:
JournalEntryLineItem.query\
.filter(JournalEntryLineItem.id.in_(to_delete)).delete()
self.is_modified = True
if is_new or db.session.is_modified(obj):
self.is_modified = True
if is_new:
current_user_pk: int = get_current_user_pk()
obj.created_by_id = current_user_pk
obj.updated_by_id = current_user_pk
@property
def line_items(self) -> list[LineItemForm]:
"""Collects and returns the line item sub-forms.
:return: The line item sub-forms.
"""
line_items: list[LineItemForm] = []
for currency in self.currencies:
line_items.extend(currency.line_items)
return line_items
def __set_date(self, obj: JournalEntry, new_date: dt.date) -> None:
"""Sets the journal entry date and number.
:param obj: The journal entry object.
:param new_date: The new date.
:return: None.
"""
if obj.date is None or obj.date != new_date:
if obj.date is not None:
sort_journal_entries_in(obj.date, obj.id)
if self.max_date is not None and new_date == self.max_date:
db_min_no: int | None = db.session.scalar(
sa.select(sa.func.min(JournalEntry.no))
.filter(JournalEntry.date == new_date))
if db_min_no is None:
obj.date = new_date
obj.no = 1
else:
obj.date = new_date
obj.no = db_min_no - 1
sort_journal_entries_in(new_date)
else:
sort_journal_entries_in(new_date, obj.id)
count: int = JournalEntry.query\
.filter(JournalEntry.date == new_date).count()
obj.date = new_date
obj.no = count + 1
@property
def debit_account_options(self) -> list[AccountOption]:
"""The selectable debit accounts.
:return: The selectable debit accounts.
"""
accounts: list[AccountOption] \
= [AccountOption(x) for x in Account.debit()
if not (x.code[0] == "2" and x.is_need_offset)]
in_use: set[int] = set(db.session.scalars(
sa.select(JournalEntryLineItem.account_id)
.filter(JournalEntryLineItem.is_debit)
.group_by(JournalEntryLineItem.account_id)).all())
for account in accounts:
account.is_in_use = account.id in in_use
return accounts
@property
def credit_account_options(self) -> list[AccountOption]:
"""The selectable credit accounts.
:return: The selectable credit accounts.
"""
accounts: list[AccountOption] \
= [AccountOption(x) for x in Account.credit()
if not (x.code[0] == "1" and x.is_need_offset)]
in_use: set[int] = set(db.session.scalars(
sa.select(JournalEntryLineItem.account_id)
.filter(sa.not_(JournalEntryLineItem.is_debit))
.group_by(JournalEntryLineItem.account_id)).all())
for account in accounts:
account.is_in_use = account.id in in_use
return accounts
@property
def currencies_errors(self) -> list[str | LazyString]:
"""Returns the currency errors, without the errors in their sub-forms.
:return: The currency errors, without the errors in their sub-forms.
"""
return [x for x in self.currencies.errors
if isinstance(x, str) or isinstance(x, LazyString)]
@property
def description_editor(self) -> DescriptionEditor:
"""Returns the description editor.
:return: The description editor.
"""
return DescriptionEditor()
@property
def original_line_item_options(self) -> list[JournalEntryLineItem]:
"""Returns the selectable original line items.
:return: The selectable original line items.
"""
if self.__original_line_item_options is None:
self.__original_line_item_options \
= get_selectable_original_line_items(
{x.eid.data for x in self.line_items
if x.eid.data is not None},
self._is_need_payable, self._is_need_receivable)
return self.__original_line_item_options
@property
def min_date(self) -> dt.date | None:
"""Returns the minimal available date.
:return: The minimal available date.
"""
original_line_item_id: set[int] \
= {x.original_line_item_id.data for x in self.line_items
if x.original_line_item_id.data is not None}
if len(original_line_item_id) == 0:
return None
select: sa.Select = sa.select(sa.func.max(JournalEntry.date))\
.join(JournalEntryLineItem)\
.filter(JournalEntryLineItem.id.in_(original_line_item_id))
return db.session.scalar(select)
@property
def max_date(self) -> dt.date | None:
"""Returns the maximum available date.
:return: The maximum available date.
"""
line_item_id: set[int] = {x.eid.data for x in self.line_items
if x.eid.data is not None}
select: sa.Select = sa.select(sa.func.min(JournalEntry.date))\
.join(JournalEntryLineItem)\
.filter(JournalEntryLineItem.original_line_item_id
.in_(line_item_id))
return db.session.scalar(select)
T = t.TypeVar("T", bound=JournalEntryForm)
"""A journal entry form variant."""
class LineItemCollector(t.Generic[T], ABC):
"""The line item collector."""
def __init__(self, form: T, obj: JournalEntry):
"""Constructs the line item collector.
:param form: The journal entry form.
:param obj: The journal entry.
"""
self.form: T = form
"""The journal entry form."""
self.__obj: JournalEntry = obj
"""The journal entry object."""
self.__line_items: list[JournalEntryLineItem] = list(obj.line_items)
"""The existing line items."""
self.__line_items_by_id: dict[int, JournalEntryLineItem] \
= {x.id: x for x in self.__line_items}
"""A dictionary from the line item ID to their line items."""
self.__no_by_id: dict[int, int] \
= {x.id: x.no for x in self.__line_items}
"""A dictionary from the line item number to their line items."""
self.__currencies: list[JournalEntryCurrency] = obj.currencies
"""The currencies in the journal entry."""
self._debit_no: int = 1
"""The number index for the debit line items."""
self._credit_no: int = 1
"""The number index for the credit line items."""
self.to_keep: set[int] = set()
"""The ID of the existing line items to keep."""
@abstractmethod
def collect(self) -> set[int]:
"""Collects the line items.
:return: The ID of the line items to keep.
"""
def _add_line_item(self, form: LineItemForm, currency_code: str, no: int) \
-> None:
"""Composes a line item from the form.
:param form: The line item form.
:param currency_code: The code of the currency.
:param no: The number of the line item.
:return: None.
"""
line_item: JournalEntryLineItem | None \
= self.__line_items_by_id.get(form.eid.data)
if line_item is not None:
line_item.currency_code = currency_code
form.populate_obj(line_item)
line_item.no = no
if db.session.is_modified(line_item):
self.form.is_modified = True
else:
line_item = JournalEntryLineItem()
line_item.currency_code = currency_code
form.populate_obj(line_item)
line_item.no = no
self.__obj.line_items.append(line_item)
self.form.is_modified = True
self.to_keep.add(line_item.id)
def _make_cash_line_item(self, forms: list[LineItemForm], is_debit: bool,
currency_code: str, no: int) -> None:
"""Composes the cash line item at the other debit or credit of the
cash journal entry.
:param forms: The line item forms in the same currency.
:param is_debit: True for a cash receipt journal entry, or False for a
cash disbursement journal entry.
:param currency_code: The code of the currency.
:param no: The number of the line item.
:return: None.
"""
candidates: list[JournalEntryLineItem] \
= [x for x in self.__line_items
if x.is_debit == is_debit and x.currency_code == currency_code]
line_item: JournalEntryLineItem
if len(candidates) > 0:
candidates.sort(key=lambda x: x.no)
line_item = candidates[0]
line_item.account_id = Account.cash().id
line_item.description = None
line_item.amount = sum([x.amount.data for x in forms])
line_item.no = no
if db.session.is_modified(line_item):
self.form.is_modified = True
else:
line_item = JournalEntryLineItem()
line_item.id = new_id(JournalEntryLineItem)
line_item.is_debit = is_debit
line_item.currency_code = currency_code
line_item.account_id = Account.cash().id
line_item.description = None
line_item.amount = sum([x.amount.data for x in forms])
line_item.no = no
self.__obj.line_items.append(line_item)
self.form.is_modified = True
self.to_keep.add(line_item.id)
def _sort_line_item_forms(self, forms: list[LineItemForm]) -> None:
"""Sorts the line item sub-forms.
:param forms: The line item sub-forms.
:return: None.
"""
missing_no: int = 100 if len(self.__no_by_id) == 0 \
else max(self.__no_by_id.values()) + 100
ord_by_form: dict[LineItemForm, int] \
= {forms[i]: i for i in range(len(forms))}
recv_no: set[int] = {x.no.data for x in forms if x.no.data is not None}
missing_recv_no: int = 100 if len(recv_no) == 0 else max(recv_no) + 100
forms.sort(key=lambda x: (x.no.data or missing_recv_no,
missing_no if x.eid.data is None else
self.__no_by_id.get(x.eid.data, missing_no),
ord_by_form.get(x)))
def _sort_currency_forms(self, forms: list[CurrencyForm]) -> None:
"""Sorts the currency forms.
:param forms: The currency forms.
:return: None.
"""
missing_no: int = len(self.__currencies) + 100
no_by_code: dict[str, int] = {self.__currencies[i].code: i
for i in range(len(self.__currencies))}
ord_by_form: dict[CurrencyForm, int] \
= {forms[i]: i for i in range(len(forms))}
recv_no: set[int] = {x.no.data for x in forms if x.no.data is not None}
missing_recv_no: int = 100 if len(recv_no) == 0 else max(recv_no) + 100
forms.sort(key=lambda x: (x.no.data or missing_recv_no,
no_by_code.get(x.code.data, missing_no),
ord_by_form.get(x)))
class CashReceiptJournalEntryForm(JournalEntryForm):
"""The form to create or edit a cash receipt journal entry."""
date = DateField(
validators=[DATE_REQUIRED,
NotBeforeOriginalLineItems(),
NotAfterOffsetItems()])
"""The date."""
currencies = FieldList(FormField(CashReceiptCurrencyForm), name="currency",
validators=[NeedSomeCurrencies()])
"""The line items categorized by their currencies."""
note = TextAreaField(filters=[strip_multiline_text])
"""The note."""
whole_form = BooleanField(
validators=[CannotDeleteOriginalLineItemsWithOffset()])
"""The pseudo field for the whole form validators."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._is_need_receivable = True
class Collector(LineItemCollector[CashReceiptJournalEntryForm]):
"""The line item collector for the cash receipt journal entries."""
def collect(self) -> None:
currencies: list[CashReceiptCurrencyForm] \
= [x.form for x in self.form.currencies]
self._sort_currency_forms(currencies)
for currency in currencies:
# The debit cash line item
self._make_cash_line_item(list(currency.credit), True,
currency.code.data,
self._debit_no)
self._debit_no = self._debit_no + 1
# The credit forms
credit_forms: list[CreditLineItemForm] \
= [x.form for x in currency.credit]
self._sort_line_item_forms(credit_forms)
for credit_form in credit_forms:
self._add_line_item(credit_form, currency.code.data,
self._credit_no)
self._credit_no = self._credit_no + 1
self.collector = Collector
class CashDisbursementJournalEntryForm(JournalEntryForm):
"""The form to create or edit a cash disbursement journal entry."""
date = DateField(
validators=[DATE_REQUIRED,
NotBeforeOriginalLineItems(),
NotAfterOffsetItems()])
"""The date."""
currencies = FieldList(FormField(CashDisbursementCurrencyForm),
name="currency",
validators=[NeedSomeCurrencies()])
"""The line items categorized by their currencies."""
note = TextAreaField(filters=[strip_multiline_text])
"""The note."""
whole_form = BooleanField(
validators=[CannotDeleteOriginalLineItemsWithOffset()])
"""The pseudo field for the whole form validators."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._is_need_payable = True
class Collector(LineItemCollector[CashDisbursementJournalEntryForm]):
"""The line item collector for the cash disbursement journal
entries."""
def collect(self) -> None:
currencies: list[CashDisbursementCurrencyForm] \
= [x.form for x in self.form.currencies]
self._sort_currency_forms(currencies)
for currency in currencies:
# The debit forms
debit_forms: list[DebitLineItemForm] \
= [x.form for x in currency.debit]
self._sort_line_item_forms(debit_forms)
for debit_form in debit_forms:
self._add_line_item(debit_form, currency.code.data,
self._debit_no)
self._debit_no = self._debit_no + 1
# The credit forms
self._make_cash_line_item(list(currency.debit), False,
currency.code.data,
self._credit_no)
self._credit_no = self._credit_no + 1
self.collector = Collector
class TransferJournalEntryForm(JournalEntryForm):
"""The form to create or edit a transfer journal entry."""
date = DateField(
validators=[DATE_REQUIRED,
NotBeforeOriginalLineItems(),
NotAfterOffsetItems()])
"""The date."""
currencies = FieldList(FormField(TransferCurrencyForm), name="currency",
validators=[NeedSomeCurrencies()])
"""The line items categorized by their currencies."""
note = TextAreaField(filters=[strip_multiline_text])
"""The note."""
whole_form = BooleanField(
validators=[CannotDeleteOriginalLineItemsWithOffset()])
"""The pseudo field for the whole form validators."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._is_need_payable = True
self._is_need_receivable = True
class Collector(LineItemCollector[TransferJournalEntryForm]):
"""The line item collector for the transfer journal entries."""
def collect(self) -> None:
currencies: list[TransferCurrencyForm] \
= [x.form for x in self.form.currencies]
self._sort_currency_forms(currencies)
for currency in currencies:
# The debit forms
debit_forms: list[DebitLineItemForm] \
= [x.form for x in currency.debit]
self._sort_line_item_forms(debit_forms)
for debit_form in debit_forms:
self._add_line_item(debit_form, currency.code.data,
self._debit_no)
self._debit_no = self._debit_no + 1
# The credit forms
credit_forms: list[CreditLineItemForm] \
= [x.form for x in currency.credit]
self._sort_line_item_forms(credit_forms)
for credit_form in credit_forms:
self._add_line_item(credit_form, currency.code.data,
self._credit_no)
self._credit_no = self._credit_no + 1
self.collector = Collector

View File

@ -0,0 +1,537 @@
# The Mia! Accounting Flask Project.
# Author: imacat@mail.imacat.idv.tw (imacat), 2023/3/10
# Copyright (c) 2023 imacat.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""The line item sub-forms for the journal entry management.
"""
import re
from datetime import date
from decimal import Decimal
import sqlalchemy as sa
from flask_babel import LazyString
from flask_wtf import FlaskForm
from sqlalchemy.orm import selectinload
from wtforms import StringField, ValidationError, DecimalField, IntegerField
from wtforms.validators import DataRequired, Optional
from accounting import db
from accounting.locale import lazy_gettext
from accounting.models import Account, JournalEntryLineItem
from accounting.template_filters import format_amount
from accounting.utils.cast import be
from accounting.utils.random_id import new_id
from accounting.utils.strip_text import strip_text
from accounting.utils.user import get_current_user_pk
ACCOUNT_REQUIRED: DataRequired = DataRequired(
lazy_gettext("Please select the account."))
"""The validator to check if the account code is empty."""
class OriginalLineItemExists:
"""The validator to check if the original line item exists."""
def __call__(self, form: FlaskForm, field: IntegerField) -> None:
if field.data is None:
return
if db.session.get(JournalEntryLineItem, field.data) is None:
raise ValidationError(lazy_gettext(
"The original line item does not exist."))
class OriginalLineItemOppositeDebitCredit:
"""The validator to check if the original line item is on the opposite
debit or credit."""
def __call__(self, form: FlaskForm, field: IntegerField) -> None:
if field.data is None:
return
original_line_item: JournalEntryLineItem | None \
= db.session.get(JournalEntryLineItem, field.data)
if original_line_item is None:
return
if isinstance(form, CreditLineItemForm) \
and original_line_item.is_debit:
return
if isinstance(form, DebitLineItemForm) \
and not original_line_item.is_debit:
return
raise ValidationError(lazy_gettext(
"The original line item is on the same debit or credit."))
class OriginalLineItemNeedOffset:
"""The validator to check if the original line item needs offset."""
def __call__(self, form: FlaskForm, field: IntegerField) -> None:
if field.data is None:
return
original_line_item: JournalEntryLineItem | None \
= db.session.get(JournalEntryLineItem, field.data)
if original_line_item is None:
return
if not original_line_item.account.is_need_offset:
raise ValidationError(lazy_gettext(
"The original line item does not need offset."))
class OriginalLineItemNotOffset:
"""The validator to check if the original line item is not itself an
offset item."""
def __call__(self, form: FlaskForm, field: IntegerField) -> None:
if field.data is None:
return
original_line_item: JournalEntryLineItem | None \
= db.session.get(JournalEntryLineItem, field.data)
if original_line_item is None:
return
if original_line_item.original_line_item_id is not None:
raise ValidationError(lazy_gettext(
"The original line item cannot be an offset item."))
class AccountExists:
"""The validator to check if the account exists."""
def __call__(self, form: FlaskForm, field: StringField) -> None:
if field.data is None:
return
if Account.find_by_code(field.data) is None:
raise ValidationError(lazy_gettext(
"The account does not exist."))
class IsDebitAccount:
"""The validator to check if the account is for debit line items."""
def __call__(self, form: FlaskForm, field: StringField) -> None:
if field.data is None:
return
if re.match(r"^(?:[1235689]|7[5678])", field.data) \
and not field.data.startswith("3351-") \
and not field.data.startswith("3353-"):
return
raise ValidationError(lazy_gettext(
"This account is not for debit line items."))
class IsCreditAccount:
"""The validator to check if the account is for credit line items."""
def __call__(self, form: FlaskForm, field: StringField) -> None:
if field.data is None:
return
if re.match(r"^(?:[123489]|7[1234])", field.data) \
and not field.data.startswith("3351-") \
and not field.data.startswith("3353-"):
return
raise ValidationError(lazy_gettext(
"This account is not for credit line items."))
class SameAccountAsOriginalLineItem:
"""The validator to check if the account is the same as the
original line item."""
def __call__(self, form: FlaskForm, field: StringField) -> None:
assert isinstance(form, LineItemForm)
if field.data is None or form.original_line_item_id.data is None:
return
original_line_item: JournalEntryLineItem | None \
= db.session.get(JournalEntryLineItem,
form.original_line_item_id.data)
if original_line_item is None:
return
if field.data != original_line_item.account_code:
raise ValidationError(lazy_gettext(
"The account must be the same as the original line item."))
class KeepAccountWhenHavingOffset:
"""The validator to check if the account is the same when having offset."""
def __call__(self, form: FlaskForm, field: StringField) -> None:
assert isinstance(form, LineItemForm)
if field.data is None or form.eid.data is None:
return
line_item: JournalEntryLineItem | None = db.session\
.query(JournalEntryLineItem)\
.filter(JournalEntryLineItem.id == form.eid.data)\
.options(selectinload(JournalEntryLineItem.offsets)).first()
if line_item is None or len(line_item.offsets) == 0:
return
if field.data != line_item.account_code:
raise ValidationError(lazy_gettext(
"The account must not be changed when there is offset."))
class NotStartPayableFromDebit:
"""The validator to check that a payable line item does not start from
debit."""
def __call__(self, form: FlaskForm, field: StringField) -> None:
assert isinstance(form, DebitLineItemForm)
if field.data is None \
or field.data[0] != "2" \
or form.original_line_item_id.data is not None:
return
account: Account | None = Account.find_by_code(field.data)
if account is not None and account.is_need_offset:
raise ValidationError(lazy_gettext(
"A payable line item cannot start from debit."))
class NotStartReceivableFromCredit:
"""The validator to check that a receivable line item does not start
from credit."""
def __call__(self, form: FlaskForm, field: StringField) -> None:
assert isinstance(form, CreditLineItemForm)
if field.data is None \
or field.data[0] != "1" \
or form.original_line_item_id.data is not None:
return
account: Account | None = Account.find_by_code(field.data)
if account is not None and account.is_need_offset:
raise ValidationError(lazy_gettext(
"A receivable line item cannot start from credit."))
class PositiveAmount:
"""The validator to check if the amount is positive."""
def __call__(self, form: FlaskForm, field: DecimalField) -> None:
if field.data is None:
return
if field.data <= 0:
raise ValidationError(lazy_gettext(
"Please fill in a positive amount."))
class NotExceedingOriginalLineItemNetBalance:
"""The validator to check if the amount exceeds the net balance of the
original line item."""
def __call__(self, form: FlaskForm, field: DecimalField) -> None:
assert isinstance(form, LineItemForm)
if field.data is None or form.original_line_item_id.data is None:
return
original_line_item: JournalEntryLineItem | None \
= db.session.get(JournalEntryLineItem,
form.original_line_item_id.data)
if original_line_item is None:
return
is_debit: bool = isinstance(form, DebitLineItemForm)
existing_line_item_id: set[int] = set()
if form.journal_entry_form.obj is not None:
existing_line_item_id \
= {x.id for x in form.journal_entry_form.obj.line_items}
offset_total_func: sa.Function = sa.func.sum(sa.case(
(be(JournalEntryLineItem.is_debit == is_debit),
JournalEntryLineItem.amount),
else_=-JournalEntryLineItem.amount))
offset_total_but_form: Decimal | None = db.session.scalar(
sa.select(offset_total_func)
.filter(be(JournalEntryLineItem.original_line_item_id
== original_line_item.id),
JournalEntryLineItem.id.not_in(existing_line_item_id)))
if offset_total_but_form is None:
offset_total_but_form = Decimal("0")
offset_total_on_form: Decimal = sum(
[x.amount.data for x in form.journal_entry_form.line_items
if x.original_line_item_id.data == original_line_item.id
and x.amount != field and x.amount.data is not None])
net_balance: Decimal = original_line_item.amount \
- offset_total_but_form - offset_total_on_form
if field.data > net_balance:
raise ValidationError(lazy_gettext(
"The amount must not exceed the net balance %(balance)s of the"
" original line item.", balance=format_amount(net_balance)))
class NotLessThanOffsetTotal:
"""The validator to check if the amount is less than the offset total."""
def __call__(self, form: FlaskForm, field: DecimalField) -> None:
assert isinstance(form, LineItemForm)
if field.data is None or form.eid.data is None:
return
is_debit: bool = isinstance(form, DebitLineItemForm)
select_offset_total: sa.Select = sa.select(sa.func.sum(sa.case(
(JournalEntryLineItem.is_debit != is_debit,
JournalEntryLineItem.amount),
else_=-JournalEntryLineItem.amount)))\
.filter(be(JournalEntryLineItem.original_line_item_id
== form.eid.data))
offset_total: Decimal | None = db.session.scalar(select_offset_total)
if offset_total is not None and field.data < offset_total:
raise ValidationError(lazy_gettext(
"The amount must not be less than the offset total %(total)s.",
total=format_amount(offset_total)))
class LineItemForm(FlaskForm):
"""The base form to create or edit a line item."""
eid = IntegerField()
"""The existing line item ID."""
no = IntegerField()
"""The order in the currency."""
original_line_item_id = IntegerField()
"""The Id of the original line item."""
account_code = StringField()
"""The account code."""
description = StringField()
"""The description."""
amount = DecimalField()
"""The amount."""
def __init__(self, *args, **kwargs):
"""Constructs a base line item form.
:param args: The arguments.
:param kwargs: The keyword arguments.
"""
super().__init__(*args, **kwargs)
from .journal_entry import JournalEntryForm
self.journal_entry_form: JournalEntryForm | None = None
"""The source journal entry form."""
@property
def account_text(self) -> str:
"""Returns the text representation of the account.
:return: The text representation of the account.
"""
if self.account_code.data is None:
return ""
account: Account | None = Account.find_by_code(self.account_code.data)
if account is None:
return ""
return str(account)
@property
def __original_line_item(self) -> JournalEntryLineItem | None:
"""Returns the original line item.
:return: The original line item.
"""
if not hasattr(self, "____original_line_item"):
def get_line_item() -> JournalEntryLineItem | None:
if self.original_line_item_id.data is None:
return None
return db.session.get(JournalEntryLineItem,
self.original_line_item_id.data)
setattr(self, "____original_line_item", get_line_item())
return getattr(self, "____original_line_item")
@property
def original_line_item_date(self) -> date | None:
"""Returns the text representation of the original line item.
:return: The text representation of the original line item.
"""
return None if self.__original_line_item is None \
else self.__original_line_item.journal_entry.date
@property
def original_line_item_text(self) -> str | None:
"""Returns the text representation of the original line item.
:return: The text representation of the original line item.
"""
return None if self.__original_line_item is None \
else str(self.__original_line_item)
@property
def is_need_offset(self) -> bool:
"""Returns whether the line item needs offset.
:return: True if the line item needs offset, or False otherwise.
"""
if self.account_code.data is None:
return False
if self.account_code.data[0] == "1":
if isinstance(self, CreditLineItemForm):
return False
elif self.account_code.data[0] == "2":
if isinstance(self, DebitLineItemForm):
return False
else:
return False
account: Account | None = Account.find_by_code(self.account_code.data)
return account is not None and account.is_need_offset
@property
def offsets(self) -> list[JournalEntryLineItem]:
"""Returns the offsets.
:return: The offsets.
"""
if not hasattr(self, "__offsets"):
def get_offsets() -> list[JournalEntryLineItem]:
if not self.is_need_offset or self.eid.data is None:
return []
return JournalEntryLineItem.query\
.filter(JournalEntryLineItem.original_line_item_id
== self.eid.data)\
.options(selectinload(JournalEntryLineItem.journal_entry),
selectinload(JournalEntryLineItem.account),
selectinload(JournalEntryLineItem.offsets)
.selectinload(
JournalEntryLineItem.journal_entry)).all()
setattr(self, "__offsets", get_offsets())
return getattr(self, "__offsets")
@property
def offset_total(self) -> Decimal | None:
"""Returns the total amount of the offsets.
:return: The total amount of the offsets.
"""
if not hasattr(self, "__offset_total"):
def get_offset_total():
if not self.is_need_offset or self.eid.data is None:
return None
is_debit: bool = isinstance(self, DebitLineItemForm)
return sum([x.amount if x.is_debit != is_debit else -x.amount
for x in self.offsets])
setattr(self, "__offset_total", get_offset_total())
return getattr(self, "__offset_total")
@property
def net_balance(self) -> Decimal | None:
"""Returns the net balance.
:return: The net balance.
"""
if not self.is_need_offset or self.eid.data is None \
or self.amount.data is None:
return None
return self.amount.data - self.offset_total
@property
def all_errors(self) -> list[str | LazyString]:
"""Returns all the errors of the form.
:return: All the errors of the form.
"""
all_errors: list[str | LazyString] = []
for key in self.errors:
if key != "csrf_token":
all_errors.extend(self.errors[key])
return all_errors
class DebitLineItemForm(LineItemForm):
"""The form to create or edit a debit line item."""
eid = IntegerField()
"""The existing line item ID."""
no = IntegerField()
"""The order in the currency."""
original_line_item_id = IntegerField(
validators=[Optional(),
OriginalLineItemExists(),
OriginalLineItemOppositeDebitCredit(),
OriginalLineItemNeedOffset(),
OriginalLineItemNotOffset()])
"""The ID of the original line item."""
account_code = StringField(
filters=[strip_text],
validators=[ACCOUNT_REQUIRED,
AccountExists(),
IsDebitAccount(),
SameAccountAsOriginalLineItem(),
KeepAccountWhenHavingOffset(),
NotStartPayableFromDebit()])
"""The account code."""
description = StringField(filters=[strip_text])
"""The description."""
amount = DecimalField(
validators=[PositiveAmount(),
NotExceedingOriginalLineItemNetBalance(),
NotLessThanOffsetTotal()])
"""The amount."""
def populate_obj(self, obj: JournalEntryLineItem) -> None:
"""Populates the form data into a line item object.
:param obj: The line item object.
:return: None.
"""
is_new: bool = obj.id is None
if is_new:
obj.id = new_id(JournalEntryLineItem)
obj.original_line_item_id = self.original_line_item_id.data
obj.account_id = Account.find_by_code(self.account_code.data).id
obj.description = self.description.data
obj.is_debit = True
obj.amount = self.amount.data
if is_new:
current_user_pk: int = get_current_user_pk()
obj.created_by_id = current_user_pk
obj.updated_by_id = current_user_pk
class CreditLineItemForm(LineItemForm):
"""The form to create or edit a credit line item."""
eid = IntegerField()
"""The existing line item ID."""
no = IntegerField()
"""The order in the currency."""
original_line_item_id = IntegerField(
validators=[Optional(),
OriginalLineItemExists(),
OriginalLineItemOppositeDebitCredit(),
OriginalLineItemNeedOffset(),
OriginalLineItemNotOffset()])
"""The ID of the original line item."""
account_code = StringField(
filters=[strip_text],
validators=[ACCOUNT_REQUIRED,
AccountExists(),
IsCreditAccount(),
SameAccountAsOriginalLineItem(),
KeepAccountWhenHavingOffset(),
NotStartReceivableFromCredit()])
"""The account code."""
description = StringField(filters=[strip_text])
"""The description."""
amount = DecimalField(
validators=[PositiveAmount(),
NotExceedingOriginalLineItemNetBalance(),
NotLessThanOffsetTotal()])
"""The amount."""
def populate_obj(self, obj: JournalEntryLineItem) -> None:
"""Populates the form data into a line item object.
:param obj: The line item object.
:return: None.
"""
is_new: bool = obj.id is None
if is_new:
obj.id = new_id(JournalEntryLineItem)
obj.original_line_item_id = self.original_line_item_id.data
obj.account_id = Account.find_by_code(self.account_code.data).id
obj.description = self.description.data
obj.is_debit = False
obj.amount = self.amount.data
if is_new:
current_user_pk: int = get_current_user_pk()
obj.created_by_id = current_user_pk
obj.updated_by_id = current_user_pk

View File

@ -0,0 +1,95 @@
# The Mia! Accounting Flask Project.
# Author: imacat@mail.imacat.idv.tw (imacat), 2023/3/10
# Copyright (c) 2023 imacat.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""The reorder forms for the journal entry management.
"""
from datetime import date
import sqlalchemy as sa
from flask import request
from accounting import db
from accounting.models import JournalEntry
def sort_journal_entries_in(journal_entry_date: date,
exclude: int | None = None) -> None:
"""Sorts the journal entries under a date after changing the date or
deleting a journal entry.
:param journal_entry_date: The date of the journal entry.
:param exclude: The journal entry ID to exclude.
:return: None.
"""
conditions: list[sa.BinaryExpression] \
= [JournalEntry.date == journal_entry_date]
if exclude is not None:
conditions.append(JournalEntry.id != exclude)
journal_entries: list[JournalEntry] = JournalEntry.query\
.filter(*conditions)\
.order_by(JournalEntry.no).all()
for i in range(len(journal_entries)):
if journal_entries[i].no != i + 1:
journal_entries[i].no = i + 1
class JournalEntryReorderForm:
"""The form to reorder the journal entries."""
def __init__(self, journal_entry_date: date):
"""Constructs the form to reorder the journal entries in a day.
:param journal_entry_date: The date.
"""
self.date: date = journal_entry_date
self.is_modified: bool = False
def save_order(self) -> None:
"""Saves the order of the account.
:return:
"""
journal_entries: list[JournalEntry] = JournalEntry.query\
.filter(JournalEntry.date == self.date).all()
# Collects the specified order.
orders: dict[JournalEntry, int] = {}
for journal_entry in journal_entries:
if f"{journal_entry.id}-no" in request.form:
try:
orders[journal_entry] \
= int(request.form[f"{journal_entry.id}-no"])
except ValueError:
pass
# Missing and invalid orders are appended to the end.
missing: list[JournalEntry] \
= [x for x in journal_entries if x not in orders]
if len(missing) > 0:
next_no: int = 1 if len(orders) == 0 else max(orders.values()) + 1
for journal_entry in missing:
orders[journal_entry] = next_no
# Sort by the specified order first, and their original order.
journal_entries.sort(key=lambda x: (orders[x], x.no))
# Update the orders.
with db.session.no_autoflush:
for i in range(len(journal_entries)):
if journal_entries[i].no != i + 1:
journal_entries[i].no = i + 1
self.is_modified = True

View File

@ -0,0 +1,82 @@
# The Mia! Accounting Flask Project.
# Author: imacat@mail.imacat.idv.tw (imacat), 2023/2/25
# Copyright (c) 2023 imacat.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""The template filters for the journal entry management.
"""
from decimal import Decimal
from html import escape
from urllib.parse import ParseResult, urlparse, parse_qsl, urlencode, \
urlunparse
from flask import request
def with_type(uri: str) -> str:
"""Adds the journal entry type to the URI, if it is specified.
:param uri: The URI.
:return: The result URL, optionally with the journal entry type added.
"""
if "as" not in request.args:
return uri
uri_p: ParseResult = urlparse(uri)
params: list[tuple[str, str]] = parse_qsl(uri_p.query)
params = [x for x in params if x[0] != "as"]
params.append(("as", request.args["as"]))
parts: list[str] = list(uri_p)
parts[4] = urlencode(params)
return urlunparse(parts)
def to_transfer(uri: str) -> str:
"""Adds the transfer journal entry type to the URI.
:param uri: The URI.
:return: The result URL, with the transfer journal entry type added.
"""
uri_p: ParseResult = urlparse(uri)
params: list[tuple[str, str]] = parse_qsl(uri_p.query)
params = [x for x in params if x[0] != "as"]
params.append(("as", "transfer"))
parts: list[str] = list(uri_p)
parts[4] = urlencode(params)
return urlunparse(parts)
def format_amount_input(value: Decimal | None) -> str:
"""Format an amount for an input value.
:param value: The amount.
:return: The formatted amount text for an input value.
"""
if value is None:
return ""
whole: int = int(value)
frac: Decimal = (value - whole).normalize()
return str(whole) + str(frac)[1:]
def text2html(value: str) -> str:
"""Converts plain text into HTML.
:param value: The plain text.
:return: The HTML.
"""
s: str = escape(value)
s = s.replace("\n", "<br>")
s = s.replace(" ", " &nbsp;")
return s

View File

@ -0,0 +1,19 @@
# The Mia! Accounting Flask Project.
# Author: imacat@mail.imacat.idv.tw (imacat), 2023/3/10
# Copyright (c) 2023 imacat.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""The utilities for the journal entry management.
"""

View File

@ -0,0 +1,49 @@
# The Mia! Accounting Flask Project.
# Author: imacat@mail.imacat.idv.tw (imacat), 2023/3/10
# Copyright (c) 2023 imacat.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""The account option for the journal entry management.
"""
from accounting.models import Account
class AccountOption:
"""An account option."""
def __init__(self, account: Account):
"""Constructs an account option.
:param account: The account.
"""
self.id: str = account.id
"""The account ID."""
self.code: str = account.code
"""The account code."""
self.query_values: list[str] = account.query_values
"""The values to be queried."""
self.__str: str = str(account)
"""The string representation of the account option."""
self.is_in_use: bool = False
"""True if this account is in use, or False otherwise."""
self.is_need_offset: bool = account.is_need_offset
"""True if this account needs offset, or False otherwise."""
def __str__(self) -> str:
"""Returns the string representation of the account option.
:return: The string representation of the account option.
"""
return self.__str

View File

@ -0,0 +1,261 @@
# The Mia! Accounting Flask Project.
# Author: imacat@mail.imacat.idv.tw (imacat), 2023/2/27
# Copyright (c) 2023 imacat.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""The description editor.
"""
import typing as t
import sqlalchemy as sa
from accounting import db
from accounting.models import Account, JournalEntryLineItem
class DescriptionAccount:
"""An account for a description tag."""
def __init__(self, account: Account, freq: int):
"""Constructs an account for a description tag.
:param account: The account.
:param freq: The frequency of the tag with the account.
"""
self.account: Account = account
"""The account."""
self.id: int = account.id
"""The account ID."""
self.code: str = account.code
"""The account code."""
self.freq: int = freq
"""The frequency of the tag with the account."""
def __str__(self) -> str:
"""Returns the string representation of the account.
:return: The string representation of the account.
"""
return str(self.account)
def add_freq(self, freq: int) -> None:
"""Adds the frequency of an account.
:param freq: The frequency of the tag name with the account.
:return: None.
"""
self.freq = self.freq + freq
class DescriptionTag:
"""A description tag."""
def __init__(self, name: str):
"""Constructs a description tag.
:param name: The tag name.
"""
self.name: str = name
"""The tag name."""
self.__account_dict: dict[int, DescriptionAccount] = {}
"""The accounts that come with the tag, in the order of their
frequency."""
self.freq: int = 0
"""The frequency of the tag."""
def __str__(self) -> str:
"""Returns the string representation of the tag.
:return: The string representation of the tag.
"""
return self.name
def add_account(self, account: Account, freq: int):
"""Adds an account.
:param account: The associated account.
:param freq: The frequency of the tag name with the account.
:return: None.
"""
self.__account_dict[account.id] = DescriptionAccount(account, freq)
self.freq = self.freq + freq
@property
def accounts(self) -> list[DescriptionAccount]:
"""Returns the accounts by the order of their frequencies.
:return: The accounts by the order of their frequencies.
"""
return sorted(self.__account_dict.values(), key=lambda x: -x.freq)
@property
def account_codes(self) -> list[str]:
"""Returns the account codes by the order of their frequencies.
:return: The account codes by the order of their frequencies.
"""
return [x.code for x in self.accounts]
class DescriptionType:
"""A description type"""
def __init__(self, type_id: t.Literal["general", "travel", "bus"]):
"""Constructs a description type.
:param type_id: The type ID, either "general", "travel", or "bus".
"""
self.id: t.Literal["general", "travel", "bus"] = type_id
"""The type ID."""
self.__tag_dict: dict[str, DescriptionTag] = {}
"""A dictionary from the tag name to their corresponding tag."""
def add_tag(self, name: str, account: Account, freq: int) -> None:
"""Adds a tag.
:param name: The tag name.
:param account: The associated account.
:param freq: The frequency of the tag name with the account.
:return: None.
"""
if name not in self.__tag_dict:
self.__tag_dict[name] = DescriptionTag(name)
self.__tag_dict[name].add_account(account, freq)
@property
def tags(self) -> list[DescriptionTag]:
"""Returns the tags by the order of their frequencies.
:return: The tags by the order of their frequencies.
"""
return sorted(self.__tag_dict.values(), key=lambda x: -x.freq)
class DescriptionDebitCredit:
"""The description on debit or credit."""
def __init__(self, debit_credit: t.Literal["debit", "credit"]):
"""Constructs the description on debit or credit.
:param debit_credit: Either "debit" or "credit".
"""
self.debit_credit: t.Literal["debit", "credit"] = debit_credit
"""Either debit or credit."""
self.general: DescriptionType = DescriptionType("general")
"""The general tags."""
self.travel: DescriptionType = DescriptionType("travel")
"""The travel tags."""
self.bus: DescriptionType = DescriptionType("bus")
"""The bus tags."""
self.__type_dict: dict[t.Literal["general", "travel", "bus"],
DescriptionType] \
= {x.id: x for x in {self.general, self.travel, self.bus}}
"""A dictionary from the type ID to the corresponding tags."""
def add_tag(self, tag_type: t.Literal["general", "travel", "bus"],
name: str, account: Account, freq: int) -> None:
"""Adds a tag.
:param tag_type: The tag type, either "general", "travel", or "bus".
:param name: The name.
:param account: The associated account.
:param freq: The frequency of the tag name with the account.
:return: None.
"""
self.__type_dict[tag_type].add_tag(name, account, freq)
@property
def accounts(self) -> list[DescriptionAccount]:
"""Returns the suggested accounts of all tags in the description editor
in debit or credit, in their frequency order.
:return: The suggested accounts of all tags, in their frequency order.
"""
accounts: dict[int, DescriptionAccount] = {}
freq: dict[int, int] = {}
for tag_type in self.__type_dict.values():
for tag in tag_type.tags:
for account in tag.accounts:
accounts[account.id] = account
if account.id not in freq:
freq[account.id] = 0
freq[account.id] \
= freq[account.id] + account.freq
return [accounts[y] for y in sorted(freq.keys(),
key=lambda x: -freq[x])]
class DescriptionEditor:
"""The description editor."""
def __init__(self):
"""Constructs the description editor."""
self.debit: DescriptionDebitCredit = DescriptionDebitCredit("debit")
"""The debit tags."""
self.credit: DescriptionDebitCredit = DescriptionDebitCredit("credit")
"""The credit tags."""
debit_credit: sa.Label = sa.case(
(JournalEntryLineItem.is_debit, "debit"),
else_="credit").label("debit_credit")
tag_type: sa.Label = sa.case(
(JournalEntryLineItem.description.like("_%—_%—_%→_%"), "bus"),
(sa.or_(JournalEntryLineItem.description.like("_%—_%→_%"),
JournalEntryLineItem.description.like("_%—_%↔_%")),
"travel"),
else_="general").label("tag_type")
tag: sa.Label = get_prefix(JournalEntryLineItem.description, "")\
.label("tag")
select: sa.Select = sa.Select(debit_credit, tag_type, tag,
JournalEntryLineItem.account_id,
sa.func.count().label("freq"))\
.filter(JournalEntryLineItem.description.is_not(None),
JournalEntryLineItem.description.like("_%—_%"),
JournalEntryLineItem.original_line_item_id.is_(None))\
.group_by(debit_credit, tag_type, tag,
JournalEntryLineItem.account_id)
result: list[sa.Row] = db.session.execute(select).all()
accounts: dict[int, Account] \
= {x.id: x for x in Account.query
.filter(Account.id.in_({x.account_id for x in result})).all()}
debit_credit_dict: dict[t.Literal["debit", "credit"],
DescriptionDebitCredit] \
= {x.debit_credit: x for x in {self.debit, self.credit}}
for row in result:
debit_credit_dict[row.debit_credit].add_tag(
row.tag_type, row.tag, accounts[row.account_id], row.freq)
def get_prefix(string: str | sa.Column, separator: str | sa.Column) \
-> sa.Function:
"""Returns the SQL function to find the prefix of a string.
:param string: The string.
:param separator: The separator.
:return: The position of the substring, starting from 1.
"""
return sa.func.substr(string, 0, get_position(string, separator))
def get_position(string: str | sa.Column, substring: str | sa.Column) \
-> sa.Function:
"""Returns the SQL function to find the position of a substring.
:param string: The string.
:param substring: The substring.
:return: The position of the substring, starting from 1.
"""
if db.engine.name == "postgresql":
return sa.func.strpos(string, substring)
return sa.func.instr(string, substring)

View File

@ -0,0 +1,39 @@
# The Mia! Accounting Flask Project.
# Author: imacat@mail.imacat.idv.tw (imacat), 2023/3/15
# Copyright (c) 2023 imacat.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""The SQLAlchemy alias for the offset items.
"""
import typing as t
import sqlalchemy as sa
from accounting.models import JournalEntryLineItem
def offset_alias() -> sa.Alias:
"""Returns the SQLAlchemy alias for the offset items.
:return: The SQLAlchemy alias for the offset items.
"""
def as_from(model_cls: t.Any) -> sa.FromClause:
return model_cls
def as_alias(alias: t.Any) -> sa.Alias:
return alias
return as_alias(sa.alias(as_from(JournalEntryLineItem), name="offset"))

View File

@ -0,0 +1,336 @@
# The Mia! Accounting Flask Project.
# Author: imacat@mail.imacat.idv.tw (imacat), 2023/2/19
# Copyright (c) 2023 imacat.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""The operators for different journal entry types.
"""
import typing as t
from abc import ABC, abstractmethod
from flask import render_template, request, abort
from flask_wtf import FlaskForm
from accounting.models import JournalEntry
from accounting.template_globals import default_currency_code
from accounting.utils.journal_entry_types import JournalEntryType
from accounting.journal_entry.forms import JournalEntryForm, \
CashReceiptJournalEntryForm, CashDisbursementJournalEntryForm, \
TransferJournalEntryForm
from accounting.journal_entry.forms.line_item import LineItemForm
class JournalEntryOperator(ABC):
"""The base journal entry operator."""
CHECK_ORDER: int = -1
"""The order when checking the journal entry operator."""
@property
@abstractmethod
def form(self) -> t.Type[JournalEntryForm]:
"""Returns the form class.
:return: The form class.
"""
@abstractmethod
def render_create_template(self, form: FlaskForm) -> str:
"""Renders the template for the form to create a journal entry.
:param form: The journal entry form.
:return: the form to create a journal entry.
"""
@abstractmethod
def render_detail_template(self, journal_entry: JournalEntry) -> str:
"""Renders the template for the detail page.
:param journal_entry: The journal entry.
:return: the detail page.
"""
@abstractmethod
def render_edit_template(self, journal_entry: JournalEntry,
form: FlaskForm) -> str:
"""Renders the template for the form to edit a journal entry.
:param journal_entry: The journal entry.
:param form: The form.
:return: the form to edit a journal entry.
"""
@abstractmethod
def is_my_type(self, journal_entry: JournalEntry) -> bool:
"""Checks and returns whether the journal entry belongs to the type.
:param journal_entry: The journal entry.
:return: True if the journal entry belongs to the type, or False
otherwise.
"""
@property
def _line_item_template(self) -> str:
"""Renders and returns the template for the line item sub-form.
:return: The template for the line item sub-form.
"""
return render_template(
"accounting/journal-entry/include/form-line-item.html",
currency_index="CURRENCY_INDEX",
debit_credit="DEBIT_CREDIT",
line_item_index="LINE_ITEM_INDEX",
form=LineItemForm())
class CashReceiptJournalEntry(JournalEntryOperator):
"""A cash receipt journal entry."""
CHECK_ORDER: int = 2
"""The order when checking the journal entry operator."""
@property
def form(self) -> t.Type[JournalEntryForm]:
"""Returns the form class.
:return: The form class.
"""
return CashReceiptJournalEntryForm
def render_create_template(self, form: CashReceiptJournalEntryForm) -> str:
"""Renders the template for the form to create a journal entry.
:param form: The journal entry form.
:return: the form to create a journal entry.
"""
return render_template(
"accounting/journal-entry/receipt/create.html",
form=form,
journal_entry_type=JournalEntryType.CASH_RECEIPT,
currency_template=self.__currency_template,
line_item_template=self._line_item_template)
def render_detail_template(self, journal_entry: JournalEntry) -> str:
"""Renders the template for the detail page.
:param journal_entry: The journal entry.
:return: the detail page.
"""
return render_template("accounting/journal-entry/receipt/detail.html",
obj=journal_entry)
def render_edit_template(self, journal_entry: JournalEntry,
form: CashReceiptJournalEntryForm) -> str:
"""Renders the template for the form to edit a journal entry.
:param journal_entry: The journal entry.
:param form: The form.
:return: the form to edit a journal entry.
"""
return render_template("accounting/journal-entry/receipt/edit.html",
journal_entry=journal_entry, form=form,
currency_template=self.__currency_template,
line_item_template=self._line_item_template)
def is_my_type(self, journal_entry: JournalEntry) -> bool:
"""Checks and returns whether the journal entry belongs to the type.
:param journal_entry: The journal entry.
:return: True if the journal entry belongs to the type, or False
otherwise.
"""
return journal_entry.is_cash_receipt
@property
def __currency_template(self) -> str:
"""Renders and returns the template for the currency sub-form.
:return: The template for the currency sub-form.
"""
return render_template(
"accounting/journal-entry/receipt/include/form-currency.html",
currency_index="CURRENCY_INDEX",
currency_code_data=default_currency_code(),
credit_total="-")
class CashDisbursementJournalEntry(JournalEntryOperator):
"""A cash disbursement journal entry."""
CHECK_ORDER: int = 1
"""The order when checking the journal entry operator."""
@property
def form(self) -> t.Type[JournalEntryForm]:
"""Returns the form class.
:return: The form class.
"""
return CashDisbursementJournalEntryForm
def render_create_template(self, form: CashDisbursementJournalEntryForm) \
-> str:
"""Renders the template for the form to create a journal entry.
:param form: The journal entry form.
:return: the form to create a journal entry.
"""
return render_template(
"accounting/journal-entry/disbursement/create.html",
form=form,
journal_entry_type=JournalEntryType.CASH_DISBURSEMENT,
currency_template=self.__currency_template,
line_item_template=self._line_item_template)
def render_detail_template(self, journal_entry: JournalEntry) -> str:
"""Renders the template for the detail page.
:param journal_entry: The journal entry.
:return: the detail page.
"""
return render_template(
"accounting/journal-entry/disbursement/detail.html",
obj=journal_entry)
def render_edit_template(self, journal_entry: JournalEntry,
form: CashDisbursementJournalEntryForm) -> str:
"""Renders the template for the form to edit a journal entry.
:param journal_entry: The journal entry.
:param form: The form.
:return: the form to edit a journal entry.
"""
return render_template(
"accounting/journal-entry/disbursement/edit.html",
journal_entry=journal_entry, form=form,
currency_template=self.__currency_template,
line_item_template=self._line_item_template)
def is_my_type(self, journal_entry: JournalEntry) -> bool:
"""Checks and returns whether the journal entry belongs to the type.
:param journal_entry: The journal entry.
:return: True if the journal entry belongs to the type, or False
otherwise.
"""
return journal_entry.is_cash_disbursement
@property
def __currency_template(self) -> str:
"""Renders and returns the template for the currency sub-form.
:return: The template for the currency sub-form.
"""
return render_template(
"accounting/journal-entry/disbursement/include/form-currency.html",
currency_index="CURRENCY_INDEX",
currency_code_data=default_currency_code(),
debit_total="-")
class TransferJournalEntry(JournalEntryOperator):
"""A transfer journal entry."""
CHECK_ORDER: int = 3
"""The order when checking the journal entry operator."""
@property
def form(self) -> t.Type[JournalEntryForm]:
"""Returns the form class.
:return: The form class.
"""
return TransferJournalEntryForm
def render_create_template(self, form: TransferJournalEntryForm) -> str:
"""Renders the template for the form to create a journal entry.
:param form: The journal entry form.
:return: the form to create a journal entry.
"""
return render_template(
"accounting/journal-entry/transfer/create.html",
form=form,
journal_entry_type=JournalEntryType.TRANSFER,
currency_template=self.__currency_template,
line_item_template=self._line_item_template)
def render_detail_template(self, journal_entry: JournalEntry) -> str:
"""Renders the template for the detail page.
:param journal_entry: The journal entry.
:return: the detail page.
"""
return render_template("accounting/journal-entry/transfer/detail.html",
obj=journal_entry)
def render_edit_template(self, journal_entry: JournalEntry,
form: TransferJournalEntryForm) -> str:
"""Renders the template for the form to edit a journal entry.
:param journal_entry: The journal entry.
:param form: The form.
:return: the form to edit a journal entry.
"""
return render_template("accounting/journal-entry/transfer/edit.html",
journal_entry=journal_entry, form=form,
currency_template=self.__currency_template,
line_item_template=self._line_item_template)
def is_my_type(self, journal_entry: JournalEntry) -> bool:
"""Checks and returns whether the journal entry belongs to the type.
:param journal_entry: The journal entry.
:return: True if the journal entry belongs to the type, or False
otherwise.
"""
return True
@property
def __currency_template(self) -> str:
"""Renders and returns the template for the currency sub-form.
:return: The template for the currency sub-form.
"""
return render_template(
"accounting/journal-entry/transfer/include/form-currency.html",
currency_index="CURRENCY_INDEX",
currency_code_data=default_currency_code(),
debit_total="-", credit_total="-")
JOURNAL_ENTRY_TYPE_TO_OP: dict[JournalEntryType, JournalEntryOperator] \
= {JournalEntryType.CASH_RECEIPT: CashReceiptJournalEntry(),
JournalEntryType.CASH_DISBURSEMENT: CashDisbursementJournalEntry(),
JournalEntryType.TRANSFER: TransferJournalEntry()}
"""The map from the journal entry types to their operators."""
def get_journal_entry_op(journal_entry: JournalEntry,
is_check_as: bool = False) -> JournalEntryOperator:
"""Returns the journal entry operator that may be specified in the "as"
query parameter. If it is not specified, check the journal entry type from
the journal entry.
:param journal_entry: The journal entry.
:param is_check_as: True to check the "as" parameter, or False otherwise.
:return: None.
"""
if is_check_as and "as" in request.args:
type_dict: dict[str, JournalEntryType] \
= {x.value: x for x in JournalEntryType}
if request.args["as"] not in type_dict:
abort(404)
return JOURNAL_ENTRY_TYPE_TO_OP[type_dict[request.args["as"]]]
for journal_entry_type in sorted(JOURNAL_ENTRY_TYPE_TO_OP.values(),
key=lambda x: x.CHECK_ORDER):
if journal_entry_type.is_my_type(journal_entry):
return journal_entry_type

View File

@ -0,0 +1,84 @@
# The Mia! Accounting Flask Project.
# Author: imacat@mail.imacat.idv.tw (imacat), 2023/3/10
# Copyright (c) 2023 imacat.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""The selectable original line items.
"""
from decimal import Decimal
import sqlalchemy as sa
from sqlalchemy.orm import selectinload
from accounting import db
from accounting.models import Account, JournalEntry, JournalEntryLineItem
from accounting.utils.cast import be
from .offset_alias import offset_alias
def get_selectable_original_line_items(
line_item_id_on_form: set[int], is_payable: bool,
is_receivable: bool) -> list[JournalEntryLineItem]:
"""Queries and returns the selectable original line items, with their net
balances. The offset amounts of the form is excluded.
:param line_item_id_on_form: The ID of the line items on the form.
:param is_payable: True to check the payable original line items, or False
otherwise.
:param is_receivable: True to check the receivable original line items, or
False otherwise.
:return: The selectable original line items, with their net balances.
"""
assert is_payable or is_receivable
offset: sa.Alias = offset_alias()
net_balance: sa.Label = (JournalEntryLineItem.amount + sa.func.sum(sa.case(
(offset.c.id.in_(line_item_id_on_form), 0),
(be(offset.c.is_debit == JournalEntryLineItem.is_debit),
offset.c.amount),
else_=-offset.c.amount))).label("net_balance")
conditions: list[sa.BinaryExpression] = [Account.is_need_offset]
sub_conditions: list[sa.BinaryExpression] = []
if is_payable:
sub_conditions.append(sa.and_(Account.base_code.startswith("2"),
sa.not_(JournalEntryLineItem.is_debit)))
if is_receivable:
sub_conditions.append(sa.and_(Account.base_code.startswith("1"),
JournalEntryLineItem.is_debit))
conditions.append(sa.or_(*sub_conditions))
select_net_balances: sa.Select \
= sa.select(JournalEntryLineItem.id, net_balance)\
.join(Account)\
.join(offset, be(JournalEntryLineItem.id
== offset.c.original_line_item_id),
isouter=True)\
.filter(*conditions)\
.group_by(JournalEntryLineItem.id)\
.having(sa.or_(sa.func.count(offset.c.id) == 0, net_balance != 0))
net_balances: dict[int, Decimal] \
= {x.id: x.net_balance
for x in db.session.execute(select_net_balances).all()}
line_items: list[JournalEntryLineItem] = JournalEntryLineItem.query\
.filter(JournalEntryLineItem.id.in_({x for x in net_balances}))\
.join(JournalEntry)\
.order_by(JournalEntry.date, JournalEntryLineItem.is_debit,
JournalEntryLineItem.no)\
.options(selectinload(JournalEntryLineItem.currency),
selectinload(JournalEntryLineItem.account),
selectinload(JournalEntryLineItem.journal_entry)).all()
for line_item in line_items:
line_item.net_balance = line_item.amount \
if net_balances[line_item.id] is None \
else net_balances[line_item.id]
return line_items

View File

@ -0,0 +1,235 @@
# The Mia! Accounting Flask Project.
# Author: imacat@mail.imacat.idv.tw (imacat), 2023/2/18
# Copyright (c) 2023 imacat.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""The views for the journal entry management.
"""
from datetime import date
from urllib.parse import parse_qsl, urlencode
import sqlalchemy as sa
from flask import Blueprint, render_template, session, redirect, request, \
flash, url_for
from werkzeug.datastructures import ImmutableMultiDict
from accounting import db
from accounting.locale import lazy_gettext
from accounting.models import JournalEntry
from accounting.utils.cast import s
from accounting.utils.flash_errors import flash_form_errors
from accounting.utils.next_uri import inherit_next, or_next
from accounting.utils.permission import has_permission, can_view, can_edit
from accounting.utils.journal_entry_types import JournalEntryType
from accounting.utils.user import get_current_user_pk
from .forms import sort_journal_entries_in, JournalEntryReorderForm
from .template_filters import with_type, to_transfer, format_amount_input, \
text2html
from .utils.operators import JournalEntryOperator, JOURNAL_ENTRY_TYPE_TO_OP, \
get_journal_entry_op
bp: Blueprint = Blueprint("journal-entry", __name__)
"""The view blueprint for the journal entry management."""
bp.add_app_template_filter(with_type, "accounting_journal_entry_with_type")
bp.add_app_template_filter(to_transfer, "accounting_journal_entry_to_transfer")
bp.add_app_template_filter(format_amount_input,
"accounting_journal_entry_format_amount_input")
bp.add_app_template_filter(text2html, "accounting_journal_entry_text2html")
@bp.get("/create/<journalEntryType:journal_entry_type>", endpoint="create")
@has_permission(can_edit)
def show_add_journal_entry_form(journal_entry_type: JournalEntryType) -> str:
"""Shows the form to add a journal entry.
:param journal_entry_type: The journal entry type.
:return: The form to add a journal entry.
"""
journal_entry_op: JournalEntryOperator \
= JOURNAL_ENTRY_TYPE_TO_OP[journal_entry_type]
form: journal_entry_op.form
if "form" in session:
form = journal_entry_op.form(
ImmutableMultiDict(parse_qsl(session["form"])))
del session["form"]
form.validate()
else:
form = journal_entry_op.form()
form.date.data = date.today()
return journal_entry_op.render_create_template(form)
@bp.post("/store/<journalEntryType:journal_entry_type>", endpoint="store")
@has_permission(can_edit)
def add_journal_entry(journal_entry_type: JournalEntryType) -> redirect:
"""Adds a journal entry.
:param journal_entry_type: The journal entry type.
:return: The redirection to the journal entry detail on success, or the
journal entry creation form on error.
"""
journal_entry_op: JournalEntryOperator \
= JOURNAL_ENTRY_TYPE_TO_OP[journal_entry_type]
form: journal_entry_op.form = journal_entry_op.form(request.form)
if not form.validate():
flash_form_errors(form)
session["form"] = urlencode(list(request.form.items()))
return redirect(inherit_next(with_type(
url_for("accounting.journal-entry.create",
journal_entry_type=journal_entry_type))))
journal_entry: JournalEntry = JournalEntry()
form.populate_obj(journal_entry)
db.session.add(journal_entry)
db.session.commit()
flash(s(lazy_gettext("The journal entry is added successfully")),
"success")
return redirect(inherit_next(__get_detail_uri(journal_entry)))
@bp.get("/<journalEntry:journal_entry>", endpoint="detail")
@has_permission(can_view)
def show_journal_entry_detail(journal_entry: JournalEntry) -> str:
"""Shows the journal entry detail.
:param journal_entry: The journal entry.
:return: The detail.
"""
journal_entry_op: JournalEntryOperator \
= get_journal_entry_op(journal_entry)
return journal_entry_op.render_detail_template(journal_entry)
@bp.get("/<journalEntry:journal_entry>/edit", endpoint="edit")
@has_permission(can_edit)
def show_journal_entry_edit_form(journal_entry: JournalEntry) -> str:
"""Shows the form to edit a journal entry.
:param journal_entry: The journal entry.
:return: The form to edit the journal entry.
"""
journal_entry_op: JournalEntryOperator \
= get_journal_entry_op(journal_entry, is_check_as=True)
form: journal_entry_op.form
if "form" in session:
form = journal_entry_op.form(
ImmutableMultiDict(parse_qsl(session["form"])))
del session["form"]
form.obj = journal_entry
form.validate()
else:
form = journal_entry_op.form(obj=journal_entry)
return journal_entry_op.render_edit_template(journal_entry, form)
@bp.post("/<journalEntry:journal_entry>/update", endpoint="update")
@has_permission(can_edit)
def update_journal_entry(journal_entry: JournalEntry) -> redirect:
"""Updates a journal entry.
:param journal_entry: The journal entry.
:return: The redirection to the journal entry detail on success, or the
journal entry edit form on error.
"""
journal_entry_op: JournalEntryOperator \
= get_journal_entry_op(journal_entry, is_check_as=True)
form: journal_entry_op.form = journal_entry_op.form(request.form)
form.obj = journal_entry
if not form.validate():
flash_form_errors(form)
session["form"] = urlencode(list(request.form.items()))
return redirect(inherit_next(with_type(
url_for("accounting.journal-entry.edit",
journal_entry=journal_entry))))
with db.session.no_autoflush:
form.populate_obj(journal_entry)
if not form.is_modified:
flash(s(lazy_gettext("The journal entry was not modified.")),
"success")
return redirect(inherit_next(__get_detail_uri(journal_entry)))
journal_entry.updated_by_id = get_current_user_pk()
journal_entry.updated_at = sa.func.now()
db.session.commit()
flash(s(lazy_gettext("The journal entry is updated successfully.")),
"success")
return redirect(inherit_next(__get_detail_uri(journal_entry)))
@bp.post("/<journalEntry:journal_entry>/delete", endpoint="delete")
@has_permission(can_edit)
def delete_journal_entry(journal_entry: JournalEntry) -> redirect:
"""Deletes a journal entry.
:param journal_entry: The journal entry.
:return: The redirection to the journal entry list on success, or the
journal entry detail on error.
"""
journal_entry.delete()
sort_journal_entries_in(journal_entry.date, journal_entry.id)
db.session.commit()
flash(s(lazy_gettext("The journal entry is deleted successfully.")),
"success")
return redirect(or_next(__get_default_page_uri()))
@bp.get("/dates/<date:journal_entry_date>", endpoint="order")
@has_permission(can_view)
def show_journal_entry_order(journal_entry_date: date) -> str:
"""Shows the order of the journal entries in a same date.
:param journal_entry_date: The date.
:return: The order of the journal entries in the date.
"""
journal_entries: list[JournalEntry] = JournalEntry.query \
.filter(JournalEntry.date == journal_entry_date) \
.order_by(JournalEntry.no).all()
return render_template("accounting/journal-entry/order.html",
date=journal_entry_date, list=journal_entries)
@bp.post("/dates/<date:journal_entry_date>", endpoint="sort")
@has_permission(can_edit)
def sort_journal_entries(journal_entry_date: date) -> redirect:
"""Reorders the journal entries in a date.
:param journal_entry_date: The date.
:return: The redirection to the incoming account or the account list. The
reordering operation does not fail.
"""
form: JournalEntryReorderForm = JournalEntryReorderForm(journal_entry_date)
form.save_order()
if not form.is_modified:
flash(s(lazy_gettext("The order was not modified.")), "success")
return redirect(or_next(__get_default_page_uri()))
db.session.commit()
flash(s(lazy_gettext("The order is updated successfully.")), "success")
return redirect(or_next(__get_default_page_uri()))
def __get_detail_uri(journal_entry: JournalEntry) -> str:
"""Returns the detail URI of a journal entry.
:param journal_entry: The journal entry.
:return: The detail URI of the journal entry.
"""
return url_for("accounting.journal-entry.detail",
journal_entry=journal_entry)
def __get_default_page_uri() -> str:
"""Returns the URI for the default page.
:return: The URI for the default page.
"""
return url_for("accounting.report.default")