flexget.feed
Covered: 401 lines
Missed: 0 lines
Skipped 138 lines
Percent: 100 %
  1
import logging
  2
import copy
  3
import hashlib
  4
from flexget import validator
  5
from flexget.manager import Session, register_config_key
  6
from flexget.plugin import get_plugins_by_phase, get_plugin_by_name, \
  7
    feed_phases, PluginWarning, PluginError, DependencyError, plugins as all_plugins
  8
from flexget.utils.simple_persistence import SimpleFeedPersistence
  9
import flexget.utils.requests as requests
 10
from flexget.event import fire_event
 11
from flexget.entry import Entry, EntryUnicodeError
 12
from functools import wraps
 14
log = logging.getLogger('feed')
 17
def useFeedLogging(func):
 19
    @wraps(func)
 20
    def wrapper(self, *args, **kw):
 22
        from flexget import logger
 23
        logger.set_feed(self.name)
 24
        try:
 25
            return func(self, *args, **kw)
 26
        finally:
 27
            logger.set_feed('')
 29
    return wrapper
 32
class Feed(object):
 34
    """
 35
    Represents one feed in the configuration.
 37
    **Fires events:**
 39
    * feed.execute.before_plugin
 41
      Before a plugin is about to be executed. Note that since this will also include all
 42
      builtin plugins the amount of calls can be quite high
 44
      ``parameters: feed, keyword``
 46
    * feed.execute.after_plugin
 48
      After a plugin has been executed.
 50
      ``parameters: feed, keyword``
 52
    * feed.execute.completed
 54
      After feed execution has been completed
 56
      ``parameters: feed``
 58
    """
 60
    max_reruns = 5
 62
    def __init__(self, manager, name, config):
 63
        """
 64
        :param Manager manager: Manager instance.
 65
        :param string name: Name of the feed.
 66
        :param dict config: Feed configuration.
 67
        """
 68
        self.name = unicode(name)
 69
        self.config = config
 70
        self.manager = manager
 73
        self.simple_persistence = SimpleFeedPersistence(self)
 76
        self._rerun_count = 0
 79
        self.config_modified = None
 82
        self._reset()
 84
    def _reset(self):
 85
        """Reset feed state"""
 86
        log.debug('resetting %s' % self.name)
 87
        self.enabled = True
 88
        self.session = None
 89
        self.priority = 65535
 91
        self.requests = requests.Session()
 94
        self.entries = []
 97
        self.accepted = [] # accepted entries, can still be rejected
 98
        self.rejected = [] # rejected entries
 99
        self.failed = []   # failed entries
101
        self.disabled_phases = []
104
        self._abort = False
106
        self._rerun = False
109
        self.current_phase = None
110
        self.current_plugin = None
112
    def __cmp__(self, other):
113
        return cmp(self.priority, other.priority)
115
    def __str__(self):
116
        return '<Feed(name=%s,aborted=%s)>' % (self.name, str(self._abort))
118
    @property
119
    def undecided(self):
120
        """Iterate over undecided entries"""
121
        return (entry for entry in self.entries if not entry in self.accepted and entry not in self.rejected)
123
    def purge(self):
124
        """
125
        Purges rejected and failed entries.
126
        Failed entries will be removed from :attr:`entries`, :attr:`accepted` and :attr:`rejected`
127
        Rejected entries will be removed from :attr:`entries` and :attr:`accepted`
128
        """
129
        self.__purge_failed()
130
        self.__purge_rejected()
132
    def __purge_failed(self):
133
        """Purge failed entries from feed."""
134
        self.__purge(self.failed, self.entries)
135
        self.__purge(self.failed, self.rejected)
136
        self.__purge(self.failed, self.accepted)
138
    def __purge_rejected(self):
139
        """Purge rejected entries from feed."""
140
        self.__purge(self.rejected, self.entries)
141
        self.__purge(self.rejected, self.accepted)
143
    def __purge(self, purge_what, purge_from):
144
        """Purge entries in list from feed.entries"""
146
        for entry in purge_what:
147
            if entry in purge_from:
148
                purge_from.remove(entry)
150
    def disable_phase(self, phase):
151
        """Disable ``phase`` from execution.
153
        All disabled phases are re-enabled by :meth:`Feed._reset()` after feed
154
        execution has been completed.
156
        :param string phase: Name of ``phase``
157
        :raises ValueError: *phase* could not be found.
158
        """
