#!/usr/bin/env lua local socket = require("socket") local cjson = require("cjson") local db = require("db") local log = require("log") -- Optional dependencies local redis pcall(function() redis = require("redis") end) local digest pcall(function() digest = require("openssl.digest") end) local app = {} app.port = tonumber(os.getenv("API_PORT")) or 8080 app.host = "0.0.0.0" -- Database configuration (from db.lua) local DB_HOST = os.getenv("DB_HOST") or "localhost" local DB_PORT = tonumber(os.getenv("DB_PORT")) or 5432 local DB_NAME = os.getenv("DB_NAME") or "handheld_devices" -- Redis configuration local REDIS_HOST = os.getenv("REDIS_HOST") or "127.0.0.1" local REDIS_PORT = tonumber(os.getenv("REDIS_PORT")) or 6379 -- Redis client with retry local function get_redis_connection() if not redis then return nil end local attempts = 3 for i = 1, attempts do local ok, red = pcall(redis.connect, REDIS_HOST, REDIS_PORT) if ok and red then return red end if i < attempts then socket.sleep(math.min(2 ^ i * 0.1, 2)) end end return nil end -- Redis ping for health local function redis_ping() local red = get_redis_connection() if not red then return false end local ok, res = pcall(red.ping, red) if ok and res == "PONG" then return true end return false end -- Ensure tables exist local function init_db() local ok, err = db.with_retry(function() return db.with_connection(function(conn) conn:query([[ CREATE TABLE IF NOT EXISTS devices ( id SERIAL PRIMARY KEY, name VARCHAR(255) NOT NULL, manufacturer VARCHAR(255) NOT NULL, release_year INTEGER, cpu VARCHAR(255), ram_mb INTEGER, storage_mb INTEGER, display_size VARCHAR(50), battery_hours REAL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS ratings ( id SERIAL PRIMARY KEY, device_id INTEGER NOT NULL REFERENCES devices(id) ON DELETE CASCADE, user_id VARCHAR(255) NOT NULL, score INTEGER CHECK (score >= 1 AND score <= 5), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS reviews ( id SERIAL PRIMARY KEY, device_id INTEGER NOT NULL REFERENCES devices(id) ON DELETE CASCADE, user_id VARCHAR(255) NOT NULL, content TEXT NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); ]]) return true end) end) if not ok then error("Failed to init DB: " .. tostring(err)) end end -- Seed initial data local function seed_db() db.with_connection(function(conn) local res = conn:query("SELECT COUNT(*) as total FROM devices") local count = 0 if res and res[1] then count = tonumber(res[1].total) or 0 end if count == 0 then log.info("Seeding initial devices", { component = "seed" }) local devices = { { name = "Steam Deck", manufacturer = "Valve", release_year = 2022, cpu = "AMD Zen 2", ram_mb = 16384, storage_mb = 524288, display_size = "7-inch", battery_hours = 4.0 }, { name = "Nintendo Switch", manufacturer = "Nintendo", release_year = 2017, cpu = "Nvidia Tegra X1", ram_mb = 4096, storage_mb = 32768, display_size = "6.2-inch", battery_hours = 5.5 }, { name = "ROG Ally", manufacturer = "ASUS", release_year = 2023, cpu = "AMD Ryzen Z1 Extreme", ram_mb = 16384, storage_mb = 524288, display_size = "7-inch", battery_hours = 3.5 } } for _, device in ipairs(devices) do conn:query( "INSERT INTO devices (name, manufacturer, release_year, cpu, ram_mb, storage_mb, " .. "display_size, battery_hours) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)", device.name, device.manufacturer, device.release_year, device.cpu, device.ram_mb, device.storage_mb, device.display_size, device.battery_hours ) end log.info("Seeding completed", { component = "seed" }) end end) end -- Publish to Redis if available (with retry) local function publish_event(event_type, data, request_id) for attempt = 1, 3 do local red = get_redis_connection() if red then local event = { event_type = event_type, timestamp = os.time(), request_id = request_id, } for k, v in pairs(data) do event[k] = v end local event_json = cjson.encode(event) -- Push to queue (worker will consume from this using reliable pattern) red:lpush("devices:events:queue", event_json) -- Also publish to Pub/Sub for immediate processing (WebSockets) local count = red:publish("devices:events", event_json) log.info("Event published", { event_type = event_type, subscribers = count, request_id = request_id }) red:quit() return end if attempt < 3 then socket.sleep(math.min(2 ^ attempt * 0.1, 2)) end end log.warn("Failed to publish event after retries", { event_type = event_type, request_id = request_id }) end -- Helper to check if a value is JSON null local function is_json_null(val) return val == nil or val == cjson.null end -- WebSocket Utils local function sha1(data) if digest then return digest.new("sha1"):final(data) end -- Fallback: if luaossl is not available, we can't do a proper handshake -- In a production app, we should ensure it is available. -- NOTE: This fallback doesn't actually produce a SHA1 hash, it just returns data. -- The WebSocket handshake will fail if digest is not available. print("[WS] Warning: openssl.digest not available, SHA1 handshake will fail") return data end local function b64(data) -- Minimal Base64 local b = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/' return ((data:gsub('.', function(x) local r, byte = '', x:byte() for i = 8, 1, -1 do r = r .. (byte % 2^i - byte % 2^(i - 1) > 0 and '1' or '0') end return r end) .. '0000'):gsub('%d%d%d?%d?%d?%d?', function(x) if (#x < 6) then return '' end local c=0 for i=1,6 do c=c+(x:sub(i,i)=='1' and 2^(6-i) or 0) end return b:sub(c + 1, c + 1) end) .. ({ '', '==', '=' })[#data % 3 + 1]) end local function encode_ws_frame(payload) local header = string.char(0x81) -- FIN + Opcode 1 (text) local len = #payload if len <= 125 then header = header .. string.char(len) elseif len <= 65535 then header = header .. string.char(126) .. string.char(math.floor(len / 256)) .. string.char(len % 256) else -- 64-bit length not implemented for simplicity header = header .. string.char(127) .. string.rep(string.char(0), 4) .. string.char(math.floor(len / 16777216) % 256) .. string.char(math.floor(len / 65536) % 256) .. string.char(math.floor(len / 256) % 256) .. string.char(len % 256) end return header .. payload end -- Device Model local Device = {} function Device.all(limit, offset) limit = limit or 10 offset = offset or 0 local res = db.with_connection(function(conn) return conn:query("SELECT * FROM devices ORDER BY id DESC LIMIT $1 OFFSET $2", limit, offset) end) return res or {} end function Device.find(id) local cache_key = "device:" .. id local red = get_redis_connection() if red then local cached = red:get(cache_key) if cached then return cjson.decode(cached) end end local res = db.with_connection(function(conn) return conn:query("SELECT * FROM devices WHERE id = $1", tonumber(id)) end) local row = res and res[1] or nil if row and red then red:setex(cache_key, 300, cjson.encode(row)) end return row end function Device.create(data, request_id) local res = db.with_connection(function(conn) return conn:query( "INSERT INTO devices (name, manufacturer, release_year, cpu, ram_mb, storage_mb, " .. "display_size, battery_hours) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) RETURNING id", data.name, data.manufacturer, (not is_json_null(data.release_year)) and tonumber(data.release_year) or nil, (not is_json_null(data.cpu)) and data.cpu or nil, (not is_json_null(data.ram_mb)) and tonumber(data.ram_mb) or nil, (not is_json_null(data.storage_mb)) and tonumber(data.storage_mb) or nil, (not is_json_null(data.display_size)) and data.display_size or nil, (not is_json_null(data.battery_hours)) and tonumber(data.battery_hours) or nil ) end) local row = res and res[1] local device_id = row and tonumber(row.id) or nil if device_id then publish_event("DevicePublished", { device_id = device_id, device_name = data.name }, request_id) return Device.find(device_id) end return nil end function Device.update(id, data, request_id) local updated = false db.with_connection(function(conn) local updates = {} local pg = conn if not is_json_null(data.name) then table.insert(updates, "name = " .. pg:escape_literal(data.name)) end if not is_json_null(data.manufacturer) then table.insert(updates, "manufacturer = " .. pg:escape_literal(data.manufacturer)) end if not is_json_null(data.release_year) then table.insert(updates, "release_year = " .. tonumber(data.release_year)) end if not is_json_null(data.cpu) then table.insert(updates, "cpu = " .. pg:escape_literal(data.cpu)) end if not is_json_null(data.ram_mb) then table.insert(updates, "ram_mb = " .. tonumber(data.ram_mb)) end if not is_json_null(data.storage_mb) then table.insert(updates, "storage_mb = " .. tonumber(data.storage_mb)) end if not is_json_null(data.display_size) then table.insert(updates, "display_size = " .. pg:escape_literal(data.display_size)) end if not is_json_null(data.battery_hours) then table.insert(updates, "battery_hours = " .. tonumber(data.battery_hours)) end if #updates > 0 then table.insert(updates, "updated_at = CURRENT_TIMESTAMP") conn:query("UPDATE devices SET " .. table.concat(updates, ", ") .. " WHERE id = $1", tonumber(id)) updated = true end end) local red = get_redis_connection() if red then red:del("device:" .. id) end local device = Device.find(id) if device and updated then publish_event("DeviceUpdated", { device_id = tonumber(id), device_name = device.name }, request_id) end return device end function Device.delete(id, request_id) local device = Device.find(id) db.with_connection(function(conn) conn:query("DELETE FROM devices WHERE id = $1", tonumber(id)) end) local red = get_redis_connection() if red then red:del("device:" .. id) end if device then publish_event("DeviceDeleted", { device_id = tonumber(id), device_name = device.name }, request_id) end return true end function Device.get_count() local res = db.with_connection(function(conn) return conn:query("SELECT COUNT(*) as total FROM devices") end) if res and res[1] then return tonumber(res[1].total) or 0 end return 0 end -- HTTP Request Parser local function parse_request(request_line) local method, path, version = request_line:match("^(%w+)%s+(%S+)%s+(%S+)") return method, path, version end local function parse_headers(client) local headers = {} while true do -- Use a small timeout for individual header lines to handle slow clients -- or cases where headers are partially sent. client:settimeout(0.1) local line = client:receive("*l") if not line or line == "" then break end local key, value = line:match("^([^:]+):%s*(.*)$") if key then headers[key:lower()] = value end end client:settimeout(0) -- Back to non-blocking return headers end local function parse_query_string(query_str) local params = {} if query_str and query_str ~= "" then for key, value in query_str:gmatch("([^=&]+)=([^&]*)") do params[key] = value end end return params end local function parse_path(full_path) local path, query = full_path:match("^([^?]+)%??(.*)$") return path or full_path, query or "" end local function generate_request_id() return string.format("%x-%x-%x", math.random(0, 0xffff), math.random(0, 0xffff), os.time()) end -- HTTP Response Builder local function build_response(status, body, content_type, request_id) content_type = content_type or "application/json" local extra_headers = "" if request_id then extra_headers = "X-Request-ID: " .. request_id .. "\r\n" end local response = string.format( "HTTP/1.1 %s\r\n" .. "Content-Type: %s\r\n" .. "Content-Length: %d\r\n" .. "%s" .. "Access-Control-Allow-Origin: *\r\n" .. "Access-Control-Allow-Methods: GET, POST, PUT, DELETE, OPTIONS\r\n" .. "Access-Control-Allow-Headers: Content-Type, X-Request-ID\r\n" .. "Connection: close\r\n" .. "\r\n" .. "%s", status, content_type, #body, extra_headers, body ) return response end -- Route Handlers local function handle_get_devices(query_params) local page = tonumber(query_params.page) or 1 local per_page = 10 local offset = (page - 1) * per_page local devices = Device.all(per_page, offset) local total = Device.get_count() return "200 OK", cjson.encode({ data = devices, total = total, page = page, per_page = per_page }) end local function handle_post_devices(json_data, request_id) if not json_data.name or not json_data.manufacturer then return "400 Bad Request", cjson.encode({ error = "Missing required fields: name, manufacturer" }) end local device = Device.create(json_data, request_id) if device then return "201 Created", cjson.encode(device) else return "500 Internal Server Error", cjson.encode({ error = "Failed to create device" }) end end local function handle_get_device(id) local device = Device.find(id) if device then return "200 OK", cjson.encode(device) else return "404 Not Found", cjson.encode({ error = "Device not found" }) end end local function handle_put_device(id, json_data, request_id) local device = Device.update(id, json_data, request_id) if device then return "200 OK", cjson.encode(device) else return "404 Not Found", cjson.encode({ error = "Device not found" }) end end local function handle_delete_device(id, request_id) Device.delete(id, request_id) return "200 OK", cjson.encode({ success = true }) end -- Health: liveness (process alive) local function handle_health_live() return "200 OK", cjson.encode({ status = "ok" }) end -- Health: readiness (DB + Redis OK) local function handle_health_ready() local db_ok = db.ping() local redis_ok = redis_ping() if db_ok and redis_ok then return "200 OK", cjson.encode({ status = "ok", db = "ok", redis = "ok" }) end local details = { db = db_ok and "ok" or "fail", redis = redis_ok and "ok" or "fail" } return "503 Service Unavailable", cjson.encode({ status = "degraded", details = details }) end -- Main Request Handler local function handle_request_with_headers(client, request_line, headers) local method, full_path = parse_request(request_line) local path, query_str = parse_path(full_path) local query_params = parse_query_string(query_str) local request_id = headers["x-request-id"] or generate_request_id() -- Read body if POST/PUT local body = "" if method == "POST" or method == "PUT" then local content_length = tonumber(headers["content-length"]) or 0 if content_length > 0 then body = client:receive(content_length) end end -- Parse JSON local json_data = {} if body ~= "" then local ok, data = pcall(cjson.decode, body) if ok then json_data = data end end -- Handle CORS preflight if method == "OPTIONS" then client:send(build_response("200 OK", "", nil, request_id)) client:close() return end -- Route handling local status, response_body = "404 Not Found", cjson.encode({ error = "Not found" }) if method == "GET" and path == "/devices" then status, response_body = handle_get_devices(query_params) elseif method == "POST" and path == "/devices" then status, response_body = handle_post_devices(json_data, request_id) elseif method == "GET" and path:match("^/devices/%d+$") then local id = path:match("/devices/(%d+)") status, response_body = handle_get_device(id) elseif method == "PUT" and path:match("^/devices/%d+$") then local id = path:match("/devices/(%d+)") status, response_body = handle_put_device(id, json_data, request_id) elseif method == "DELETE" and path:match("^/devices/%d+$") then local id = path:match("/devices/(%d+)") status, response_body = handle_delete_device(id, request_id) elseif method == "GET" and path == "/health" then status, response_body = "200 OK", cjson.encode({ status = "ok" }) elseif method == "GET" and path == "/health/live" then status, response_body = handle_health_live() elseif method == "GET" and path == "/health/ready" then status, response_body = handle_health_ready() end log.info("Request", { method = method, path = path, status = status:match("^(%d+)") or "?", request_id = request_id, }) -- Send response local response = build_response(status, response_body, nil, request_id) client:send(response) client:close() end -- Start server function app.start() log.info("Handheld Devices API Server starting", { component = "api", port = app.port, db = DB_NAME .. "@" .. DB_HOST .. ":" .. DB_PORT, redis = redis and "enabled" or "disabled", }) init_db() seed_db() local server = socket.bind(app.host, app.port) if not server then error("Failed to bind to " .. app.host .. ":" .. app.port) end server:settimeout(0) -- Non-blocking log.info("Server started successfully", { component = "api" }) local clients = {} local ws_clients = {} -- Background Redis subscriber local function connect_subscriber() local red = get_redis_connection() if red then -- Note: redis-lua's subscribe() sets the connection into a subscription state. local ok, err = pcall(function() return red:subscribe("devices:events") end) if ok then -- Set the underlying socket to a very small timeout for non-blocking feel pcall(function() if red.network and red.network.socket then red.network.socket:settimeout(0.001) end end) log.info("Redis subscriber connected", { component = "redis" }) else log.error("Redis subscriber failed", { component = "redis", err = tostring(err) }) red = nil end end return red end local red_sub = connect_subscriber() local last_sub_reconnect = os.time() while true do local client = server:accept() if client then client:settimeout(0) table.insert(clients, { socket = client }) end -- Reconnect subscriber if lost if not red_sub and os.time() - last_sub_reconnect > 5 then red_sub = connect_subscriber() last_sub_reconnect = os.time() end -- Handle HTTP clients local to_remove = {} for i, c in ipairs(clients) do local line, err = c.socket:receive("*l") if line then local headers = parse_headers(c.socket) if headers["upgrade"] == "websocket" then -- Handle WebSocket Handshake local key = headers["sec-websocket-key"] if key then local sha1_key = sha1(key .. "258EAFA5-E914-47DA-95CA-C5AB0DC85B11") local accept = b64(sha1_key) local response = "HTTP/1.1 101 Switching Protocols\r\n" .. "Upgrade: websocket\r\n" .. "Connection: Upgrade\r\n" .. "Sec-WebSocket-Accept: " .. accept .. "\r\n\r\n" c.socket:send(response) table.insert(ws_clients, c.socket) log.info("WebSocket client connected", { component = "ws", total = #ws_clients }) else c.socket:close() end table.insert(to_remove, i) else handle_request_with_headers(c.socket, line, headers) table.insert(to_remove, i) end elseif err == "closed" then table.insert(to_remove, i) end end for i = #to_remove, 1, -1 do table.remove(clients, to_remove[i]) end -- Handle Redis Messages -> Send to WS -- redis-lua has no read_reply; pub/sub messages come as multibulk: [kind, channel, payload] if red_sub then local ok, err = pcall(function() local sock = red_sub.network.socket if not sock then return end sock:settimeout(0.001) local line, serr = sock:receive("*l") if not line then if serr ~= "timeout" then error(serr or "socket read failed") end return end -- Parse RESP: *3\r\n means 3-element array local prefix = line:sub(1, 1) if prefix == "*" then local count = tonumber(line:sub(2)) if count and count >= 3 then local parts = {} for i = 1, count do local bline, berr = sock:receive("*l") if not bline then error(berr or "incomplete") end if bline:sub(1, 1) == "$" then local len = tonumber(bline:sub(2)) local bulk = len > 0 and sock:receive(len + 2) or "" if bulk then parts[i] = len > 0 and bulk:sub(1, -3) or "" end end end -- Pub/sub message: ["message", channel, payload] if parts[1] == "message" and parts[3] then local payload = parts[3] log.info("WebSocket broadcast", { component = "ws", payload_len = #tostring(payload) }) local frame = encode_ws_frame(payload) local closed_ws = {} for i, ws in ipairs(ws_clients) do local _, send_err = ws:send(frame) if send_err then table.insert(closed_ws, i) end end for i = #closed_ws, 1, -1 do table.remove(ws_clients, closed_ws[i]) end end end end end) if not ok then local err_str = tostring(err) if not string.find(err_str, "timeout") and not string.find(err_str, "closed") then log.error("Redis subscriber error", { component = "redis", err = err_str }) red_sub = nil end end end socket.sleep(0.01) end end -- Run if executed directly if arg[0]:match("app%-standalone") then app.start() end return app