Skip to content
21 changes: 19 additions & 2 deletions sdks/python/apache_beam/runners/worker/sdk_worker_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,25 @@ def create_harness(environment, dry_run=False):
else:
fn_log_handler = None

pipeline_options_dict = _load_pipeline_options(
environment.get('PIPELINE_OPTIONS'))
options_json = environment.get('PIPELINE_OPTIONS')

#We check if options are stored in the file.
if 'PIPELINE_OPTIONS_FILE' in environment:
options_file = environment['PIPELINE_OPTIONS_FILE']
try:
with open(options_file, 'r') as f:
options_json = f.read()
_LOGGER.info('Load pipeline options from file: %s', options_file)
except:
_LOGGER.error(
'Failed to load pipeline options from file: %s', options_file)
raise

pipeline_options_dict = _load_pipeline_options(options_json)

#pipeline_options_dict = _load_pipeline_options(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove unnecessary comments.

# environment.get('PIPELINE_OPTIONS'))

default_log_level = _get_log_level_from_options_dict(pipeline_options_dict)
logging.getLogger().setLevel(default_log_level)
_set_log_level_overrides(pipeline_options_dict)
Expand Down
31 changes: 30 additions & 1 deletion sdks/python/container/boot.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,18 @@ func launchSDKProcess() error {

// (3) Invoke python

os.Setenv("PIPELINE_OPTIONS", options)
//Commented out -->
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clean up comments

//os.Setenv("PIPELINE_OPTIONS", options)

//To prevent crashes if options > ARG_MAX -->
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you follow the comment style of this file to polish your comment here?

Something like:

// Write the JSON string of pipeline options into a file to prevent `argument list too long` error.

// --> Write the large JSON content into a file on disk
// --> environment only receives the short filename
if optionsFile, err := MakePipelineOptionsFileAndEnvVar(options); err != nil {
logger.Fatalf(ctx, "Failed to create a pipeline options file: %v", err)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For consistency, could we use the same error message as Java and GO Sdks?

if err := tools.MakePipelineOptionsFileAndEnvVar(options); err != nil {
logger.Fatalf(ctx, "Failed to load pipeline options to worker: %v", err)

if err := tools.MakePipelineOptionsFileAndEnvVar(options); err != nil {
logger.Fatalf(ctx, "Failed to load pipeline options to worker: %v", err)

} else {
os.Setenv("PIPELINE_OPTIONS_FILE", optionsFile)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In either the original MakePipelineOptionsFileAndEnvVar or your own copy (which seems to be unnecessary), the environment variable is already set. I think we don't need to do that again here.

}
os.Setenv("SEMI_PERSISTENT_DIRECTORY", *semiPersistDir)
os.Setenv("SEMI_PERSISTENT_DIRECTORY", *semiPersistDir)
os.Setenv("LOGGING_API_SERVICE_DESCRIPTOR", (&pipepb.ApiServiceDescriptor{Url: *loggingEndpoint}).String())
os.Setenv("CONTROL_API_SERVICE_DESCRIPTOR", (&pipepb.ApiServiceDescriptor{Url: *controlEndpoint}).String())
Expand Down Expand Up @@ -503,3 +514,21 @@ func logSubmissionEnvDependencies(ctx context.Context, bufLogger *tools.Buffered
bufLogger.Printf(ctx, "%s", string(content))
return nil
}

// MakePipelineOptionsFileAndEnvVar writes the pipeline options to a file.
// Assumes the options string is JSON formatted.
//
// Stores the file name in question in PIPELINE_OPTIONS_FILE for access by the SDK.
func MakePipelineOptionsFileAndEnvVar(options string) (string, error) {
Copy link
Collaborator

@shunping shunping Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason to redefine this function? I see we already have that at

func MakePipelineOptionsFileAndEnvVar(options string) error {
, and we already imported the Go package in this boot.go at
"github.com/apache/beam/sdks/v2/go/container/tools"

fn := "pipeline_options.json"
f, err := os.Create(fn)
if err != nil {
return "", fmt.Errorf("unable to create %v: %w", fn, err)
}
defer f.Close()
if _, err := f.WriteString(options); err != nil {
return "", fmt.Errorf("error writing %v: %w", f.Name(), err)
}
os.Setenv("PIPELINE_OPTIONS_FILE", f.Name())
return fn, nil
}
Loading