159
        if phase not in feed_phases:
160
            raise ValueError('%s is not a valid phase' % phase)
161
        if phase not in self.disabled_phases:
162
            log.debug('Disabling %s phase' % phase)
163
            self.disabled_phases.append(phase)
165
    def accept(self, entry, reason=None, **kwargs):
166
        """
167
        Accept *entry* immediately with optional but
168
        highly recommendable *reason*.
170
        :param Entry entry: To be aceppeted
171
        :param string reason: Optional reason
172
        :param kwargs: Optional kwargs will be passed to plugins hooking action
173
        """
174
        if not isinstance(entry, Entry):
175
            raise Exception('Trying to accept non entry, %r' % entry)
176
        if entry in self.rejected:
177
            log.debug('tried to accept rejected %r' % entry)
178
        if entry not in self.accepted and entry not in self.rejected:
179
            self.accepted.append(entry)
181
            self.__run_entry_phase('accept', entry, reason=reason, **kwargs)
183
    def reject(self, entry, reason=None, **kwargs):
184
        """
185
        Reject *entry* immediately and permanently with optional but
186
        highly recommendable *reason*.
188
        :param Entry entry: To be rejected
189
        :param string reason: Optional reason
190
        :param kwargs: Optional kwargs will be passed to plugins hooking action
191
        """
192
        if not isinstance(entry, Entry):
193
            raise Exception('Trying to reject non entry, %r' % entry)
195
        if entry.get('immortal'):
196
            reason_str = '(%s)' % reason if reason else ''
197
            log.info('Tried to reject immortal %s %s' % (entry['title'], reason_str))
198
            self.trace(entry, 'Tried to reject immortal %s' % reason_str)
199
            return
201
        if not entry in self.rejected:
202
            self.rejected.append(entry)
204
            self.__run_entry_phase('reject', entry, reason=reason, **kwargs)
206
    def fail(self, entry, reason=None, **kwargs):
207
        """
208
        Fails *entry* immediately with optional but
209
        highly recommendable *reason*.
211
        :param Entry entry: To be failed
212
        :param string reason: Optional reason
213
        :param kwargs: Optional kwargs will be passed to plugins hooking action
214
        """
215
        log.debug('Marking entry \'%s\' as failed' % entry['title'])
216
        if not entry in self.failed:
217
            self.failed.append(entry)
218
            log.error('Failed %s (%s)' % (entry['title'], reason))
220
            self.__run_entry_phase('fail', entry, reason=reason, **kwargs)
222
    def trace(self, entry, message):
223
        """Add tracing message to entry.
225
        .. note:: Not yet supported in any meaningful way
226
        """
227
        entry.trace.append((self.current_plugin, message))
229
    def abort(self, **kwargs):
230
        """Abort this feed execution, no more plugins will be executed after the current one exists."""
231
        if self._abort:
232
            return
233
        if not kwargs.get('silent', False):
234
            log.info('Aborting feed (plugin: %s)' % self.current_plugin)
235
        else:
236
            log.debug('Aborting feed (plugin: %s)' % self.current_plugin)
238
        self._abort = True
239
        self.__run_feed_phase('abort')
241
    def find_entry(self, category='entries', **values):
242
        """
243
        Find and return :class:`~flexget.entry.Entry` with given attributes from feed or None
245
        :param string category: entries, accepted, rejected or failed. Defaults to entries.
246
        :param values: Key values of entries to be searched
247
        :return: Entry or None
248
        """
249
        cat = getattr(self, category)
250
        if not isinstance(cat, list):
251
            raise TypeError('category must be a list')
252
        for entry in cat:
253
            match = 0
254
            for k, v in values.iteritems():
255
                if k in entry:
256
                    if entry.get(k) == v:
257
                        match += 1
258
            if match == len(values):
259
                return entry
260
        return None
262
    def plugins(self, phase=None):
263
        """Get currently enabled plugins.
265
        :param string phase:
266
          Optional, limits to plugins currently configured on given phase, sorted in phase order.
267
        :return:
268
          An iterator over configured :class:`flexget.plugin.PluginInfo` instances enabled on this feed.
269
        """
270
        if phase:
271
            plugins = sorted(get_plugins_by_phase(phase), key=lambda p: p.phase_handlers[phase], reverse=True)
272
        else:
273
            plugins = all_plugins.itervalues()
274
        return (p for p in plugins if p.name in self.config or p.builtin)
276
    def __run_feed_phase(self, phase):
277
        """Executes feed phase, ie. call all enabled plugins on the feed.
279
        Fires events:
281
        * feed.execute.before_plugin
282
        * feed.execute.after_plugin
284
        :param string phase: Name of the phase
285
        """
286
        if phase not in feed_phases + ['abort', 'process_start', 'process_end']:
287
            raise Exception('%s is not a valid feed phase' % phase)
289
        if phase in ['input', 'filter', 'output']:
290
            if not self.manager.unit_test:
292
                for p in self.plugins(phase):
293
                    if not p.builtin:
294
                        break
295
                else:
296
                    log.warning('Feed doesn\'t have any %s plugins, you should add (at least) one!' % phase)
298
        for plugin in self.plugins(phase):
300
            if phase in self.disabled_phases:
301
                return
303
            self.current_phase = phase
304
            self.current_plugin = plugin.name
306
            if plugin.api_ver == 1:
309
                args = (self,)
310
            else:
312
                args = (self, copy.copy(self.config.get(plugin.name)))
314
            try:
315
                fire_event('feed.execute.before_plugin', self, plugin.name)
316
                response = self.__run_plugin(plugin, phase, args)
317
                if phase == 'input' and response:
319
                    self.entries.extend(response)
321
                self.purge()
322
            finally:
323
                fire_event('feed.execute.after_plugin', self, plugin.name)
326
            if self._abort and phase != 'abort':
327
                return
329
    def __run_entry_phase(self, phase, entry, **kwargs):
331
        if phase not in ['accept', 'reject', 'fail']:
332
            raise Exception('Not a valid entry phase')
333
        phase_plugins = self.plugins(phase)
334
        for plugin in phase_plugins:
335
            self.__run_plugin(plugin, phase, (self, entry), kwargs)
337
    def __run_plugin(self, plugin, phase, args=None, kwargs=None):
338
        """
339
        Execute given plugins phase method, with supplied args and kwargs.
340
        If plugin throws unexpected exceptions :meth:`abort` will be called.
342
        :param PluginInfo plugin: Plugin to be executed
343
        :param string phase: Name of the phase to be executed
344
        :param args: Passed to the plugin
345
        :param kwargs: Passed to the plugin
346
        """
347
        keyword = plugin.name
348
        method = plugin.phase_handlers[phase]
349
        if args is None:
350
            args = []
351
        if kwargs is None:
352
            kwargs = {}
356
        try:
357
            return method(*args, **kwargs)
358
        except PluginWarning, warn:
360
            if warn.kwargs.get('log_once', False):
361
                from flexget.utils.log import log_once
362
                log_once(warn.value, warn.log)
363
            else:
364
                warn.log.warning(warn)
365
        except EntryUnicodeError, eue:
366
            log.critical('Plugin %s tried to create non-unicode compatible entry (key: %s, value: %r)' %
367
                (keyword, eue.key, eue.value))
368
            self.abort()
369
        except PluginError, err:
370
            err.log.critical(err)
371
            self.abort()
372
        except DependencyError, e:
373
            log.critical('Plugin `%s` cannot be used because dependency `%s` is missing.' %
374
                (keyword, e.missing))
375
            log.debug(e.message)
376
            self.abort()
377
        except Exception, e:
378
            log.exception('BUG: Unhandled error in plugin %s: %s' % (keyword, e))
379
            self.abort()
381
            if self.manager.unit_test:
382
                raise
384
    def rerun(self):
385
        """Immediately re-run the feed after execute has completed,
386
        feed can be re-run up to :attr:`.max_reruns` times."""
387
        self._rerun = True
388
        log.info('Plugin %s has requested feed to be ran again after execution has completed.' %
389
                 self.current_plugin)
391
    @useFeedLogging
392
    def execute(self, disable_phases=None, entries=None):
393
        """Executes the feed.
395
        :param list disable_phases: Disable given phases names during execution
396
        :param list entries: Entries to be used in execution instead
397
            of using the input. Disables input phase.
398
        """
400
        log.debug('executing %s' % self.name)
402
        self._reset()
404
        if disable_phases:
405
            map(self.disable_phase, disable_phases)
406
        if entries:
408
            self.disable_phase('input')
409
            self.entries.extend(entries)
412
        errors = self.validate()
413
        if self._abort: # todo: bad practice
414
            return
415
        if errors and self.manager.unit_test: # todo: bad practice
416
            raise Exception('configuration errors')
417
        if self.manager.options.validate:
418
            if not errors:
419
                log.info('Feed \'%s\' passed' % self.name)
420
            return
422
        log.debug('starting session')
423
        self.session = Session()
425
        try:
427
            for phase in feed_phases:
428
                if phase in self.disabled_phases:
430
                    for plugin in self.plugins(phase):
431
                        if plugin.name in self.config:
432
                            log.info('Plugin %s is not executed because %s phase is disabled' %
433
                                     (plugin.name, phase))
434
                    continue
437
                self.__run_feed_phase(phase)
441
                if self._abort:
442
                    return
444
            log.debug('committing session, abort=%s' % self._abort)
445
            self.session.commit()
446
            fire_event('feed.execute.completed', self)
447
        finally:
449
            self.session.close()
452
        if self._rerun:
453
            if self._rerun_count >= self.max_reruns:
454
                log.info('Feed has been rerunning already %s times, stopping for now' % self._rerun_count)
456
                self._rerun_count = 0
457
            else:
458
                log.info('Rerunning the feed in case better resolution can be achieved.')
459
                self._rerun_count += 1
460
                self.execute(disable_phases=disable_phases, entries=entries)
462
    def _process_start(self):
463
        """Execute process_start phase"""
464
        self.__run_feed_phase('process_start')
465
        config_hash = hashlib.md5(str(self.config.items())).hexdigest()
466
        if self.simple_persistence.get('feed_config_hash') != config_hash:
467
            self.config_modified = True
468
            self.simple_persistence['feed_config_hash'] = config_hash
469
        else:
470
            self.config_modified = False
472
    def _process_end(self):
473
        """Execute terminate phase for this feed"""
474
        if self.manager.options.validate:
475
            log.debug('No process_end phase with --check')
476
            return
477
        self.__run_feed_phase('process_end')
479
    def validate(self):
480
        """Called during feed execution. Validates config, prints errors and aborts feed if invalid."""
481
        errors = self.validate_config(self.config)
483
        if errors:
484
            log.critical('Feed \'%s\' has configuration errors:' % self.name)
485
            for error in errors:
486
                log.error(error)
488
            self.abort()
489
        return errors
491
    @staticmethod
492
    def validate_config(config):
493
        """Plugin configuration validation. Return list of error messages that were detected."""
494
        validate_errors = []
496
        if not isinstance(config, dict):
497
            validate_errors.append('Config is not a dictionary.')
498
            return validate_errors
500
        for keyword in config:
501
            if keyword.startswith('_'):
502
                continue
503
            try:
504
                plugin = get_plugin_by_name(keyword)
505
            except:
506
                validate_errors.append('Unknown plugin \'%s\'' % keyword)
507
                continue
508
            if hasattr(plugin.instance, 'validator'):
509
                try:
510
                    validator = plugin.instance.validator()
511
                except TypeError, e:
512
                    log.critical('Invalid validator method in plugin %s' % keyword)
513
                    log.exception(e)
514
                    continue
515
                if not validator.name == 'root':
517
                    validator = validator.add_root_parent()
518
                if not validator.validate(config[keyword]):
519
                    for msg in validator.errors.messages:
520
                        validate_errors.append('%s %s' % (keyword, msg))
521
            else:
522
                log.warning('Used plugin %s does not support validating. Please notify author!' % keyword)
524
        return validate_errors
527
def root_config_validator():
528
    """Returns a validator for the 'feeds' key of config."""
530
    valid_plugins = [p for p in all_plugins if hasattr(all_plugins[p].instance, 'validator')]
531
    root = validator.factory('dict')
532
    root.reject_keys(valid_plugins, message='plugins should go under a specific feed. '
533
        '(and feeds are not allowed to be named the same as any plugins)')
534
    root.accept_any_key('dict').accept_any_key('any')
535
    return root
538
register_config_key('feeds', root_config_validator)