Skip to content

Commit

Permalink
Python: Add the Dapr Runtime for Processes (#9642)
Browse files Browse the repository at this point in the history
### Motivation and Context

In addition to the Python local process runtime, we're adding the
ability to congfigure the Dapr runtime for Python SK processes.

<!-- Thank you for your contribution to the semantic-kernel repo!
Please help reviewers and future users, providing the following
information:
  1. Why is this change required?
  2. What problem does it solve?
  3. What scenario does it contribute to?
  4. If it fixes an open issue, please link to the issue here.
-->

### Description

The PR introduces:
- The ability to configure the Dapr Runtime for Python processes
- Adds a sample FastAPI app showing how to configure the Dapr runtime
with a sample SK Python process (see the README for details on setup)
- Adds some unit test coverage for the Dapr runtime
- Closes #9355 

<!-- Describe your changes, the overall approach, the underlying design.
These notes will help understanding how your code works. Thanks! -->

### Contribution Checklist

<!-- Before submitting this PR, please make sure: -->

- [X] The code builds clean without any errors or warnings
- [X] The PR follows the [SK Contribution
Guidelines](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md)
and the [pre-submission formatting
script](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md#development-scripts)
raises no violations
- [X] All unit tests pass, and I have added new tests where possible
- [X] I didn't break anyone 😄
  • Loading branch information
moonbox3 authored Nov 15, 2024
1 parent 434db27 commit 29e3e83
Show file tree
Hide file tree
Showing 66 changed files with 3,449 additions and 206 deletions.
20 changes: 20 additions & 0 deletions python/.vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,26 @@
"request": "launch",
"program": "${file}",
"console": "integratedTerminal",
"justMyCode": true
},
{
"name": "Python FastAPI app with Dapr",
"type": "python",
"request": "launch",
"program": "${workspaceFolder}/samples/demos/process_with_dapr/fastapi_app.py",
"console": "integratedTerminal",
"preLaunchTask": "daprd-debug-python",
"postDebugTask": "daprd-down-python",
"justMyCode": false
},
{
"name": "Python Flask API app with Dapr",
"type": "python",
"request": "launch",
"program": "${workspaceFolder}/samples/demos/process_with_dapr/flask_app.py",
"console": "integratedTerminal",
"preLaunchTask": "daprd-debug-python",
"postDebugTask": "daprd-down-python",
"justMyCode": false
}
]
Expand Down
14 changes: 14 additions & 0 deletions python/.vscode/tasks.json
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,20 @@
"panel": "shared"
},
"problemMatcher": []
},
{
"label": "daprd-debug-python",
"type": "daprd",
"appId": "dapr-processes",
"httpPort": 3500,
"appPort": 5001,
"grpcPort": 53317,
"metricsPort": 9091
},
{
"label": "daprd-down-python",
"type": "daprd-down",
"appId": "dapr-processes"
}
],
"inputs": [
Expand Down
5 changes: 5 additions & 0 deletions python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,11 @@ pandas = [
aws = [
"boto3>=1.28.57",
]
dapr = [
"dapr>=1.14.0",
"dapr-ext-fastapi>=1.14.0",
"flask-dapr>=1.14.0"
]

[tool.uv]
prerelease = "if-necessary-or-explicit"
Expand Down
28 changes: 14 additions & 14 deletions python/samples/concepts/processes/cycles_with_fan_in.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ class KickOffStep(KernelProcessStep):

@kernel_function(name=KICK_OFF_FUNCTION)
async def print_welcome_message(self, context: KernelProcessStepContext):
await context.emit_event(process_event=CommonEvents.StartARequested.value, data="Get Going A")
await context.emit_event(process_event=CommonEvents.StartBRequested.value, data="Get Going B")
await context.emit_event(process_event=CommonEvents.StartARequested, data="Get Going A")
await context.emit_event(process_event=CommonEvents.StartBRequested, data="Get Going B")


# Define a sample `AStep` step that will emit an event after 1 second.
Expand All @@ -52,7 +52,7 @@ class AStep(KernelProcessStep):
@kernel_function()
async def do_it(self, context: KernelProcessStepContext):
await asyncio.sleep(1)
await context.emit_event(process_event=CommonEvents.AStepDone.value, data="I did A")
await context.emit_event(process_event=CommonEvents.AStepDone, data="I did A")


# Define a sample `BStep` step that will emit an event after 2 seconds.
Expand All @@ -61,7 +61,7 @@ class BStep(KernelProcessStep):
@kernel_function()
async def do_it(self, context: KernelProcessStepContext):
await asyncio.sleep(2)
await context.emit_event(process_event=CommonEvents.BStepDone.value, data="I did B")
await context.emit_event(process_event=CommonEvents.BStepDone, data="I did B")


# Define a sample `CStepState` that will keep track of the current cycle.
Expand All @@ -84,9 +84,9 @@ async def do_it(self, context: KernelProcessStepContext, astepdata: str, bstepda
print(f"CStep Current Cycle: {self.state.current_cycle}")
if self.state.current_cycle == 3:
print("CStep Exit Requested")
await context.emit_event(process_event=CommonEvents.ExitRequested.value)
await context.emit_event(process_event=CommonEvents.ExitRequested)
return
await context.emit_event(process_event=CommonEvents.CStepDone.value)
await context.emit_event(process_event=CommonEvents.CStepDone)


kernel = Kernel()
Expand All @@ -105,25 +105,25 @@ async def cycles_with_fan_in():
myCStep = process.add_step(step_type=CStep)

# Define the input event and where to send it to
process.on_input_event(event_id=CommonEvents.StartProcess.value).send_event_to(target=kickoff_step)
process.on_input_event(event_id=CommonEvents.StartProcess).send_event_to(target=kickoff_step)

# Define the process flow
kickoff_step.on_event(event_id=CommonEvents.StartARequested.value).send_event_to(target=myAStep)
kickoff_step.on_event(event_id=CommonEvents.StartBRequested.value).send_event_to(target=myBStep)
myAStep.on_event(event_id=CommonEvents.AStepDone.value).send_event_to(target=myCStep, parameter_name="astepdata")
kickoff_step.on_event(event_id=CommonEvents.StartARequested).send_event_to(target=myAStep)
kickoff_step.on_event(event_id=CommonEvents.StartBRequested).send_event_to(target=myBStep)
myAStep.on_event(event_id=CommonEvents.AStepDone).send_event_to(target=myCStep, parameter_name="astepdata")

# Define the fan in behavior once both AStep and BStep are done
myBStep.on_event(event_id=CommonEvents.BStepDone.value).send_event_to(target=myCStep, parameter_name="bstepdata")
myCStep.on_event(event_id=CommonEvents.CStepDone.value).send_event_to(target=kickoff_step)
myCStep.on_event(event_id=CommonEvents.ExitRequested.value).stop_process()
myBStep.on_event(event_id=CommonEvents.BStepDone).send_event_to(target=myCStep, parameter_name="bstepdata")
myCStep.on_event(event_id=CommonEvents.CStepDone).send_event_to(target=kickoff_step)
myCStep.on_event(event_id=CommonEvents.ExitRequested).stop_process()

# Build the process
kernel_process = process.build()

async with await start(
process=kernel_process,
kernel=kernel,
initial_event=KernelProcessEvent(id=CommonEvents.StartProcess.value, data="foo"),
initial_event=KernelProcessEvent(id=CommonEvents.StartProcess, data="foo"),
) as process_context:
process_state = await process_context.get_state()
c_step_state: KernelProcessStepState[CStepState] = next(
Expand Down
12 changes: 6 additions & 6 deletions python/samples/concepts/processes/nested_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,12 @@ async def repeat(self, message: str, context: KernelProcessStepContext, count: i
print(f"[REPEAT] {output}")

await context.emit_event(
process_event=ProcessEvents.OutputReadyPublic.value,
process_event=ProcessEvents.OutputReadyPublic,
data=output,
visibility=KernelProcessEventVisibility.Public,
)
await context.emit_event(
process_event=ProcessEvents.OutputReadyInternal.value,
process_event=ProcessEvents.OutputReadyInternal,
data=output,
visibility=KernelProcessEventVisibility.Internal,
)
Expand All @@ -74,7 +74,7 @@ def create_linear_process(name: str):
echo_step = process_builder.add_step(step_type=EchoStep)
repeat_step = process_builder.add_step(step_type=RepeatStep)

process_builder.on_input_event(event_id=ProcessEvents.StartProcess.value).send_event_to(target=echo_step)
process_builder.on_input_event(event_id=ProcessEvents.StartProcess).send_event_to(target=echo_step)

echo_step.on_function_result(function_name=EchoStep.ECHO).send_event_to(
target=repeat_step, parameter_name="message"
Expand All @@ -93,16 +93,16 @@ async def nested_process():

nested_process_step = process_builder.add_step_from_process(create_linear_process("Inner"))

process_builder.steps[1].on_event(ProcessEvents.OutputReadyInternal.value).send_event_to(
nested_process_step.where_input_event_is(ProcessEvents.StartProcess.value)
process_builder.steps[1].on_event(ProcessEvents.OutputReadyInternal).send_event_to(
nested_process_step.where_input_event_is(ProcessEvents.StartProcess)
)

process = process_builder.build()

test_input = "Test"

process_handle = await start(
process=process, kernel=kernel, initial_event=ProcessEvents.StartProcess.value, data=test_input
process=process, kernel=kernel, initial_event=ProcessEvents.StartProcess, data=test_input
)
process_info = await process_handle.get_state()

Expand Down
180 changes: 180 additions & 0 deletions python/samples/demos/process_with_dapr/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
# Semantic Kernel Processes in Dapr

This demo contains a FastAPI app that uses Dapr to run a Semantic Kernel Process. Dapr is a portable, event-driven runtime that can simplify the process of building resilient, stateful application that run in the cloud and/or edge. Dapr is a natural fit for hosting Semantic Kernel Processes and allows you to scale your processes in size and quantity without sacrificing performance, or reliability.

For more information about Semantic Kernel Processes and Dapr, see the following documentation:

#### Semantic Kernel Processes

- [Overview of the Process Framework (docs)](https://learn.microsoft.com/semantic-kernel/frameworks/process/process-framework)
- [Getting Started with Processes (samples)](../../getting_started_with_processes/)
- [Semantic Kernel Dapr Runtime](../../../semantic_kernel/processes/dapr_runtime/)

#### Dapr

- [Dapr documentation](https://docs.dapr.io/)
- [Dapr Actor documentation](https://v1-10.docs.dapr.io/developing-applications/building-blocks/actors/)
- [Dapr local development](https://docs.dapr.io/getting-started/install-dapr-selfhost/)

### Supported Dapr Extensions:

| Extension | Supported |
|--------------------|:----:|
| FastAPI ||
| Flask ||
| gRPC ||
| Dapr Workflow ||

## Running the Demo

Before running this Demo, make sure to configure Dapr for local development following the links above. The Dapr containers must be running for this demo application to run.

```mermaid
flowchart LR
Kickoff --> A
Kickoff --> B
A --> C
B --> C
C -->|Count < 3| Kickoff
C -->|Count >= 3| End
classDef kickoffClass fill:#f9f,stroke:#333,stroke-width:2px;
class Kickoff kickoffClass;
End((End))
```

1. Build and run the sample. Running the Dapr service locally can be done using the Dapr Cli or with the Dapr VS Code extension. The VS Code extension is the recommended approach if you want to debug the code as it runs.
- If using VSCode to debug, select either the `Python FastAPI App with Dapr` or the `Python Flask API App with Dapr` option from the Run and Debug dropdown list.
1. When the service is up and running, it will expose a single API in localhost port 5001.

#### Invoking the process:

1. Open a web browser and point it to [http://localhost:5001/processes/1234](http://localhost:5001/processes/1234) to invoke a new process with `Id = "1234"`
1. When the process is complete, you should see `{"processId":"1234"}` in the web browser.
1. You should also see console output from the running service with logs that match the following:

```text
##### Kickoff ran.
##### AStep ran.
##### BStep ran.
##### CStep activated with Cycle = '1'.
##### CStep run cycle 2.
##### Kickoff ran.
##### AStep ran.
##### BStep ran.
##### CStep run cycle 3 - exiting.
```

Now refresh the page in your browser to run the same processes instance again. Now the logs should look like this:

```text
##### Kickoff ran.
##### AStep ran.
##### BStep ran.
##### CStep run cycle 4 - exiting.
```

Notice that the logs from the two runs are not the same. In the first run, the processes has not been run before and so it's initial
state came from what we defined in the process:

**_First Run_**

- `CState` is initialized with `Cycle = 1` which is the initial state that we specified while building the process.
- `CState` is invoked a total of two times before the terminal condition of `Cycle >= 3` is reached.

In the second run however, the process has persisted state from the first run:

**_Second Run_**

- `CState` is initialized with `Cycle = 3` which is the final state from the first run of the process.
- `CState` is invoked only once and is already in the terminal condition of `Cycle >= 3`.

If you create a new instance of the process with `Id = "ABCD"` by pointing your browser to [http://localhost:5001/processes/ABCD](http://localhost:5001/processes/ABCD), you will see the it will start with the initial state as expected.

## Understanding the Code

Below are the key aspects of the code that show how Dapr and Semantic Kernel Processes can be integrated into a FastAPI app:

- Create a new Dapr FastAPI app.
- Add the required Semantic Kernel and Dapr packages to your project:

**_General Imports and Dapr Packages_**

**_FastAPI App_**

```python
import logging
from contextlib import asynccontextmanager

import uvicorn
from dapr.ext.fastapi import DaprActor
from fastapi import FastAPI
from fastapi.responses import JSONResponse
```

**_Flask API App_**

```python
import asyncio
import logging

from flask import Flask, jsonify
from flask_dapr.actor import DaprActor
```

**_Semantic Kernel Process Imports_**

```python
from samples.demos.process_with_dapr.process.process import get_process
from samples.demos.process_with_dapr.process.steps import CommonEvents
from semantic_kernel import Kernel
from semantic_kernel.processes.dapr_runtime import (
register_fastapi_dapr_actors,
start,
)
```

**_Define the FastAPI app, Dapr App, and the DaprActor_**

```python
# Define the kernel that is used throughout the process
kernel = Kernel()


# Define a lifespan method that registers the actors with the Dapr runtime
@asynccontextmanager
async def lifespan(app: FastAPI):
print("## actor startup ##")
await register_fastapi_dapr_actors(actor, kernel)
yield


# Define the FastAPI app along with the DaprActor
app = FastAPI(title="SKProcess", lifespan=lifespan)
actor = DaprActor(app)
```

If using Flask, you will define:

```python
kernel = Kernel()

app = Flask("SKProcess")

# Enable DaprActor Flask extension
actor = DaprActor(app)

# Synchronously register actors
print("## actor startup ##")
register_flask_dapr_actors(actor, kernel)

# Create the global event loop
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
```

- Build and run a Process as you normally would. For this Demo we run a simple example process from with either a FastAPI or a Flask API method in response to a GET request.
- [See the FastAPI app here](./fastapi_app.py).
- [See the Flask API app here](./flask_app.py)
Loading

0 comments on commit 29e3e83

Please sign in to comment.