flexget.manager
Covered: 436 lines
Missed: 36 lines
Skipped 143 lines
Percent: 92 %
  1
import os
  2
import sys
  3
import shutil
  4
import logging
  5
import yaml
  6
import atexit
  7
from datetime import datetime, timedelta
  8
import sqlalchemy
  9
from sqlalchemy.orm import sessionmaker
 10
from sqlalchemy.ext.declarative import declarative_base
 11
from sqlalchemy.pool import SingletonThreadPool
 12
from flexget.event import fire_event
 13
from flexget import validator
 15
log = logging.getLogger('manager')
 17
Base = declarative_base()
 18
Session = sessionmaker()
 19
manager = None
 20
DB_CLEANUP_INTERVAL = timedelta(days=7)
 23
_config_validator = validator.factory('dict')
 26
def register_config_key(key, validator, required=False):
 27
    """ Registers a valid root level key for the config.
 29
    :param string key:
 30
      Name of the root level key being registered.
 31
    :param validator:
 32
      Validator for the key.
 33
      Accepts: :class:`flexget.validator.Validator` instance, function returning
 34
      Validator instance, or validator type string.
 35
    :param bool required:
 36
      Specify whether this is a mandatory key.
 37
    """
 38
    _config_validator.accept(validator, key=key, required=required)
 41
def useExecLogging(func):
 43
    def wrapper(self, *args, **kw):
 45
        from flexget import logger
 46
        import time
 47
        logger.set_execution(str(time.time()))
 48
        try:
 49
            return func(self, *args, **kw)
 50
        finally:
 51
            logger.set_execution('')
 53
    return wrapper
 56
class Manager(object):
 58
    """Manager class for FlexGet
 60
    Fires events:
 62
    * manager.startup
 64
      After manager has been initialized. This is when application becomes ready to use
 66
    * manager.upgrade
 68
      Upgrade plugin database schemas etc
 70
    * manager.execute.started
 72
      When execute is about the be started, this happens before any feed phases occur
 73
      including on_process_start
 75
    * manager.execute.completed
 77
      After manager has executed all Feeds
 79
    * manager.shutdown
 81
      When the manager is exiting
 82
    """
 84
    unit_test = False
 85
    options = None
 87
    def __init__(self, options):
 88
        """
 89
        :param options: optparse parsed options object
 90
        """
 91
        global manager
 92
        assert not manager, 'Only one instance of Manager should be created at a time!'
 93
        manager = self
 94
        self.options = options
 95
        self.config_base = None
 96
        self.config_name = None
 97
        self.db_filename = None
 98
        self.engine = None
 99
        self.lockfile = None
100
        self.database_uri = None
102
        self.config = {}
103
        self.feeds = {}
105
        self.initialize()
108
        from flexget.utils.simple_persistence import SimplePersistence
109
        self.persist = SimplePersistence('manager')
111
        log.debug('sys.defaultencoding: %s' % sys.getdefaultencoding())
112
        log.debug('sys.getfilesystemencoding: %s' % sys.getfilesystemencoding())
113
        log.debug('os.path.supports_unicode_filenames: %s' % os.path.supports_unicode_filenames)
115
        fire_event('manager.upgrade', self)
116
        fire_event('manager.startup', self)
117
        self.db_cleanup()
119
    def __del__(self):
120
        global manager
121
        manager = None
123
    def initialize(self):
124
        """Separated from __init__ so that unit tests can modify options before loading config."""
125
        self.setup_yaml()
126
        self.find_config()
127
        self.acquire_lock()
128
        self.init_sqlalchemy()
129
        errors = self.validate_config()
130
        if errors:
131
            for error in errors:
132
                log.critical(error)
133
            return
134
        self.create_feeds()
136
    def setup_yaml(self):
137
        """ Set up the yaml loader to return unicode objects for strings by default
138
        """
140
        def construct_yaml_str(self, node):
143
            return self.construct_scalar(node)
144
        yaml.Loader.add_constructor(u'tag:yaml.org,2002:str', construct_yaml_str)
145
        yaml.SafeLoader.add_constructor(u'tag:yaml.org,2002:str', construct_yaml_str)
148
        def unicode_representer(dumper, uni):
