🎉 initial commit
This commit is contained in:
724
devices-api/app-standalone.lua
Normal file
724
devices-api/app-standalone.lua
Normal file
@@ -0,0 +1,724 @@
|
||||
#!/usr/bin/env lua
|
||||
|
||||
local socket = require("socket")
|
||||
local cjson = require("cjson")
|
||||
local db = require("db")
|
||||
local log = require("log")
|
||||
|
||||
-- Optional dependencies
|
||||
local redis
|
||||
pcall(function() redis = require("redis") end)
|
||||
|
||||
local digest
|
||||
pcall(function() digest = require("openssl.digest") end)
|
||||
|
||||
local app = {}
|
||||
app.port = tonumber(os.getenv("API_PORT")) or 8080
|
||||
app.host = "0.0.0.0"
|
||||
|
||||
-- Database configuration (from db.lua)
|
||||
local DB_HOST = os.getenv("DB_HOST") or "localhost"
|
||||
local DB_PORT = tonumber(os.getenv("DB_PORT")) or 5432
|
||||
local DB_NAME = os.getenv("DB_NAME") or "handheld_devices"
|
||||
|
||||
-- Redis configuration
|
||||
local REDIS_HOST = os.getenv("REDIS_HOST") or "127.0.0.1"
|
||||
local REDIS_PORT = tonumber(os.getenv("REDIS_PORT")) or 6379
|
||||
|
||||
-- Redis client with retry
|
||||
local function get_redis_connection()
|
||||
if not redis then return nil end
|
||||
local attempts = 3
|
||||
for i = 1, attempts do
|
||||
local ok, red = pcall(redis.connect, REDIS_HOST, REDIS_PORT)
|
||||
if ok and red then return red end
|
||||
if i < attempts then
|
||||
socket.sleep(math.min(2 ^ i * 0.1, 2))
|
||||
end
|
||||
end
|
||||
return nil
|
||||
end
|
||||
|
||||
-- Redis ping for health
|
||||
local function redis_ping()
|
||||
local red = get_redis_connection()
|
||||
if not red then return false end
|
||||
local ok, res = pcall(red.ping, red)
|
||||
if ok and res == "PONG" then return true end
|
||||
return false
|
||||
end
|
||||
|
||||
-- Ensure tables exist
|
||||
local function init_db()
|
||||
local ok, err = db.with_retry(function()
|
||||
return db.with_connection(function(conn)
|
||||
conn:query([[
|
||||
CREATE TABLE IF NOT EXISTS devices (
|
||||
id SERIAL PRIMARY KEY,
|
||||
name VARCHAR(255) NOT NULL,
|
||||
manufacturer VARCHAR(255) NOT NULL,
|
||||
release_year INTEGER,
|
||||
cpu VARCHAR(255),
|
||||
ram_mb INTEGER,
|
||||
storage_mb INTEGER,
|
||||
display_size VARCHAR(50),
|
||||
battery_hours REAL,
|
||||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS ratings (
|
||||
id SERIAL PRIMARY KEY,
|
||||
device_id INTEGER NOT NULL REFERENCES devices(id) ON DELETE CASCADE,
|
||||
user_id VARCHAR(255) NOT NULL,
|
||||
score INTEGER CHECK (score >= 1 AND score <= 5),
|
||||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS reviews (
|
||||
id SERIAL PRIMARY KEY,
|
||||
device_id INTEGER NOT NULL REFERENCES devices(id) ON DELETE CASCADE,
|
||||
user_id VARCHAR(255) NOT NULL,
|
||||
content TEXT NOT NULL,
|
||||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||||
);
|
||||
]])
|
||||
return true
|
||||
end)
|
||||
end)
|
||||
if not ok then
|
||||
error("Failed to init DB: " .. tostring(err))
|
||||
end
|
||||
end
|
||||
|
||||
-- Seed initial data
|
||||
local function seed_db()
|
||||
db.with_connection(function(conn)
|
||||
local res, err = conn:query("SELECT COUNT(*) as total FROM devices")
|
||||
local count = 0
|
||||
if res and res[1] then
|
||||
count = tonumber(res[1].total) or 0
|
||||
end
|
||||
|
||||
if count == 0 then
|
||||
log.info("Seeding initial devices", { component = "seed" })
|
||||
local devices = {
|
||||
{
|
||||
name = "Steam Deck",
|
||||
manufacturer = "Valve",
|
||||
release_year = 2022,
|
||||
cpu = "AMD Zen 2",
|
||||
ram_mb = 16384,
|
||||
storage_mb = 524288,
|
||||
display_size = "7-inch",
|
||||
battery_hours = 4.0
|
||||
},
|
||||
{
|
||||
name = "Nintendo Switch",
|
||||
manufacturer = "Nintendo",
|
||||
release_year = 2017,
|
||||
cpu = "Nvidia Tegra X1",
|
||||
ram_mb = 4096,
|
||||
storage_mb = 32768,
|
||||
display_size = "6.2-inch",
|
||||
battery_hours = 5.5
|
||||
},
|
||||
{
|
||||
name = "ROG Ally",
|
||||
manufacturer = "ASUS",
|
||||
release_year = 2023,
|
||||
cpu = "AMD Ryzen Z1 Extreme",
|
||||
ram_mb = 16384,
|
||||
storage_mb = 524288,
|
||||
display_size = "7-inch",
|
||||
battery_hours = 3.5
|
||||
}
|
||||
}
|
||||
|
||||
for _, device in ipairs(devices) do
|
||||
conn:query(
|
||||
"INSERT INTO devices (name, manufacturer, release_year, cpu, ram_mb, storage_mb, display_size, battery_hours) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
|
||||
device.name,
|
||||
device.manufacturer,
|
||||
device.release_year,
|
||||
device.cpu,
|
||||
device.ram_mb,
|
||||
device.storage_mb,
|
||||
device.display_size,
|
||||
device.battery_hours
|
||||
)
|
||||
end
|
||||
log.info("Seeding completed", { component = "seed" })
|
||||
end
|
||||
end)
|
||||
end
|
||||
|
||||
-- Publish to Redis if available (with retry)
|
||||
local function publish_event(event_type, data, request_id)
|
||||
for attempt = 1, 3 do
|
||||
local red = get_redis_connection()
|
||||
if red then
|
||||
local event = {
|
||||
event_type = event_type,
|
||||
timestamp = os.time(),
|
||||
request_id = request_id,
|
||||
}
|
||||
for k, v in pairs(data) do
|
||||
event[k] = v
|
||||
end
|
||||
|
||||
local event_json = cjson.encode(event)
|
||||
|
||||
-- Push to queue (worker will consume from this using reliable pattern)
|
||||
red:lpush("devices:events:queue", event_json)
|
||||
|
||||
-- Also publish to Pub/Sub for immediate processing (WebSockets)
|
||||
local count = red:publish("devices:events", event_json)
|
||||
|
||||
log.info("Event published", { event_type = event_type, subscribers = count, request_id = request_id })
|
||||
red:quit()
|
||||
return
|
||||
end
|
||||
if attempt < 3 then socket.sleep(math.min(2 ^ attempt * 0.1, 2)) end
|
||||
end
|
||||
log.warn("Failed to publish event after retries", { event_type = event_type, request_id = request_id })
|
||||
end
|
||||
|
||||
|
||||
-- Helper to check if a value is JSON null
|
||||
local function is_json_null(val)
|
||||
return val == nil or val == cjson.null
|
||||
end
|
||||
|
||||
-- WebSocket Utils
|
||||
local function sha1(data)
|
||||
if digest then
|
||||
return digest.new("sha1"):final(data)
|
||||
end
|
||||
-- Fallback: if luaossl is not available, we can't do a proper handshake
|
||||
-- In a production app, we should ensure it is available.
|
||||
-- NOTE: This fallback doesn't actually produce a SHA1 hash, it just returns data.
|
||||
-- The WebSocket handshake will fail if digest is not available.
|
||||
print("[WS] Warning: openssl.digest not available, SHA1 handshake will fail")
|
||||
return data
|
||||
end
|
||||
|
||||
local function b64(data)
|
||||
-- Minimal Base64
|
||||
local b='ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/'
|
||||
return ((data:gsub('.', function(x)
|
||||
local r,b='',x:byte()
|
||||
for i=8,1,-1 do r=r..(b%2^i-b%2^(i-1)>0 and '1' or '0') end
|
||||
return r;
|
||||
end)..'0000'):gsub('%d%d%d?%d?%d?%d?', function(x)
|
||||
if (#x < 6) then return '' end
|
||||
local c=0
|
||||
for i=1,6 do c=c+(x:sub(i,i)=='1' and 2^(6-i) or 0) end
|
||||
return b:sub(c+1,c+1)
|
||||
end)..({ '', '==', '=' })[#data%3+1])
|
||||
end
|
||||
|
||||
local function encode_ws_frame(payload)
|
||||
local header = string.char(0x81) -- FIN + Opcode 1 (text)
|
||||
local len = #payload
|
||||
if len <= 125 then
|
||||
header = header .. string.char(len)
|
||||
elseif len <= 65535 then
|
||||
header = header .. string.char(126) .. string.char(math.floor(len / 256)) .. string.char(len % 256)
|
||||
else
|
||||
-- 64-bit length not implemented for simplicity
|
||||
header = header .. string.char(127) .. string.rep(string.char(0), 4) ..
|
||||
string.char(math.floor(len / 16777216) % 256) ..
|
||||
string.char(math.floor(len / 65536) % 256) ..
|
||||
string.char(math.floor(len / 256) % 256) ..
|
||||
string.char(len % 256)
|
||||
end
|
||||
return header .. payload
|
||||
end
|
||||
|
||||
-- Device Model
|
||||
local Device = {}
|
||||
|
||||
function Device.all(limit, offset)
|
||||
limit = limit or 10
|
||||
offset = offset or 0
|
||||
|
||||
local res = db.with_connection(function(conn)
|
||||
return conn:query("SELECT * FROM devices ORDER BY id DESC LIMIT $1 OFFSET $2", limit, offset)
|
||||
end)
|
||||
return res or {}
|
||||
end
|
||||
|
||||
function Device.find(id)
|
||||
local cache_key = "device:" .. id
|
||||
local red = get_redis_connection()
|
||||
|
||||
if red then
|
||||
local cached = red:get(cache_key)
|
||||
if cached then
|
||||
return cjson.decode(cached)
|
||||
end
|
||||
end
|
||||
|
||||
local res = db.with_connection(function(conn)
|
||||
return conn:query("SELECT * FROM devices WHERE id = $1", tonumber(id))
|
||||
end)
|
||||
local row = res and res[1] or nil
|
||||
|
||||
if row and red then
|
||||
red:setex(cache_key, 300, cjson.encode(row))
|
||||
end
|
||||
|
||||
return row
|
||||
end
|
||||
|
||||
function Device.create(data, request_id)
|
||||
local res = db.with_connection(function(conn)
|
||||
return conn:query(
|
||||
"INSERT INTO devices (name, manufacturer, release_year, cpu, ram_mb, storage_mb, display_size, battery_hours) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) RETURNING id",
|
||||
data.name,
|
||||
data.manufacturer,
|
||||
(not is_json_null(data.release_year)) and tonumber(data.release_year) or nil,
|
||||
(not is_json_null(data.cpu)) and data.cpu or nil,
|
||||
(not is_json_null(data.ram_mb)) and tonumber(data.ram_mb) or nil,
|
||||
(not is_json_null(data.storage_mb)) and tonumber(data.storage_mb) or nil,
|
||||
(not is_json_null(data.display_size)) and data.display_size or nil,
|
||||
(not is_json_null(data.battery_hours)) and tonumber(data.battery_hours) or nil
|
||||
)
|
||||
end)
|
||||
local row = res and res[1]
|
||||
local device_id = row and tonumber(row.id) or nil
|
||||
|
||||
if device_id then
|
||||
publish_event("DevicePublished", { device_id = device_id, device_name = data.name }, request_id)
|
||||
return Device.find(device_id)
|
||||
end
|
||||
|
||||
return nil
|
||||
end
|
||||
|
||||
function Device.update(id, data, request_id)
|
||||
local updated = false
|
||||
db.with_connection(function(conn)
|
||||
local updates = {}
|
||||
local pg = conn
|
||||
if not is_json_null(data.name) then table.insert(updates, "name = " .. pg:escape_literal(data.name)) end
|
||||
if not is_json_null(data.manufacturer) then table.insert(updates, "manufacturer = " .. pg:escape_literal(data.manufacturer)) end
|
||||
if not is_json_null(data.release_year) then table.insert(updates, "release_year = " .. tonumber(data.release_year)) end
|
||||
if not is_json_null(data.cpu) then table.insert(updates, "cpu = " .. pg:escape_literal(data.cpu)) end
|
||||
if not is_json_null(data.ram_mb) then table.insert(updates, "ram_mb = " .. tonumber(data.ram_mb)) end
|
||||
if not is_json_null(data.storage_mb) then table.insert(updates, "storage_mb = " .. tonumber(data.storage_mb)) end
|
||||
if not is_json_null(data.display_size) then table.insert(updates, "display_size = " .. pg:escape_literal(data.display_size)) end
|
||||
if not is_json_null(data.battery_hours) then table.insert(updates, "battery_hours = " .. tonumber(data.battery_hours)) end
|
||||
|
||||
if #updates > 0 then
|
||||
table.insert(updates, "updated_at = CURRENT_TIMESTAMP")
|
||||
conn:query("UPDATE devices SET " .. table.concat(updates, ", ") .. " WHERE id = $1", tonumber(id))
|
||||
updated = true
|
||||
end
|
||||
end)
|
||||
|
||||
local red = get_redis_connection()
|
||||
if red then
|
||||
red:del("device:" .. id)
|
||||
end
|
||||
|
||||
local device = Device.find(id)
|
||||
if device and updated then
|
||||
publish_event("DeviceUpdated", { device_id = tonumber(id), device_name = device.name }, request_id)
|
||||
end
|
||||
return device
|
||||
end
|
||||
|
||||
function Device.delete(id, request_id)
|
||||
local device = Device.find(id)
|
||||
db.with_connection(function(conn)
|
||||
conn:query("DELETE FROM devices WHERE id = $1", tonumber(id))
|
||||
end)
|
||||
|
||||
local red = get_redis_connection()
|
||||
if red then
|
||||
red:del("device:" .. id)
|
||||
end
|
||||
|
||||
if device then
|
||||
publish_event("DeviceDeleted", { device_id = tonumber(id), device_name = device.name }, request_id)
|
||||
end
|
||||
|
||||
return true
|
||||
end
|
||||
|
||||
function Device.get_count()
|
||||
local res = db.with_connection(function(conn)
|
||||
return conn:query("SELECT COUNT(*) as total FROM devices")
|
||||
end)
|
||||
if res and res[1] then
|
||||
return tonumber(res[1].total) or 0
|
||||
end
|
||||
return 0
|
||||
end
|
||||
|
||||
-- HTTP Request Parser
|
||||
local function parse_request(request_line)
|
||||
local method, path, version = request_line:match("^(%w+)%s+(%S+)%s+(%S+)")
|
||||
return method, path, version
|
||||
end
|
||||
|
||||
local function parse_headers(client)
|
||||
local headers = {}
|
||||
while true do
|
||||
-- Use a small timeout for individual header lines to handle slow clients
|
||||
-- or cases where headers are partially sent.
|
||||
client:settimeout(0.1)
|
||||
local line, err = client:receive("*l")
|
||||
if not line or line == "" then break end
|
||||
local key, value = line:match("^([^:]+):%s*(.*)$")
|
||||
if key then
|
||||
headers[key:lower()] = value
|
||||
end
|
||||
end
|
||||
client:settimeout(0) -- Back to non-blocking
|
||||
return headers
|
||||
end
|
||||
|
||||
local function parse_query_string(query_str)
|
||||
local params = {}
|
||||
if query_str and query_str ~= "" then
|
||||
for key, value in query_str:gmatch("([^=&]+)=([^&]*)") do
|
||||
params[key] = value
|
||||
end
|
||||
end
|
||||
return params
|
||||
end
|
||||
|
||||
local function parse_path(full_path)
|
||||
local path, query = full_path:match("^([^?]+)%??(.*)$")
|
||||
return path or full_path, query or ""
|
||||
end
|
||||
|
||||
local function generate_request_id()
|
||||
return string.format("%x-%x-%x", math.random(0, 0xffff), math.random(0, 0xffff), os.time())
|
||||
end
|
||||
|
||||
-- HTTP Response Builder
|
||||
local function build_response(status, body, content_type, request_id)
|
||||
content_type = content_type or "application/json"
|
||||
local extra_headers = ""
|
||||
if request_id then
|
||||
extra_headers = "X-Request-ID: " .. request_id .. "\r\n"
|
||||
end
|
||||
local response = string.format(
|
||||
"HTTP/1.1 %s\r\n" ..
|
||||
"Content-Type: %s\r\n" ..
|
||||
"Content-Length: %d\r\n" ..
|
||||
"%s" ..
|
||||
"Access-Control-Allow-Origin: *\r\n" ..
|
||||
"Access-Control-Allow-Methods: GET, POST, PUT, DELETE, OPTIONS\r\n" ..
|
||||
"Access-Control-Allow-Headers: Content-Type, X-Request-ID\r\n" ..
|
||||
"Connection: close\r\n" ..
|
||||
"\r\n" ..
|
||||
"%s",
|
||||
status, content_type, #body, extra_headers, body
|
||||
)
|
||||
return response
|
||||
end
|
||||
|
||||
-- Route Handlers
|
||||
local function handle_get_devices(query_params)
|
||||
local page = tonumber(query_params.page) or 1
|
||||
local per_page = 10
|
||||
local offset = (page - 1) * per_page
|
||||
|
||||
local devices = Device.all(per_page, offset)
|
||||
local total = Device.get_count()
|
||||
|
||||
return "200 OK", cjson.encode({
|
||||
data = devices,
|
||||
total = total,
|
||||
page = page,
|
||||
per_page = per_page
|
||||
})
|
||||
end
|
||||
|
||||
local function handle_post_devices(json_data, request_id)
|
||||
if not json_data.name or not json_data.manufacturer then
|
||||
return "400 Bad Request", cjson.encode({ error = "Missing required fields: name, manufacturer" })
|
||||
end
|
||||
|
||||
local device = Device.create(json_data, request_id)
|
||||
if device then
|
||||
return "201 Created", cjson.encode(device)
|
||||
else
|
||||
return "500 Internal Server Error", cjson.encode({ error = "Failed to create device" })
|
||||
end
|
||||
end
|
||||
|
||||
local function handle_get_device(id)
|
||||
local device = Device.find(id)
|
||||
if device then
|
||||
return "200 OK", cjson.encode(device)
|
||||
else
|
||||
return "404 Not Found", cjson.encode({ error = "Device not found" })
|
||||
end
|
||||
end
|
||||
|
||||
local function handle_put_device(id, json_data, request_id)
|
||||
local device = Device.update(id, json_data, request_id)
|
||||
if device then
|
||||
return "200 OK", cjson.encode(device)
|
||||
else
|
||||
return "404 Not Found", cjson.encode({ error = "Device not found" })
|
||||
end
|
||||
end
|
||||
|
||||
local function handle_delete_device(id, request_id)
|
||||
Device.delete(id, request_id)
|
||||
return "200 OK", cjson.encode({ success = true })
|
||||
end
|
||||
|
||||
-- Health: liveness (process alive)
|
||||
local function handle_health_live()
|
||||
return "200 OK", cjson.encode({ status = "ok" })
|
||||
end
|
||||
|
||||
-- Health: readiness (DB + Redis OK)
|
||||
local function handle_health_ready()
|
||||
local db_ok = db.ping()
|
||||
local redis_ok = redis_ping()
|
||||
if db_ok and redis_ok then
|
||||
return "200 OK", cjson.encode({ status = "ok", db = "ok", redis = "ok" })
|
||||
end
|
||||
local details = { db = db_ok and "ok" or "fail", redis = redis_ok and "ok" or "fail" }
|
||||
return "503 Service Unavailable", cjson.encode({ status = "degraded", details = details })
|
||||
end
|
||||
|
||||
-- Main Request Handler
|
||||
local function handle_request_with_headers(client, request_line, headers)
|
||||
local method, full_path = parse_request(request_line)
|
||||
local path, query_str = parse_path(full_path)
|
||||
local query_params = parse_query_string(query_str)
|
||||
|
||||
local request_id = headers["x-request-id"] or generate_request_id()
|
||||
|
||||
-- Read body if POST/PUT
|
||||
local body = ""
|
||||
if method == "POST" or method == "PUT" then
|
||||
local content_length = tonumber(headers["content-length"]) or 0
|
||||
if content_length > 0 then
|
||||
body = client:receive(content_length)
|
||||
end
|
||||
end
|
||||
|
||||
-- Parse JSON
|
||||
local json_data = {}
|
||||
if body ~= "" then
|
||||
local ok, data = pcall(cjson.decode, body)
|
||||
if ok then json_data = data end
|
||||
end
|
||||
|
||||
-- Handle CORS preflight
|
||||
if method == "OPTIONS" then
|
||||
client:send(build_response("200 OK", "", nil, request_id))
|
||||
client:close()
|
||||
return
|
||||
end
|
||||
|
||||
-- Route handling
|
||||
local status, response_body = "404 Not Found", cjson.encode({ error = "Not found" })
|
||||
|
||||
if method == "GET" and path == "/devices" then
|
||||
status, response_body = handle_get_devices(query_params)
|
||||
elseif method == "POST" and path == "/devices" then
|
||||
status, response_body = handle_post_devices(json_data, request_id)
|
||||
elseif method == "GET" and path:match("^/devices/%d+$") then
|
||||
local id = path:match("/devices/(%d+)")
|
||||
status, response_body = handle_get_device(id)
|
||||
elseif method == "PUT" and path:match("^/devices/%d+$") then
|
||||
local id = path:match("/devices/(%d+)")
|
||||
status, response_body = handle_put_device(id, json_data, request_id)
|
||||
elseif method == "DELETE" and path:match("^/devices/%d+$") then
|
||||
local id = path:match("/devices/(%d+)")
|
||||
status, response_body = handle_delete_device(id, request_id)
|
||||
elseif method == "GET" and path == "/health" then
|
||||
status, response_body = "200 OK", cjson.encode({ status = "ok" })
|
||||
elseif method == "GET" and path == "/health/live" then
|
||||
status, response_body = handle_health_live()
|
||||
elseif method == "GET" and path == "/health/ready" then
|
||||
status, response_body = handle_health_ready()
|
||||
end
|
||||
|
||||
log.info("Request", {
|
||||
method = method,
|
||||
path = path,
|
||||
status = status:match("^(%d+)") or "?",
|
||||
request_id = request_id,
|
||||
})
|
||||
|
||||
-- Send response
|
||||
local response = build_response(status, response_body, nil, request_id)
|
||||
client:send(response)
|
||||
client:close()
|
||||
end
|
||||
|
||||
-- Start server
|
||||
function app.start()
|
||||
log.info("Handheld Devices API Server starting", {
|
||||
component = "api",
|
||||
port = app.port,
|
||||
db = DB_NAME .. "@" .. DB_HOST .. ":" .. DB_PORT,
|
||||
redis = redis and "enabled" or "disabled",
|
||||
})
|
||||
|
||||
init_db()
|
||||
seed_db()
|
||||
|
||||
local server = socket.bind(app.host, app.port)
|
||||
if not server then
|
||||
error("Failed to bind to " .. app.host .. ":" .. app.port)
|
||||
end
|
||||
server:settimeout(0) -- Non-blocking
|
||||
|
||||
log.info("Server started successfully", { component = "api" })
|
||||
|
||||
local clients = {}
|
||||
local ws_clients = {}
|
||||
|
||||
-- Background Redis subscriber
|
||||
local function connect_subscriber()
|
||||
local red = get_redis_connection()
|
||||
if red then
|
||||
-- Note: redis-lua's subscribe() sets the connection into a subscription state.
|
||||
local ok, err = pcall(function() return red:subscribe("devices:events") end)
|
||||
if ok then
|
||||
-- Set the underlying socket to a very small timeout for non-blocking feel
|
||||
pcall(function()
|
||||
if red.network and red.network.socket then
|
||||
red.network.socket:settimeout(0.001)
|
||||
end
|
||||
end)
|
||||
log.info("Redis subscriber connected", { component = "redis" })
|
||||
else
|
||||
log.error("Redis subscriber failed", { component = "redis", err = tostring(err) })
|
||||
red = nil
|
||||
end
|
||||
end
|
||||
return red
|
||||
end
|
||||
|
||||
local red_sub = connect_subscriber()
|
||||
local last_sub_reconnect = os.time()
|
||||
|
||||
while true do
|
||||
local client = server:accept()
|
||||
if client then
|
||||
client:settimeout(0)
|
||||
table.insert(clients, { socket = client })
|
||||
end
|
||||
|
||||
-- Reconnect subscriber if lost
|
||||
if not red_sub and os.time() - last_sub_reconnect > 5 then
|
||||
red_sub = connect_subscriber()
|
||||
last_sub_reconnect = os.time()
|
||||
end
|
||||
|
||||
-- Handle HTTP clients
|
||||
local to_remove = {}
|
||||
for i, c in ipairs(clients) do
|
||||
local line, err = c.socket:receive("*l")
|
||||
if line then
|
||||
local method, full_path = parse_request(line)
|
||||
local headers = parse_headers(c.socket)
|
||||
|
||||
if headers["upgrade"] == "websocket" then
|
||||
-- Handle WebSocket Handshake
|
||||
local key = headers["sec-websocket-key"]
|
||||
if key then
|
||||
local sha1_key = sha1(key .. "258EAFA5-E914-47DA-95CA-C5AB0DC85B11")
|
||||
local accept = b64(sha1_key)
|
||||
local response = "HTTP/1.1 101 Switching Protocols\r\n" ..
|
||||
"Upgrade: websocket\r\n" ..
|
||||
"Connection: Upgrade\r\n" ..
|
||||
"Sec-WebSocket-Accept: " .. accept .. "\r\n\r\n"
|
||||
c.socket:send(response)
|
||||
table.insert(ws_clients, c.socket)
|
||||
log.info("WebSocket client connected", { component = "ws", total = #ws_clients })
|
||||
else
|
||||
c.socket:close()
|
||||
end
|
||||
table.insert(to_remove, i)
|
||||
else
|
||||
handle_request_with_headers(c.socket, line, headers)
|
||||
table.insert(to_remove, i)
|
||||
end
|
||||
elseif err == "closed" then
|
||||
table.insert(to_remove, i)
|
||||
end
|
||||
end
|
||||
for i = #to_remove, 1, -1 do table.remove(clients, to_remove[i]) end
|
||||
|
||||
-- Handle Redis Messages -> Send to WS
|
||||
-- redis-lua has no read_reply; pub/sub messages come as multibulk: [kind, channel, payload]
|
||||
if red_sub then
|
||||
local ok, err = pcall(function()
|
||||
local sock = red_sub.network.socket
|
||||
if not sock then return end
|
||||
sock:settimeout(0.001)
|
||||
local line, serr = sock:receive("*l")
|
||||
if not line then
|
||||
if serr ~= "timeout" then
|
||||
error(serr or "socket read failed")
|
||||
end
|
||||
return
|
||||
end
|
||||
-- Parse RESP: *3\r\n means 3-element array
|
||||
local prefix = line:sub(1, 1)
|
||||
if prefix == "*" then
|
||||
local count = tonumber(line:sub(2))
|
||||
if count and count >= 3 then
|
||||
local parts = {}
|
||||
for i = 1, count do
|
||||
local bline, berr = sock:receive("*l")
|
||||
if not bline then error(berr or "incomplete") end
|
||||
if bline:sub(1, 1) == "$" then
|
||||
local len = tonumber(bline:sub(2))
|
||||
local bulk = len > 0 and sock:receive(len + 2) or ""
|
||||
if bulk then
|
||||
parts[i] = len > 0 and bulk:sub(1, -3) or ""
|
||||
end
|
||||
end
|
||||
end
|
||||
-- Pub/sub message: ["message", channel, payload]
|
||||
if parts[1] == "message" and parts[3] then
|
||||
local payload = parts[3]
|
||||
log.info("WebSocket broadcast", { component = "ws", payload_len = #tostring(payload) })
|
||||
local frame = encode_ws_frame(payload)
|
||||
local closed_ws = {}
|
||||
for i, ws in ipairs(ws_clients) do
|
||||
local _, send_err = ws:send(frame)
|
||||
if send_err then table.insert(closed_ws, i) end
|
||||
end
|
||||
for i = #closed_ws, 1, -1 do table.remove(ws_clients, closed_ws[i]) end
|
||||
end
|
||||
end
|
||||
end
|
||||
end)
|
||||
|
||||
if not ok then
|
||||
local err_str = tostring(err)
|
||||
if not string.find(err_str, "timeout") and not string.find(err_str, "closed") then
|
||||
log.error("Redis subscriber error", { component = "redis", err = err_str })
|
||||
red_sub = nil
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
socket.sleep(0.01)
|
||||
end
|
||||
end
|
||||
|
||||
-- Run if executed directly
|
||||
if arg[0]:match("app%-standalone") then
|
||||
app.start()
|
||||
end
|
||||
|
||||
return app
|
||||
Reference in New Issue
Block a user