Many real-world processes require executing multiple tasks in parallel, retrying failures, and combining results into a final output. Implementing this orchestration logic manually can introduce unnecessary complexity.
Argo Workflows, built for Kubernetes, enables you to model these processes declaratively as workflows. In this article, we use a simple example to highlight how it handles parallel execution, retries, and result aggregation.
Why Not Just Use Native Kubernetes Resources?
Kubernetes provides solid primitives for running workloads, Pods, Jobs, and CronJobs. They work well for executing individual tasks, but they don’t provide a built-in way to orchestrate multiple steps.
At a high level:
- Pods run a single unit of work, with no awareness of sequencing or dependencies
- Jobs ensure completion and support retries, but still operate in isolation
- CronJobs add scheduling, yet each run remains independent
This model works for simple tasks. However, once you need to:
- Run steps in parallel
- Retry only failed parts of a process
- Pass data between steps
- Aggregate results at the end
- Seamless integrations with cloud services.
you’re no longer dealing with isolated workloads , you’re dealing with a workflow.
At that point, using only native Kubernetes resources often means introducing custom scripts or external logic to coordinate execution, which increases complexity and reduces clarity.
Why Argo Workflows?
Argo Workflows extends Kubernetes by adding a native way to orchestrate multi-step processes. Instead of managing execution logic manually, you define the workflow declaratively and let Argo handle the coordination.
Key capabilities include:
- Declarative workflows Define the entire process in YAML, making it easy to version and maintain
- Parallel execution Run independent tasks concurrently without additional logic
- Step-level retries Configure retry behavior per task without modifying application code
- Dependency management (DAGs) Control execution order by defining relationships between steps
- Result aggregation Collect outputs from parallel tasks and pass them to subsequent steps
- Artifact storage and integrations Persist outputs in systems like Amazon S3 and integrate with services such as Amazon SNS, Amazon SQS, or AWS Lambda for downstream processing
Because it runs on Kubernetes, Argo integrates seamlessly with existing cluster capabilities like scheduling and scaling, while also fitting naturally into cloud environments such as AWS.
Together, these features provide a clear and maintainable way to move from running individual tasks to orchestrating complete processes.
A Practical Workflow Example
To demonstrate these capabilities in practice, we’ll build a simple workflow that models a multi-step process with parallel execution, retry handling, and result aggregation.
The goal is not the complexity of the use case itself, but to highlight how Argo Workflows manages coordination between tasks in a clear and declarative way.
What the Workflow Does
This exercise is designed to highlight a few key patterns that are common in real-world workflows:
- Generates a list of inputs to process
- Fetches data from multiple external sources in parallel
- Applies basic validation and retries failed requests
- Aggregates all responses into a single result
- Stores the final output in Amazon S3
- Notifies admins with Amazon SNS
While the individual steps are simple, together they demonstrate how Argo Workflows can coordinate multiple tasks in a structured and declarative way.
Workflow Breakdown
Dynamic Input Generation
Instead of hardcoding inputs, the workflow generates them dynamically using withSequence. This allows the number of tasks to scale based on a parameter.
- name: fetch-all
template: call-api
withSequence:
count: "{{workflow.parameters.count}}"
start: 1
This creates a sequence from 1 → count, where each value becomes an independent unit of work.
Parallel Execution (Fan-Out)
Each generated item is processed in parallel by the same template. Argo automatically expands this into multiple concurrent tasks.
arguments:
parameters:
- name: url
value: "https://jsonplaceholder.typicode.com/posts/{{item}}"
Each {{item}} represents one execution, allowing all requests to run simultaneously.
Error Handling and Retries
Each task includes a retry strategy to handle transient failures without affecting the rest of the workflow.
retryStrategy:
limit: 3
retryPolicy: OnTransientError
backoff:
duration: "5s"
factor: 2
Retries are applied at the task level, ensuring resilience while keeping the workflow logic simple.
Validation
Validation is enforced directly within the task using a success condition. A response is only considered valid if it meets specific criteria.
successCondition: "response.statusCode == 200 && response.body contains \"userId\" && response.body contains \"id\""
This ensures that only well-formed responses are passed forward in the workflow.
Result Aggregation (Fan-In)
Once all parallel tasks complete, their outputs are automatically aggregated and passed to the next step.
- name: save-results
dependencies: [fetch-all]
template: save-api-results-artifact
arguments:
parameters:
- name: results
value: "{{tasks.fetch-all.outputs.result}}"
Argo collects all parallel outputs into a single JSON array, enabling downstream processing.
Artifacts and Amazon S3 Integration
The aggregated result is validated, normalized, and stored as an artifact in Amazon S3.
outputs:
artifacts:
- name: api-aggregated-responses
path: /tmp/api-responses.json
s3:
bucket: argo-workflow-data
key: "{{workflow.name}}/api-responses.json"
Notifications and Amazon SNS Integration
After successfully storing the results, the workflow sends a notification using Amazon SNS.
- name: notify-sns
dependencies: [save-results]
template: notify-sns
Full Workflow
The following is the complete workflow that brings together all the concepts discussed — dynamic execution, parallel processing, retries, validation, aggregation, and integration with Amazon S3 and SNS.
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: dynamic-deal-aggregator-
spec:
entrypoint: main
serviceAccountName: argo-workflow-sa
arguments:
parameters:
- name: count
value: "5"
- name: snsTopicArn
value: "arn:aws:sns:AWS_REGION:ACCOUNT_ID:TOPIC_NAME"
templates:
- name: main
dag:
tasks:
- name: fetch-all
template: call-api
withSequence:
count: "{{workflow.parameters.count}}"
start: 1
arguments:
parameters:
- name: url
value: "https://jsonplaceholder.typicode.com/posts/{{item}}"
- name: save-results
dependencies: [fetch-all]
template: save-api-results-artifact
arguments:
parameters:
- name: results
value: "{{tasks.fetch-all.outputs.result}}"
- name: expectedCount
value: "{{workflow.parameters.count}}"
- name: notify-sns
dependencies: [save-results]
template: notify-sns
arguments:
parameters:
- name: topicArn
value: "{{workflow.parameters.snsTopicArn}}"
- name: workflowName
value: "{{workflow.name}}"
- name: call-api
retryStrategy:
limit: 3
retryPolicy: OnTransientError
backoff:
duration: "5s"
factor: 2
inputs:
parameters:
- name: url
http:
url: "{{inputs.parameters.url}}"
method: GET
timeoutSeconds: 60
successCondition: "response.statusCode == 200 && response.body contains \"userId\" && response.body contains \"id\""
- name: save-api-results-artifact
inputs:
parameters:
- name: results
- name: expectedCount
outputs:
artifacts:
- name: api-aggregated-responses
path: /tmp/api-responses.json
archive:
none: {}
s3:
bucket: S3_BUCKET_NAME
endpoint: s3.amazonaws.com
region: AWS_REGION
key: "{{workflow.name}}/api-responses.json"
accessKeySecret:
name: argo-s3-credentials
key: accessKey
secretKeySecret:
name: argo-s3-credentials
key: secretKey
script:
image: python:3.9
command: [python]
env:
- name: RESULTS
value: "{{inputs.parameters.results}}"
- name: EXPECTED_COUNT
value: "{{inputs.parameters.expectedCount}}"
source: |
import json
import os
import sys
raw = os.environ["RESULTS"]
expected = int(os.environ["EXPECTED_COUNT"])
path = "/tmp/api-responses.json"
try:
data = json.loads(raw)
except json.JSONDecodeError as e:
print(f"validation: aggregated results are not valid JSON: {e}", file=sys.stderr)
sys.exit(1)
if not isinstance(data, list):
print("validation: aggregated results must be a JSON array", file=sys.stderr)
sys.exit(1)
if len(data) != expected:
print(f"validation: expected {expected} responses, got {len(data)}", file=sys.stderr)
sys.exit(1)
normalized = []
for i, item in enumerate(data):
if isinstance(item, dict):
obj = item
else:
try:
obj = json.loads(item) if isinstance(item, str) else item
except (TypeError, json.JSONDecodeError) as e:
print(f"validation: item {i} is not a JSON object: {e}", file=sys.stderr)
sys.exit(1)
if not isinstance(obj, dict):
print(f"validation: item {i} must be a JSON object", file=sys.stderr)
sys.exit(1)
if "id" not in obj or "userId" not in obj:
print(f"validation: item {i} missing required keys id or userId", file=sys.stderr)
sys.exit(1)
normalized.append(obj)
with open(path, "w", encoding="utf-8") as f:
json.dump(normalized, f, indent=2, ensure_ascii=False)
print(f"Wrote artifact to {path}")
- name: notify-sns
inputs:
parameters:
- name: topicArn
- name: workflowName
retryStrategy:
limit: 3
retryPolicy: OnTransientError
backoff:
duration: "5s"
factor: 2
container:
image: amazon/aws-cli:2.17.59
env:
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: argo-s3-credentials
key: accessKey
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: argo-s3-credentials
key: secretKey
- name: AWS_DEFAULT_REGION
value: "AWS_REGION"
- name: SNS_TOPIC_ARN
value: "{{inputs.parameters.topicArn}}"
- name: WORKFLOW_NAME
value: "{{inputs.parameters.workflowName}}"
command: [sh, -c]
args:
- |
MSG="Workflow ${WORKFLOW_NAME} completed successfully. Artifact: s3://S3_BUCKET/${WORKFLOW_NAME}/api-responses.json"
exec aws sns publish --topic-arn "${SNS_TOPIC_ARN}" --message "${MSG}" --region "${AWS_DEFAULT_REGION}"
Running the workflow on the UI will show you the output of each step with the logs and intermediate metadata as the below screenshot shows.
Enhancements
The workflow presented is intentionally simple, but it can be extended in several ways to improve reliability, flexibility, and operational maturity without significantly increasing complexity.
Reliability and Operations
Argo provides several built-in controls to improve stability and cluster hygiene:
- TTL strategy (
ttlStrategy) Automatically clean up completed workflows after a defined period:
ttlStrategy:
secondsAfterCompletion: 86400 # 24 hours
- Pod cleanup (
podGC) Remove completed pods to reduce resource usage:
podGC:
strategy: OnWorkflowCompletion
- Execution limits (
activeDeadlineSeconds) Prevent workflows from running indefinitely:
activeDeadlineSeconds: 1800 # 30 minutes
- Parallelism control (
parallelism) Limit concurrent tasks to avoid overwhelming external systems:
parallelism: 3
Notifications and Outcomes
Currently, notifications are only sent on success. This can be extended to cover all outcomes:
- Exit handler (
onExit) Run a step regardless of success or failure:
onExit: notify-exit
Inside the exit template, you can branch based on status:
{{workflow.status}}
- Richer Amazon SNS payloads Include additional context such as workflow ID or duration:
MSG='{
"workflow": "{{workflow.name}}",
"status": "{{workflow.status}}",
"uid": "{{workflow.uid}}"
}'
Security and Configuration
To improve security and avoid hardcoding sensitive values:
- Externalized parameters Pass values at submission time instead of embedding them:
argo submit workflow.yaml -p snsTopicArn=<YOUR_TOPIC_ARN>
- IAM roles (IRSA) On AWS (EKS), use a service account with an IAM role instead of storing credentials in secrets.
Flexibility and Reuse
To make the workflow reusable across different scenarios:
- Workflow templates (
WorkflowTemplate) Extract reusable logic:
kind: WorkflowTemplate
Then reference it:
workflowTemplateRef:
name: my-template
- Parameterization Make values configurable:
parameters:
- name: baseUrl
- name: s3Bucket
- name: awsRegion
Observability
Improve visibility into execution and debugging:
- Log archiving
archiveLogs: true
- Metrics (template-level)
metrics:
prometheus:
- name: workflow_success
help: "Workflow success count"
when: "{{status}} == Succeeded"
Conclusion
Argo Workflows provides a simple yet powerful way to orchestrate multi-step processes directly on Kubernetes. By combining parallel execution, retry handling, result aggregation, and integrations with services like Amazon S3 and SNS, it enables you to model complex workflows in a clear and declarative way.
While the example in this article is intentionally simple, the same patterns can be extended to support more advanced, production-grade use cases with minimal changes.
Comments
Loading comments…