@@ -27,7 +27,7 @@ import org.apache.hadoop.conf.Configuration
2727import org .apache .hadoop .fs .{FileSystem , FSDataOutputStream , Path }
2828import org .apache .hadoop .mapred .{JobConf , JobID }
2929
30- import org .apache .spark .{SparkConf , TaskContext }
30+ import org .apache .spark .{CarmelConcurrentModifiedException , SparkConf , TaskContext }
3131import org .apache .spark .deploy .SparkHadoopUtil
3232import org .apache .spark .executor .OutputMetrics
3333import org .apache .spark .internal .Logging
@@ -102,12 +102,29 @@ object SparkHadoopWriterUtils extends Logging {
102102 new Path (parent, s " _WRITING_ $diff" )
103103 }
104104
105+ def withLock [T ](fs : FileSystem , path : Path , waitTime : Int , info : String )(f : => T ): T = {
106+ var lockFileOut : Option [FSDataOutputStream ] = None
107+ try {
108+ lockFileOut = tryToLockFile(fs,
109+ path, true , waitTime, info)
110+ if (waitTime < 0 || lockFileOut.nonEmpty) {
111+ f
112+ } else {
113+ throw CarmelConcurrentModifiedException (path)
114+ }
115+ } finally {
116+ if (waitTime >= 0 && lockFileOut.nonEmpty) {
117+ releaseLockFile(fs, lockFileOut.get, path)
118+ }
119+ }
120+ }
121+
105122 def tryToLockFile (fs : FileSystem , path : Path , retry : Boolean ,
106123 waitTime : Int , info : String = " " ): Option [FSDataOutputStream ] = {
107124 if (waitTime < 0 ) return null
108125 val startTime = System .currentTimeMillis()
109126 log.info(s " [compact info] $info try to lock path $path" )
110- val loc = tryToLockFileInternal(fs, path, retry, waitTime)
127+ val loc = tryToLockFileInternal(fs, path, retry, waitTime, info )
111128 val durationMS = System .currentTimeMillis() - startTime
112129 if (loc.isDefined) {
113130 log.info(s " [compact info] $info got lock in $durationMS ms for path $path" )
@@ -125,11 +142,15 @@ object SparkHadoopWriterUtils extends Logging {
125142 }
126143
127144 private def tryToLockFileInternal (fs : FileSystem , path : Path , retry : Boolean ,
128- waitTime : Int ): Option [FSDataOutputStream ] = {
145+ waitTime : Int , info : String ): Option [FSDataOutputStream ] = {
129146 if (waitTime < 0 ) return null
130147 val targetPath = getLockFilePath(path)
131148 val sleepTime = 1000
132- var retryNum = waitTime / sleepTime + 1
149+ var retryNum = if (retry) {
150+ waitTime / sleepTime + 1
151+ } else {
152+ 1
153+ }
133154 var ret : Option [FSDataOutputStream ] = None
134155 while (retryNum > 0 ) {
135156 try {
@@ -142,11 +163,13 @@ object SparkHadoopWriterUtils extends Logging {
142163 }
143164 } catch {
144165 case ex : Throwable =>
145- logError(s " Failed to lock file $path in $retryNum attempt: ${ex.getMessage}" )
166+ logError(s " [compact info] $info Failed to lock file $path in $retryNum" +
167+ s " attempt: ${ex.getMessage}" )
146168 }
147169
148170 if (retryNum > 1 ) {
149- logWarning(s " Failed to lock file for $targetPath, ${retryNum - 1 } retries left, " +
171+ logWarning(s " [compact info] $info Failed to lock file for $targetPath, " +
172+ s " ${retryNum - 1 } retries left, " +
150173 s " sleep for $sleepTime ms " )
151174 Thread .sleep(sleepTime)
152175 }
0 commit comments