ドキュメント

View Categories

Batchサーバー

11 min read

バッチサーバーはバッチ処理ハンドルをhttp://localhost/<path=/batches/compile>:<port=5678>で待ち受けます。

std::shared_ptr<Caches> caches = std::make_shared<Caches>(
    &io_loop
  , spa::config::batch::path::compile
  , spa::config::batch::port::compile);
caches->io_loop->addListener(spa::config::batch::ip, spa::config::batch::port::compile);


Cachesクラスはio_loopを使ってメッセージを送信したり、バッチ処理を待ち受け、バッチ処理ハンドラで作成されるデータを保存します。

	caches->registerBatchHandler(drogon::async_func([caches]() -> drogon::Task<> {
		co_await spa::batch::compile::handler::dispatch(caches);
		co_return;
	}));


ハンドラはバッチ処理を行なってCacheオブジェクトに保存します。

drogon::Task<> spa::batch::compile::handler::dispatch(std::shared_ptr<Caches> caches)
{
	try
	{
		caches->log_info_user();

		LOG_INFO << caches->args();

		co_await caches->start_track();

		co_await caches->processing();

		caches->clear();

		spa::batch::compile::closed::compiling(caches.get());

		co_await spa::batch::compile::closed::save(caches.get());

		co_await spa::batch::compile::account::caching(caches.get());

		co_await spa::batch::compile::description::caching(caches.get());

		co_await spa::batch::compile::debit::caching(caches.get());

		co_await spa::batch::compile::credit::caching(caches.get());

		spa::batch::compile::ledger::compiling(caches.get());

		spa::batch::compile::financial::compiling(caches.get());

		co_await spa::batch::compile::ledger::save(caches.get());

		co_await spa::batch::compile::financial::save(caches.get());

		co_await caches->processed();

	}
	catch (const std::exception& e)
	{
		LOG_INFO << e.what();
		caches->err(e.what());
	}
	catch (const drogon::HttpStatusCode& code)
	{
		LOG_INFO << spa::web::message::status(code);
		caches->err(spa::web::message::status(code));
	}
	caches->free();
	co_await caches->connected();
	co_return;
}


caching処理はデータを取得してCacheオブジェクトに保存します。

drogon::Task<> spa::batch::compile::debit::caching(Caches* caches)
{
  auto journals = co_await spa::web::request::pplx::get_format_request<Caches::JOURNAL>(
      caches->client()
    , caches->req()
    , "/journals/json?year=" + std::to_string(caches->closed.year));
  auto deco_compile_fc
    = spa::format::journal::filter::compile(caches->closed.year).second;
  journals = deco_compile_fc(journals);
  for (auto const& journal : journals) 
    caches->debits[journal.debit_account_code].push_back(journal);
}

drogon::Task<> spa::batch::compile::credit::caching(Caches* caches)
{
  auto journals = co_await spa::web::request::pplx::get_format_request<Caches::JOURNAL>(
      caches->client()
    , caches->req()
    , "/journals/json?year=" + std::to_string(caches->closed.year));
  auto deco_compile_fc
    = spa::format::journal::filter::compile(caches->closed.year).second;
  journals = deco_compile_fc(journals);
  for (auto const& journal : journals)
    caches->credits[journal.credit_account_code].push_back(journal);
}

// format/journalite/journal.hpp
inline Specific compile(unsigned int year)
{
  DecorationFc fc = [year = year](DATA data)
  {
    DATA data_;
    for (auto const& d : data)
      if ( std::stoi(d.date.substr(0, 4)) == year
        && d.valid)
        data_.push_back(d);
    return data_;
  };
  return Specific("compile:" + std::to_string(year), fc);
}


closed::compiling処理はリクエストデータから締データを作成します。

void spa::batch::compile::closed::compiling(Caches* caches)
{
  Json::Value json = spa::utils::json::read(caches->args());
  json["id"] = 0;
  json["created_at"] = spa::chrono::Timer{}.datetime("%Y-%m-%d %H:%M:%S");
  Caches::CLOSED closed{json};
  caches->closed = closed;
}


ledger::compiling処理は仕訳データから総勘定元帳データを作成します。

