Skip to content

Conversation

@wodeshijie33941
Copy link

As described by # 292 Issuse, using both 'output_Schema' and 'create_variable_message' will verify whether the format of the data is correct.
However, the problem is that the read() and close() of the stream are executed by different goroutines. At this point, the plugin daemon receives an error message from the plugin ("SESSION_MESSAGE_TYPE_ERROR"), and then executes the stream's close(). Close() internally calls the stream's before Close callback function, while stream's read() calls the filter callback function. The problem lies here, the stream's before Close callback function and read () callback function access the critical resource variables (a map variable). This critical resource variable was accessed in multiple goroutines but was not protected, resulting in a 'concurrent map iteration and map write' error and ultimately causing the daemon to crash.
Simply put, the error message sent by dify-plugin triggered the crash bug of dify-plugin-daemon
Solution:
Use locks to protect critical resources

@Yeuoly
Copy link
Contributor

Yeuoly commented Jun 16, 2025

I've looked into the code, but the ideal way to fix this issue is to avoid race condition to bindToolValidator.variables.

  1. response was consumed in goroutine baseSSEService.routine.Submit, what's more, it adds new variable into bindToolValidator.variables.
  2. response was closed by goroutine baseSSEService, also, a validation was made here and bindToolValidator.variables were accessed concurrently.

So bad that they are running in 2 different goroutines, but this is not an issue to stream module and I'd like to change the implementation of this patch.

I can push directly into your branch if you don't mind, just fell free to tell what's your thoughts.

@wodeshijie33941
Copy link
Author

wodeshijie33941 commented Jun 17, 2025

I've looked into the code, but the ideal way to fix this issue is to avoid race condition to bindToolValidator.variables.

  1. response was consumed in goroutine baseSSEService.routine.Submit, what's more, it adds new variable into bindToolValidator.variables.
  2. response was closed by goroutine baseSSEService, also, a validation was made here and bindToolValidator.variables were accessed concurrently.

So bad that they are running in 2 different goroutines, but this is not an issue to stream module and I'd like to change the implementation of this patch.

I can push directly into your branch if you don't mind, just fell free to tell what's your thoughts.

Firstly, I strongly support a solution that fundamentally addresses the problem, rather than a temporary solution.

The reality is that there are 3 goroutines related to issue # 292, not 2 goroutines

  1. local_runtime.run.routine.Submit, This goroutines is used to listen to plugin stdout
  2. http_server.srv.ListenAndServe, This goroutines is used to handle HTTP requests
  3. base_sse.baseSSEService.routine.Submit, This goroutines is used to add new variables to bindToolValidator.variables

It should be noted that:
1.Both local_runtime.run.routine.Submit and http_server.srv.ListenAndServe coroutines call close()
2. According to the stack information provided by # 292 issuses, dify_plugin_daemon receives the "SESSION_MESSAGE_TYPE_ERROR" message, and then calls close(). Therefore, the close() should be called in the local_runtime.run.routine.Submit coroutine instead of the http_server.srv.ListenAndServe coroutine.

If the goroutines problem is solved by merging the local_runtime.run.routine.Submit goroutines http_server.srv.ListenAndServe goroutines and base_stse.baseSSEService.routine.Submit goroutines ,I personally think the cost is relatively high! If that's the case, it will also have a significant impact on the new energy of dify_plugin_daemon.

The above is just my personal opinion. If there are any questions, please continue to communicate and discuss.

@Yeuoly
Copy link
Contributor

Yeuoly commented Jun 17, 2025

Actually, local_runtime.run.routine.Submit was designed to receiving stdout stream in background once the plugin started, each plugin need to have at least one goroutine which handle this functionality, and the num of its instance never grow up by concurrency requests, and it only take care of IO stream of a subprocess.

At this point, you can see that each request will create 2 goroutines which I've mentioned above, one is http_server.srv.ListenAndServe, and another is base_sse.baseSSEService.routine.Submit which call response.Next to continuously consume messages and write it to gin.Writer, also, you will find http_server.srv.ListenAndServe just simply listen to http events and call stream.Close at the end just like the flow below

flowchart TD

    %% First flow: http_server
    A1[http_server.srv.ListenAndServe]
    A2[createStream]
    A3[setupHooks]
    A4[listenToHttpEvents]

    A1 --> A2 --> A3 --> A4

    %% Second flow: base_sse
    B1[base_sse.baseSSEService.routine.Submit]
    B2[listenToStream]
    B3[writeToSSE]

    B1 --> B2 --> B3

    %% Third flow: local_runtime with multiple requests
    C1[local_runtime.run.routine.Submit]
    C2a[request_1]
    C2b[request_2]
    C2c[request_3]

    C1 --> C2a
    C1 --> C2b
    C1 --> C2c
