/*
 * Copyright (c) 2021-2024, Andreas Kling <andreas@ladybird.org>
 * Copyright (c) 2025, Aliaksandr Kalenik <kalenik.aliaksandr@gmail.com>
 * Copyright (c) 2022, the SerenityOS developers.
 *
 * SPDX-License-Identifier: BSD-2-Clause
 */

#include <AK/Vector.h>
#include <LibIPC/Connection.h>
#include <LibIPC/Message.h>
#include <LibIPC/Stub.h>

namespace IPC {

ConnectionBase::ConnectionBase(IPC::Stub& local_stub, NonnullOwnPtr<Transport> transport, u32 local_endpoint_magic)
    : m_local_stub(local_stub)
    , m_transport(move(transport))
    , m_local_endpoint_magic(local_endpoint_magic)
{
    m_transport->set_up_read_hook([this] {
        NonnullRefPtr protect = *this;
        drain_messages_from_peer();
        handle_messages();
    });
}

ConnectionBase::~ConnectionBase() = default;

bool ConnectionBase::is_open() const
{
    return m_transport->is_open();
}

ErrorOr<void> ConnectionBase::post_message(Message const& message)
{
    auto buffer = TRY(message.encode());
    return post_message(buffer);
}

ErrorOr<void> ConnectionBase::post_message(MessageBuffer& buffer)
{
    // NOTE: If this connection is being shut down, but has not yet been destroyed,
    //       the socket will be closed. Don't try to send more messages.
    if (!m_transport->is_open())
        return Error::from_string_literal("Trying to post_message during IPC shutdown");

    TRY(buffer.transfer_message(*m_transport));

    return {};
}

void ConnectionBase::shutdown()
{
    m_transport->close();
    die();
}

void ConnectionBase::shutdown_with_error(Error const& error)
{
    dbgln("IPC::ConnectionBase ({:p}) had an error ({}), disconnecting.", this, error);
    shutdown();
}

void ConnectionBase::handle_messages()
{
    auto messages = move(m_unprocessed_messages);
    for (auto& message : messages) {
        if (message->endpoint_magic() != m_local_endpoint_magic)
            continue;

        if (!is_open())
            dbgln("Handling message while connection closed: {}", message->message_name());

        auto handler_result = m_local_stub.handle(move(message));
        if (handler_result.is_error()) {
            dbgln("IPC::ConnectionBase::handle_messages: {}", handler_result.error());
            continue;
        }

        if (!is_open())
            continue;

        if (auto response = handler_result.release_value()) {
            if (auto post_result = post_message(*response); post_result.is_error())
                dbgln("IPC::ConnectionBase::handle_messages: {}", post_result.error());
        }
    }
}

void ConnectionBase::wait_for_transport_to_become_readable()
{
    m_transport->wait_until_readable();
}

ConnectionBase::PeerEOF ConnectionBase::drain_messages_from_peer()
{
    bool parse_error = false;
    auto schedule_shutdown = m_transport->read_as_many_messages_as_possible_without_blocking([&](auto&& raw_message) {
        if (auto message = try_parse_message(raw_message.bytes, raw_message.attachments)) {
            m_unprocessed_messages.append(message.release_nonnull());
        } else {
            dbgln("Failed to parse IPC message {:hex-dump}", raw_message.bytes);
            parse_error = true;
        }
    });

    if (parse_error) {
        dbgln("IPC::ConnectionBase ({:p}): Disconnecting misbehaving peer due to malformed message", this);
        schedule_shutdown = Transport::ShouldShutdown::Yes;
    }

    if (!m_unprocessed_messages.is_empty()) {
        deferred_invoke([this] {
            handle_messages();
        });
    }

    if (schedule_shutdown == Transport::ShouldShutdown::Yes) {
        deferred_invoke([this] {
            shutdown();
        });
        return PeerEOF::Yes;
    }

    return PeerEOF::No;
}

OwnPtr<IPC::Message> ConnectionBase::wait_for_specific_endpoint_message_impl(u32 endpoint_magic, int message_id)
{
    for (;;) {
        // Double check we don't already have the event waiting for us.
        // Otherwise we might end up blocked for a while for no reason.
        for (size_t i = 0; i < m_unprocessed_messages.size(); ++i) {
            auto& message = m_unprocessed_messages[i];
            if (message->endpoint_magic() != endpoint_magic)
                continue;
            if (message->message_id() == message_id)
                return m_unprocessed_messages.take(i);
        }

        if (!is_open())
            break;

        wait_for_transport_to_become_readable();
        if (drain_messages_from_peer() == PeerEOF::Yes)
            break;
    }

    dbgln("Failed to receive message_id: {}", message_id);

    if (!m_unprocessed_messages.is_empty()) {
        m_transport->close();

        dbgln("Transport shutdown with unprocessed messages left: {}", m_unprocessed_messages.size());
        for (size_t i = 0; i < m_unprocessed_messages.size(); ++i) {
            auto& message = m_unprocessed_messages[i];
            dbgln(" Message {:03} is: {:2}-{}", i, message->message_id(), message->message_name());
        }

        dbgln("Handling remaining messages before returning to caller");
        handle_messages();
        dbgln("Messages handled, returning to caller");
    }

    return {};
}

}
