Plugins
A Plugin is an abstraction that allows you to customize any aspect of your Temporal Worker setup, including registering Workflow and Activity definitions, modifying worker and client options, and more. Using plugins, you can build reusable open-source libraries or build add-ons for engineers at your company.
This guide will teach you how to create plugins and give platform engineers general guidance on using and managing Temporal's primitives.
Here are some common use cases for plugins:
- AI Agent SDKs
- Observability, tracing, or logging middleware
- Adding reliable built-in functionality such as LLM calls, messaging systems, and payments infrastructure
- Encryption or compliance middleware
How to build a Plugin
The recommended way to start building plugins is with a SimplePlugin. This abstraction will tackle the vast majority of plugins people want to write.
For advanced use cases, you can extend the methods in lower-level classes that Simple Plugin is based on without re-implementing what you’ve done. See the Advanced Topics section for more information.
Example Plugins
If you prefer to learn by getting hands-on with code, check out some existing plugins.
- Temporal's Python SDK ships with an OpenAI Agents SDK plugin
- Temporal client and Worker plugin for Pydantic AI
What you can provide to users in a plugin
There are a number of features you can give your users with a plugin. Here's a short list of some of the things you can do.
- Built-in Activities
- Workflow-friendly libraries
- Built-in Workflows
- Built-in Nexus Operations
- Custom Data Converters
- Interceptors
Built-in Activity
You can provide built-in Activities in a Plugin for users to call from their Workflows. Check out the Activities doc for more detail on how these work.
You should refer to the best practices for creating Activities when you are making Activity plugins.
Timeouts and retry policies
Temporal's Activity retry mechanism gives applications the benefits of durable execution. See the Activity retry policy explanation for more details.
features/snippets/plugins/plugins.go
func SomeActivity(ctx context.Context) error {
// Activity implementation
return nil
}
func createActivityPlugin() (*temporal.SimplePlugin, error) {
return temporal.NewSimplePlugin(temporal.SimplePluginOptions{
Name: "PluginName",
RunContextBefore: func(ctx context.Context, options temporal.SimplePluginRunContextBeforeOptions) error {
options.Registry.RegisterActivityWithOptions(
SomeActivity,
activity.RegisterOptions{Name: "SomeActivity"},
)
return nil
},
})
}
features/snippets/plugins/plugins.java
@ActivityInterface
public interface SomeActivity {
@ActivityMethod
void someActivity();
}
public class SomeActivityImpl implements SomeActivity {
@Override
public void someActivity() {
// Activity implementation
}
}
SimplePlugin activityPlugin =
SimplePlugin.newBuilder("PluginName")
.registerActivitiesImplementations(new SomeActivityImpl())
.build();
features/snippets/plugins/plugins.py
@activity.defn
async def some_activity() -> None:
return None
plugin = SimplePlugin("PluginName", activities=[some_activity])
features/snippets/plugins/plugins.ts
const activity = async () => 'activity';
const plugin = new SimplePlugin({
name: 'plugin-name',
activities: {
pluginActivity: activity,
},
});
features/snippets/plugins/plugins.cs
[Activity]
static void SomeActivity() => throw new NotImplementedException();
SimplePlugin activityPlugin = new SimplePlugin(
"PluginName",
new SimplePluginOptions() { }.AddActivity(SomeActivity));
features/snippets/plugins/plugins.rb
def some_activity
# Activity implementation
end
plugin = Temporalio::SimplePlugin.new(
name: 'PluginName',
activities: [method(:some_activity)]
)
Workflow-friendly libraries
You can provide a library with functionality for use within a Workflow if you'd like to abstract away some Temporal-specific details for your users. Your library will call elements you include in your Plugin such as Activities, Child Workflows, Signals, Updates, Queries, Nexus Operations, Interceptors, Data Converters, and any other code as long as it follows these requirements:
- It should be deterministic, running the same way every time it’s executed. Non-deterministic code should go in Activities or Nexus Operations.
- See observability to avoid duplicating observation side effects when Workflows replay.
- Put other side effects inside of Activities or Local Activities. This helps your Workflow handle being restarted, resumed, or executed in a different process from where it originally began without losing correctness or state consistency.
- See testing your Plugin to write tests that check for issues with side effects.
- It should run quickly since it may be replayed many times during a long Workflow execution. More expensive code should go in Activities or Nexus Operations.
A Plugin should allow a user to decompose their Workflows into Activities, as well as Child Workflows and Nexus Calls when needed. This gives users granular control through retries and timeouts, debuggability through the Temporal UI, operability with resets, pauses, and cancels, memoization for efficiency and resumability, and scalability using task queues and Workers.
Users use Workflows for:
- Orchestration and decision-making
- Interactivity via message-passing
- Tracing and observability
Making changes to your library
Your users may want to keep their Workflows running across deployments of their Worker code. If their deployment includes a new version of your Plugin, changes to your Plugin could break Workflow code that started before the new version was deployed. This can be due to non-deterministic behavior from code changes in your Plugin.
See testing to see how to test for this. And, if you make substantive changes, you need to use patching.
Example of a Workflow library that uses a Plugin in Python
Built-in Workflows
You can provide a built-in Workflow in a SimplePlugin. It’s callable as a Child Workflow or standalone. When you want to provide a piece of functionality that's more complex than an Activity, you can:
- Use a Workflow Library that runs directly in the end user’s Workflow
- Add a Child Workflow
Consider adding a Child Workflow when one or more of these conditions applies:
- That child should outlive the parent.
- The Workflow Event History would otherwise not scale in parent Workflows.
- When you want a separate Workflow ID for the child so that it can be operated independently of the parent's state (canceled, terminated, paused).
Any Workflow can be run as a standalone Workflow or as a Child Workflow, so registering a Child Workflow in a SimplePlugin is the same as registering any Workflow.
features/snippets/plugins/plugins.go
func HelloWorkflow(ctx workflow.Context, name string) (string, error) {
return "Hello, " + name + "!", nil
}
func createWorkflowPlugin() (*temporal.SimplePlugin, error) {
return temporal.NewSimplePlugin(temporal.SimplePluginOptions{
Name: "PluginName",
RunContextBefore: func(ctx context.Context, options temporal.SimplePluginRunContextBeforeOptions) error {
options.Registry.RegisterWorkflowWithOptions(
HelloWorkflow,
workflow.RegisterOptions{Name: "HelloWorkflow"},
)
return nil
},
})
}
features/snippets/plugins/plugins.java
@WorkflowInterface
public interface HelloWorkflow {
@WorkflowMethod
String run(String name);
}
public static class HelloWorkflowImpl implements HelloWorkflow {
@Override
public String run(String name) {
return "Hello, " + name + "!";
}
}
SimplePlugin workflowPlugin =
SimplePlugin.newBuilder("PluginName")
.registerWorkflowImplementationTypes(HelloWorkflowImpl.class)
.build();
features/snippets/plugins/plugins.py
@workflow.defn
class HelloWorkflow:
@workflow.run
async def run(self, name: str) -> str:
return f"Hello, {name}!"
plugin = SimplePlugin("PluginName", workflows=[HelloWorkflow])
features/snippets/plugins/plugins.cs
[Workflow]
class SimpleWorkflow
{
[WorkflowRun]
public Task<string> RunAsync(string name) => Task.FromResult($"Hello, {name}!");
}
SimplePlugin workflowPlugin = new SimplePlugin(
"PluginName",
new SimplePluginOptions() { }.AddWorkflow<SimpleWorkflow>());
features/snippets/plugins/plugins.rb
class HelloWorkflow < Temporalio::Workflow::Definition
def execute(name)
"Hello, #{name}!"
end
end
plugin = Temporalio::SimplePlugin.new(
name: 'PluginName',
workflows: [HelloWorkflow]
)
Built-in Nexus Operations
Nexus calls are used from Workflows similar to Activities and you can check out some common Nexus Use Cases. Like Activities, Nexus Call arguments and return values must be serializable.
features/snippets/plugins/plugins.go
type WeatherInput struct {
City string `json:"city"`
}
type Weather struct {
City string `json:"city"`
TemperatureRange string `json:"temperatureRange"`
Conditions string `json:"conditions"`
}
var WeatherService = nexus.NewService("weather-service")
var GetWeatherOperation = nexus.NewSyncOperation(
"get-weather",
func(ctx context.Context, input WeatherInput, options nexus.StartOperationOptions) (Weather, error) {
return Weather{
City: input.City,
TemperatureRange: "14-20C",
Conditions: "Sunny with wind.",
}, nil
},
)
func createNexusPlugin() (*temporal.SimplePlugin, error) {
return temporal.NewSimplePlugin(temporal.SimplePluginOptions{
Name: "PluginName",
RunContextBefore: func(ctx context.Context, options temporal.SimplePluginRunContextBeforeOptions) error {
options.Registry.RegisterNexusService(WeatherService)
return nil
},
})
}
features/snippets/plugins/plugins.java
// Example Nexus service implementation
public class WeatherService {
public Weather getWeather(WeatherInput input) {
return new Weather(input.getCity(), "14-20C", "Sunny with wind.");
}
}
public static class Weather {
private final String city;
private final String temperatureRange;
private final String conditions;
public Weather(String city, String temperatureRange, String conditions) {
this.city = city;
this.temperatureRange = temperatureRange;
this.conditions = conditions;
}
// Getters...
}
public static class WeatherInput {
private final String city;
public WeatherInput(String city) {
this.city = city;
}
public String getCity() {
return city;
}
}
SimplePlugin nexusPlugin =
SimplePlugin.newBuilder("PluginName")
.registerNexusServiceImplementation(new WeatherService())
.build();
features/snippets/plugins/plugins.py
@nexusrpc.service
class WeatherService:
get_weather_nexus_operation: nexusrpc.Operation[WeatherInput, Weather]
@nexusrpc.handler.service_handler(service=WeatherService)
class WeatherServiceHandler:
@nexusrpc.handler.sync_operation
async def get_weather_nexus_operation(
self, ctx: nexusrpc.handler.StartOperationContext, input: WeatherInput
) -> Weather:
return Weather(
city=input.city,
temperature_range="14-20C",
conditions="Sunny with wind.",
)
plugin = SimplePlugin("PluginName", nexus_service_handlers=[WeatherServiceHandler()])
features/snippets/plugins/plugins.ts
const testServiceHandler = nexus.serviceHandler(
nexus.service('testService', {
testSyncOp: nexus.operation<string, string>(),
}),
{
async testSyncOp(_, input) {
return input;
},
},
);
const plugin = new SimplePlugin({
name: 'plugin-name',
nexusServices: [testServiceHandler],
});
features/snippets/plugins/plugins.cs
[NexusService]
public interface IStringService
{
[NexusOperation]
string DoSomething(string name);
}
[NexusServiceHandler(typeof(IStringService))]
public class HandlerFactoryStringService
{
private readonly Func<IOperationHandler<string, string>> handlerFactory;
public HandlerFactoryStringService(Func<IOperationHandler<string, string>> handlerFactory) =>
this.handlerFactory = handlerFactory;
[NexusOperationHandler]
public IOperationHandler<string, string> DoSomething() => handlerFactory();
}
SimplePlugin nexusPlugin = new SimplePlugin(
"PluginName",
new SimplePluginOptions() { }.AddNexusService(new HandlerFactoryStringService(() =>
OperationHandler.Sync<string, string>((ctx, name) => $"Hello, {name}")))
);
Custom Data Converters
A custom Data Converter can alter data formats or provide compression or encryption.
Note that you can use an existing Data Converter such as, in Python, PydanticPayloadConverter in your Plugin.
features/snippets/plugins/plugins.go
func createConverterPlugin() (*temporal.SimplePlugin, error) {
customConverter := converter.GetDefaultDataConverter() // Or your custom converter
return temporal.NewSimplePlugin(temporal.SimplePluginOptions{
Name: "PluginName",
DataConverter: customConverter,
})
}
features/snippets/plugins/plugins.java
SimplePlugin converterPlugin =
SimplePlugin.newBuilder("PluginName")
.customizeDataConverter(
existingConverter -> {
// Customize the data converter
// This example keeps the existing converter unchanged
// In practice, you might wrap it with additional functionality
return existingConverter;
})
.build();
features/snippets/plugins/plugins.py
def set_converter(converter: DataConverter | None) -> DataConverter:
if converter is None or converter == DataConverter.default:
return pydantic_data_converter
# Should consider interactions with other plugins,
# as this will override the data converter.
# This may mean failing, warning, or something else
return converter
plugin = SimplePlugin("PluginName", data_converter=set_converter)
features/snippets/plugins/plugins.ts
const codec: PayloadCodec = {
encode(_payloads: Payload[]): Promise<Payload[]> {
throw new Error();
},
decode(_payloads: Payload[]): Promise<Payload[]> {
throw new Error();
},
};
const plugin = new SimplePlugin({
name: 'plugin-name',
dataConverter: (converter: DataConverter | undefined) => ({
...converter,
payloadCodecs: [...(converter?.payloadCodecs ?? []), codec],
}),
});
features/snippets/plugins/plugins.cs
private class Codec : IPayloadCodec
{
public Task<IReadOnlyCollection<Payload>> EncodeAsync(IReadOnlyCollection<Payload> payloads) => throw new NotImplementedException();
public Task<IReadOnlyCollection<Payload>> DecodeAsync(IReadOnlyCollection<Payload> payloads) => throw new NotImplementedException();
}
SimplePlugin converterPlugin = new SimplePlugin(
"PluginName",
new SimplePluginOptions()
{
DataConverterOption = new SimplePluginOptions.SimplePluginOption<DataConverter>(
(converter) => converter with { PayloadCodec = new Codec() }
),
});
features/snippets/plugins/plugins.rb
custom_converter = Temporalio::Converters::DataConverter.new(
payload_converter: Temporalio::Converters::PayloadConverter.default
)
plugin = Temporalio::SimplePlugin.new(
name: 'PluginName',
data_converter: custom_converter
)
Interceptors
Interceptors are middleware that can run before and after various calls such as Activities, Workflows, and Signals. You can learn more about interceptors for the details of implementing them. They're used to:
- Create side effects such as logging and tracing.
- Modify arguments, such as adding headers for authorization or tracing propagation.
features/snippets/plugins/plugins.go
type SomeWorkerInterceptor struct {
interceptor.WorkerInterceptorBase
}
type SomeClientInterceptor struct {
interceptor.ClientInterceptorBase
}
func createInterceptorPlugin() (*temporal.SimplePlugin, error) {
return temporal.NewSimplePlugin(temporal.SimplePluginOptions{
Name: "PluginName",
WorkerInterceptors: []interceptor.WorkerInterceptor{&SomeWorkerInterceptor{}},
ClientInterceptors: []interceptor.ClientInterceptor{&SomeClientInterceptor{}},
})
}
features/snippets/plugins/plugins.java
public class SomeWorkerInterceptor extends WorkerInterceptorBase {
// Your worker interceptor implementation
}
public class SomeClientInterceptor extends WorkflowClientInterceptorBase {
// Your client interceptor implementation
}
SimplePlugin interceptorPlugin =
SimplePlugin.newBuilder("PluginName")
.addWorkerInterceptors(new SomeWorkerInterceptor())
.addClientInterceptors(new SomeClientInterceptor())
.build();
features/snippets/plugins/plugins.py
class SomeWorkerInterceptor(temporalio.worker.Interceptor):
pass # Your implementation
class SomeClientInterceptor(temporalio.client.Interceptor):
pass # Your implementation
plugin = SimplePlugin(
"PluginName", interceptors=[SomeWorkerInterceptor(), SomeClientInterceptor()]
)
features/snippets/plugins/plugins.ts
class MyWorkflowClientInterceptor implements WorkflowClientInterceptor {}
class MyActivityInboundInterceptor implements ActivityInboundCallsInterceptor {}
class MyActivityOutboundInterceptor implements ActivityOutboundCallsInterceptor {}
const workflowInterceptorsPath = '';
const plugin = new SimplePlugin({
name: 'plugin-name',
clientInterceptors: {
workflow: [new MyWorkflowClientInterceptor()],
},
workerInterceptors: {
client: {
workflow: [new MyWorkflowClientInterceptor()],
},
workflowModules: [workflowInterceptorsPath],
activity: [
(_: Context) => ({
inbound: new MyActivityInboundInterceptor(),
outbound: new MyActivityOutboundInterceptor(),
}),
],
},
});
features/snippets/plugins/plugins.cs
private class SomeClientInterceptor : IClientInterceptor
{
public ClientOutboundInterceptor InterceptClient(
ClientOutboundInterceptor nextInterceptor) =>
throw new NotImplementedException();
}
private class SomeWorkerInterceptor : IWorkerInterceptor
{
public WorkflowInboundInterceptor InterceptWorkflow(
WorkflowInboundInterceptor nextInterceptor) =>
throw new NotImplementedException();
public ActivityInboundInterceptor InterceptActivity(
ActivityInboundInterceptor nextInterceptor) =>
throw new NotImplementedException();
}
SimplePlugin interceptorPlugin = new SimplePlugin(
"PluginName",
new SimplePluginOptions()
{
ClientInterceptors = new List<IClientInterceptor>() { new SomeClientInterceptor() },
WorkerInterceptors = new List<IWorkerInterceptor>() { new SomeWorkerInterceptor() },
});
features/snippets/plugins/plugins.rb
class SomeWorkerInterceptor
include Temporalio::Worker::Interceptor::Workflow
def intercept_workflow(next_interceptor)
# Your interceptor implementation
next_interceptor
end
end
class SomeClientInterceptor
include Temporalio::Client::Interceptor
def intercept_client(next_interceptor)
# Your interceptor implementation
next_interceptor
end
end
plugin = Temporalio::SimplePlugin.new(
name: 'PluginName',
client_interceptors: [SomeClientInterceptor.new],
worker_interceptors: [SomeWorkerInterceptor.new]
)
Special considerations for different languages
Each of the SDKs has nuances you should be aware of so you can account for it in your code.
Python
You can choose to run your Workflows in a sandbox in Python. This lets you run Workflow code in a sandbox environment to help prevent non-determinism errors in your application. To work for users who use sandboxing, your Plugin should specify the Workflow runner that it uses.
features/snippets/plugins/plugins.py
def workflow_runner(runner: WorkflowRunner | None) -> WorkflowRunner:
if not runner:
raise ValueError("No WorkflowRunner provided to the plugin.")
# If in sandbox, add additional passthrough
if isinstance(runner, SandboxedWorkflowRunner):
return dataclasses.replace(
runner,
restrictions=runner.restrictions.with_passthrough_modules("module"),
)
return runner
plugin = SimplePlugin("PluginName", workflow_runner=workflow_runner)
Testing your Plugin
To test your Plugin, you'll write a normal Temporal Workflow tests, having included the plugin in your Client.
Two special concerns are versioning tests, for when you're making changes to your plugin, and testing unwanted side effects.
Versioning tests
When you make changes to your plugin after it has already shipped to users, it's recommended that you set up replay testing on each important change to make sure that you’re not causing non-determinism errors for your users.
Side effects tests
Your Plugin should cater to Workflows resuming in different processes than the ones they started on and then replaying from the beginning, which can happen, for example, after an intermittent failure.
You can ensure you're not depending on local side effects by turning Workflow caching off, which will mean that the Workflow replays from the top each time it progresses:
features/snippets/worker/worker.go
worker.SetStickyWorkflowCacheSize(0)
w := worker.New(c, "task-queue", worker.Options{})
features/snippets/worker/worker.java
WorkerFactory factory =
WorkerFactory.newInstance(
client, WorkerFactoryOptions.newBuilder().setWorkflowCacheSize(0).build());
Worker worker = factory.newWorker("task-queue");
features/snippets/worker/worker.py
worker = Worker(client, task_queue="task-queue", max_cached_workflows=0)
features/snippets/worker/worker.ts
const worker = await Worker.create({
connection,
taskQueue: 'task-queue',
maxCachedWorkflows: 0,
});
features/snippets/worker/worker.cs
using var worker = new TemporalWorker(
client,
new TemporalWorkerOptions("task-queue")
{
MaxCachedWorkflows = 0
});
features/snippets/worker/worker.rb
worker = Temporalio::Worker.new(
client: client,
task_queue: 'task-queue',
max_cached_workflows: 0
)
Check for duplicate side effects or other types of failures.
It's harder to test against side effects to global variables, so this practice is best avoided entirely.