Skip to content

Conversation

@Fokko
Copy link
Contributor

@Fokko Fokko commented Aug 27, 2020

https://issues.apache.org/jira/browse/SPARK-32719

What changes were proposed in this pull request?

Add a check to detect missing imports. This makes sure that if we use a specific class, it should be explicitly imported (not using a wildcard).

Why are the changes needed?

To make sure that the quality of the Python code is up to standard.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Existing unit-tests and Flake8 static analysis

Add Flake8 check to detect missing imports.

While working on SPARK-17333 I've noticed that we're
missing some imports. This PR will enable a check using
Flake8. One of the side effects is that we can't use
wildcard imports, since Flake8 is unable to figure them
out. However, having wildcard imports isn't the best
practice since it can be unclear from which wildcard
import a specific class is coming from.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Fokko Fokko force-pushed the fd-add-check-missing-imports branch from 9346cd9 to a241db9 Compare August 27, 2020 20:15
Copy link
Contributor Author

@Fokko Fokko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some pointers

def test_udf_with_aggregate_function(self):
df = self.spark.createDataFrame([(1, "1"), (2, "2"), (1, "2"), (1, "2")], ["key", "value"])
from pyspark.sql.functions import col, sum
from pyspark.sql.types import BooleanType
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Imported top level


def test_udf_with_decorator(self):
from pyspark.sql.functions import lit
from pyspark.sql.types import IntegerType, DoubleType
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Imported top level

)

def test_udf_wrapper(self):
from pyspark.sql.types import IntegerType
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Imported top level

#

from pyspark import keyword_only
import sys
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This import was missing as well

# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing import

@dbtsai
Copy link
Member

dbtsai commented Aug 27, 2020

Jenkins, add to whitelist.

@Fokko Fokko changed the title Add Flake8 check missing imports [SPARK-32719][PYSPARK] Add Flake8 check missing imports Aug 27, 2020
@SparkQA
Copy link

SparkQA commented Aug 27, 2020

Test build #127966 has finished for PR 29563 at commit bcdf4ea.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like a good change to me

spark = SparkSession.builder.getOrCreate()
except py4j.protocol.Py4JError:
spark = SparkSession(sc)
except py4j.protocol.Py4JError: # noqa: F821
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was failing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to run at, and it seems to run:

MacBook-Pro-van-Fokko:spark fokkodriesprong$ python3 ./python/pyspark/sql/streaming.py
20/08/28 11:41:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
20/08/28 11:41:33 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/km/xypq2kxs4ys3dt6bwtd4fbj00000gn/T/temporary-ca2e7725-cb1f-4ac9-adc6-5533ab36db58. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
20/08/28 11:41:34 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/km/xypq2kxs4ys3dt6bwtd4fbj00000gn/T/temporary-0c6bf4bd-795f-4a82-ae4c-72f2cd7751ab. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
20/08/28 11:41:34 WARN Shell: Interrupted while joining on: Thread[Thread-48,5,]
java.lang.InterruptedException
	at java.lang.Object.wait(Native Method)
	at java.lang.Thread.join(Thread.java:1252)
	at java.lang.Thread.join(Thread.java:1326)
	at org.apache.hadoop.util.Shell.joinThread(Shell.java:629)
	at org.apache.hadoop.util.Shell.runCommand(Shell.java:580)
	at org.apache.hadoop.util.Shell.run(Shell.java:482)
	at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:776)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:869)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:852)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:733)
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:225)
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:209)
	at org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:307)
	at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:296)
	at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:328)
	at org.apache.hadoop.fs.FileSystem.primitiveCreate(FileSystem.java:1017)
	at org.apache.hadoop.fs.DelegateToFileSystem.createInternal(DelegateToFileSystem.java:100)
	at org.apache.hadoop.fs.ChecksumFs$ChecksumFSOutputSummer.<init>(ChecksumFs.java:353)
	at org.apache.hadoop.fs.ChecksumFs.createInternal(ChecksumFs.java:400)
	at org.apache.hadoop.fs.AbstractFileSystem.create(AbstractFileSystem.java:596)
	at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:686)
	at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:682)
	at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
	at org.apache.hadoop.fs.FileContext.create(FileContext.java:688)
	at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createTempFile(CheckpointFileManager.scala:310)
	at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.<init>(CheckpointFileManager.scala:133)
	at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.<init>(CheckpointFileManager.scala:136)
	at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createAtomic(CheckpointFileManager.scala:316)
	at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.writeBatchToFile(HDFSMetadataLog.scala:131)
	at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$add$3(HDFSMetadataLog.scala:120)
	at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.add(HDFSMetadataLog.scala:118)
	at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.add(CompactibleFileStreamLog.scala:158)
	at org.apache.spark.sql.execution.streaming.FileStreamSourceLog.add(FileStreamSourceLog.scala:69)
	at org.apache.spark.sql.execution.streaming.FileStreamSource.fetchMaxOffset(FileStreamSource.scala:150)
	at org.apache.spark.sql.execution.streaming.FileStreamSource.latestOffset(FileStreamSource.scala:286)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$3(MicroBatchExecution.scala:380)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$2(MicroBatchExecution.scala:371)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
	at scala.collection.immutable.Map$Map1.foreach(Map.scala:128)
	at scala.collection.TraversableLike.map(TraversableLike.scala:238)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:368)
	at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:598)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch(MicroBatchExecution.scala:364)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:208)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:191)
	at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:185)
	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:334)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:245)
