Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
eb461f2
Improve apps logs streaming helpers
Nov 10, 2025
141643b
Refactor AcquireToken arguments
Nov 13, 2025
8660fe6
Move persistent auth opts test to libs
Nov 13, 2025
c24350d
Relocate loadToken tests to libs
Nov 13, 2025
f9e9ce5
Move logstream skill under claude directory
Nov 13, 2025
8dbae1a
Use constants for log source filters
Nov 13, 2025
b076eef
Register apps logs command in apps.go
Nov 13, 2025
32a7332
Clarify --source flag help text
Nov 13, 2025
7a4755f
Validate log source filters against allowed list
Nov 13, 2025
20ebdb5
Add helper tests for apps logs command
Nov 13, 2025
4b335e3
Combine log stream dialer tests
Nov 13, 2025
fa934a9
Document apps logs command inline
Nov 13, 2025
1cdef0a
Remove leftover logstream skill file
Nov 13, 2025
3699181
Extract log stream backoff constants
Nov 13, 2025
21be263
Name websocket close codes for auth refresh
Nov 13, 2025
3594a6c
Extract connectAndConsume helper for logstream run loop
Nov 13, 2025
022c6a0
Defer websocket close and move context watcher
Nov 13, 2025
e9e21aa
Guard websocket close with sync.Once
Nov 13, 2025
7671c2d
Document logStreamer.Run thread-safety
Nov 13, 2025
251a56a
Add colorized output to apps logs command
Nov 13, 2025
7cde43b
Revert "Add colorized output to apps logs command"
Nov 13, 2025
6684f01
Add colorized output to apps logs command
Nov 13, 2025
a0b91f6
Refactor token handling to use SDK's TokenSource
Nov 18, 2025
95ff0cb
Move logstream package to libs/apps
Nov 18, 2025
259f550
Format constants into const block in logstream
Nov 18, 2025
a8fef98
Fix tail-lines flag to work correctly with follow mode
Nov 18, 2025
b8310ac
Extract handshake timeout to named constant
Nov 18, 2025
ad5a905
apps: register logs via overrides instead of generated file
Nov 18, 2025
0cff914
apps: refine logstream backoff, timers, and connection closing
Nov 18, 2025
9e2670b
apps: make logstream backoff timer safe for zero value
Nov 18, 2025
759dcc7
apps: document logstream backoff timer initialization
Nov 18, 2025
5c0315b
apps: exit logs command when app stops during follow
Nov 18, 2025
59e1abc
Lint and format code
pkosiec Dec 1, 2025
438f019
Remove scratch PR-related files
pkosiec Dec 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
281 changes: 281 additions & 0 deletions cmd/workspace/apps/logs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,281 @@
package apps

import (
"context"
"crypto/tls"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"os"
"path"
"slices"
"strings"
"time"

"github.com/databricks/cli/cmd/root"
"github.com/databricks/cli/libs/apps/logstream"
"github.com/databricks/cli/libs/cmdctx"
"github.com/databricks/cli/libs/cmdgroup"
"github.com/databricks/cli/libs/cmdio"
"github.com/databricks/cli/libs/log"
"github.com/databricks/databricks-sdk-go/config"
"github.com/databricks/databricks-sdk-go/service/apps"
"github.com/gorilla/websocket"
"github.com/spf13/cobra"
)

const (
defaultTailLines = 200
defaultPrefetchWindow = 2 * time.Second
defaultHandshakeTimeout = 30 * time.Second
)

var allowedSources = []string{"APP", "SYSTEM"}

