742 lines
25 KiB
Lua
742 lines
25 KiB
Lua
#!/usr/bin/env lua
|
|
|
|
local socket = require("socket")
|
|
local cjson = require("cjson")
|
|
local db = require("db")
|
|
local log = require("log")
|
|
|
|
-- Optional dependencies
|
|
local redis
|
|
pcall(function() redis = require("redis") end)
|
|
|
|
local digest
|
|
pcall(function() digest = require("openssl.digest") end)
|
|
|
|
local app = {}
|
|
app.port = tonumber(os.getenv("API_PORT")) or 8080
|
|
app.host = "0.0.0.0"
|
|
|
|
-- Database configuration (from db.lua)
|
|
local DB_HOST = os.getenv("DB_HOST") or "localhost"
|
|
local DB_PORT = tonumber(os.getenv("DB_PORT")) or 5432
|
|
local DB_NAME = os.getenv("DB_NAME") or "handheld_devices"
|
|
|
|
-- Redis configuration
|
|
local REDIS_HOST = os.getenv("REDIS_HOST") or "127.0.0.1"
|
|
local REDIS_PORT = tonumber(os.getenv("REDIS_PORT")) or 6379
|
|
|
|
-- Redis client with retry
|
|
local function get_redis_connection()
|
|
if not redis then return nil end
|
|
local attempts = 3
|
|
for i = 1, attempts do
|
|
local ok, red = pcall(redis.connect, REDIS_HOST, REDIS_PORT)
|
|
if ok and red then return red end
|
|
if i < attempts then
|
|
socket.sleep(math.min(2 ^ i * 0.1, 2))
|
|
end
|
|
end
|
|
return nil
|
|
end
|
|
|
|
-- Redis ping for health
|
|
local function redis_ping()
|
|
local red = get_redis_connection()
|
|
if not red then return false end
|
|
local ok, res = pcall(red.ping, red)
|
|
if ok and res == "PONG" then return true end
|
|
return false
|
|
end
|
|
|
|
-- Ensure tables exist
|
|
local function init_db()
|
|
local ok, err = db.with_retry(function()
|
|
return db.with_connection(function(conn)
|
|
conn:query([[
|
|
CREATE TABLE IF NOT EXISTS devices (
|
|
id SERIAL PRIMARY KEY,
|
|
name VARCHAR(255) NOT NULL,
|
|
manufacturer VARCHAR(255) NOT NULL,
|
|
release_year INTEGER,
|
|
cpu VARCHAR(255),
|
|
ram_mb INTEGER,
|
|
storage_mb INTEGER,
|
|
display_size VARCHAR(50),
|
|
battery_hours REAL,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS ratings (
|
|
id SERIAL PRIMARY KEY,
|
|
device_id INTEGER NOT NULL REFERENCES devices(id) ON DELETE CASCADE,
|
|
user_id VARCHAR(255) NOT NULL,
|
|
score INTEGER CHECK (score >= 1 AND score <= 5),
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS reviews (
|
|
id SERIAL PRIMARY KEY,
|
|
device_id INTEGER NOT NULL REFERENCES devices(id) ON DELETE CASCADE,
|
|
user_id VARCHAR(255) NOT NULL,
|
|
content TEXT NOT NULL,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
);
|
|
]])
|
|
return true
|
|
end)
|
|
end)
|
|
if not ok then
|
|
error("Failed to init DB: " .. tostring(err))
|
|
end
|
|
end
|
|
|
|
-- Seed initial data
|
|
local function seed_db()
|
|
db.with_connection(function(conn)
|
|
local res = conn:query("SELECT COUNT(*) as total FROM devices")
|
|
local count = 0
|
|
if res and res[1] then
|
|
count = tonumber(res[1].total) or 0
|
|
end
|
|
|
|
if count == 0 then
|
|
log.info("Seeding initial devices", { component = "seed" })
|
|
local devices = {
|
|
{
|
|
name = "Steam Deck",
|
|
manufacturer = "Valve",
|
|
release_year = 2022,
|
|
cpu = "AMD Zen 2",
|
|
ram_mb = 16384,
|
|
storage_mb = 524288,
|
|
display_size = "7-inch",
|
|
battery_hours = 4.0
|
|
},
|
|
{
|
|
name = "Nintendo Switch",
|
|
manufacturer = "Nintendo",
|
|
release_year = 2017,
|
|
cpu = "Nvidia Tegra X1",
|
|
ram_mb = 4096,
|
|
storage_mb = 32768,
|
|
display_size = "6.2-inch",
|
|
battery_hours = 5.5
|
|
},
|
|
{
|
|
name = "ROG Ally",
|
|
manufacturer = "ASUS",
|
|
release_year = 2023,
|
|
cpu = "AMD Ryzen Z1 Extreme",
|
|
ram_mb = 16384,
|
|
storage_mb = 524288,
|
|
display_size = "7-inch",
|
|
battery_hours = 3.5
|
|
}
|
|
}
|
|
|
|
for _, device in ipairs(devices) do
|
|
conn:query(
|
|
"INSERT INTO devices (name, manufacturer, release_year, cpu, ram_mb, storage_mb, "
|
|
.. "display_size, battery_hours) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
|
|
device.name,
|
|
device.manufacturer,
|
|
device.release_year,
|
|
device.cpu,
|
|
device.ram_mb,
|
|
device.storage_mb,
|
|
device.display_size,
|
|
device.battery_hours
|
|
)
|
|
end
|
|
log.info("Seeding completed", { component = "seed" })
|
|
end
|
|
end)
|
|
end
|
|
|
|
-- Publish to Redis if available (with retry)
|
|
local function publish_event(event_type, data, request_id)
|
|
for attempt = 1, 3 do
|
|
local red = get_redis_connection()
|
|
if red then
|
|
local event = {
|
|
event_type = event_type,
|
|
timestamp = os.time(),
|
|
request_id = request_id,
|
|
}
|
|
for k, v in pairs(data) do
|
|
event[k] = v
|
|
end
|
|
|
|
local event_json = cjson.encode(event)
|
|
|
|
-- Push to queue (worker will consume from this using reliable pattern)
|
|
red:lpush("devices:events:queue", event_json)
|
|
|
|
-- Also publish to Pub/Sub for immediate processing (WebSockets)
|
|
local count = red:publish("devices:events", event_json)
|
|
|
|
log.info("Event published", { event_type = event_type, subscribers = count, request_id = request_id })
|
|
red:quit()
|
|
return
|
|
end
|
|
if attempt < 3 then socket.sleep(math.min(2 ^ attempt * 0.1, 2)) end
|
|
end
|
|
log.warn("Failed to publish event after retries", { event_type = event_type, request_id = request_id })
|
|
end
|
|
|
|
|
|
-- Helper to check if a value is JSON null
|
|
local function is_json_null(val)
|
|
return val == nil or val == cjson.null
|
|
end
|
|
|
|
-- WebSocket Utils
|
|
local function sha1(data)
|
|
if digest then
|
|
return digest.new("sha1"):final(data)
|
|
end
|
|
-- Fallback: if luaossl is not available, we can't do a proper handshake
|
|
-- In a production app, we should ensure it is available.
|
|
-- NOTE: This fallback doesn't actually produce a SHA1 hash, it just returns data.
|
|
-- The WebSocket handshake will fail if digest is not available.
|
|
print("[WS] Warning: openssl.digest not available, SHA1 handshake will fail")
|
|
return data
|
|
end
|
|
|
|
local function b64(data)
|
|
-- Minimal Base64
|
|
local b = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/'
|
|
return ((data:gsub('.', function(x)
|
|
local r, byte = '', x:byte()
|
|
for i = 8, 1, -1 do r = r .. (byte % 2^i - byte % 2^(i - 1) > 0 and '1' or '0') end
|
|
return r
|
|
end) .. '0000'):gsub('%d%d%d?%d?%d?%d?', function(x)
|
|
if (#x < 6) then return '' end
|
|
local c=0
|
|
for i=1,6 do c=c+(x:sub(i,i)=='1' and 2^(6-i) or 0) end
|
|
return b:sub(c + 1, c + 1)
|
|
end) .. ({ '', '==', '=' })[#data % 3 + 1])
|
|
end
|
|
|
|
local function encode_ws_frame(payload)
|
|
local header = string.char(0x81) -- FIN + Opcode 1 (text)
|
|
local len = #payload
|
|
if len <= 125 then
|
|
header = header .. string.char(len)
|
|
elseif len <= 65535 then
|
|
header = header .. string.char(126) .. string.char(math.floor(len / 256)) .. string.char(len % 256)
|
|
else
|
|
-- 64-bit length not implemented for simplicity
|
|
header = header .. string.char(127) .. string.rep(string.char(0), 4)
|
|
.. string.char(math.floor(len / 16777216) % 256)
|
|
.. string.char(math.floor(len / 65536) % 256)
|
|
.. string.char(math.floor(len / 256) % 256)
|
|
.. string.char(len % 256)
|
|
end
|
|
return header .. payload
|
|
end
|
|
|
|
-- Device Model
|
|
local Device = {}
|
|
|
|
function Device.all(limit, offset)
|
|
limit = limit or 10
|
|
offset = offset or 0
|
|
|
|
local res = db.with_connection(function(conn)
|
|
return conn:query("SELECT * FROM devices ORDER BY id DESC LIMIT $1 OFFSET $2", limit, offset)
|
|
end)
|
|
return res or {}
|
|
end
|
|
|
|
function Device.find(id)
|
|
local cache_key = "device:" .. id
|
|
local red = get_redis_connection()
|
|
|
|
if red then
|
|
local cached = red:get(cache_key)
|
|
if cached then
|
|
return cjson.decode(cached)
|
|
end
|
|
end
|
|
|
|
local res = db.with_connection(function(conn)
|
|
return conn:query("SELECT * FROM devices WHERE id = $1", tonumber(id))
|
|
end)
|
|
local row = res and res[1] or nil
|
|
|
|
if row and red then
|
|
red:setex(cache_key, 300, cjson.encode(row))
|
|
end
|
|
|
|
return row
|
|
end
|
|
|
|
function Device.create(data, request_id)
|
|
local res = db.with_connection(function(conn)
|
|
return conn:query(
|
|
"INSERT INTO devices (name, manufacturer, release_year, cpu, ram_mb, storage_mb, "
|
|
.. "display_size, battery_hours) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) RETURNING id",
|
|
data.name,
|
|
data.manufacturer,
|
|
(not is_json_null(data.release_year)) and tonumber(data.release_year) or nil,
|
|
(not is_json_null(data.cpu)) and data.cpu or nil,
|
|
(not is_json_null(data.ram_mb)) and tonumber(data.ram_mb) or nil,
|
|
(not is_json_null(data.storage_mb)) and tonumber(data.storage_mb) or nil,
|
|
(not is_json_null(data.display_size)) and data.display_size or nil,
|
|
(not is_json_null(data.battery_hours)) and tonumber(data.battery_hours) or nil
|
|
)
|
|
end)
|
|
local row = res and res[1]
|
|
local device_id = row and tonumber(row.id) or nil
|
|
|
|
if device_id then
|
|
publish_event("DevicePublished", { device_id = device_id, device_name = data.name }, request_id)
|
|
return Device.find(device_id)
|
|
end
|
|
|
|
return nil
|
|
end
|
|
|
|
function Device.update(id, data, request_id)
|
|
local updated = false
|
|
db.with_connection(function(conn)
|
|
local updates = {}
|
|
local pg = conn
|
|
if not is_json_null(data.name) then
|
|
table.insert(updates, "name = " .. pg:escape_literal(data.name))
|
|
end
|
|
if not is_json_null(data.manufacturer) then
|
|
table.insert(updates, "manufacturer = " .. pg:escape_literal(data.manufacturer))
|
|
end
|
|
if not is_json_null(data.release_year) then
|
|
table.insert(updates, "release_year = " .. tonumber(data.release_year))
|
|
end
|
|
if not is_json_null(data.cpu) then
|
|
table.insert(updates, "cpu = " .. pg:escape_literal(data.cpu))
|
|
end
|
|
if not is_json_null(data.ram_mb) then
|
|
table.insert(updates, "ram_mb = " .. tonumber(data.ram_mb))
|
|
end
|
|
if not is_json_null(data.storage_mb) then
|
|
table.insert(updates, "storage_mb = " .. tonumber(data.storage_mb))
|
|
end
|
|
if not is_json_null(data.display_size) then
|
|
table.insert(updates, "display_size = " .. pg:escape_literal(data.display_size))
|
|
end
|
|
if not is_json_null(data.battery_hours) then
|
|
table.insert(updates, "battery_hours = " .. tonumber(data.battery_hours))
|
|
end
|
|
|
|
if #updates > 0 then
|
|
table.insert(updates, "updated_at = CURRENT_TIMESTAMP")
|
|
conn:query("UPDATE devices SET " .. table.concat(updates, ", ") .. " WHERE id = $1", tonumber(id))
|
|
updated = true
|
|
end
|
|
end)
|
|
|
|
local red = get_redis_connection()
|
|
if red then
|
|
red:del("device:" .. id)
|
|
end
|
|
|
|
local device = Device.find(id)
|
|
if device and updated then
|
|
publish_event("DeviceUpdated", { device_id = tonumber(id), device_name = device.name }, request_id)
|
|
end
|
|
return device
|
|
end
|
|
|
|
function Device.delete(id, request_id)
|
|
local device = Device.find(id)
|
|
db.with_connection(function(conn)
|
|
conn:query("DELETE FROM devices WHERE id = $1", tonumber(id))
|
|
end)
|
|
|
|
local red = get_redis_connection()
|
|
if red then
|
|
red:del("device:" .. id)
|
|
end
|
|
|
|
if device then
|
|
publish_event("DeviceDeleted", { device_id = tonumber(id), device_name = device.name }, request_id)
|
|
end
|
|
|
|
return true
|
|
end
|
|
|
|
function Device.get_count()
|
|
local res = db.with_connection(function(conn)
|
|
return conn:query("SELECT COUNT(*) as total FROM devices")
|
|
end)
|
|
if res and res[1] then
|
|
return tonumber(res[1].total) or 0
|
|
end
|
|
return 0
|
|
end
|
|
|
|
-- HTTP Request Parser
|
|
local function parse_request(request_line)
|
|
local method, path, version = request_line:match("^(%w+)%s+(%S+)%s+(%S+)")
|
|
return method, path, version
|
|
end
|
|
|
|
local function parse_headers(client)
|
|
local headers = {}
|
|
while true do
|
|
-- Use a small timeout for individual header lines to handle slow clients
|
|
-- or cases where headers are partially sent.
|
|
client:settimeout(0.1)
|
|
local line = client:receive("*l")
|
|
if not line or line == "" then break end
|
|
local key, value = line:match("^([^:]+):%s*(.*)$")
|
|
if key then
|
|
headers[key:lower()] = value
|
|
end
|
|
end
|
|
client:settimeout(0) -- Back to non-blocking
|
|
return headers
|
|
end
|
|
|
|
local function parse_query_string(query_str)
|
|
local params = {}
|
|
if query_str and query_str ~= "" then
|
|
for key, value in query_str:gmatch("([^=&]+)=([^&]*)") do
|
|
params[key] = value
|
|
end
|
|
end
|
|
return params
|
|
end
|
|
|
|
local function parse_path(full_path)
|
|
local path, query = full_path:match("^([^?]+)%??(.*)$")
|
|
return path or full_path, query or ""
|
|
end
|
|
|
|
local function generate_request_id()
|
|
return string.format("%x-%x-%x", math.random(0, 0xffff), math.random(0, 0xffff), os.time())
|
|
end
|
|
|
|
-- HTTP Response Builder
|
|
local function build_response(status, body, content_type, request_id)
|
|
content_type = content_type or "application/json"
|
|
local extra_headers = ""
|
|
if request_id then
|
|
extra_headers = "X-Request-ID: " .. request_id .. "\r\n"
|
|
end
|
|
local response = string.format(
|
|
"HTTP/1.1 %s\r\n" ..
|
|
"Content-Type: %s\r\n" ..
|
|
"Content-Length: %d\r\n" ..
|
|
"%s" ..
|
|
"Access-Control-Allow-Origin: *\r\n" ..
|
|
"Access-Control-Allow-Methods: GET, POST, PUT, DELETE, OPTIONS\r\n" ..
|
|
"Access-Control-Allow-Headers: Content-Type, X-Request-ID\r\n" ..
|
|
"Connection: close\r\n" ..
|
|
"\r\n" ..
|
|
"%s",
|
|
status, content_type, #body, extra_headers, body
|
|
)
|
|
return response
|
|
end
|
|
|
|
-- Route Handlers
|
|
local function handle_get_devices(query_params)
|
|
local page = tonumber(query_params.page) or 1
|
|
local per_page = 10
|
|
local offset = (page - 1) * per_page
|
|
|
|
local devices = Device.all(per_page, offset)
|
|
local total = Device.get_count()
|
|
|
|
return "200 OK", cjson.encode({
|
|
data = devices,
|
|
total = total,
|
|
page = page,
|
|
per_page = per_page
|
|
})
|
|
end
|
|
|
|
local function handle_post_devices(json_data, request_id)
|
|
if not json_data.name or not json_data.manufacturer then
|
|
return "400 Bad Request", cjson.encode({ error = "Missing required fields: name, manufacturer" })
|
|
end
|
|
|
|
local device = Device.create(json_data, request_id)
|
|
if device then
|
|
return "201 Created", cjson.encode(device)
|
|
else
|
|
return "500 Internal Server Error", cjson.encode({ error = "Failed to create device" })
|
|
end
|
|
end
|
|
|
|
local function handle_get_device(id)
|
|
local device = Device.find(id)
|
|
if device then
|
|
return "200 OK", cjson.encode(device)
|
|
else
|
|
return "404 Not Found", cjson.encode({ error = "Device not found" })
|
|
end
|
|
end
|
|
|
|
local function handle_put_device(id, json_data, request_id)
|
|
local device = Device.update(id, json_data, request_id)
|
|
if device then
|
|
return "200 OK", cjson.encode(device)
|
|
else
|
|
return "404 Not Found", cjson.encode({ error = "Device not found" })
|
|
end
|
|
end
|
|
|
|
local function handle_delete_device(id, request_id)
|
|
Device.delete(id, request_id)
|
|
return "200 OK", cjson.encode({ success = true })
|
|
end
|
|
|
|
-- Health: liveness (process alive)
|
|
local function handle_health_live()
|
|
return "200 OK", cjson.encode({ status = "ok" })
|
|
end
|
|
|
|
-- Health: readiness (DB + Redis OK)
|
|
local function handle_health_ready()
|
|
local db_ok = db.ping()
|
|
local redis_ok = redis_ping()
|
|
if db_ok and redis_ok then
|
|
return "200 OK", cjson.encode({ status = "ok", db = "ok", redis = "ok" })
|
|
end
|
|
local details = { db = db_ok and "ok" or "fail", redis = redis_ok and "ok" or "fail" }
|
|
return "503 Service Unavailable", cjson.encode({ status = "degraded", details = details })
|
|
end
|
|
|
|
-- Main Request Handler
|
|
local function handle_request_with_headers(client, request_line, headers)
|
|
local method, full_path = parse_request(request_line)
|
|
local path, query_str = parse_path(full_path)
|
|
local query_params = parse_query_string(query_str)
|
|
|
|
local request_id = headers["x-request-id"] or generate_request_id()
|
|
|
|
-- Read body if POST/PUT
|
|
local body = ""
|
|
if method == "POST" or method == "PUT" then
|
|
local content_length = tonumber(headers["content-length"]) or 0
|
|
if content_length > 0 then
|
|
body = client:receive(content_length)
|
|
end
|
|
end
|
|
|
|
-- Parse JSON
|
|
local json_data = {}
|
|
if body ~= "" then
|
|
local ok, data = pcall(cjson.decode, body)
|
|
if ok then json_data = data end
|
|
end
|
|
|
|
-- Handle CORS preflight
|
|
if method == "OPTIONS" then
|
|
client:send(build_response("200 OK", "", nil, request_id))
|
|
client:close()
|
|
return
|
|
end
|
|
|
|
-- Route handling
|
|
local status, response_body = "404 Not Found", cjson.encode({ error = "Not found" })
|
|
|
|
if method == "GET" and path == "/devices" then
|
|
status, response_body = handle_get_devices(query_params)
|
|
elseif method == "POST" and path == "/devices" then
|
|
status, response_body = handle_post_devices(json_data, request_id)
|
|
elseif method == "GET" and path:match("^/devices/%d+$") then
|
|
local id = path:match("/devices/(%d+)")
|
|
status, response_body = handle_get_device(id)
|
|
elseif method == "PUT" and path:match("^/devices/%d+$") then
|
|
local id = path:match("/devices/(%d+)")
|
|
status, response_body = handle_put_device(id, json_data, request_id)
|
|
elseif method == "DELETE" and path:match("^/devices/%d+$") then
|
|
local id = path:match("/devices/(%d+)")
|
|
status, response_body = handle_delete_device(id, request_id)
|
|
elseif method == "GET" and path == "/health" then
|
|
status, response_body = "200 OK", cjson.encode({ status = "ok" })
|
|
elseif method == "GET" and path == "/health/live" then
|
|
status, response_body = handle_health_live()
|
|
elseif method == "GET" and path == "/health/ready" then
|
|
status, response_body = handle_health_ready()
|
|
end
|
|
|
|
log.info("Request", {
|
|
method = method,
|
|
path = path,
|
|
status = status:match("^(%d+)") or "?",
|
|
request_id = request_id,
|
|
})
|
|
|
|
-- Send response
|
|
local response = build_response(status, response_body, nil, request_id)
|
|
client:send(response)
|
|
client:close()
|
|
end
|
|
|
|
-- Start server
|
|
function app.start()
|
|
log.info("Handheld Devices API Server starting", {
|
|
component = "api",
|
|
port = app.port,
|
|
db = DB_NAME .. "@" .. DB_HOST .. ":" .. DB_PORT,
|
|
redis = redis and "enabled" or "disabled",
|
|
})
|
|
|
|
init_db()
|
|
seed_db()
|
|
|
|
local server = socket.bind(app.host, app.port)
|
|
if not server then
|
|
error("Failed to bind to " .. app.host .. ":" .. app.port)
|
|
end
|
|
server:settimeout(0) -- Non-blocking
|
|
|
|
log.info("Server started successfully", { component = "api" })
|
|
|
|
local clients = {}
|
|
local ws_clients = {}
|
|
|
|
-- Background Redis subscriber
|
|
local function connect_subscriber()
|
|
local red = get_redis_connection()
|
|
if red then
|
|
-- Note: redis-lua's subscribe() sets the connection into a subscription state.
|
|
local ok, err = pcall(function() return red:subscribe("devices:events") end)
|
|
if ok then
|
|
-- Set the underlying socket to a very small timeout for non-blocking feel
|
|
pcall(function()
|
|
if red.network and red.network.socket then
|
|
red.network.socket:settimeout(0.001)
|
|
end
|
|
end)
|
|
log.info("Redis subscriber connected", { component = "redis" })
|
|
else
|
|
log.error("Redis subscriber failed", { component = "redis", err = tostring(err) })
|
|
red = nil
|
|
end
|
|
end
|
|
return red
|
|
end
|
|
|
|
local red_sub = connect_subscriber()
|
|
local last_sub_reconnect = os.time()
|
|
|
|
while true do
|
|
local client = server:accept()
|
|
if client then
|
|
client:settimeout(0)
|
|
table.insert(clients, { socket = client })
|
|
end
|
|
|
|
-- Reconnect subscriber if lost
|
|
if not red_sub and os.time() - last_sub_reconnect > 5 then
|
|
red_sub = connect_subscriber()
|
|
last_sub_reconnect = os.time()
|
|
end
|
|
|
|
-- Handle HTTP clients
|
|
local to_remove = {}
|
|
for i, c in ipairs(clients) do
|
|
local line, err = c.socket:receive("*l")
|
|
if line then
|
|
local headers = parse_headers(c.socket)
|
|
|
|
if headers["upgrade"] == "websocket" then
|
|
-- Handle WebSocket Handshake
|
|
local key = headers["sec-websocket-key"]
|
|
if key then
|
|
local sha1_key = sha1(key .. "258EAFA5-E914-47DA-95CA-C5AB0DC85B11")
|
|
local accept = b64(sha1_key)
|
|
local response = "HTTP/1.1 101 Switching Protocols\r\n" ..
|
|
"Upgrade: websocket\r\n" ..
|
|
"Connection: Upgrade\r\n" ..
|
|
"Sec-WebSocket-Accept: " .. accept .. "\r\n\r\n"
|
|
c.socket:send(response)
|
|
table.insert(ws_clients, c.socket)
|
|
log.info("WebSocket client connected", { component = "ws", total = #ws_clients })
|
|
else
|
|
c.socket:close()
|
|
end
|
|
table.insert(to_remove, i)
|
|
else
|
|
handle_request_with_headers(c.socket, line, headers)
|
|
table.insert(to_remove, i)
|
|
end
|
|
elseif err == "closed" then
|
|
table.insert(to_remove, i)
|
|
end
|
|
end
|
|
for i = #to_remove, 1, -1 do table.remove(clients, to_remove[i]) end
|
|
|
|
-- Handle Redis Messages -> Send to WS
|
|
-- redis-lua has no read_reply; pub/sub messages come as multibulk: [kind, channel, payload]
|
|
if red_sub then
|
|
local ok, err = pcall(function()
|
|
local sock = red_sub.network.socket
|
|
if not sock then return end
|
|
sock:settimeout(0.001)
|
|
local line, serr = sock:receive("*l")
|
|
if not line then
|
|
if serr ~= "timeout" then
|
|
error(serr or "socket read failed")
|
|
end
|
|
return
|
|
end
|
|
-- Parse RESP: *3\r\n means 3-element array
|
|
local prefix = line:sub(1, 1)
|
|
if prefix == "*" then
|
|
local count = tonumber(line:sub(2))
|
|
if count and count >= 3 then
|
|
local parts = {}
|
|
for i = 1, count do
|
|
local bline, berr = sock:receive("*l")
|
|
if not bline then error(berr or "incomplete") end
|
|
if bline:sub(1, 1) == "$" then
|
|
local len = tonumber(bline:sub(2))
|
|
local bulk = len > 0 and sock:receive(len + 2) or ""
|
|
if bulk then
|
|
parts[i] = len > 0 and bulk:sub(1, -3) or ""
|
|
end
|
|
end
|
|
end
|
|
-- Pub/sub message: ["message", channel, payload]
|
|
if parts[1] == "message" and parts[3] then
|
|
local payload = parts[3]
|
|
log.info("WebSocket broadcast", { component = "ws", payload_len = #tostring(payload) })
|
|
local frame = encode_ws_frame(payload)
|
|
local closed_ws = {}
|
|
for i, ws in ipairs(ws_clients) do
|
|
local _, send_err = ws:send(frame)
|
|
if send_err then table.insert(closed_ws, i) end
|
|
end
|
|
for i = #closed_ws, 1, -1 do table.remove(ws_clients, closed_ws[i]) end
|
|
end
|
|
end
|
|
end
|
|
end)
|
|
|
|
if not ok then
|
|
local err_str = tostring(err)
|
|
if not string.find(err_str, "timeout") and not string.find(err_str, "closed") then
|
|
log.error("Redis subscriber error", { component = "redis", err = err_str })
|
|
red_sub = nil
|
|
end
|
|
end
|
|
end
|
|
|
|
socket.sleep(0.01)
|
|
end
|
|
end
|
|
|
|
-- Run if executed directly
|
|
if arg[0]:match("app%-standalone") then
|
|
app.start()
|
|
end
|
|
|
|
return app
|