/*
 * Decompiled with CFR 0.152.
 */
package zeph.core;

import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import zeph.core.Connection;
import zeph.uring.IoUring;
import zeph.uring.Socket;

public class EventLoop
implements Runnable,
AutoCloseable {
    private static final long OP_ACCEPT = 0x100000000000000L;
    private static final long OP_READ = 0x200000000000000L;
    private static final long OP_WRITE = 0x300000000000000L;
    private static final long OP_CLOSE = 0x400000000000000L;
    private static final long OP_MASK = -72057594037927936L;
    private static final long FD_MASK = 0xFFFFFFFFFFFFFFL;
    private final IoUring ring;
    private final Arena arena;
    private final int serverFd;
    private final AtomicBoolean running = new AtomicBoolean(true);
    private final AtomicLong connectionIdGenerator = new AtomicLong(0L);
    private final ConcurrentHashMap<Integer, Connection> connections = new ConcurrentHashMap();
    private final BiConsumer<Connection, byte[]> dataHandler;
    private static final int BUFFER_SIZE = 8192;
    private volatile int pendingSubmits = 0;

    public EventLoop(String ip, int port, int ringSize, BiConsumer<Connection, byte[]> dataHandler) throws Exception {
        this.arena = Arena.ofShared();
        this.dataHandler = dataHandler;
        this.ring = new IoUring(ringSize, 0);
        this.serverFd = Socket.createServerSocket();
        try {
            Socket.setReuseAddr(this.serverFd, this.arena);
            Socket.setReusePort(this.serverFd, this.arena);
            Socket.bind(this.serverFd, ip, port, this.arena);
            Socket.listen(this.serverFd, 4096);
            System.out.println("Server listening on " + ip + ":" + port);
            this.submitAccept();
        }
        catch (Exception e) {
            Socket.close(this.serverFd);
            this.ring.close();
            this.arena.close();
            throw e;
        }
    }

    private void submitAccept() {
        MemorySegment sqe = this.ring.getSqe();
        if (sqe != null) {
            this.ring.prepareAccept(sqe, this.serverFd, null, null, 526336, 0x100000000000000L | (long)this.serverFd);
            this.ring.submit();
            ++this.pendingSubmits;
        }
    }

    private void submitRead(Connection conn) {
        MemorySegment sqe = this.ring.getSqe();
        if (sqe != null) {
            this.ring.prepareRecv(sqe, conn.getFd(), conn.getReadBuffer(), 8192, 0, 0x200000000000000L | (long)conn.getFd());
            this.ring.submit();
            ++this.pendingSubmits;
        }
    }

    private void submitWrite(Connection conn, int len) {
        MemorySegment sqe = this.ring.getSqe();
        if (sqe != null) {
            this.ring.prepareSend(sqe, conn.getFd(), conn.getWriteBuffer(), len, 0, 0x300000000000000L | (long)conn.getFd());
            this.ring.submit();
            ++this.pendingSubmits;
        }
    }

    private void submitClose(int fd) {
        MemorySegment sqe = this.ring.getSqe();
        if (sqe != null) {
            this.ring.prepareClose(sqe, fd, 0x400000000000000L | (long)fd);
            this.ring.submit();
            ++this.pendingSubmits;
        }
    }

    @Override
    public void run() {
        System.out.println("Event loop started");
        while (this.running.get()) {
            try {
                MemorySegment cqe;
                int toSubmit = this.pendingSubmits;
                this.pendingSubmits = 0;
                int submitted = this.ring.enter(toSubmit, 1, 1);
                System.out.println("Submitted: " + submitted + ", waiting for completions...");
                while ((cqe = this.ring.peekCqe()) != null) {
                    long userData = IoUring.getUserData(cqe);
                    int result = IoUring.getResult(cqe);
                    this.ring.advanceCq();
                    long op = userData & 0xFF00000000000000L;
                    int fd = (int)(userData & 0xFFFFFFFFFFFFFFL);
                    System.out.println("CQE: op=" + (op >> 56) + ", fd=" + fd + ", result=" + result);
                    if (op == 0x100000000000000L) {
                        this.handleAccept(result);
                        continue;
                    }
                    if (op == 0x200000000000000L) {
                        this.handleRead(fd, result);
                        continue;
                    }
                    if (op == 0x300000000000000L) {
                        this.handleWrite(fd, result);
                        continue;
                    }
                    if (op != 0x400000000000000L) continue;
                    this.handleClose(fd, result);
                }
            }
            catch (Exception e) {
                if (!this.running.get()) continue;
                System.err.println("Event loop error: " + e.getMessage());
                e.printStackTrace();
            }
        }
        System.out.println("Event loop stopped");
    }

    private void handleAccept(int result) {
        if (result >= 0) {
            int clientFd = result;
            Connection conn = new Connection(clientFd, this.connectionIdGenerator.incrementAndGet(), this.arena);
            this.connections.put(clientFd, conn);
            System.out.println("Accepted connection: fd=" + clientFd + ", id=" + conn.getId());
            this.submitRead(conn);
            this.submitAccept();
        } else {
            System.err.println("Accept failed: " + -result);
            this.submitAccept();
        }
    }

    private void handleRead(int fd, int result) {
        Connection conn = this.connections.get(fd);
        if (conn == null) {
            return;
        }
        if (result > 0) {
            byte[] data = new byte[result];
            conn.getReadBuffer().asSlice(0L, result).asByteBuffer().get(data);
            if (this.dataHandler != null) {
                this.dataHandler.accept(conn, data);
            }
            this.submitRead(conn);
        } else if (result == 0) {
            System.out.println("Connection closed by peer: fd=" + fd);
            this.closeConnection(fd);
        } else {
            int err = -result;
            if (err != 11 && err != 35) {
                System.err.println("Read error on fd=" + fd + ": " + err);
                this.closeConnection(fd);
            } else {
                this.submitRead(conn);
            }
        }
    }

    private void handleWrite(int fd, int result) {
        int err;
        Connection conn = this.connections.get(fd);
        if (conn == null) {
            return;
        }
        if (result < 0 && (err = -result) != 11 && err != 35) {
            System.err.println("Write error on fd=" + fd + ": " + err);
            this.closeConnection(fd);
        }
    }

    private void handleClose(int fd, int result) {
        Connection conn = this.connections.remove(fd);
        if (conn != null) {
            conn.close();
        }
    }

    private void closeConnection(int fd) {
        Connection conn = this.connections.get(fd);
        if (conn != null) {
            this.submitClose(fd);
        }
    }

    public void write(Connection conn, byte[] data) {
        if (data.length > 8192) {
            throw new IllegalArgumentException("Data too large: " + data.length + " > 8192");
        }
        conn.getWriteBuffer().asSlice(0L, data.length).asByteBuffer().put(data);
        this.submitWrite(conn, data.length);
    }

    public void stop() {
        this.running.set(false);
        try {
            MemorySegment sqe = this.ring.getSqe();
            if (sqe != null) {
                this.ring.prepareNop(sqe, 0L);
                this.ring.submit();
                this.ring.enter(1, 0, 0);
            }
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Override
    public void close() {
        this.stop();
        for (Connection conn : this.connections.values()) {
            try {
                Socket.close(conn.getFd());
            }
            catch (Exception exception) {
                // empty catch block
            }
            conn.close();
        }
        this.connections.clear();
        try {
            Socket.close(this.serverFd);
        }
        catch (Exception exception) {
            // empty catch block
        }
        this.ring.close();
        this.arena.close();
    }

    public int getServerFd() {
        return this.serverFd;
    }

    public int getConnectionCount() {
        return this.connections.size();
    }
}