Loading

There is almost no performance issue in the flow, they are just simply listen and write, of course I've created a flamegraph, almost all the CPU resource was used on serialization, and the limitation is GIL inside plugin itself, not here, that's how goroutine it should be, just like nginx, we could have thousands coroutines without much resource.

Okay, what's more, we have 3 types of plugins: Local/Serverless/Debugging, they have been uniformed to the same IO interfaces to keep the behaviour the same, based on the uniformed interfaces, the result is the flow above, the only routine will may could be merged was base_sse.baseSSEService.routine.Submit, but you can not call response.Next() and select at the same time.

Yes I agree that we need a fundamental solution, that why I introduced stream.Stream[T], as you known, it's too hard to handle multiple channels:

  1. you can not close a channel multiple times or it will panic
  2. you can not write to a closed channel or it will panic
  3. you need an other channel to raise errors
  4. you never know when will a channel to be closed once you are not the repo maintainer which completely understand the code.

For stream.Stream, you can close it everywhere, it's idempotent, also you can write into it when it's closed, it helps a newbee to Golang could understand code and we cloud have lower review cost, no worry about channel panic or dead lock.

Finally, lets back to the issue itself, this a variables was accessed in different goroutines, as for me, avoid it is not a temporary solution, the problem only exists here, reference to *.gen.go, almost all the code here were generated by templates, it brings no mental burden to us, the exceptions are tools and agents, because of jsonSchemaValidator, the only way to reach what you said fundamental solution is only a single one goroutine, you can have a try.

@wodeshijie33941
Copy link
Author

Actually, local_runtime.run.routine.Submit was designed to receiving stdout stream in background once the plugin started, each plugin need to have at least one goroutine which handle this functionality, and the num of its instance never grow up by concurrency requests, and it only take care of IO stream of a subprocess.

At this point, you can see that each request will create 2 goroutines which I've mentioned above, one is http_server.srv.ListenAndServe, and another is base_sse.baseSSEService.routine.Submit which call response.Next to continuously consume messages and write it to gin.Writer, also, you will find http_server.srv.ListenAndServe just simply listen to http events and call stream.Close at the end just like the flow below

flowchart TD

    %% First flow: http_server
    A1[http_server.srv.ListenAndServe]
    A2[createStream]
    A3[setupHooks]
    A4[listenToHttpEvents]

    A1 --> A2 --> A3 --> A4

    %% Second flow: base_sse
    B1[base_sse.baseSSEService.routine.Submit]
    B2[listenToStream]
    B3[writeToSSE]

    B1 --> B2 --> B3

    %% Third flow: local_runtime with multiple requests
    C1[local_runtime.run.routine.Submit]
    C2a[request_1]
    C2b[request_2]
    C2c[request_3]

    C1 --> C2a
    C1 --> C2b
    C1 --> C2c
Loading

There is almost no performance issue in the flow, they are just simply listen and write, of course I've created a flamegraph, almost all the CPU resource was used on serialization, and the limitation is GIL inside plugin itself, not here, that's how goroutine it should be, just like nginx, we could have thousands coroutines without much resource.

Okay, what's more, we have 3 types of plugins: Local/Serverless/Debugging, they have been uniformed to the same IO interfaces to keep the behaviour the same, based on the uniformed interfaces, the result is the flow above, the only routine will may could be merged was base_sse.baseSSEService.routine.Submit, but you can not call response.Next() and select at the same time.

Yes I agree that we need a fundamental solution, that why I introduced stream.Stream[T], as you known, it's too hard to handle multiple channels:

  1. you can not close a channel multiple times or it will panic
  2. you can not write to a closed channel or it will panic
  3. you need an other channel to raise errors
  4. you never know when will a channel to be closed once you are not the repo maintainer which completely understand the code.

For stream.Stream, you can close it everywhere, it's idempotent, also you can write into it when it's closed, it helps a newbee to Golang could understand code and we cloud have lower review cost, no worry about channel panic or dead lock.

Finally, lets back to the issue itself, this a variables was accessed in different goroutines, as for me, avoid it is not a temporary solution, the problem only exists here, reference to *.gen.go, almost all the code here were generated by templates, it brings no mental burden to us, the exceptions are tools and agents, because of jsonSchemaValidator, the only way to reach what you said fundamental solution is only a single one goroutine, you can have a try.

Okay, I understand what you're saying. Avoiding goroutines competition is indeed a good solution. What I'm curious about is how to modify the code

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants