Skip to content
Draft
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
6a8dc6d
Refactor FileWatchSensor to use watchdog instead of our logshipper fork
blag Nov 29, 2020
0a2f6ae
Remove logshipper and pyinotify from requirements files
blag Nov 29, 2020
2f71b2f
Import eventlet since we use it
blag Dec 12, 2020
3f0abb2
Use self.logger instead of self._logger
blag Dec 12, 2020
d175507
Use self.trigger instead of self._trigger
blag Dec 12, 2020
ce6e8ef
Merge branch 'master' into remove-logshipper
blag Dec 15, 2020
6657a25
Merge branch 'master' into remove-logshipper
blag Dec 18, 2020
8add2cc
Refactor for testability and test coverage
blag Dec 22, 2020
169cc5a
Add tests for FileWatchSensor components
blag Dec 22, 2020
fa68bef
Linting
blag Dec 23, 2020
1493656
Merge branch 'master' into remove-logshipper
blag Dec 23, 2020
e932ffe
Add class docstrings
blag Jan 17, 2021
5a22c27
Add logging
blag Jan 17, 2021
490595d
No need for eventlet.sleep() when time.sleep() should do just fine
blag Jan 17, 2021
05603f6
Refactor TailManageer.run() to use time.sleep()
blag Jan 20, 2021
8a05c36
Refactor calculating parent_dir
blag Jan 20, 2021
847d830
Refactor SingleFileTail.close() to keep parent watch open
blag Jan 20, 2021
914b3f5
Refactor follow logic
blag Jan 20, 2021
5caa459
Refactor TailManager.start() to make it idempotent
blag Jan 20, 2021
e6d3911
Refactor FileWatchSensor.run() to use TailManager.run()
blag Jan 20, 2021
456daba
Break out tests into multiple modules
blag Jan 20, 2021
a19eac6
Merge branch 'master' into remove-logshipper
blag Apr 8, 2021
4279986
Add .vagrant/ to .gitignore
blag Apr 8, 2021
d915752
Merge branch 'master' into remove-logshipper
blag Apr 12, 2021
5601743
Ignore a few more virtualenv directories
blag Apr 12, 2021
0c10559
Make pylint happy
blag Apr 12, 2021
f8a3056
Run all tests when run directly
blag Apr 12, 2021
5f3cf9a
Make black happy
blag Apr 12, 2021
1e5e389
Make flake8 happy
blag Apr 12, 2021
5e0b7e6
Make pylint happy
blag Apr 12, 2021
d90be9c
Make black happy
blag Apr 12, 2021
87d32aa
Don't use threading.Thread() after all
blag Apr 12, 2021
e64e547
Do whatever it takes to make all of the linters happy
blag Apr 12, 2021
ec856da
Merge branch 'master' into remove-logshipper
cognifloyd Oct 5, 2021
dca3e4f
Merge branch 'master' into remove-logshipper
cognifloyd Oct 6, 2021
3055413
add changelog entry
cognifloyd Oct 7, 2021
9631524
Merge branch 'master' into remove-logshipper
cognifloyd Jan 20, 2024
83fa1ec
Fix borked merge
cognifloyd Jan 20, 2024
bab2952
pin transitive dep in test-requirements.txt
cognifloyd Jan 20, 2024
f8d1f50
Merge branch 'master' into remove-logshipper
cognifloyd Jan 29, 2024
5b38106
regen st2 lockfile to switch logshipper -> watchdog
cognifloyd Jan 29, 2024
ed32348
linux pack: Drop out-of-date README notes about pack config
cognifloyd Jan 30, 2024
465a8f4
update the linux pack README to explain how to use FileWatchSensor
cognifloyd Jan 30, 2024
36bacae
linux pack: add LinuxFileWatchSensor.update_trigger
cognifloyd Jan 30, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions contrib/linux/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1 @@
# used by file watcher sensor
pyinotify>=0.9.5,<=0.10
-e git+https://github.com/StackStorm/logshipper.git@stackstorm_patched#egg=logshipper
watchdog
242 changes: 207 additions & 35 deletions contrib/linux/sensors/file_watch_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,56 +14,218 @@
# limitations under the License.

import os
import signal
import time
import sys

import eventlet
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

from logshipper.tail import Tail
try:
from st2reactor.sensor.base import Sensor
except ImportError:
Sensor = object

from st2reactor.sensor.base import Sensor

class FileEventHandler(FileSystemEventHandler):
def __init__(self, *args, callbacks=None, **kwargs):
self.callbacks = callbacks or {}

def dispatch(self, event):
if not event.is_synthetic and not event.is_directory:
super().dispatch(event)

def on_created(self, event):
cb = self.callbacks.get('created')
if cb:
cb(event=event)

def on_modified(self, event):
cb = self.callbacks.get('modified')
if cb:
cb(event=event)

def on_moved(self, event):
cb = self.callbacks.get('moved')
if cb:
cb(event=event)

def on_deleted(self, event):
cb = self.callbacks.get('deleted')
if cb:
cb(event=event)


class SingleFileTail(object):
def __init__(self, path, handler, read_all=False, observer=None):
self.path = path
self.handler = handler
self.read_all = read_all
self.buffer = ''
self.observer = observer or Observer()

self.open()

def read(self, event=None):
while True:
# Buffer 1024 bytes at a time
buff = os.read(self.fd, 1024)
if not buff:
return

# Possible bug? What if the 1024 cuts off in the middle of a utf8
# code point?
# We use errors='replace' to have Python replace the unreadable
# character with an "official U+FFFD REPLACEMENT CHARACTER"
# This isn't great, but it's better than the previous behavior,
# which blew up on any issues.
buff = buff.decode(encoding='utf8', errors='replace')

# An alternative is to try to read additional bytes one at a time
# until we can decode the string properly
# while True:
# try:
# buff = buff.decode(encoding='utf8')
# except UnicodeDecodeError:
# # Try to read another byte (this may not read anything)
# b = os.read(self.fd, 1)
# # If we read something
# if b:
# buff += b
# else:
# buff = buff.decode(encoding='utf8', errors='ignore')
# else:
# # If we could decode to UTF-8, then continue
# break

# Append to previous buffer
if self.buffer:
buff = self.buffer + buff
self.buffer = ''

lines = buff.splitlines(True)
# If the last character of the last line is not a newline
if lines[-1][-1] != '\n': # Incomplete line in the buffer
self.buffer = lines[-1] # Save the last line fragment
lines = lines[:-1]

for line in lines:
self.handler(self.path, line[:-1])

def reopen(self, event=None, skip_to_end=False):
# stat the file on disk
file_stat = os.stat(self.path)

# stat the file from the existing file descriptor
fd_stat = os.fstat(self.fd)
# Seek right back where we thought we were
pos = os.lseek(self.fd, 0, os.SEEK_CUR)

# If the file now on disk is larger than where we were currently reading
if fd_stat.st_size > pos:
# More data to read - read as normal
self.read()
# If the file now on disk is smaller (eg: if the file is a freshly
# rotated log), or if its inode has changed
if self.stat.st_size > file_stat.st_size or \
self.stat.st_ino != file_stat:
self.close()
# Since we already read the entirety of the previous file, we don't
# want to skip any of the new file's contents, so don't seek to the
# end, and try to read from it immediately
self.open(seek_to_end=False)
self.read()

def open(self, seek_to_end=False):
self.stat = os.stat(self.path)
self.fd = os.open(self.path, os.O_RDONLY | os.O_NONBLOCK)

if not self.read_all or seek_to_end:
os.lseek(self.fd, 0, os.SEEK_END)

file_event_handler = FileEventHandler(callbacks={
'created': None,
'modified': self.read,
'moved': self.reopen,
'deleted': self.reopen,
})
self.watch = self.observer.schedule(file_event_handler, self.path)

def close(self):
os.close(self.fd)
self.observer.unschedule(self.watch)
if self.buffer:
self.handler(self.path, self.buffer)


class TailManager(object):
def __init__(self, *args, **kwargs):
self.observer = Observer()
self.tails = {}

def tail_file(self, path, handler, read_all=False):
if handler not in self.tails.setdefault(path, {}):
sft = SingleFileTail(path, handler,
read_all=read_all, observer=self.observer)
self.tails[path][handler] = sft

def stop_tailing_file(self, path, handler):
tailed_file = self.tails.get(path, {}).pop(handler)
tailed_file.close()
# Amortize some cleanup while we're at it
if not self.tails.get(path):
self.tails.pop(path)

def run(self):
self.start()
while True:
time.sleep(1)

def start(self):
self.observer.start()

def stop(self):
for handlers in self.tails.values():
for tailed_file in handlers.values():
tailed_file.close()
self.observer.stop()
self.observer.join()


class FileWatchSensor(Sensor):
def __init__(self, sensor_service, config=None):
super(FileWatchSensor, self).__init__(sensor_service=sensor_service,
config=config)
self._trigger = None
self._logger = self._sensor_service.get_logger(__name__)
self._tail = None
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._stop = False
self.trigger = None
self.logger = self.sensor_service.get_logger(__name__)

def setup(self):
self._tail = Tail(filenames=[])
self._tail.handler = self._handle_line
self._tail.should_run = True
self.tail_manager = TailManager()

def run(self):
self._tail.run()
self.tail_manager.run()
while not self._stop:
eventlet.sleep(60)

def cleanup(self):
if self._tail:
self._tail.should_run = False