20/08/28 11:41:36 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/km/xypq2kxs4ys3dt6bwtd4fbj00000gn/T/temporary-3c27fa5e-7644-437d-9832-cac5168d4af5. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
20/08/28 11:41:36 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/km/xypq2kxs4ys3dt6bwtd4fbj00000gn/T/temporary-5d2c3db7-4d59-4e0b-953e-429a9d398ef2. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
20/08/28 11:41:36 WARN Shell: Interrupted while joining on: Thread[Thread-110,5,]
java.lang.InterruptedException
	at java.lang.Object.wait(Native Method)
	at java.lang.Thread.join(Thread.java:1252)
	at java.lang.Thread.join(Thread.java:1326)
	at org.apache.hadoop.util.Shell.joinThread(Shell.java:629)
	at org.apache.hadoop.util.Shell.runCommand(Shell.java:580)
	at org.apache.hadoop.util.Shell.run(Shell.java:482)
	at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:776)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:869)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:852)
	at org.apache.hadoop.fs.FileUtil.readLink(FileUtil.java:160)
	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileLinkStatusInternal(RawLocalFileSystem.java:837)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:826)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatus(RawLocalFileSystem.java:797)
	at org.apache.hadoop.fs.DelegateToFileSystem.getFileLinkStatus(DelegateToFileSystem.java:131)
	at org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:717)
	at org.apache.hadoop.fs.FilterFs.renameInternal(FilterFs.java:240)
	at org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:690)
	at org.apache.hadoop.fs.FileContext.rename(FileContext.java:958)
	at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:329)
	at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:147)
	at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.writeBatchToFile(HDFSMetadataLog.scala:134)
	at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$add$3(HDFSMetadataLog.scala:120)
	at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.add(HDFSMetadataLog.scala:118)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$12(MicroBatchExecution.scala:418)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:416)
	at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:598)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch(MicroBatchExecution.scala:364)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:208)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:191)
	at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:185)
	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:334)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:245)
20/08/28 11:41:37 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/km/xypq2kxs4ys3dt6bwtd4fbj00000gn/T/temporary-be1a0476-b88e-4b75-ac37-0486dbfe5a2e. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
MacBook-Pro-van-Fokko:spark fokkodriesprong$ echo $?
0

I was under the impression that the py4j and sc were included using the line above, that is was being pulled from some global context, but this isn't the case. It seems to run, but maybe it doesn't hit the except. It looks like some lingering test-code, maybe move this to the tests directory?

cc @zsxwing @tdas

@SparkQA
Copy link

SparkQA commented Aug 28, 2020

Test build #127993 has finished for PR 29563 at commit f039f68.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon HyukjinKwon changed the title [SPARK-32719][PYSPARK] Add Flake8 check missing imports [SPARK-32719][PYTHON] Add Flake8 check missing imports Aug 28, 2020
@SparkQA
Copy link

SparkQA commented Aug 29, 2020

Test build #128021 has finished for PR 29563 at commit 06480a7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class PlanChangeLogger[TreeType <: TreeNode[_]] extends Logging
  • class AQEOptimizer(conf: SQLConf) extends RuleExecutor[LogicalPlan]


import operator
import sys
import uuid
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Fokko, do you mind opening a minor PR to add this uuid to all other branches - looks like it wasn't there from the very first place Fokko@90b46e0?

I checked other occurrences fixed here, and other ones look fine to not fix in other branches.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh @Fokko sorry I misread. It was fine because we're using wildcard imports at from pyspark.ml.util import * and util imports uuid. So it didn't crash before .. Let's just don't port this back alone :-)..

@HyukjinKwon
Copy link
Member

Merged to master.

@HyukjinKwon
Copy link
Member

BTW, this is nice. Thanks for working on this!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants