4
from flexget import validator
5
from flexget.manager import Session, register_config_key
6
from flexget.plugin import get_plugins_by_phase, get_plugin_by_name, \
7
feed_phases, PluginWarning, PluginError, DependencyError, plugins as all_plugins
8
from flexget.utils.simple_persistence import SimpleFeedPersistence
9
import flexget.utils.requests as requests
10
from flexget.event import fire_event
11
from flexget.entry import Entry, EntryUnicodeError
12
from functools import wraps
14
log = logging.getLogger('feed')
17
def useFeedLogging(func):
20
def wrapper(self, *args, **kw):
21
# Set the feed name in the logger
22
from flexget import logger
23
logger.set_feed(self.name)
25
return func(self, *args, **kw)
35
Represents one feed in the configuration.
39
* feed.execute.before_plugin
41
Before a plugin is about to be executed. Note that since this will also include all
42
builtin plugins the amount of calls can be quite high
44
``parameters: feed, keyword``
46
* feed.execute.after_plugin
48
After a plugin has been executed.
50
``parameters: feed, keyword``
52
* feed.execute.completed
54
After feed execution has been completed
62
def __init__(self, manager, name, config):
64
:param Manager manager: Manager instance.
65
:param string name: Name of the feed.
66
:param dict config: Feed configuration.
68
self.name = unicode(name)
70
self.manager = manager
73
self.simple_persistence = SimpleFeedPersistence(self)
78
# This should not be used until after process_start, when it is evaluated
79
self.config_modified = None
81
# use reset to init variables when creating
85
"""Reset feed state"""
86
log.debug('resetting %s' % self.name)
91
self.requests = requests.Session()
93
# undecided entries in the feed (created by input)
96
# You should NOT change these arrays, use reject, accept and fail methods!
97
self.accepted = [] # accepted entries, can still be rejected
98
self.rejected = [] # rejected entries
99
self.failed = [] # failed entries
101
self.disabled_phases = []
103
# TODO: feed.abort() should be done by using exception? not a flag that has to be checked everywhere
109
self.current_phase = None
110
self.current_plugin = None
112
def __cmp__(self, other):
113
return cmp(self.priority, other.priority)
116
return '<Feed(name=%s,aborted=%s)>' % (self.name, str(self._abort))
120
"""Iterate over undecided entries"""
121
return (entry for entry in self.entries if not entry in self.accepted and entry not in self.rejected)
125
Purges rejected and failed entries.
126
Failed entries will be removed from :attr:`entries`, :attr:`accepted` and :attr:`rejected`
127
Rejected entries will be removed from :attr:`entries` and :attr:`accepted`
129
self.__purge_failed()
130
self.__purge_rejected()
132
def __purge_failed(self):
133
"""Purge failed entries from feed."""
134
self.__purge(self.failed, self.entries)
135
self.__purge(self.failed, self.rejected)
136
self.__purge(self.failed, self.accepted)
138
def __purge_rejected(self):
139
"""Purge rejected entries from feed."""
140
self.__purge(self.rejected, self.entries)
141
self.__purge(self.rejected, self.accepted)
143
def __purge(self, purge_what, purge_from):
144
"""Purge entries in list from feed.entries"""
145
# TODO: there is probably more efficient way to do this now that I got rid of __count
146
for entry in purge_what:
147
if entry in purge_from:
148
purge_from.remove(entry)
150
def disable_phase(self, phase):
151
"""Disable ``phase`` from execution.
153
All disabled phases are re-enabled by :meth:`Feed._reset()` after feed
154
execution has been completed.
156
:param string phase: Name of ``phase``
157
:raises ValueError: *phase* could not be found.
159
if phase not in feed_phases:
160
raise ValueError('%s is not a valid phase' % phase)
161
if phase not in self.disabled_phases:
162
log.debug('Disabling %s phase' % phase)
163
self.disabled_phases.append(phase)
165
def accept(self, entry, reason=None, **kwargs):
167
Accept *entry* immediately with optional but
168
highly recommendable *reason*.
170
:param Entry entry: To be aceppeted
171
:param string reason: Optional reason
172
:param kwargs: Optional kwargs will be passed to plugins hooking action
174
if not isinstance(entry, Entry):
175
raise Exception('Trying to accept non entry, %r' % entry)
176
if entry in self.rejected:
177
log.debug('tried to accept rejected %r' % entry)
178
if entry not in self.accepted and entry not in self.rejected:
179
self.accepted.append(entry)
180
# Run on_entry_accept phase
181
self.__run_entry_phase('accept', entry, reason=reason, **kwargs)
183
def reject(self, entry, reason=None, **kwargs):
185
Reject *entry* immediately and permanently with optional but
186
highly recommendable *reason*.
188
:param Entry entry: To be rejected
189
:param string reason: Optional reason
190
:param kwargs: Optional kwargs will be passed to plugins hooking action
192
if not isinstance(entry, Entry):
193
raise Exception('Trying to reject non entry, %r' % entry)
194
# ignore rejections on immortal entries
195
if entry.get('immortal'):
196
reason_str = '(%s)' % reason if reason else ''
197
log.info('Tried to reject immortal %s %s' % (entry['title'], reason_str))
198
self.trace(entry, 'Tried to reject immortal %s' % reason_str)
201
if not entry in self.rejected:
202
self.rejected.append(entry)
203
# Run on_entry_reject phase
204
self.__run_entry_phase('reject', entry, reason=reason, **kwargs)
206
def fail(self, entry, reason=None, **kwargs):
208
Fails *entry* immediately with optional but
209
highly recommendable *reason*.
211
:param Entry entry: To be failed
212
:param string reason: Optional reason
213
:param kwargs: Optional kwargs will be passed to plugins hooking action
215
log.debug('Marking entry \'%s\' as failed' % entry['title'])
216
if not entry in self.failed:
217
self.failed.append(entry)
218
log.error('Failed %s (%s)' % (entry['title'], reason))
219
# Run on_entry_fail phase
220
self.__run_entry_phase('fail', entry, reason=reason, **kwargs)
222
def trace(self, entry, message):
223
"""Add tracing message to entry.
225
.. note:: Not yet supported in any meaningful way
227
entry.trace.append((self.current_plugin, message))
229
def abort(self, **kwargs):
230
"""Abort this feed execution, no more plugins will be executed after the current one exists."""
233
if not kwargs.get('silent', False):
234
log.info('Aborting feed (plugin: %s)' % self.current_plugin)
236
log.debug('Aborting feed (plugin: %s)' % self.current_plugin)
237
# Run the abort phase before we set the _abort flag
239
self.__run_feed_phase('abort')
241
def find_entry(self, category='entries', **values):
243
Find and return :class:`~flexget.entry.Entry` with given attributes from feed or None
245
:param string category: entries, accepted, rejected or failed. Defaults to entries.
246
:param values: Key values of entries to be searched
247
:return: Entry or None
249
cat = getattr(self, category)
250
if not isinstance(cat, list):
251
raise TypeError('category must be a list')
254
for k, v in values.iteritems():
256
if entry.get(k) == v:
258
if match == len(values):
262
def plugins(self, phase=None):
263
"""Get currently enabled plugins.
266
Optional, limits to plugins currently configured on given phase, sorted in phase order.
268
An iterator over configured :class:`flexget.plugin.PluginInfo` instances enabled on this feed.
271
plugins = sorted(get_plugins_by_phase(phase), key=lambda p: p.phase_handlers[phase], reverse=True)
273
plugins = all_plugins.itervalues()
274
return (p for p in plugins if p.name in self.config or p.builtin)
276
def __run_feed_phase(self, phase):
277
"""Executes feed phase, ie. call all enabled plugins on the feed.
281
* feed.execute.before_plugin
282
* feed.execute.after_plugin
284
:param string phase: Name of the phase
286
if phase not in feed_phases + ['abort', 'process_start', 'process_end']:
287
raise Exception('%s is not a valid feed phase' % phase)
288
# warn if no inputs, filters or outputs in the feed
289
if phase in ['input', 'filter', 'output']:
290
if not self.manager.unit_test:
291
# Check that there is at least one manually configured plugin for these phases
292
for p in self.plugins(phase):
296
log.warning('Feed doesn\'t have any %s plugins, you should add (at least) one!' % phase)
298
for plugin in self.plugins(phase):
299
# Abort this phase if one of the plugins disables it
300
if phase in self.disabled_phases:
302
# store execute info, except during entry events
303
self.current_phase = phase
304
self.current_plugin = plugin.name
306
if plugin.api_ver == 1:
307
# backwards compatibility
308
# pass method only feed (old behaviour)
311
# pass method feed, copy of config (so plugin cannot modify it)
312
args = (self, copy.copy(self.config.get(plugin.name)))
315
fire_event('feed.execute.before_plugin', self, plugin.name)
316
response = self.__run_plugin(plugin, phase, args)
317
if phase == 'input' and response:
318
# add entries returned by input to self.entries
319
self.entries.extend(response)
320
# purge entries between plugins
323
fire_event('feed.execute.after_plugin', self, plugin.name)
325
# Make sure we abort if any plugin sets our abort flag
326
if self._abort and phase != 'abort':
329
def __run_entry_phase(self, phase, entry, **kwargs):
330
# TODO: entry events are not very elegant, refactor into real (new) events or something ...
331
if phase not in ['accept', 'reject', 'fail']:
332
raise Exception('Not a valid entry phase')
333
phase_plugins = self.plugins(phase)
334
for plugin in phase_plugins:
335
self.__run_plugin(plugin, phase, (self, entry), kwargs)
337
def __run_plugin(self, plugin, phase, args=None, kwargs=None):
339
Execute given plugins phase method, with supplied args and kwargs.
340
If plugin throws unexpected exceptions :meth:`abort` will be called.
342
:param PluginInfo plugin: Plugin to be executed
343
:param string phase: Name of the phase to be executed
344
:param args: Passed to the plugin
345
:param kwargs: Passed to the plugin
347
keyword = plugin.name
348
method = plugin.phase_handlers[phase]
354
# log.trace('Running %s method %s' % (keyword, method))
357
return method(*args, **kwargs)
358
except PluginWarning, warn:
359
# check if this warning should be logged only once (may keep repeating)
360
if warn.kwargs.get('log_once', False):
361
from flexget.utils.log import log_once
362
log_once(warn.value, warn.log)
364
warn.log.warning(warn)
365
except EntryUnicodeError, eue:
366
log.critical('Plugin %s tried to create non-unicode compatible entry (key: %s, value: %r)' %
367
(keyword, eue.key, eue.value))
369
except PluginError, err:
370
err.log.critical(err)
372
except DependencyError, e:
373
log.critical('Plugin `%s` cannot be used because dependency `%s` is missing.' %
374
(keyword, e.missing))
378
log.exception('BUG: Unhandled error in plugin %s: %s' % (keyword, e))
380
# don't handle plugin errors gracefully with unit test
381
if self.manager.unit_test:
385
"""Immediately re-run the feed after execute has completed,
386
feed can be re-run up to :attr:`.max_reruns` times."""
388
log.info('Plugin %s has requested feed to be ran again after execution has completed.' %
392
def execute(self, disable_phases=None, entries=None):
393
"""Executes the feed.
395
:param list disable_phases: Disable given phases names during execution
396
:param list entries: Entries to be used in execution instead
397
of using the input. Disables input phase.
400
log.debug('executing %s' % self.name)
403
# Handle keyword args
405
map(self.disable_phase, disable_phases)
407
# If entries are passed for this execution (eg. rerun), disable the input phase
408
self.disable_phase('input')
409
self.entries.extend(entries)
411
# validate configuration
412
errors = self.validate()
413
if self._abort: # todo: bad practice
415
if errors and self.manager.unit_test: # todo: bad practice
416
raise Exception('configuration errors')
417
if self.manager.options.validate:
419
log.info('Feed \'%s\' passed' % self.name)
422
log.debug('starting session')
423
self.session = Session()
427
for phase in feed_phases:
428
if phase in self.disabled_phases:
429
# log keywords not executed
430
for plugin in self.plugins(phase):
431
if plugin.name in self.config:
432
log.info('Plugin %s is not executed because %s phase is disabled' %
433
(plugin.name, phase))
436
# run all plugins with this phase
437
self.__run_feed_phase(phase)
439
# if abort flag has been set feed should be aborted now
440
# since this calls return rerun will not be done
444
log.debug('committing session, abort=%s' % self._abort)
445
self.session.commit()
446
fire_event('feed.execute.completed', self)
448
# this will cause database rollback on exception and feed.abort
453
if self._rerun_count >= self.max_reruns:
454
log.info('Feed has been rerunning already %s times, stopping for now' % self._rerun_count)
455
# reset the counter for future runs (necessary only with webui)
456
self._rerun_count = 0
458
log.info('Rerunning the feed in case better resolution can be achieved.')
459
self._rerun_count += 1
460
self.execute(disable_phases=disable_phases, entries=entries)
462
def _process_start(self):
463
"""Execute process_start phase"""
464
self.__run_feed_phase('process_start')
465
config_hash = hashlib.md5(str(self.config.items())).hexdigest()
466
if self.simple_persistence.get('feed_config_hash') != config_hash:
467
self.config_modified = True
468
self.simple_persistence['feed_config_hash'] = config_hash
470
self.config_modified = False
472
def _process_end(self):
473
"""Execute terminate phase for this feed"""
474
if self.manager.options.validate:
475
log.debug('No process_end phase with --check')
477
self.__run_feed_phase('process_end')
480
"""Called during feed execution. Validates config, prints errors and aborts feed if invalid."""
481
errors = self.validate_config(self.config)
482
# log errors and abort
484
log.critical('Feed \'%s\' has configuration errors:' % self.name)
487
# feed has errors, abort it
492
def validate_config(config):
493
"""Plugin configuration validation. Return list of error messages that were detected."""
495
# validate config is a dictionary
496
if not isinstance(config, dict):
497
validate_errors.append('Config is not a dictionary.')
498
return validate_errors
499
# validate all plugins
500
for keyword in config:
501
if keyword.startswith('_'):
504
plugin = get_plugin_by_name(keyword)
506
validate_errors.append('Unknown plugin \'%s\'' % keyword)
508
if hasattr(plugin.instance, 'validator'):
510
validator = plugin.instance.validator()
512
log.critical('Invalid validator method in plugin %s' % keyword)
515
if not validator.name == 'root':
516
# if validator is not root type, add root validator as it's parent
517
validator = validator.add_root_parent()
518
if not validator.validate(config[keyword]):
519
for msg in validator.errors.messages:
520
validate_errors.append('%s %s' % (keyword, msg))
522
log.warning('Used plugin %s does not support validating. Please notify author!' % keyword)
524
return validate_errors
527
def root_config_validator():
528
"""Returns a validator for the 'feeds' key of config."""
529
# TODO: better error messages
530
valid_plugins = [p for p in all_plugins if hasattr(all_plugins[p].instance, 'validator')]
531
root = validator.factory('dict')
532
root.reject_keys(valid_plugins, message='plugins should go under a specific feed. '
533
'(and feeds are not allowed to be named the same as any plugins)')
534
root.accept_any_key('dict').accept_any_key('any')
538
register_config_key('feeds', root_config_validator)