@@ -240,15 +240,15 @@ defmodule K8s.Client.Mint.HTTPAdapter do
240240
241241 @ impl true
242242 def handle_call ( { :request , method , path , headers , body } , from , state ) do
243- make_request ( state , method , path , headers , body , type: :sync , caller: from )
243+ make_request ( state , method , path , headers , body , from , type: :sync , caller: from )
244244 end
245245
246- def handle_call ( { :stream , method , path , headers , body } , _from , state ) do
247- make_request ( state , method , path , headers , body , type: :stream )
246+ def handle_call ( { :stream , method , path , headers , body } , from , state ) do
247+ make_request ( state , method , path , headers , body , from , type: :stream )
248248 end
249249
250- def handle_call ( { :stream_to , method , path , headers , body , stream_to } , _from , state ) do
251- make_request ( state , method , path , headers , body , type: :stream_to , stream_to: stream_to )
250+ def handle_call ( { :stream_to , method , path , headers , body , stream_to } , from , state ) do
251+ make_request ( state , method , path , headers , body , from , type: :stream_to , stream_to: stream_to )
252252 end
253253
254254 def handle_call ( { :websocket_request , path , headers } , from , state ) do
@@ -326,13 +326,39 @@ defmodule K8s.Client.Mint.HTTPAdapter do
326326 end
327327 end
328328
329+ def handle_info ( { :DOWN , ref , :process , _pid , _reason } , state ) do
330+ state =
331+ state . requests
332+ |> Map . filter ( fn { _request_ref , request } -> request . caller_ref == ref end )
333+ |> Map . keys ( )
334+ |> Enum . reduce_while ( state , fn
335+ request_ref , state ->
336+ case pop_in ( state . requests [ request_ref ] ) do
337+ { % HTTPRequest { } , state } ->
338+ conn = Mint.HTTP2 . cancel_request ( state . conn , request_ref ) |> elem ( 1 )
339+ { :cont , struct! ( state , conn: conn ) }
340+
341+ { _ , state } ->
342+ { :halt , { :stop , state } }
343+ end
344+ end )
345+
346+ case state do
347+ { :stop , state } ->
348+ { :stop , :normal , state }
349+
350+ state ->
351+ { :noreply , state }
352+ end
353+ end
354+
329355 @ impl true
330356 def terminate ( _reason , state ) do
331357 state = flush_buffer ( state )
332358
333359 state
334360 |> Map . get ( :requests )
335- |> Enum . filter ( fn { _ref , request } -> is_map_key ( request , :websocket ) end )
361+ |> Enum . filter ( fn { _ref , request } -> is_struct ( request , WebSocketRequest ) end )
336362 |> Enum . each ( fn { request_ref , request } ->
337363 { :ok , _websocket , data } = Mint.WebSocket . encode ( request . websocket , :close )
338364 Mint.WebSocket . stream_request_body ( state . conn , request_ref , data )
@@ -342,15 +368,25 @@ defmodule K8s.Client.Mint.HTTPAdapter do
342368 :ok
343369 end
344370
345- @ spec make_request ( t ( ) , binary ( ) , binary ( ) , Mint.Types . headers ( ) , binary ( ) , keyword ( ) ) ::
371+ @ spec make_request (
372+ t ( ) ,
373+ binary ( ) ,
374+ binary ( ) ,
375+ Mint.Types . headers ( ) ,
376+ binary ( ) ,
377+ GenServer . from ( ) ,
378+ keyword ( )
379+ ) ::
346380 { :noreply , t ( ) } | { :reply , :ok | { :ok , reference ( ) } | { :error , HTTPError . t ( ) } , t ( ) }
347- defp make_request ( state , method , path , headers , body , extra ) do
381+ defp make_request ( state , method , path , headers , body , caller , extra ) do
382+ caller_ref = caller |> elem ( 0 ) |> Process . monitor ( )
383+
348384 case Mint.HTTP . request ( state . conn , method , path , headers , body ) do
349385 { :ok , conn , request_ref } ->
350386 state =
351387 put_in (
352388 state . requests [ request_ref ] ,
353- HTTPRequest . new ( extra )
389+ extra |> Keyword . put ( :caller_ref , caller_ref ) |> HTTPRequest . new ( )
354390 )
355391
356392 case extra [ :type ] do
@@ -366,15 +402,27 @@ defmodule K8s.Client.Mint.HTTPAdapter do
366402 end
367403 end
368404
369- @ spec upgrade_to_websocket ( t ( ) , binary ( ) , Mint.Types . headers ( ) , pid ( ) , WebSocketRequest . t ( ) ) ::
405+ @ spec upgrade_to_websocket (
406+ t ( ) ,
407+ binary ( ) ,
408+ Mint.Types . headers ( ) ,
409+ GenServer . from ( ) ,
410+ WebSocketRequest . t ( )
411+ ) ::
370412 { :noreply , t ( ) } | { :reply , { :error , HTTPError . t ( ) , t ( ) } }
371413 defp upgrade_to_websocket ( state , path , headers , caller , websocket_request ) do
414+ caller_ref = caller |> elem ( 0 ) |> Process . monitor ( )
415+
372416 case Mint.WebSocket . upgrade ( :wss , state . conn , path , headers ) do
373417 { :ok , conn , request_ref } ->
374418 state =
375419 put_in (
376420 state . requests [ request_ref ] ,
377- UpgradeRequest . new ( caller: caller , websocket_request: websocket_request )
421+ UpgradeRequest . new (
422+ caller: caller ,
423+ caller_ref: caller_ref ,
424+ websocket_request: struct! ( websocket_request , caller_ref: caller_ref )
425+ )
378426 )
379427
380428 { :noreply , struct! ( state , conn: conn ) }
0 commit comments