Skip to content

Commit bfff466

Browse files
committed
latest changes from master branch
2 parents 59efcfc + a26d7d3 commit bfff466

33 files changed

+3866
-2102
lines changed

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
<img src='logo/mobius-star-200.png' width='125px' alt='Mobius logo' />
22
# Mobius: C# API for Spark
33

4-
[Mobius](https://github.com/Microsoft/Mobius) adds C# language binding to [Apache Spark](https://spark.apache.org/), enabling the implementation of Spark driver code and data processing operations in C#.
4+
[Mobius](https://github.com/Microsoft/Mobius) provides C# language binding to [Apache Spark](https://spark.apache.org/), enabling the implementation of Spark driver code and data processing operations in the languages supported in the .NET framework like C# or F#.
55

66
For example, the word count sample in Apache Spark can be implemented in C# as follows :
77

@@ -79,7 +79,7 @@ StreamingContext sparkStreamingContext = StreamingContext.GetOrCreate(checkpoint
7979
sparkStreamingContext.Start();
8080
sparkStreamingContext.AwaitTermination();
8181
```
82-
Refer to [Mobius\csharp\Samples](./csharp/Samples) directory and [sample usage](./csharp/Samples/Microsoft.Spark.CSharp/samplesusage.md) for complete samples.
82+
For more code samples, refer to [Mobius\examples](./examples) directory or [Mobius\csharp\Samples](./csharp/Samples) directory.
8383

8484
## API Documentation
8585

@@ -89,7 +89,7 @@ Refer to [Mobius C# API documentation](./csharp/Adapter/documentation/Mobius_API
8989

9090
Mobius API usage samples are available at:
9191

92-
* [Examples folder](./examples) which contains standalone [C# projects](./notes/running-mobius-app.md#running-mobius-examples-in-local-mode) that can be used as templates to start developing Mobius applications
92+
* [Examples folder](./examples) which contains standalone [C#/F# projects](./notes/running-mobius-app.md#running-mobius-examples-in-local-mode) that can be used as templates to start developing Mobius applications
9393

9494
* [Samples project](./csharp/Samples/Microsoft.Spark.CSharp/) which uses a comprehensive set of Mobius APIs to implement samples that are also used for functional validation of APIs
9595

build/Build.cmd

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -247,13 +247,14 @@ popd
247247

248248
if %CppSkipped% EQU 1 (
249249
@echo.
250-
@echo ============================================================================================
251-
@echo.
252-
@echo Note!!! Skipped to build Mobius C++ components due to missing VC++ Build Toolset.
253-
@echo If you want to compile C++ components, please enalble VC++ language from
254-
@echo Visual Studio. You can either download "Visual C++ Build Tools" availabe at
255-
@echo "http://landinghub.visualstudio.com/visual-cpp-build-tools"
250+
@echo ===============================================================================================================
251+
@echo. !!! Note !!!
252+
@echo Skipped building Mobius C++ component ^(RIOSock.dll^) due to missing VC++ Build Toolset.
253+
@echo Mobius uses this component to leverage socket optimization available in Windows
254+
@echo This is an optional component and Mobius will be fully functional even without this component
255+
@echo If you want to build this component, enable VC++ project in Visual Studio
256+
@echo or download "Visual C++ Build Tools" from "http://landinghub.visualstudio.com/visual-cpp-build-tools"
256257
@echo.
257-
@echo ============================================================================================
258+
@echo ===============================================================================================================
258259
@echo.
259-
)
260+
)

csharp/Adapter/Microsoft.Spark.CSharp/Adapter.csproj

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
<Compile Include="Configuration\IConfigurationService.cs" />
6363
<Compile Include="Core\Accumulator.cs" />
6464
<Compile Include="Core\Broadcast.cs" />
65+
<Compile Include="Core\HadoopConfiguration.cs" />
6566
<Compile Include="Core\Option.cs" />
6667
<Compile Include="Core\Partitioner.cs" />
6768
<Compile Include="Core\RDDCollector.cs" />
@@ -70,6 +71,7 @@
7071
<Compile Include="Core\OrderedRDDFunctions.cs" />
7172
<Compile Include="Core\PairRDDFunctions.cs" />
7273
<Compile Include="Core\PipelinedRDD.cs" />
74+
<Compile Include="Core\PriorityQueue.cs" />
7375
<Compile Include="Core\Profiler.cs" />
7476
<Compile Include="Core\RDD.cs" />
7577
<Compile Include="Core\SparkConf.cs" />
@@ -104,11 +106,13 @@
104106
<Compile Include="Proxy\IDataFrameReaderProxy.cs" />
105107
<Compile Include="Proxy\IDataFrameWriterProxy.cs" />
106108
<Compile Include="Proxy\IDStreamProxy.cs" />
109+
<Compile Include="Proxy\IHadoopConfigurationProxy.cs" />
107110
<Compile Include="Proxy\Ipc\DataFrameIpcProxy.cs" />
108111
<Compile Include="Proxy\Ipc\DataFrameNaFunctionsIpcProxy.cs" />
109112
<Compile Include="Proxy\Ipc\DataFrameReaderIpcProxy.cs" />
110113
<Compile Include="Proxy\Ipc\DataFrameWriterIpcProxy.cs" />
111114
<Compile Include="Proxy\Ipc\DStreamIpcProxy.cs" />
115+
<Compile Include="Proxy\Ipc\HadoopConfigurationIpcProxy.cs" />
112116
<Compile Include="Proxy\Ipc\RDDIpcProxy.cs" />
113117
<Compile Include="Proxy\Ipc\SparkCLRIpcProxy.cs" />
114118
<Compile Include="Proxy\Ipc\SqlContextIpcProxy.cs" />
@@ -154,12 +158,12 @@
154158
<Compile Include="Streaming\TransformedDStream.cs" />
155159
</ItemGroup>
156160
<ItemGroup Condition=" '$(CppDll)' == 'HasCpp' ">
157-
<ContentWithTargetPath Include="$(SolutionDir)..\cpp\x64\$(ConfigurationName)\Riosock.dll">
161+
<ContentWithTargetPath Include="..\..\..\cpp\x64\$(ConfigurationName)\Riosock.dll">
158162
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
159163
<Link>Cpp\Riosock.dll</Link>
160164
<TargetPath>Riosock.dll</TargetPath>
161165
</ContentWithTargetPath>
162-
<ContentWithTargetPath Include="$(SolutionDir)..\cpp\x64\$(ConfigurationName)\Riosock.pdb">
166+
<ContentWithTargetPath Include="..\..\..\cpp\x64\$(ConfigurationName)\Riosock.pdb">
163167
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
164168
<Link>Cpp\Riosock.dll</Link>
165169
<TargetPath>Riosock.pdb</TargetPath>

csharp/Adapter/Microsoft.Spark.CSharp/Configuration/ConfigurationService.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ internal class ConfigurationService : IConfigurationService
2121
public const string CSharpWorkerPathSettingKey = "CSharpWorkerPath";
2222
public const string CSharpBackendPortNumberSettingKey = "CSharpBackendPortNumber";
2323
public const string CSharpSocketTypeEnvName = "spark.mobius.CSharp.socketType";
24+
public const string CSharpWorkerReadBufferSizeEnvName = "spark.mobius.CSharpWorker.readBufferSize";
25+
public const string CSharpWorkerWriteBufferSizeEnvName = "spark.mobius.CSharpWorker.writeBufferSize";
2426
public const string SPARKCLR_HOME = "SPARKCLR_HOME";
2527
public const string SPARK_MASTER = "spark.master";
2628
public const string CSHARPBACKEND_PORT = "CSHARPBACKEND_PORT";
@@ -208,7 +210,7 @@ internal override string GetCSharpWorkerExePath()
208210
logger.LogInfo("Worker path read from setting {0} in app config", CSharpWorkerPathSettingKey);
209211
return workerPathConfig.Value;
210212
}
211-
213+
212214
var path = GetSparkCLRArtifactsPath("bin", ProcFileName);
213215
logger.LogInfo("Worker path {0} constructed using {1} environment variable", path, SPARKCLR_HOME);
214216

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
// Copyright (c) Microsoft. All rights reserved.
2+
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
3+
4+
using Microsoft.Spark.CSharp.Proxy;
5+
6+
namespace Microsoft.Spark.CSharp.Core
7+
{
8+
/// <summary>
9+
/// Configuration for Hadoop operations
10+
/// </summary>
11+
public class HadoopConfiguration
12+
{
13+
private readonly IHadoopConfigurationProxy hadoopConfigurationProxy;
14+
internal HadoopConfiguration(IHadoopConfigurationProxy hadoopConfProxy)
15+
{
16+
hadoopConfigurationProxy = hadoopConfProxy;
17+
}
18+
19+
/// <summary>
20+
/// Sets a property value to HadoopConfiguration
21+
/// </summary>
22+
/// <param name="name">Name of the property</param>
23+
/// <param name="value">Value of the property</param>
24+
public void Set(string name, string value)
25+
{
26+
hadoopConfigurationProxy.Set(name, value);
27+
}
28+
29+
/// <summary>
30+
/// Gets the value of a property from HadoopConfiguration
31+
/// </summary>
32+
/// <param name="name">Name of the property</param>
33+
/// <param name="defaultValue">Default value if the property is not available in the configuration</param>
34+
/// <returns></returns>
35+
public string Get(string name, string defaultValue)
36+
{
37+
return hadoopConfigurationProxy.Get(name, defaultValue);
38+
}
39+
}
40+
}
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
// Copyright (c) Microsoft. All rights reserved.
2+
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
3+
4+
using System;
5+
using System.Collections;
6+
using System.Collections.Generic;
7+
8+
namespace Microsoft.Spark.CSharp.Core
9+
{
10+
/// <summary>
11+
/// A bounded priority queue implemented with max binary heap.
12+
///
13+
/// Construction steps:
14+
/// 1. Build a Max Heap of the first k elements.
15+
/// 2. For each element after the kth element, compare it with the root of the max heap,
16+
/// a. If the element is less than the root, replace root with this element, heapify.
17+
/// b. Else ignore it.
18+
/// </summary>
19+
[Serializable]
20+
internal class PriorityQueue<T> : IEnumerable<T> where T : IComparable<T>
21+
{
22+
// The number of elements in the priority queue.
23+
private int elementCount;
24+
private T[] queue;
25+
private Comparer<T> comparer;
26+
27+
/// <summary>
28+
/// Constructor of PriorityQueue type.
29+
/// </summary>
30+
internal PriorityQueue(int queueSize, Comparer<T> comparer)
31+
{
32+
this.comparer = comparer;
33+
queue = new T[queueSize];
34+
}
35+
36+
/// <summary>
37+
/// Inserts the specified element into this priority queue.
38+
/// </summary>
39+
internal void Offer(T e)
40+
{
41+
if (ReferenceEquals(null, e))
42+
{
43+
throw new NullReferenceException();
44+
}
45+
46+
var i = elementCount;
47+
if (i >= queue.Length)
48+
{
49+
if (GT(queue[0], e)) // compare it with root of the heap
50+
{
51+
queue[0] = e;
52+
SiftDownHeapRoot();
53+
}
54+
55+
return;
56+
}
57+
58+
elementCount = i + 1;
59+
if (i == 0)
60+
{
61+
queue[0] = e;
62+
}
63+
else
64+
{
65+
SiftUp(i, e);
66+
}
67+
}
68+
69+
private void SiftDownHeapRoot()
70+
{
71+
var x = queue[0];
72+
var half = (int)((uint)elementCount >> 1);
73+
var k = 0;
74+
75+
while (k < half)
76+
{
77+
var child = (k << 1) + 1;
78+
var c = queue[child];
79+
var right = child + 1;
80+
if (right < elementCount && GT(queue[right], c))
81+
{
82+
c = queue[child = right];
83+
}
84+
85+
if (GE(x, c))
86+
{
87+
break;
88+
}
89+
90+
queue[k] = c;
91+
k = child;
92+
}
93+
94+
queue[k] = x;
95+
}
96+
97+
private void SiftUp(int k, T x)
98+
{
99+
while (k > 0)
100+
{
101+
var parent = (int)((uint)(k - 1) >> 1);
102+
var e = queue[parent];
103+
if (GE(e, x)) // if parent >= child, stop
104+
{
105+
break;
106+
}
107+
108+
queue[k] = e;
109+
k = parent;
110+
}
111+
112+
queue[k] = x;
113+
}
114+
115+
// helper method for comparision
116+
private bool GT(T a, T b)
117+
{
118+
return comparer.Compare(a, b) > 0;
119+
}
120+
121+
// great or equal, helper method for comparision
122+
private bool GE(T a, T b)
123+
{
124+
return comparer.Compare(a, b) >= 0;
125+
}
126+
127+
public IEnumerator<T> GetEnumerator()
128+
{
129+
for (var i = 0; i < elementCount; i++)
130+
{
131+
yield return queue[i];
132+
}
133+
}
134+
135+
IEnumerator IEnumerable.GetEnumerator()
136+
{
137+
return GetEnumerator();
138+
}
139+
}
140+
}

0 commit comments

Comments
 (0)