try:
self._tail.notifier.stop()
except Exception:
pass
self._stop = True
self.tail_manager.stop()

def add_trigger(self, trigger):
file_path = trigger['parameters'].get('file_path', None)

if not file_path:
self._logger.error('Received trigger type without "file_path" field.')
self.logger.error('Received trigger type without "file_path" field.')
return

self._trigger = trigger.get('ref', None)
self.trigger = trigger.get('ref', None)

if not self._trigger:
if not self.trigger:
raise Exception('Trigger %s did not contain a ref.' % trigger)

# Wait a bit to avoid initialization race in logshipper library
eventlet.sleep(1.0)

self._tail.add_file(filename=file_path)
self._logger.info('Added file "%s"' % (file_path))
self.tail_manager.tail_file(file_path, self._handle_line)
self.logger.info('Added file "%s"' % (file_path))

def update_trigger(self, trigger):
pass
Expand All @@ -72,21 +234,31 @@ def remove_trigger(self, trigger):
file_path = trigger['parameters'].get('file_path', None)

if not file_path:
self._logger.error('Received trigger type without "file_path" field.')
self.logger.error('Received trigger type without "file_path" field.')
return

self._tail.remove_file(filename=file_path)
self._trigger = None
self.tail_manager.stop_tailing_file(file_path, self._handle_line)
self.trigger = None

self._logger.info('Removed file "%s"' % (file_path))
self.logger.info('Removed file "%s"' % (file_path))

def _handle_line(self, file_path, line):
trigger = self._trigger
payload = {
'file_path': file_path,
'file_name': os.path.basename(file_path),
'line': line
}
self._logger.debug('Sending payload %s for trigger %s to sensor_service.',
payload, trigger)
self.sensor_service.dispatch(trigger=trigger, payload=payload)
self.logger.debug('Sending payload %s for trigger %s to sensor_service.',
payload, self.trigger)
self.sensor_service.dispatch(trigger=self.trigger, payload=payload)


if __name__ == '__main__':
tm = TailManager()
tm.tail_file('test.py', handler=print)
tm.run()

def halt(sig, frame):
tm.stop()
sys.exit(0)
signal.signal(signal.SIGINT, halt)
1 change: 0 additions & 1 deletion fixed-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ paramiko==2.7.1
passlib==1.7.1
prance==0.9.0
prompt-toolkit==1.0.15
pyinotify==0.9.6; platform_system=="Linux"
pymongo==3.10.0
python-editor==1.0.4
python-gnupg==0.4.5
Expand Down
3 changes: 1 addition & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ cryptography==3.2
dnspython<2.0.0,>=1.16.0
eventlet==0.25.1
flex==6.14.0
git+https://github.com/StackStorm/logshipper.git@stackstorm_patched#egg=logshipper
git+https://github.com/StackStorm/[email protected]#egg=orquesta
git+https://github.com/StackStorm/st2-auth-backend-flat-file.git@master#egg=st2-auth-backend-flat-file
git+https://github.com/StackStorm/st2-auth-ldap.git@master#egg=st2-auth-ldap
Expand Down Expand Up @@ -43,7 +42,6 @@ passlib==1.7.1
prettytable
prompt-toolkit==1.0.15
psutil==5.6.6
pyinotify==0.9.6 ; platform_system == "Linux"
pymongo==3.10.0
pyrabbit
python-dateutil==2.8.0
Expand All @@ -66,6 +64,7 @@ stevedore==1.30.1
tooz==1.66.1
ujson==1.35
unittest2
watchdog
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably pin it infixed-requirements.txt, right?

webob==1.8.5
webtest
zake==0.2.2
Expand Down
3 changes: 1 addition & 2 deletions st2actions/in-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ python-json-logger
gitpython
lockfile
# needed by core "linux" pack - TODO: create virtualenv for linux pack on postinst
pyinotify
git+https://github.com/StackStorm/logshipper.git@stackstorm_patched#egg=logshipper
watchdog
# required by pack_mgmt/setup_virtualenv.py#L135
virtualenv
# needed by requests
Expand Down
3 changes: 1 addition & 2 deletions st2actions/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,15 @@
apscheduler==3.6.3
chardet<3.1.0
eventlet==0.25.1
git+https://github.com/StackStorm/logshipper.git@stackstorm_patched#egg=logshipper
gitpython==2.1.15
jinja2==2.10.3
kombu==4.6.6
lockfile==0.12.2
oslo.config<1.13,>=1.12.1
oslo.utils<=3.37.0,>=3.36.2
pyinotify==0.9.6 ; platform_system == "Linux"
python-dateutil==2.8.0
python-json-logger
pyyaml==5.1.2
requests[security]==2.23.0
six==1.13.0
watchdog