func newLogsCommand() *cobra.Command {
var (
tailLines int
follow bool
outputPath string
streamTimeout time.Duration
searchTerm string
sourceFilters []string
)

cmd := &cobra.Command{
Use: "logs NAME",
Short: "Show Databricks app logs",
Long: `Stream stdout/stderr logs for a Databricks app via its log stream.

By default the command fetches the most recent logs (up to --tail-lines, default 200) and exits.
Use --follow to continue streaming logs until cancelled, optionally bounding the duration with --timeout.
Server-side filtering is available through --search (same semantics as the Databricks UI) and client-side filtering
via --source APP|SYSTEM. Use --output-file to mirror the stream to a local file (created with 0600 permissions).`,
Example: ` # Fetch the last 50 log lines
databricks apps logs my-app --tail-lines 50

# Follow logs until interrupted, searching for "ERROR" messages from app sources only
databricks apps logs my-app --follow --search ERROR --source APP

# Mirror streamed logs to a local file while following for up to 5 minutes
databricks apps logs my-app --follow --timeout 5m --output-file /tmp/my-app.log`,
Args: root.ExactArgs(1),
PreRunE: root.MustWorkspaceClient,
RunE: func(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()

if tailLines < 0 {
return errors.New("--tail-lines cannot be negative")
}

if follow && streamTimeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, streamTimeout)
defer cancel()
}

name := args[0]
w := cmdctx.WorkspaceClient(ctx)
app, err := w.Apps.Get(ctx, apps.GetAppRequest{Name: name})
if err != nil {
return err
}
if app.Url == "" {
return fmt.Errorf("app %s does not have a public URL; deploy and start it before streaming logs", name)
}

wsURL, err := buildLogsURL(app.Url)
if err != nil {
return err
}

cfg := cmdctx.ConfigUsed(ctx)
if cfg == nil {
return errors.New("missing workspace configuration")
}

tokenSource := cfg.GetTokenSource()
if tokenSource == nil {
return errors.New("configuration does not support OAuth tokens")
}

initialToken, err := tokenSource.Token(ctx)
if err != nil {
return err
}

tokenProvider := func(ctx context.Context) (string, error) {
tok, err := tokenSource.Token(ctx)
if err != nil {
return "", err
}
return tok.AccessToken, nil
}

appStatusChecker := func(ctx context.Context) error {
app, err := w.Apps.Get(ctx, apps.GetAppRequest{Name: name})
if err != nil {
return err
}
if app.ComputeStatus == nil {
return errors.New("app status unavailable")
}
// Check if app is in a terminal/stopped state
switch app.ComputeStatus.State {
case apps.ComputeStateStopped, apps.ComputeStateDeleting, apps.ComputeStateError:
return fmt.Errorf("app is %s", app.ComputeStatus.State)
default:
// App is running or transitioning - continue streaming
return nil
}
}

writer := cmd.OutOrStdout()
var file *os.File
if outputPath != "" {
file, err = os.OpenFile(outputPath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0o600)
if err != nil {
return err
}
defer file.Close()
writer = io.MultiWriter(writer, file)
}
colorizeLogs := outputPath == "" && cmdio.IsTTY(cmd.OutOrStdout())

sourceMap, err := buildSourceFilter(sourceFilters)
if err != nil {
return err
}

log.Infof(ctx, "Streaming logs for %s (%s)", name, wsURL)
return logstream.Run(ctx, logstream.Config{
Dialer: newLogStreamDialer(cfg),
URL: wsURL,
Origin: normalizeOrigin(app.Url),
Token: initialToken.AccessToken,
TokenProvider: tokenProvider,
AppStatusChecker: appStatusChecker,
Search: searchTerm,
Sources: sourceMap,
Tail: tailLines,
Follow: follow,
Prefetch: defaultPrefetchWindow,
Writer: writer,
UserAgent: "databricks-cli apps logs",
Colorize: colorizeLogs,
})
},
}

streamGroup := cmdgroup.NewFlagGroup("Streaming")
streamGroup.FlagSet().IntVar(&tailLines, "tail-lines", defaultTailLines, "Number of recent log lines to show before streaming. Set to 0 to show everything.")
streamGroup.FlagSet().BoolVarP(&follow, "follow", "f", false, "Continue streaming logs until interrupted.")
streamGroup.FlagSet().DurationVar(&streamTimeout, "timeout", 0, "Maximum time to stream when --follow is set. 0 disables the timeout.")

filterGroup := cmdgroup.NewFlagGroup("Filtering")
filterGroup.FlagSet().StringVar(&searchTerm, "search", "", "Send a search term to the log service before streaming.")
filterGroup.FlagSet().StringSliceVar(&sourceFilters, "source", nil, "Restrict logs to APP and/or SYSTEM sources.")

wrappedCmd := cmdgroup.NewCommandWithGroupFlag(cmd)
wrappedCmd.AddFlagGroup(streamGroup)
wrappedCmd.AddFlagGroup(filterGroup)

cmd.Flags().StringVar(&outputPath, "output-file", "", "Optional file path to write logs in addition to stdout.")

return cmd
}

func buildLogsURL(appURL string) (string, error) {
parsed, err := url.Parse(appURL)
if err != nil {
return "", err
}

switch strings.ToLower(parsed.Scheme) {
case "https":
parsed.Scheme = "wss"
case "http":
parsed.Scheme = "ws"
case "wss", "ws":
default:
return "", fmt.Errorf("unsupported app URL scheme: %s", parsed.Scheme)
}

parsed.Path = path.Join(parsed.Path, "logz/stream")
if !strings.HasPrefix(parsed.Path, "/") {
parsed.Path = "/" + parsed.Path
}

return parsed.String(), nil
}

func normalizeOrigin(appURL string) string {
parsed, err := url.Parse(appURL)
if err != nil {
return ""
}
switch strings.ToLower(parsed.Scheme) {
case "http", "https":
return parsed.Scheme + "://" + parsed.Host
case "ws":
parsed.Scheme = "http"
case "wss":
parsed.Scheme = "https"
default:
return ""
}
parsed.Path = ""
parsed.RawQuery = ""
parsed.Fragment = ""
return parsed.String()
}

