Skip to content

Commit a26d7d3

Browse files
authored
expose HadoopConfiguration in SparkContext
1 parent 4944319 commit a26d7d3

File tree

14 files changed

+1192
-945
lines changed

14 files changed

+1192
-945
lines changed

csharp/Adapter/Microsoft.Spark.CSharp/Adapter.csproj

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
<Compile Include="Configuration\IConfigurationService.cs" />
6363
<Compile Include="Core\Accumulator.cs" />
6464
<Compile Include="Core\Broadcast.cs" />
65+
<Compile Include="Core\HadoopConfiguration.cs" />
6566
<Compile Include="Core\Option.cs" />
6667
<Compile Include="Core\Partitioner.cs" />
6768
<Compile Include="Core\RDDCollector.cs" />
@@ -105,11 +106,13 @@
105106
<Compile Include="Proxy\IDataFrameReaderProxy.cs" />
106107
<Compile Include="Proxy\IDataFrameWriterProxy.cs" />
107108
<Compile Include="Proxy\IDStreamProxy.cs" />
109+
<Compile Include="Proxy\IHadoopConfigurationProxy.cs" />
108110
<Compile Include="Proxy\Ipc\DataFrameIpcProxy.cs" />
109111
<Compile Include="Proxy\Ipc\DataFrameNaFunctionsIpcProxy.cs" />
110112
<Compile Include="Proxy\Ipc\DataFrameReaderIpcProxy.cs" />
111113
<Compile Include="Proxy\Ipc\DataFrameWriterIpcProxy.cs" />
112114
<Compile Include="Proxy\Ipc\DStreamIpcProxy.cs" />
115+
<Compile Include="Proxy\Ipc\HadoopConfigurationIpcProxy.cs" />
113116
<Compile Include="Proxy\Ipc\RDDIpcProxy.cs" />
114117
<Compile Include="Proxy\Ipc\SparkCLRIpcProxy.cs" />
115118
<Compile Include="Proxy\Ipc\SqlContextIpcProxy.cs" />
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
// Copyright (c) Microsoft. All rights reserved.
2+
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
3+
4+
using Microsoft.Spark.CSharp.Proxy;
5+
6+
namespace Microsoft.Spark.CSharp.Core
7+
{
8+
/// <summary>
9+
/// Configuration for Hadoop operations
10+
/// </summary>
11+
public class HadoopConfiguration
12+
{
13+
private readonly IHadoopConfigurationProxy hadoopConfigurationProxy;
14+
internal HadoopConfiguration(IHadoopConfigurationProxy hadoopConfProxy)
15+
{
16+
hadoopConfigurationProxy = hadoopConfProxy;
17+
}
18+
19+
/// <summary>
20+
/// Sets a property value to HadoopConfiguration
21+
/// </summary>
22+
/// <param name="name">Name of the property</param>
23+
/// <param name="value">Value of the property</param>
24+
public void Set(string name, string value)
25+
{
26+
hadoopConfigurationProxy.Set(name, value);
27+
}
28+
29+
/// <summary>
30+
/// Gets the value of a property from HadoopConfiguration
31+
/// </summary>
32+
/// <param name="name">Name of the property</param>
33+
/// <param name="defaultValue">Default value if the property is not available in the configuration</param>
34+
/// <returns></returns>
35+
public string Get(string name, string defaultValue)
36+
{
37+
return hadoopConfigurationProxy.Get(name, defaultValue);
38+
}
39+
}
40+
}

csharp/Adapter/Microsoft.Spark.CSharp/Core/SparkContext.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,11 @@ public int DefaultMinPartitions
8888
/// </summary>
8989
public StatusTracker StatusTracker { get { return new StatusTracker(SparkContextProxy.StatusTracker); } }
9090

91+
/// <summary>
92+
/// Configuration for Hadoop usage in Spark
93+
/// </summary>
94+
public HadoopConfiguration HadoopConfiguration { get { return new HadoopConfiguration(SparkContextProxy.HadoopConfiguration); }}
95+
9196
/// <summary>
9297
/// Initializes a SparkContext instance with a specific master, application name, and spark home
9398
/// </summary>
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
// Copyright (c) Microsoft. All rights reserved.
2+
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
3+
4+
namespace Microsoft.Spark.CSharp.Proxy
5+
{
6+
interface IHadoopConfigurationProxy
7+
{
8+
void Set(string name, string value);
9+
string Get(string name, string defaultValue);
10+
}
11+
}

csharp/Adapter/Microsoft.Spark.CSharp/Proxy/ISparkContextProxy.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ internal interface ISparkContextProxy
2727
long StartTime { get; }
2828
int DefaultParallelism { get; }
2929
int DefaultMinPartitions { get; }
30+
IHadoopConfigurationProxy HadoopConfiguration { get; }
3031
void Stop();
3132
IRDDProxy EmptyRDD();
3233
IRDDProxy Parallelize(IEnumerable<byte[]> values, int numSlices);
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
// Copyright (c) Microsoft. All rights reserved.
2+
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
3+
4+
using System.Diagnostics.CodeAnalysis;
5+
using Microsoft.Spark.CSharp.Interop.Ipc;
6+
7+
namespace Microsoft.Spark.CSharp.Proxy.Ipc
8+
{
9+
[ExcludeFromCodeCoverage] //IPC calls to JVM validated using validation-enabled samples - unit test coverage not reqiured
10+
internal class HadoopConfigurationIpcProxy : IHadoopConfigurationProxy
11+
{
12+
private readonly JvmObjectReference jvmHadoopConfigurationReference;
13+
public HadoopConfigurationIpcProxy(JvmObjectReference jHadoopConf)
14+
{
15+
jvmHadoopConfigurationReference = jHadoopConf;
16+
}
17+
18+
public void Set(string name, string value)
19+
{
20+
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmHadoopConfigurationReference, "set", new object[] { name, value });
21+
}
22+
23+
public string Get(string name, string defaultvalue)
24+
{
25+
return SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmHadoopConfigurationReference, "get", new object[] { name, defaultvalue }).ToString();
26+
}
27+
}
28+
}

csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/SparkContextIpcProxy.cs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,22 @@ public int DefaultMinPartitions
111111
{
112112
get { if (defaultMinPartitions == null) { defaultMinPartitions = (int)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmJavaContextReference, "defaultMinPartitions"); } return (int)defaultMinPartitions; }
113113
}
114+
115+
private IHadoopConfigurationProxy hadoopConfiguration;
116+
public IHadoopConfigurationProxy HadoopConfiguration
117+
{
118+
get
119+
{
120+
return hadoopConfiguration ??
121+
(hadoopConfiguration =
122+
new HadoopConfigurationIpcProxy(
123+
new JvmObjectReference(
124+
(string)
125+
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmJavaContextReference,
126+
"hadoopConfiguration"))));
127+
}
128+
}
129+
114130
public void Accumulator(int port)
115131
{
116132
jvmAccumulatorReference = new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmJavaContextReference, "accumulator",

0 commit comments

Comments
 (0)