Flows (Automated Workflows)
Flows (Automated Workflows) Flows are automated workflows that execute a sequence of steps based on triggers. They support scheduled execution (cron) and manual triggers. For event-driven flows, use handlers/hooks with @TRIGGER() . Tables Table Purpose flow_definition Flow config
Flows (Automated Workflows)
Flows are automated workflows that execute a sequence of steps based on triggers. They support scheduled execution (cron) and manual triggers. For event-driven flows, use handlers/hooks with @TRIGGER().
Tables
| Table | Purpose |
|---|---|
flow_definition |
Flow configuration (name, trigger, timeout, maxExecutions) |
flow_step_definition |
Steps within a flow (ordered, typed, configurable) |
flow_execution_definition |
Execution history and state (query separately, not nested) |
Trigger Types
| Type | Config Example | Description |
|---|---|---|
schedule |
{"cron": "0 2 * * *", "timezone": "UTC"} |
Cron-based scheduling |
manual |
{} |
User-initiated via UI, API, or @TRIGGER() from handlers/hooks |
Step Types
| Type | Config | Description |
|---|---|---|
script |
{"code": "..."} |
Execute custom JavaScript with full context |
condition |
{"code": "return ..."} |
Evaluate condition using JS truthy/falsy. Truthy branch "true", falsy branch "false" |
query |
{"table": "...", "filter": {...}, "limit": 10} |
Query table data |
create |
{"table": "...", "data": {...}} |
Create a record |
update |
{"table": "...", "id": 1, "data": {...}} |
Update a record by ID |
delete |
{"table": "...", "id": 1} |
Delete a record by ID |
http |
{"url": "...", "method": "POST", "body": {...}, "headers": {...}, "timeout": 30000} |
Send HTTP request (auto Content-Type: application/json when body present). See HTTP step URL rules. |
trigger_flow |
{"flowId": 2} or {"flowName": "..."} |
Trigger another flow |
sleep |
{"ms": 5000} |
Pause execution for N ms |
log |
{"message": "..."} |
Log a message to execution context |
HTTP step URL rules (SSRF hardening)
The server only allows http: and https: URLs that are not obvious SSRF targets: localhost and common loopback names, raw private/reserved IP literals, and hostnames that resolve only to private IPs are rejected. Prefer public internet hostnames (e.g. https://api.example.com). Internal callbacks need a deliberate architecture change, not an arbitrary internal URL in this step.
Template Syntax
Flow steps support the same template macros as handlers/hooks, plus flow-specific ones. Code is auto-transpiled when cached.
| Macro | Expands to | Description |
|---|---|---|
@FLOW_PAYLOAD |
$ctx.$flow.$payload |
Input payload data |
@TRIGGER |
$ctx.$trigger |
Trigger flow from handlers/hooks |
@FLOW_LAST |
$ctx.$flow.$last |
Last step result |
@FLOW |
$ctx.$flow |
Full flow data chain |
@FLOW_META |
$ctx.$flow.$meta |
Flow metadata (flowId, flowName, executionId, depth) |
#table_name |
$ctx.$repos.table_name |
Table repository |
@HELPERS |
$ctx.$helpers |
Helpers (jwt, bcrypt, autoSlug) |
@USER |
$ctx.$user |
Current user (null for cron) |
@THROW400 … @THROW503, @THROW |
$ctx.$throw['400'], …, $ctx.$throw |
HTTP error helpers (use numeric keys, not '4xx') |
%package |
$ctx.$pkgs.package |
Installed packages |
Data Chain
Each flow execution maintains a shared context ($ctx.$flow / @FLOW) that passes data between steps:
@FLOW
├── @FLOW_PAYLOAD // Input data passed to the flow
├── @FLOW_LAST // Result of the most recent step
├── @FLOW_META // { flowId, flowName, executionId, startedAt }
├── @FLOW.step_1 // Result of step with key "step_1"
└── @FLOW.step_2 // Result of step with key "step_2"
Accessing data in script steps (template syntax recommended)
// Template syntax (recommended)
const email = @FLOW_PAYLOAD.email;
const user = @FLOW.find_user?.data?.[0];
const prev = @FLOW_LAST;
const orders = await #order_definition.find({
filter: { userId: { _eq: user.id } }
});
return orders;
// Equivalent verbose syntax (not recommended)
// $ctx.$flow.$payload.email, $ctx.$repos.order_definition, etc.
Condition Branching
Condition steps support true/false branching. Child steps reference the condition via parent relation and branch field.
[query_users] ← root step (parent: null)
↓
[check_has_users] ← root step, type: condition
├── true:
│ [process_users] ← parent: check_has_users, branch: "true"
│ [send_report] ← parent: check_has_users, branch: "true"
└── false:
[log_empty] ← parent: check_has_users, branch: "false"
↓
[cleanup] ← root step (continues after condition)
Creating branch steps via API:
POST /api/flow_step_definition
{
"flow": { "id": 1 },
"key": "process_users",
"stepOrder": 1,
"type": "script",
"config": { "code": "return #user_definition.find({ limit: 100 })" },
"parent": { "id": 5 },
"branch": "true"
}
Rules:
- Steps with parent: null are root level — execute sequentially
- Condition uses JS truthy/falsy: return user (object = truthy), return null (falsy), return count > 0 (boolean)
- Truthy children with branch: "true" execute
- Falsy children with branch: "false" execute
- After condition + branch children complete next root step continues
- Branch steps can have onError: skip/stop/retry independently
Error Handling
| Strategy | Behavior |
|---|---|
stop |
Halt flow, mark execution as failed (default) |
skip |
Record error, continue to next step |
retry |
Retry with exponential backoff up to retryAttempts times |
Admin Endpoints
| Endpoint | Description |
|---|---|
POST /admin/flow/trigger/:id |
Trigger a flow execution via BullMQ |
POST /admin/test/run |
Test a flow step without saving by sending kind: "flow_step" |
Triggering Flows from Handlers
await @TRIGGER('send-welcome-email', { userId: @USER.id });
await @TRIGGER(5, { orderId: @PARAMS.id, total: 100 });
Execution History
Query execution records separately (not nested under flow):
GET /api/flow_execution_definition?filter={"flow":{"_eq":1}}&sort=-id&limit=10
Each execution record contains:
- status: pending, running, completed, failed, cancelled
- currentStep: which step the flow stopped or is running at
- completedSteps: array of step keys that completed successfully
- error: error details if failed (message + stack)
- duration: total execution time in ms
Example: Order Processing Flow
1. Create the flow
POST /api/flow_definition
{
"name": "process-order",
"triggerType": "manual",
"timeout": 30000,
"isEnabled": true
}
Then trigger from a post-hook on /order_definition:
await @TRIGGER('process-order', { data: @DATA });
2. Add steps
POST /api/flow_step_definition
{
"flow": { "id": 1 },
"key": "validate_stock",
"stepOrder": 1,
"type": "script",
"config": {
"code": "const order = @FLOW_PAYLOAD.data; const product = await #product_definition.find({ filter: { id: { _eq: order.productId } }, limit: 1 }); return { inStock: product.data[0]?.stock > order.quantity }"
},
"timeout": 5000,
"onError": "stop"
}
3. Test a step before saving
POST /api/admin/test/run
{
"kind": "flow_step",
"type": "query",
"config": { "table": "user_definition", "filter": { "status": { "_eq": "active" } }, "limit": 5 },
"timeout": 5000
}
Response: { "success": true, "result": { "data": [...] }, "duration": 42 }
4. Trigger manually
POST /api/admin/flow/trigger/1
{ "payload": { "orderId": 123 } }
5. View execution history
GET /api/flow_execution_definition?filter={"flow":{"_eq":1}}&sort=-id&limit=10
Scheduled Flow Example
POST /api/flow_definition
{
"name": "daily-cleanup",
"triggerType": "schedule",
"triggerConfig": { "cron": "0 2 * * *", "timezone": "Asia/Ho_Chi_Minh" },
"timeout": 60000,
"isEnabled": true
}
Cron schedules are registered automatically via BullMQ Job Schedulers when the flow cache reloads.
Safety & Limits
| Feature | Detail |
|---|---|
| Max nesting depth | 10 (flow triggering flow via trigger_flow step or $trigger()) |
| Circular detection | Tracks visited flow IDs in chain — ABA rejected immediately |
| HTTP timeout | 30s default, configurable via config.timeout per step |
| Execution history | Auto-cleanup per flow via maxExecutions (default 100, oldest deleted) |
| Step timeout | Per-step or inherited from flow timeout (default 5000ms) |
| Retry backoff | Exponential: 1s, 2s, 4s, 8s... capped at 30s |
Triggering Flows from Flow Steps
Flow steps have $trigger() available, same as handlers/hooks:
// Inside a script step
const result = await @TRIGGER('send-notification', { userId: @FLOW_PAYLOAD.userId });
This respects the same nesting depth and circular detection limits.