149
            node = yaml.ScalarNode(tag=u'tag:yaml.org,2002:str', value=uni)
150
            return node
151
        yaml.add_representer(unicode, unicode_representer)
154
        def increase_indent_wrapper(func):
156
            def increase_indent(self, flow=False, indentless=False):
157
                func(self, flow, False)
158
            return increase_indent
160
        yaml.Dumper.increase_indent = increase_indent_wrapper(yaml.Dumper.increase_indent)
161
        yaml.SafeDumper.increase_indent = increase_indent_wrapper(yaml.SafeDumper.increase_indent)
163
    def find_config(self):
164
        """Find the configuration file and then call :meth:`.load_config` to load it"""
165
        startup_path = os.path.dirname(os.path.abspath(sys.path[0]))
166
        home_path = os.path.join(os.path.expanduser('~'), '.flexget')
167
        current_path = os.getcwd()
168
        exec_path = sys.path[0]
170
        config_path = os.path.dirname(self.options.config)
171
        path_given = config_path != ''
173
        possible = []
174
        if path_given:
176
            possible.append(self.options.config)
177
        else:
178
            log.debug('Figuring out config load paths')
180
            possible.append(startup_path)
181
            possible.append(home_path)
183
            from flexget import __version__ as version
184
            if version == '{subversion}':
185
                log.debug('Running subversion, adding virtualenv / sandbox paths')
186
                possible.append(os.path.join(exec_path, '..'))
187
                possible.append(current_path)
188
                possible.append(exec_path)
190
        for path in possible:
191
            config = os.path.join(path, self.options.config)
192
            if os.path.exists(config):
193
                log.debug('Found config: %s' % config)
194
                self.load_config(config)
195
                return
196
        log.info('Tried to read from: %s' % ', '.join(possible))
197
        raise IOError('Failed to find configuration file %s' % self.options.config)
199
    def load_config(self, config):
200
        """
201
        .. warning::
203
           Calls sys.exit(1) if configuration file could not be loaded.
204
           This is something we probably want to change.
206
        :param string config: Path to configuration file
207
        """
208
        if not self.options.quiet:
210
            self.pre_check_config(config)
211
        try:
212
            self.config = yaml.safe_load(file(config)) or {}
213
        except Exception, e:
214
            log.critical(e)
215
            print ''
216
            print '-' * 79
217
            print ' Malformed configuration file, common reasons:'
218
            print '-' * 79
219
            print ''
220
            print ' o Indentation error'
221
            print ' o Missing : from end of the line'
222
            print ' o Non ASCII characters (use UTF8)'
223
            print ' o If text contains any of :[]{}% characters it must be single-quoted (eg. value{1} should be \'value{1}\')\n'
227
            if hasattr(e, 'problem') and hasattr(e, 'context_mark') and hasattr(e, 'problem_mark'):
228
                lines = 0
229
                if e.problem is not None:
230
                    print ' Reason: %s\n' % e.problem
231
                    if e.problem == 'mapping values are not allowed here':
232
                        print ' ----> MOST LIKELY REASON: Missing : from end of the line!'
233
                        print ''
234
                if e.context_mark is not None:
235
                    print ' Check configuration near line %s, column %s' % (e.context_mark.line, e.context_mark.column)
236
                    lines += 1
237
                if e.problem_mark is not None:
238
                    print ' Check configuration near line %s, column %s' % (e.problem_mark.line, e.problem_mark.column)
239
                    lines += 1
240
                if lines:
241
                    print ''
242
                if lines == 1:
243
                    print ' Fault is almost always in this or previous line\n'
244
                if lines == 2:
245
                    print ' Fault is almost always in one of these lines or previous ones\n'
248
            if self.options.debug:
249
                raise
250
            sys.exit(1)
253
        self.config_name = os.path.splitext(os.path.basename(config))[0]
254
        self.config_base = os.path.normpath(os.path.dirname(config))
255
        self.lockfile = os.path.join(self.config_base, '.%s-lock' % self.config_name)
256
        log.debug('config_name: %s' % self.config_name)
257
        log.debug('config_base: %s' % self.config_base)
259
    def save_config(self):
260
        """Dumps current config to yaml config file"""
261
        config_file = file(os.path.join(self.config_base, self.config_name) + '.yml', 'w')
262
        try:
263
            config_file.write(yaml.dump(self.config, default_flow_style=False))
264
        finally:
265
            config_file.close()
267
    def pre_check_config(self, fn):
268
        """Checks configuration file for common mistakes that are easily detectable"""
270
        def get_indentation(line):
271
            i, n = 0, len(line)
272
            while i < n and line[i] == ' ':
273
                i += 1
274
            return i
276
        def isodd(n):
277
            return bool(n % 2)
279
        file = open(fn)
280
        line_num = 0
281
        duplicates = {}
283
        prev_indentation = 0
284
        prev_mapping = False
285
        prev_list = True
286
        prev_scalar = True
287
        list_open = False # multiline list with [
289
        for line in file:
290
            line_num += 1
292
            line = line.rstrip()
294
            if line.strip() == '':
295
                continue
297
            if line.strip().startswith('#'):
298
                continue
299
            indentation = get_indentation(line)
301
            if prev_scalar:
302
                if indentation <= prev_indentation:
303
                    prev_scalar = False
304
                else:
305
                    continue
307
            cur_list = line.strip().startswith('-')
310
            if list_open:
311
                if line.strip().endswith(']'):
312
                    list_open = False
314
                continue
315
            else:
316
                list_open = line.strip().endswith(': [') or line.strip().endswith(':[')
317
                if list_open:
319
                    continue
325
            if '\t' in line:
326
                log.warning('Line %s has tabs, use only spaces!' % line_num)
327
            if isodd(indentation):
328
                log.warning('Config line %s has odd (uneven) indentation' % line_num)
329
            if indentation > prev_indentation and not prev_mapping:
331
                log.warning('Config line %s is likely missing \':\' at the end' % (line_num - 1))
332
            if indentation > prev_indentation + 2 and prev_mapping and not prev_list:
334
                log.warning('Config line %s is indented too much' % line_num)
335
            if indentation <= prev_indentation + (2 * (not cur_list)) and prev_mapping and prev_list:
336
                log.warning('Config line %s is not indented enough' % line_num)
337
            if prev_mapping and cur_list:
339
                if indentation < prev_indentation or indentation > prev_indentation + 2 + (2 * prev_list):
340
                    log.warning('Config line %s containing list element is indented incorrectly' % line_num)
341
            elif prev_mapping and indentation <= prev_indentation:
343
                log.warning('Config line %s is indented incorrectly (previous line ends with \':\')' % line_num)
346
            for level in duplicates.iterkeys():
348
                if indentation < level:
349
                    duplicates[level] = {}
350
            if ':' in line:
351
                name = line.split(':', 1)[0].strip()
352
                ns = duplicates.setdefault(indentation, {})
353
                if name in ns:
354
                    log.warning('Trying to set value for `%s` in line %s, but it is already defined in line %s!' % (name, line_num, ns[name]))
355
                ns[name] = line_num
357
            prev_indentation = indentation
359
            prev_mapping = line[-1] == ':'
360
            prev_scalar = line[-1] in '|>'
362
            prev_list = line.strip()[0] == '-'
363
            if prev_list:
365
                duplicates[indentation] = {}
367
        file.close()
368
        log.debug('Pre-checked %s configuration lines' % line_num)
370
    def validate_config(self):
371
        """Check all root level keywords are valid."""
372
        _config_validator.validate(self.config)
373
        return  _config_validator.errors.messages
375
    def init_sqlalchemy(self):
376
        """Initialize SQLAlchemy"""
377
        try:
378
            if [int(part) for part in sqlalchemy.__version__.split('.')] < [0, 7, 0]:
379
                print >> sys.stderr, 'FATAL: SQLAlchemy 0.7.0 or newer required. Please upgrade your SQLAlchemy.'
380
                sys.exit(1)
381
        except ValueError, e:
382
            log.critical('Failed to check SQLAlchemy version, you may need to upgrade it')
385
        if self.database_uri is None:
386
            self.db_filename = os.path.join(self.config_base, 'db-%s.sqlite' % self.config_name)
387
            if self.options.test:
