Skip to content

Commit d0652d1

Browse files
authored
KTOR-3156 KTOR-438 update pipeline merging algorithm (#2618)
1 parent 91442b1 commit d0652d1

File tree

2 files changed

+113
-32
lines changed

2 files changed

+113
-32
lines changed

ktor-utils/common/src/io/ktor/util/pipeline/Pipeline.kt

Lines changed: 71 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ package io.ktor.util.pipeline
66

77
import io.ktor.util.*
88
import io.ktor.util.collections.*
9-
import io.ktor.utils.io.*
109
import io.ktor.utils.io.concurrent.*
1110
import kotlinx.atomicfu.*
1211
import kotlin.coroutines.*
@@ -90,7 +89,15 @@ public open class Pipeline<TSubject : Any, TContext : Any>(
9089
}
9190

9291
/**
93-
* Inserts [phase] after the [reference] phase
92+
* Inserts [phase] after the [reference] phase. If there are other phases inserted after [reference], then [phase]
93+
* will be inserted after them.
94+
* Example:
95+
* ```
96+
* val pipeline = Pipeline<String, String>(a)
97+
* pipeline.insertPhaseAfter(a, b)
98+
* pipeline.insertPhaseAfter(a, c)
99+
* assertEquals(listOf(a, b, c), pipeline.items)
100+
* ```
94101
*/
95102
public fun insertPhaseAfter(reference: PipelinePhase, phase: PipelinePhase) {
96103
if (hasPhase(phase)) return
@@ -99,12 +106,29 @@ public open class Pipeline<TSubject : Any, TContext : Any>(
99106
if (index == -1) {
100107
throw InvalidPhaseException("Phase $reference was not registered for this pipeline")
101108
}
109+
// insert after the last phase that has Relation.After on [reference]
110+
var lastRelatedPhaseIndex = index
111+
for (i in index + 1..phasesRaw.lastIndex) {
112+
val relation = (phasesRaw[i] as? PhaseContent<*, *>)?.relation ?: break
113+
val relatedTo = (relation as? PipelinePhaseRelation.After)?.relativeTo ?: continue
114+
lastRelatedPhaseIndex = if (relatedTo == reference) i else lastRelatedPhaseIndex
115+
}
102116

103-
phasesRaw.add(index + 1, PhaseContent<TSubject, TContext>(phase, PipelinePhaseRelation.After(reference)))
117+
phasesRaw.add(
118+
lastRelatedPhaseIndex + 1,
119+
PhaseContent<TSubject, TContext>(phase, PipelinePhaseRelation.After(reference))
120+
)
104121
}
105122

106123
/**
107-
* Inserts [phase] before the [reference] phase
124+
* Inserts [phase] before the [reference] phase.
125+
* Example:
126+
* ```
127+
* val pipeline = Pipeline<String, String>(c)
128+
* pipeline.insertPhaseBefore(c, a)
129+
* pipeline.insertPhaseBefore(c, b)
130+
* assertEquals(listOf(a, b, c), pipeline.items)
131+
* ```
108132
*/
109133
public fun insertPhaseBefore(reference: PipelinePhase, phase: PipelinePhase) {
110134
if (hasPhase(phase)) return
@@ -157,37 +181,34 @@ public open class Pipeline<TSubject : Any, TContext : Any>(
157181
}
158182

159183
val fromPhases = from.phasesRaw
160-
for (index in 0..fromPhases.lastIndex) {
161-
val fromPhaseOrContent = fromPhases[index]
162-
163-
val fromPhase =
164-
(fromPhaseOrContent as? PipelinePhase) ?: (fromPhaseOrContent as PhaseContent<*, *>).phase
165-
166-
if (!hasPhase(fromPhase)) {
167-
val fromPhaseRelation = when {
168-
fromPhaseOrContent === fromPhase -> PipelinePhaseRelation.Last
169-
else -> (fromPhaseOrContent as PhaseContent<*, *>).relation
184+
val toInsert = fromPhases.toMutableList()
185+
// the worst case is O(n^2), but it will happen only
186+
// when all phases were inserted before each other into the second pipeline
187+
// (see test testDependantPhasesLastCommon).
188+
// in practice, it will be linear time for most cases
189+
while (toInsert.isNotEmpty()) {
190+
val iterator = toInsert.iterator()
191+
while (iterator.hasNext()) {
192+
val fromPhaseOrContent = iterator.next()
193+
194+
val fromPhase = (fromPhaseOrContent as? PipelinePhase)
195+
?: (fromPhaseOrContent as PhaseContent<*, *>).phase
196+
197+
if (hasPhase(fromPhase)) {
198+
iterator.remove()
199+
} else {
200+
val inserted = insertRelativePhase(fromPhaseOrContent, fromPhase)
201+
if (!inserted) continue
202+
iterator.remove()
170203
}
171204

172-
when (fromPhaseRelation) {
173-
is PipelinePhaseRelation.Last -> addPhase(fromPhase)
174-
is PipelinePhaseRelation.Before -> insertPhaseBefore(
175-
fromPhaseRelation.relativeTo,
176-
fromPhase
177-
)
178-
is PipelinePhaseRelation.After -> insertPhaseAfter(
179-
fromPhaseRelation.relativeTo,
180-
fromPhase
181-
)
182-
}
183-
}
205+
if (fromPhaseOrContent is PhaseContent<*, *> && !fromPhaseOrContent.isEmpty) {
206+
@Suppress("UNCHECKED_CAST")
207+
fromPhaseOrContent as PhaseContent<TSubject, TContext>
184208

185-
if (fromPhaseOrContent is PhaseContent<*, *> && !fromPhaseOrContent.isEmpty) {
186-
@Suppress("UNCHECKED_CAST")
187-
fromPhaseOrContent as PhaseContent<TSubject, TContext>
188-
189-
fromPhaseOrContent.addTo(findPhase(fromPhase)!!)
190-
interceptorsQuantity += fromPhaseOrContent.size
209+
fromPhaseOrContent.addTo(findPhase(fromPhase)!!)
210+
interceptorsQuantity += fromPhaseOrContent.size
211+
}
191212
}
192213
}
193214
}
@@ -383,6 +404,24 @@ public open class Pipeline<TSubject : Any, TContext : Any>(
383404

384405
return false
385406
}
407+
408+
private fun insertRelativePhase(fromPhaseOrContent: Any, fromPhase: PipelinePhase): Boolean {
409+
val fromPhaseRelation = when {
410+
fromPhaseOrContent === fromPhase -> PipelinePhaseRelation.Last
411+
else -> (fromPhaseOrContent as PhaseContent<*, *>).relation
412+
}
413+
414+
when {
415+
fromPhaseRelation is PipelinePhaseRelation.Last ->
416+
addPhase(fromPhase)
417+
fromPhaseRelation is PipelinePhaseRelation.Before && hasPhase(fromPhaseRelation.relativeTo) ->
418+
insertPhaseBefore(fromPhaseRelation.relativeTo, fromPhase)
419+
fromPhaseRelation is PipelinePhaseRelation.After ->
420+
insertPhaseAfter(fromPhaseRelation.relativeTo, fromPhase)
421+
else -> return false
422+
}
423+
return true
424+
}
386425
}
387426

388427
/**

ktor-utils/common/test/io/ktor/util/PipelinePhasesTest.kt

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,4 +46,46 @@ class PipelinePhasesTest {
4646
phases1.merge(phases2)
4747
assertEquals(listOf(b, c, a), phases1.items)
4848
}
49+
50+
@Test
51+
fun testDependantPhasesNoCommon() {
52+
val pipeline1 = Pipeline<String, String>(a)
53+
val pipeline2 = Pipeline<String, String>(c)
54+
pipeline2.insertPhaseBefore(c, b)
55+
pipeline1.merge(pipeline2)
56+
assertEquals(listOf(a, b, c), pipeline1.items)
57+
}
58+
59+
@Test
60+
fun testDependantPhasesLastCommon() {
61+
val pipeline1 = Pipeline<String, String>(c)
62+
val pipeline2 = Pipeline<String, String>(c)
63+
pipeline2.insertPhaseBefore(c, b)
64+
pipeline2.insertPhaseBefore(b, a)
65+
assertEquals(listOf(a, b, c), pipeline2.items)
66+
pipeline1.merge(pipeline2)
67+
assertEquals(listOf(a, b, c), pipeline1.items)
68+
}
69+
70+
@Test
71+
fun testDependantPhasesOrderAfter() {
72+
val pipeline1 = Pipeline<String, String>(a)
73+
val pipeline2 = Pipeline<String, String>(a)
74+
pipeline2.insertPhaseAfter(a, b)
75+
pipeline2.insertPhaseAfter(a, c)
76+
assertEquals(listOf(a, b, c), pipeline2.items)
77+
pipeline1.merge(pipeline2)
78+
assertEquals(listOf(a, b, c), pipeline1.items)
79+
}
80+
81+
@Test
82+
fun testDependantPhasesOrderBefore() {
83+
val pipeline1 = Pipeline<String, String>(c)
84+
val pipeline2 = Pipeline<String, String>(c)
85+
pipeline2.insertPhaseBefore(c, a)
86+
pipeline2.insertPhaseBefore(c, b)
87+
assertEquals(listOf(a, b, c), pipeline2.items)
88+
pipeline1.merge(pipeline2)
89+
assertEquals(listOf(a, b, c), pipeline1.items)
90+
}
4991
}

0 commit comments

Comments
 (0)