Skip to content

Commit b57fef9

Browse files
yaooqinnsrowen
authored andcommitted
[SPARK-27301][DSTREAM] Shorten the FileSystem cached life cycle to the cleanup method inner scope
## What changes were proposed in this pull request? The cached FileSystem's token will expire if no tokens explicitly are add into it. ```scala 19/03/28 13:40:16 INFO storage.BlockManager: Removing RDD 83189 19/03/28 13:40:16 INFO rdd.MapPartitionsRDD: Removing RDD 82860 from persistence list 19/03/28 13:40:16 INFO spark.ContextCleaner: Cleaned shuffle 6005 19/03/28 13:40:16 INFO storage.BlockManager: Removing RDD 82860 19/03/28 13:40:16 INFO scheduler.ReceivedBlockTracker: Deleting batches: 19/03/28 13:40:16 INFO scheduler.InputInfoTracker: remove old batch metadata: 1553750250000 ms 19/03/28 13:40:17 WARN security.UserGroupInformation: PriviledgedActionException as:ursHADOOP.HZ.NETEASE.COM (auth:KERBEROS) cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (HDFS_DELEGATION_TOKEN token 53240500 for urs) is expired, current time: 2019-03-28 13:40:17,010+0800 expected renewal time: 2019-03-28 13:39:48,523+0800 19/03/28 13:40:17 WARN ipc.Client: Exception encountered while connecting to the server : org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (HDFS_DELEGATION_TOKEN token 53240500 for urs) is expired, current time: 2019-03-28 13:40:17,010+0800 expected renewal time: 2019-03-28 13:39:48,523+0800 19/03/28 13:40:17 WARN security.UserGroupInformation: PriviledgedActionException as:ursHADOOP.HZ.NETEASE.COM (auth:KERBEROS) cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (HDFS_DELEGATION_TOKEN token 53240500 for urs) is expired, current time: 2019-03-28 13:40:17,010+0800 expected renewal time: 2019-03-28 13:39:48,523+0800 19/03/28 13:40:17 WARN hdfs.LeaseRenewer: Failed to renew lease for [DFSClient_NONMAPREDUCE_-1396157959_1] for 53 seconds. Will retry shortly ... org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (HDFS_DELEGATION_TOKEN token 53240500 for urs) is expired, current time: 2019-03-28 13:40:17,010+0800 expected renewal time: 2019-03-28 13:39:48,523+0800 at org.apache.hadoop.ipc.Client.call(Client.java:1468) at org.apache.hadoop.ipc.Client.call(Client.java:1399) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232) at com.sun.proxy.$Proxy11.renewLease(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.renewLease(ClientNamenodeProtocolTranslatorPB.java:571) at sun.reflect.GeneratedMethodAccessor40.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) at com.sun.proxy.$Proxy12.renewLease(Unknown Source) at org.apache.hadoop.hdfs.DFSClient.renewLease(DFSClient.java:878) at org.apache.hadoop.hdfs.LeaseRenewer.renew(LeaseRenewer.java:417) at org.apache.hadoop.hdfs.LeaseRenewer.run(LeaseRenewer.java:442) at org.apache.hadoop.hdfs.LeaseRenewer.access$700(LeaseRenewer.java:71) at org.apache.hadoop.hdfs.LeaseRenewer$1.run(LeaseRenewer.java:298) at java.lang.Thread.run(Thread.java:748) ``` This PR shorten the FileSystem cached life cycle to the cleanup method inner scope in case of token expiry. ## How was this patch tested? existing ut Closes #24235 from yaooqinn/SPARK-27301. Authored-by: Kent Yao <yaooqinn@hotmail.com> Signed-off-by: Sean Owen <sean.owen@databricks.com> (cherry picked from commit f4c73b7) Signed-off-by: Sean Owen <sean.owen@databricks.com>
1 parent 0975fe9 commit b57fef9

1 file changed

Lines changed: 1 addition & 1 deletion

File tree

streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ class DStreamCheckpointData[T: ClassTag](dstream: DStream[T])
3939
// in that batch's checkpoint data
4040
@transient private var timeToOldestCheckpointFileTime = new HashMap[Time, Time]
4141

42-
@transient private var fileSystem: FileSystem = null
4342
protected[streaming] def currentCheckpointFiles = data.asInstanceOf[HashMap[Time, String]]
4443

4544
/**
@@ -80,6 +79,7 @@ class DStreamCheckpointData[T: ClassTag](dstream: DStream[T])
8079
// even after master fails, as the checkpoint data of `time` does not refer to those files
8180
val filesToDelete = timeToCheckpointFile.filter(_._1 < lastCheckpointFileTime)
8281
logDebug("Files to delete:\n" + filesToDelete.mkString(","))
82+
var fileSystem: FileSystem = null
8383
filesToDelete.foreach {
8484
case (time, file) =>
8585
try {

0 commit comments

Comments
 (0)