What is the Worker?
The Worker is a microservice responsible for executing workflow operations in the FluxPrompt system. It processes node and agent executions from Temporal queues, handles AI model interactions, and manages resource-intensive operations. The Worker runs as a separate service to isolate processing workloads from the main API, ensuring scalability and reliability for workflow execution.How it works
The Worker operates as a Temporal worker that:- Connects to the Temporal server to receive workflow execution tasks
- Executes workflows defined in the codebase
- Runs activities that perform actual operations (database queries, AI model calls, file generation)
- Reports execution results back to Temporal
- Handles retries and error recovery through Temporal’s built-in mechanisms
Temporal integration
The Worker integrates with Temporal using the Temporal SDK for Node.js. Temporal provides:- Workflow orchestration: Defines the execution flow and coordinates activities
- State management: Maintains workflow state across executions and failures
- Queue management: Manages task queues for node and agent processing
- Reliability: Ensures workflows complete even if the worker restarts
- History: Tracks all workflow and activity executions for debugging
Connection
The Worker connects to Temporal using aNativeConnection:
Task queue
All workflows are processed through theenhanced-ai-queue task queue, which allows multiple workers to process tasks in parallel.
Workflows and activities
Workflows
Workflows define the execution logic and orchestrate activities. They are deterministic functions that:- Cannot make direct external calls (database, HTTP, etc.)
- Must use activities for any non-deterministic operations
- Can handle retries and error scenarios
- Maintain state throughout execution
processSingleNode- Executes a single node in a workflowprocessAgent- Executes a complete agent workflow (in development)
Activities
Activities are the actual operations performed by workflows. They can:- Make database queries
- Call external APIs
- Generate files
- Send notifications
- Perform any non-deterministic operation
notifyStatusActivity- Sends status updates to usersgetInfoInDatabaseActivity- Queries the databasevalidateModelAccessActivity- Validates user access to AI modelstextGeneratorActivity- Runs the AI Text Generator node (LLM call, persistence, token usage)chargeTokensActivity- Records token usage for billinggenerateFileActivity- Generates output filesgenerateExecutionLogsActivity- Creates execution logsscriptingActivity- Runs user JavaScript or Python via LambdanodeExecutionErrorActivity- Handles node execution errors
Execution flow
When a workflow is triggered:- Workflow starts: Temporal receives a workflow execution request
- Worker picks up task: An available worker picks up the task from the queue
- Workflow executes: The workflow function runs and orchestrates activities
- Activities execute: Each activity performs its operation (database query, API call, etc.)
- Results returned: Activity results are returned to the workflow
- Workflow completes: The workflow finishes and reports completion status
- Status notification: Users are notified of the execution status
Error handling
The Worker implements comprehensive error handling:- Activity retries: Activities can be retried on failure (configured per activity)
- Workflow error handling: Workflows catch errors and execute error activities
- Status updates: Users are notified of failures through status notifications
- Execution logs: All errors are logged with stack traces for debugging
Configuration
The Worker requires the following environment variables:TEMPORAL_ADDRESS- Temporal server address (default:localhost:7233)URL_MAIN_API- Main API URL for internal API callsINTERNAL_API_KEY- API key for authenticating with the main API- Database connection variables (for direct database access)