388
                db_test_filename = os.path.join(self.config_base, 'test-%s.sqlite' % self.config_name)
389
                log.info('Test mode, creating a copy from database ...')
390
                if os.path.exists(self.db_filename):
391
                    shutil.copy(self.db_filename, db_test_filename)
392
                self.db_filename = db_test_filename
393
                log.info('Test database created')
396
            filename = self.db_filename.replace('\\', '\\\\')
397
            self.database_uri = 'sqlite:///%s' % filename
400
        log.debug('Connecting to: %s' % self.database_uri)
401
        try:
402
            self.engine = sqlalchemy.create_engine(self.database_uri,
403
                                                   echo=self.options.debug_sql,
404
                                                   poolclass=SingletonThreadPool)
405
        except ImportError:
406
            print >> sys.stderr, ('FATAL: Unable to use SQLite. Are you running Python 2.5 - 2.7 ?\n'
407
            'Python should normally have SQLite support built in.\n'
408
            'If you\'re running correct version of Python then it is not equipped with SQLite.\n'
409
            'You can try installing `pysqlite`. If you have compiled python yourself, recompile it with SQLite support.')
410
            sys.exit(1)
411
        Session.configure(bind=self.engine)
413
        from sqlalchemy.exc import OperationalError
414
        try:
415
            if self.options.reset or self.options.del_db:
416
                Base.metadata.drop_all(bind=self.engine)
417
            Base.metadata.create_all(bind=self.engine)
418
        except OperationalError, e:
419
            if os.path.exists(self.db_filename):
420
                print >> sys.stderr, '%s - make sure you have write permissions to file %s' % (e.message, self.db_filename)
421
            else:
422
                print >> sys.stderr, '%s - make sure you have write permissions to directory %s' % (e.message, self.config_base)
423
            raise Exception(e.message)
425
    def check_lock(self):
426
        """Checks if there is already a lock, returns True if there is."""
427
        if os.path.exists(self.lockfile):
429
            lock_time = datetime.fromtimestamp(os.path.getmtime(self.lockfile))
430
            if (datetime.now() - lock_time).seconds > 36000:
431
                log.warning('Lock file over 10 hour in age, ignoring it ...')
432
            else:
433
                return True
434
        return False
436
    def acquire_lock(self):
437
        if self.options.log_start:
438
            log.info('FlexGet started (PID: %s)' % os.getpid())
441
        if self.check_lock():
442
            if not self.options.quiet:
443
                f = file(self.lockfile)
444
                pid = f.read()
445
                f.close()
446
                print >> sys.stderr, 'Another process (%s) is running, will exit.' % pid.strip()
447
                print >> sys.stderr, 'If you\'re sure there is no other instance running, delete %s' % self.lockfile
448
            sys.exit(1)
450
        f = file(self.lockfile, 'w')
451
        f.write('PID: %s\n' % os.getpid())
452
        f.close()
453
        atexit.register(self.release_lock)
455
    def release_lock(self):
456
        if self.options.log_start:
457
            log.info('FlexGet stopped (PID: %s)' % os.getpid())
458
        if os.path.exists(self.lockfile):
459
            os.remove(self.lockfile)
460
            log.debug('Removed %s' % self.lockfile)
461
        else:
462
            log.debug('Lockfile %s not found' % self.lockfile)
464
    def create_feeds(self):
465
        """Creates instances of all configured feeds"""
466
        from flexget.feed import Feed
468
        self.feeds = {}
471
        feeds = self.config.get('feeds', {}).keys()
472
        for name in feeds:
474
            feed = Feed(self, name, self.config['feeds'][name])
476
            if name.startswith('_'):
477
                feed.enabled = False
478
            self.feeds[name] = feed
480
    def disable_feeds(self):
481
        """Disables all feeds."""
482
        for feed in self.feeds.itervalues():
483
            feed.enabled = False
485
    def enable_feeds(self):
486
        """Enables all feeds."""
487
        for feed in self.feeds.itervalues():
488
            feed.enabled = True
490
    def process_start(self, feeds=None):
491
        """Execute process_start for feeds.
493
        :param list feeds: Optional list of :class:`~flexget.feed.Feed` instances, defaults to all.
494
        """