func buildSourceFilter(values []string) (map[string]struct{}, error) {
if len(values) == 0 {
return nil, nil
}
filter := make(map[string]struct{})
for _, v := range values {
trimmed := strings.ToUpper(strings.TrimSpace(v))
if trimmed == "" {
continue
}
if !slices.Contains(allowedSources, trimmed) {
return nil, fmt.Errorf("invalid --source value %q (valid: %s)", v, strings.Join(allowedSources, ", "))
}
filter[trimmed] = struct{}{}
}
if len(filter) == 0 {
return nil, nil
}
return filter, nil
}

func newLogStreamDialer(cfg *config.Config) *websocket.Dialer {
dialer := &websocket.Dialer{
Proxy: http.ProxyFromEnvironment,
HandshakeTimeout: defaultHandshakeTimeout,
}

if cfg == nil {
return dialer
}

if transport, ok := cfg.HTTPTransport.(*http.Transport); ok && transport != nil {
clone := transport.Clone()
dialer.Proxy = clone.Proxy
dialer.NetDialContext = clone.DialContext
if clone.TLSClientConfig != nil {
dialer.TLSClientConfig = clone.TLSClientConfig.Clone()
}
return dialer
}

if cfg.InsecureSkipVerify {
dialer.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
}

return dialer
}
85 changes: 85 additions & 0 deletions cmd/workspace/apps/logs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package apps

import (
"crypto/tls"
"net/http"
"net/url"
"testing"

"github.com/databricks/databricks-sdk-go/config"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestNewLogStreamDialerConfiguresProxyAndTLS(t *testing.T) {
t.Run("clones HTTP transport when provided", func(t *testing.T) {
proxyURL, err := url.Parse("http://localhost:8080")
require.NoError(t, err)

transport := &http.Transport{
Proxy: http.ProxyURL(proxyURL),
TLSClientConfig: &tls.Config{MinVersion: tls.VersionTLS12},
}

cfg := &config.Config{
HTTPTransport: transport,
}

dialer := newLogStreamDialer(cfg)
require.NotNil(t, dialer)

req := &http.Request{URL: &url.URL{Scheme: "https", Host: "example.com"}}
actualProxy, err := dialer.Proxy(req)
require.NoError(t, err)
assert.Equal(t, proxyURL.String(), actualProxy.String())

require.NotNil(t, dialer.TLSClientConfig)
assert.NotSame(t, transport.TLSClientConfig, dialer.TLSClientConfig, "TLS config should be cloned")
assert.Equal(t, transport.TLSClientConfig.MinVersion, dialer.TLSClientConfig.MinVersion)
})

t.Run("honors insecure skip verify when no transport is supplied", func(t *testing.T) {
cfg := &config.Config{
InsecureSkipVerify: true,
}
dialer := newLogStreamDialer(cfg)
require.NotNil(t, dialer)
require.NotNil(t, dialer.TLSClientConfig, "expected TLS config when insecure skip verify is set")
assert.True(t, dialer.TLSClientConfig.InsecureSkipVerify)
})
}

func TestBuildLogsURLConvertsSchemes(t *testing.T) {
url, err := buildLogsURL("https://example.com/foo")
require.NoError(t, err)
assert.Equal(t, "wss://example.com/foo/logz/stream", url)

url, err = buildLogsURL("http://example.com/foo")
require.NoError(t, err)
assert.Equal(t, "ws://example.com/foo/logz/stream", url)
}

func TestBuildLogsURLRejectsUnknownScheme(t *testing.T) {
_, err := buildLogsURL("ftp://example.com/foo")
require.Error(t, err)
}

func TestNormalizeOrigin(t *testing.T) {
assert.Equal(t, "https://example.com", normalizeOrigin("https://example.com/foo"))
assert.Equal(t, "http://example.com", normalizeOrigin("ws://example.com/foo"))
assert.Equal(t, "https://example.com", normalizeOrigin("wss://example.com/foo"))
assert.Equal(t, "", normalizeOrigin("://invalid"))
}

func TestBuildSourceFilter(t *testing.T) {
filters, err := buildSourceFilter([]string{"app", "system", ""})
require.NoError(t, err)
assert.Equal(t, map[string]struct{}{"APP": {}, "SYSTEM": {}}, filters)

filters, err = buildSourceFilter(nil)
require.NoError(t, err)
assert.Nil(t, filters)

_, err = buildSourceFilter([]string{"foo"})
require.Error(t, err)
}
3 changes: 3 additions & 0 deletions cmd/workspace/apps/overrides.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ func listDeploymentsOverride(listDeploymentsCmd *cobra.Command, listDeploymentsR
}

func init() {
cmdOverrides = append(cmdOverrides, func(cmd *cobra.Command) {
cmd.AddCommand(newLogsCommand())
})
listOverrides = append(listOverrides, listOverride)
listDeploymentsOverrides = append(listDeploymentsOverrides, listDeploymentsOverride)
}
Loading