112 lines
3.5 KiB
Lua
112 lines
3.5 KiB
Lua
local redis = require("redis")
|
|
local cjson = require("cjson")
|
|
local socket = require("socket")
|
|
local log = require("log")
|
|
|
|
local REDIS_HOST = os.getenv("REDIS_HOST") or "127.0.0.1"
|
|
local REDIS_PORT = tonumber(os.getenv("REDIS_PORT")) or 6379
|
|
local MAX_RETRIES = tonumber(os.getenv("WORKER_MAX_RETRIES")) or 3
|
|
local DLQ_KEY = "devices:events:dlq"
|
|
|
|
local device_handler = require("handlers.device_handler")
|
|
local rating_handler = require("handlers.rating_handler")
|
|
local review_handler = require("handlers.review_handler")
|
|
|
|
local handlers = {
|
|
DevicePublished = device_handler.handle,
|
|
DeviceDeleted = device_handler.handle,
|
|
DeviceUpdated = device_handler.handle,
|
|
RatingPublished = rating_handler.handle,
|
|
ReviewPublished = review_handler.handle,
|
|
UserCreated = function(event)
|
|
log.info("User created", { component = "worker", user_id = event.user_id })
|
|
end,
|
|
}
|
|
|
|
local function move_to_dlq(red, event_json, reason)
|
|
red:lpush(DLQ_KEY, cjson.encode({
|
|
event = event_json,
|
|
reason = reason,
|
|
failed_at = os.time(),
|
|
}))
|
|
log.warn("Event moved to DLQ", { reason = reason, dlq_key = DLQ_KEY })
|
|
end
|
|
|
|
local function process_event(red, event_json)
|
|
local ok_decode, event = pcall(cjson.decode, event_json)
|
|
if not ok_decode or not event then
|
|
return false, "decode_failed"
|
|
end
|
|
|
|
local handler = handlers[event.event_type]
|
|
if not handler then
|
|
return true, nil -- Acknowledge unknown event types
|
|
end
|
|
|
|
local attempt = 0
|
|
local last_err
|
|
|
|
while attempt < MAX_RETRIES do
|
|
attempt = attempt + 1
|
|
local success, handler_err = pcall(handler, event)
|
|
if success then
|
|
return true, nil
|
|
end
|
|
last_err = tostring(handler_err)
|
|
if attempt < MAX_RETRIES then
|
|
local delay = math.min(2 ^ attempt * 0.5, 10)
|
|
log.warn("Handler failed, retrying", {
|
|
component = "worker",
|
|
attempt = attempt,
|
|
max_retries = MAX_RETRIES,
|
|
delay = delay,
|
|
err = last_err,
|
|
})
|
|
socket.sleep(delay)
|
|
end
|
|
end
|
|
|
|
return false, last_err
|
|
end
|
|
|
|
local function run_worker()
|
|
while true do
|
|
local ok, err = pcall(function()
|
|
log.info("Connecting to Redis", { component = "worker" })
|
|
|
|
local red = redis.connect(REDIS_HOST, REDIS_PORT)
|
|
log.info("Connected to Redis, waiting for events", { component = "worker" })
|
|
|
|
while true do
|
|
local event_json = red:brpoplpush("devices:events:queue", "devices:events:processing", 0)
|
|
|
|
if event_json then
|
|
local success, reason = process_event(red, event_json)
|
|
|
|
if success then
|
|
red:lrem("devices:events:processing", 1, event_json)
|
|
log.info("Event processed", { component = "worker" })
|
|
else
|
|
-- Move to DLQ and remove from processing
|
|
move_to_dlq(red, event_json, reason)
|
|
red:lrem("devices:events:processing", 1, event_json)
|
|
end
|
|
end
|
|
end
|
|
end)
|
|
|
|
if not ok then
|
|
log.error("Worker error", { component = "worker", err = tostring(err) })
|
|
log.info("Reconnecting in 5 seconds", { component = "worker" })
|
|
socket.sleep(5)
|
|
end
|
|
end
|
|
end
|
|
|
|
log.info("Handheld Devices Worker starting", {
|
|
component = "worker",
|
|
redis = REDIS_HOST .. ":" .. REDIS_PORT,
|
|
})
|
|
|
|
run_worker()
|