495
        if feeds is None:
496
            feeds = self.feeds.values()
498
        for feed in feeds:
499
            if not feed.enabled:
500
                continue
501
            try:
502
                log.trace('calling process_start on a feed %s' % feed.name)
503
                feed._process_start()
504
            except Exception, e:
505
                feed.enabled = False
506
                log.exception('Feed %s process_start: %s' % (feed.name, e))
508
    def process_end(self, feeds=None):
509
        """Execute process_end for all feeds.
511
        :param list feeds: Optional list of :class:`~flexget.feed.Feed` instances, defaults to all.
512
        """
513
        if feeds is None:
514
            feeds = self.feeds.values()
516
        for feed in feeds:
517
            if not feed.enabled:
518
                continue
519
            if feed._abort:
520
                continue
521
            try:
522
                log.trace('calling process_end on a feed %s' % feed.name)
523
                feed._process_end()
524
            except Exception, e:
525
                log.exception('Feed %s process_end: %s' % (feed.name, e))
527
    @useExecLogging
528
    def execute(self, feeds=None, disable_phases=None, entries=None):
529
        """
530
        Iterate trough feeds and run them. If --learn is used download and output
531
        phases are disabled.
533
        :param list feeds: Optional list of feed names to run, all feeds otherwise.
534
        :param list disable_phases: Optional list of phases to disabled
535
        :param list entries: Optional list of entries to pass into feed(s).
536
            This will also cause feed to disable input phase.
537
        """
539
        if feeds is None:
541
            run_feeds = self.feeds.values()
542
        else:
544
            run_feeds = []
545
            for feed in feeds:
546
                if isinstance(feed, basestring):
547
                    if feed in self.feeds:
548
                        run_feeds.append(self.feeds[feed])
549
                    else:
550
                        log.error('Feed `%s` does not exist.' % feed)
551
                else:
552
                    run_feeds.append(feed)
554
        if not run_feeds:
555
            log.warning('There are no feeds to execute, please add some feeds')
556
            return
558
        disable_phases = disable_phases or []
560
        if self.options.learn:
561
            log.info('Disabling download and output phases because of %s' %
562
                     ('--reset' if self.options.reset else '--learn'))
563
            disable_phases.extend(['download', 'output'])
565
        fire_event('manager.execute.started', self)
566
        self.process_start(feeds=run_feeds)
568
        for feed in sorted(run_feeds):
569
            if not feed.enabled or feed._abort:
570
                continue
571
            try:
572
                feed.execute(disable_phases=disable_phases, entries=entries)
573
            except Exception, e:
574
                feed.enabled = False
575
                log.exception('Feed %s: %s' % (feed.name, e))
576
            except KeyboardInterrupt:
578
                if self.options.debug:
579
                    raise
580
                print '**** Keyboard Interrupt ****'
581
                return
583
        self.process_end(feeds=run_feeds)
584
        fire_event('manager.execute.completed', self)
586
    def db_cleanup(self):
587
        """ Perform database cleanup if cleanup interval has been met.
588
        """
589
        if (self.options.db_cleanup or not self.persist.get('last_cleanup') or
590
            self.persist['last_cleanup'] < datetime.now() - DB_CLEANUP_INTERVAL):
591
            log.info('Running database cleanup.')
592
            session = Session()
593
            fire_event('manager.db_cleanup', session)
594
            session.commit()
595
            session.close()
596
            self.persist['last_cleanup'] = datetime.now()
597
        else:
598
            log.debug('Not running db cleanup, last run %s' % self.persist.get('last_cleanup'))
600
    def shutdown(self):
601
        """ Application is being exited
602
        """
603
        fire_event('manager.shutdown', self)
604
        if not self.unit_test: # don't scroll "nosetests" summary results when logging is enabled
605
            log.debug('Shutting down')
606
        self.engine.dispose()
608
        if self.options.test:
609
            if not 'test' in self.db_filename:
610
                raise Exception('trying to delete non test database?')
611
            os.remove(self.db_filename)
612
            log.info('Removed test database')
613
        if not self.unit_test: # don't scroll "nosetests" summary results when logging is enabled
614
            log.debug('Shutdown completed')