void spa::batch::compile::ledger::compiling(Caches* caches)
{
  for (auto const& [code, account] : caches->accounts)
  {
    for (auto const& journal : caches->debits[code])
    {
      std::string journal_description = journal.journal_description_name;
      if (!journal.journal_description_classify.empty())
        journal_description += "-" + journal.journal_description_classify;
      if (!journal.journal_description.empty())
        journal_description += "-" + journal.journal_description;
      Caches::LEDGER ledger;
      ledger.json["journal_closed_id"] = caches->closed.id;
      ledger.json["date"] = journal.date;
      ledger.json["account_code"] = journal.debit_account_code;
      ledger.json["account_name"] = journal.debit_account_name;
      ledger.json["account_category"] = journal.debit_account_category;
      ledger.json["corresponed_account_code"] = journal.credit_account_code;
      ledger.json["corresponed_account_name"] = journal.credit_account_name;
      ledger.json["corresponed_account_category"] = journal.credit_account_category;
      ledger.json["journal_description"] = journal_description;
      ledger.json["debit_money"] = journal.debit_money;
      ledger.json["created_at"] = caches->closed.created_at;
      caches->ledgers[code].push_back(Caches::LEDGER{ledger.json});
    }
    for (auto const& journal : caches->credits[code])
    {
      std::string journal_description = journal.journal_description_name;
      if (!journal.journal_description_classify.empty())
        journal_description += "-" + journal.journal_description_classify;
      if (!journal.journal_description.empty())
        journal_description += "-" + journal.journal_description;
      Caches::LEDGER ledger;
      ledger.json["journal_closed_id"] = caches->closed.id;
      ledger.json["date"] = journal.date;
      ledger.json["account_code"] = journal.credit_account_code;
      ledger.json["account_name"] = journal.credit_account_name;
      ledger.json["account_category"] = journal.credit_account_category;
      ledger.json["corresponed_account_code"] = journal.debit_account_code;
      ledger.json["corresponed_account_name"] = journal.debit_account_name;
      ledger.json["corresponed_account_category"] = journal.debit_account_category;
      ledger.json["journal_description"] = journal_description;
      ledger.json["credit_money"] = journal.credit_money;
      ledger.json["created_at"] = caches->closed.created_at;
      ledger = Caches::LEDGER{ledger.json};
      caches->ledgers[code].push_back(Caches::LEDGER{ledger.json});
    }
  }
}


financial::compilingは決算書データを作成します。

void spa::batch::compile::financial::compiling(Caches* caches)
{
  for (auto const& [code, ledgers] : caches->ledgers)
  {
    Caches::FINANCIAL financial;
    financial.json["journal_closed_id"] = caches->closed.id;
    financial.json["year"] = caches->closed.year;
    financial.json["created_at"] = caches->closed.created_at;
    unsigned int account_code = code;
    std::string account_name = "";
    unsigned int financial_category = 0;
    int money = 0;
    Caches::LEDGER prev_ledger;
    for (auto const& ledger : ledgers)
    {
      spa::utils::check::boolean(
          account_code == ledger.account_code
        , __FILE__
        , __LINE__);
      if (!account_name.empty())
      {
        if (account_name != ledger.account_name)
        {
          // same accout code different name 
          std::string message 
            = prev_ledger.account_name + " != " + ledger.account_name + "¥n";
          message += prev_ledger.json.toStyledString();
          message += ledger.json.toStyledString();
          caches->message(message);
          LOG_WARN << prev_ledger.account_name << " != " << ledger.account_name;
          LOG_WARN << prev_ledger.json.toStyledString();
          LOG_WARN << ledger.json.toStyledString();
        }
        spa::utils::check::boolean(
            account_name == ledger.account_name
          , __FILE__
          , __LINE__);
      }
      account_name = ledger.account_name;
      if (financial_category != 0)
      {
        // same account code different financial category
        if (financial_category != ledger.account_category)
        {
          std::string message
            = std::to_string(prev_ledger.account_category) 
            + " != " + std::to_string(ledger.account_category)
            + "¥n";
          message += prev_ledger.json.toStyledString();
          message += ledger.json.toStyledString();
          caches->message(message);
          LOG_WARN << prev_ledger.account_category << " != " << ledger.account_category;
          LOG_WARN << prev_ledger.json.toStyledString();
          LOG_WARN << ledger.json.toStyledString();
        }
        spa::utils::check::boolean(
            financial_category == ledger.account_category
          , __FILE__
          , __LINE__);
      }
      financial_category = ledger.account_category;

      money += ledger.debit_money;
      money -= ledger.credit_money;

      prev_ledger = ledger;
    }
    Caches::CATEGORY financial_category_
      = static_cast<Caches::CATEGORY>(financial_category);
    if ( financial_category_ == Caches::CATEGORY::Debt
      || financial_category_ == Caches::CATEGORY::Capital
      || financial_category_ == Caches::CATEGORY::Profit)
      money *= -1;

    financial.json["financial_category"] = financial_category;
    financial.json["account_code"] = account_code;
    financial.json["account_name"] = account_name;
    financial.json["money"] = money;
    financial = Caches::FINANCIAL{financial.json};
    caches->financials[financial_category].push_back(financial);
  }
}


save処理で保存します。

drogon::Task<> spa::batch::compile::ledger::save(Caches* caches)
{
  for (auto const& [code, ledgers] : caches->ledgers)
    for (auto const& ledger : ledgers)
      co_await spa::web::request::pplx::insert_format_request(
          caches->client()
        , caches->req()
        , "/general_ledgers/json"
        , ledger);
}

drogon::Task<> spa::batch::compile::financial::save(Caches* caches)
{
  for (auto const& [category, financials] : caches->financials)
    for (auto const& financial : financials)
      co_await spa::web::request::pplx::insert_format_request(
          caches->client()
        , caches->req()
        , "/financial_statements/json"
        , financial);
}

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です