6
from flexget.utils.tools import replace_from_entry, make_valid_path
7
from flexget.plugin import register_plugin, PluginError, priority, get_plugin_by_name
9
log = logging.getLogger('deluge')
12
from twisted.python import log as twisted_log
13
from twisted.internet.main import installReactor
14
from twisted.internet.selectreactor import SelectReactor
16
class PausingReactor(SelectReactor):
17
"""A SelectReactor that can be paused and resumed."""
20
SelectReactor.__init__(self)
22
self._return_value = None
23
self._release_requested = False
24
self._mainLoopGen = None
26
def _mainLoopGenerator(self):
27
"""Generator that acts as mainLoop, but yields when requested."""
31
# Advance simulation time in delayed event
33
self.runUntilCurrent()
35
t = self.running and t2
38
if self._release_requested:
39
self._release_requested = False
41
yield self._return_value
43
twisted_log.msg("Unexpected error in main loop.")
46
twisted_log.msg('Main loop terminated.')
48
def run(self, installSignalHandlers=False):
49
"""Starts or resumes the reactor."""
51
self.startRunning(installSignalHandlers)
52
self._mainLoopGen = self._mainLoopGenerator()
55
return self._mainLoopGen.next()
59
def pause(self, return_value=None):
60
"""Causes reactor to pause after this iteration.
61
If :return_value: is specified, it will be returned by the reactor.run call."""
62
self._return_value = return_value
63
self._release_requested = True
66
"""Stops the reactor."""
67
SelectReactor.stop(self)
68
# If this was called while the reactor was paused we have to resume in order for it to complete
72
# Configure twisted to use the PausingReactor.
73
installReactor(PausingReactor())
76
# If twisted is not found, errors will be shown later
80
class OutputDeluge(object):
83
Add the torrents directly to deluge, supporting custom save paths.
87
from flexget import validator
88
root = validator.factory()
89
root.accept('boolean')
90
deluge = root.accept('dict')
91
deluge.accept('text', key='host')
92
deluge.accept('integer', key='port')
93
deluge.accept('text', key='user')
94
deluge.accept('text', key='pass')
95
deluge.accept('path', key='path', allow_replacement=True)
96
deluge.accept('path', key='movedone', allow_replacement=True)
97
deluge.accept('text', key='label')
98
deluge.accept('boolean', key='queuetotop')
99
deluge.accept('boolean', key='automanaged')
100
deluge.accept('number', key='maxupspeed')
101
deluge.accept('number', key='maxdownspeed')
102
deluge.accept('integer', key='maxconnections')
103
deluge.accept('integer', key='maxupslots')
104
deluge.accept('number', key='ratio')
105
deluge.accept('boolean', key='removeatratio')
106
deluge.accept('boolean', key='addpaused')
107
deluge.accept('boolean', key='compact')
108
deluge.accept('text', key='content_filename')
109
deluge.accept('boolean', key='main_file_only')
110
deluge.accept('boolean', key='enabled')
113
def get_config(self, feed):
114
config = feed.config.get('deluge', {})
115
if isinstance(config, bool):
116
config = {'enabled': config}
117
config.setdefault('host', 'localhost')
118
config.setdefault('port', 58846)
119
config.setdefault('user', '')
120
config.setdefault('pass', '')
121
config.setdefault('enabled', True)
122
config.setdefault('path', '')
123
config.setdefault('movedone', '')
124
config.setdefault('label', '')
129
self.deluge_version = None
130
self.reactorRunning = 0
131
self.options = {'maxupspeed': 'max_upload_speed', 'maxdownspeed': 'max_download_speed', \
132
'maxconnections': 'max_connections', 'maxupslots': 'max_upload_slots', \
133
'automanaged': 'auto_managed', 'ratio': 'stop_ratio', 'removeatratio': 'remove_at_ratio', \
134
'addpaused': 'add_paused', 'compact': 'compact_allocation'}
137
def on_process_start(self, feed):
139
Register the usable set: keywords. Detect what version of deluge is loaded.
141
set_plugin = get_plugin_by_name('set')
142
set_plugin.instance.register_keys({'path': 'text', 'movedone': 'text', \
143
'queuetotop': 'boolean', 'label': 'text', 'automanaged': 'boolean', \
144
'maxupspeed': 'number', 'maxdownspeed': 'number', 'maxupslots': 'integer', \
145
'maxconnections': 'integer', 'ratio': 'number', 'removeatratio': 'boolean', \
146
'addpaused': 'boolean', 'compact': 'boolean', 'content_filename': 'text', 'main_file_only': 'boolean'})
147
if self.deluge12 is None:
148
logger = log.info if feed.manager.options.test else log.debug
150
log.debug('Testing for deluge 1.1 API')
151
from deluge.ui.client import sclient
152
log.debug('1.1 API found')
154
log.debug('Testing for deluge 1.2 API')
156
from deluge.ui.client import client
157
except ImportError, e:
158
raise PluginError('Deluge module and it\'s dependencies required. ImportError: %s' % e, log)
160
from twisted.internet import reactor
162
raise PluginError('Twisted module required', log)
163
logger('Using deluge 1.2 api')
166
logger('Using deluge 1.1 api')
167
self.deluge12 = False
170
def on_feed_download(self, feed):
172
call download plugin to generate the temp files we will load into deluge
173
then verify they are valid torrents
175
import deluge.ui.common
176
config = self.get_config(feed)
177
if not config['enabled']:
179
# If the download plugin is not enabled, we need to call it to get our temp .torrent files
180
if not 'download' in feed.config:
181
download = get_plugin_by_name('download')
182
download.instance.get_temp_files(feed, handle_magnets=True)
184
# Check torrent files are valid
185
for entry in feed.accepted:
186
if os.path.exists(entry.get('file', '')):
187
# Check if downloaded file is a valid torrent file
189
deluge.ui.common.TorrentInfo(entry['file'])
191
feed.fail(entry, 'Invalid torrent file')
192
log.error('Torrent file appears invalid for: %s', entry['title'])
195
def on_feed_output(self, feed):
196
"""Add torrents to deluge at exit."""
197
config = self.get_config(feed)
198
# don't add when learning
199
if feed.manager.options.learn:
201
if not config['enabled'] or not (feed.accepted or feed.manager.options.test):
204
add_to_deluge = self.add_to_deluge12 if self.deluge12 else self.add_to_deluge11
205
add_to_deluge(feed, config)
206
# Clean up temp file if download plugin is not configured for this feed
207
if not 'download' in feed.config:
208
for entry in feed.accepted + feed.failed:
209
if os.path.exists(entry.get('file', '')):
210
os.remove(entry['file'])
213
def add_to_deluge11(self, feed, config):
214
""" Add torrents to deluge using deluge 1.1.x api. """
216
from deluge.ui.client import sclient
218
raise PluginError('Deluge module required', log)
220
sclient.set_core_uri()
221
for entry in feed.accepted:
223
before = sclient.get_session_state()
224
except Exception, (errno, msg):
225
raise PluginError('Could not communicate with deluge core. %s' % msg, log)
226
if feed.manager.options.test:
229
path = entry.get('path', config['path'])
231
opts['download_location'] = os.path.expanduser(path % entry)
232
for fopt, dopt in self.options.iteritems():
233
value = entry.get(fopt, config.get(fopt))
234
if value is not None:
237
opts['stop_at_ratio'] = True
239
# check that file is downloaded
240
if not 'file' in entry:
241
feed.fail(entry, 'file missing?')
244
# see that temp file is present
245
if not os.path.exists(entry['file']):
246
tmp_path = os.path.join(feed.manager.config_base, 'temp')
247
log.debug('entry: %s' % entry)
248
log.debug('temp: %s' % ', '.join(os.listdir(tmp_path)))
249
feed.fail(entry, 'Downloaded temp file \'%s\' doesn\'t exist!?' % entry['file'])
252
sclient.add_torrent_file([entry['file']], [opts])
253
log.info('%s torrent added to deluge with options %s' % (entry['title'], opts))
255
movedone = entry.get('movedone', config['movedone'])
256
label = entry.get('label', config['label']).lower()
257
queuetotop = entry.get('queuetotop', config.get('queuetotop'))
259
# Sometimes deluge takes a moment to add the torrent, wait a second.
261
after = sclient.get_session_state()
263
# find torrentid of just added torrent
264
if not item in before:
265
movedone = replace_from_entry(movedone, entry, 'movedone', log.error)
266
movedone = os.path.expanduser(movedone)
268
if not os.path.isdir(movedone):
269
log.debug('movedone path %s doesn\'t exist, creating' % (movedone))
270
os.makedirs(movedone)
271
log.debug('%s move on complete set to %s' % (entry['title'], movedone))
272
sclient.set_torrent_move_on_completed(item, True)
273
sclient.set_torrent_move_on_completed_path(item, movedone)
275
if not 'label' in sclient.get_enabled_plugins():
276
sclient.enable_plugin('label')
277
if not label in sclient.label_get_labels():
278
sclient.label_add(label)
279
log.debug('%s label set to \'%s\'' % (entry['title'], label))
280
sclient.label_set_torrent(item, label)
282
log.debug('%s moved to top of queue' % entry['title'])
283
sclient.queue_top([item])
286
log.info('%s is already loaded in deluge. Cannot change label, movedone, or queuetotop' % entry['title'])
288
def add_to_deluge12(self, feed, config):
289
"""Adds torrents to Deluge using the 1.2+ api and our custom twisted PausingReactor"""
291
from deluge.ui.client import client
292
from twisted.internet import reactor, defer
294
def format_label(label):
295
"""Makes a string compliant with deluge label naming rules"""
296
return re.sub('[^\w-]+', '_', label.lower())
298
def on_connect_success(result, feed):
299
"""Gets called when successfully connected to a daemon."""
301
log.debug('on_connect_success returned a failed result. BUG?')
303
if feed.manager.options.test:
304
log.debug('Test connection to deluge daemon successful.')
308
def set_torrent_options(torrent_id, entry, opts):
309
"""Gets called when a torrent was added to the daemon."""
312
log.error('There was an error adding %s to deluge.' % entry['title'])
313
# TODO: Fail entry? How can this happen still now?
315
log.info('%s successfully added to deluge.' % entry['title'])
316
entry['deluge_id'] = torrent_id
318
def create_path(result, path):
319
"""Creates the specified path if deluge is older than 1.3"""
320
from deluge.common import VersionSplit
321
# Before 1.3, deluge would not create a non-existent move directory, so we need to.
322
if VersionSplit('1.3.0') > VersionSplit(self.deluge_version):
323
if client.is_localhost():
324
if not os.path.isdir(path):
325
log.debug('path %s doesn\'t exist, creating' % path)
328
log.warning('If path does not exist on the machine running the daemon, move will fail.')
331
dlist.append(version_deferred.addCallback(create_path, opts['path']))
332
log.debug('Moving storage for %s to %s' % (entry['title'], opts['path']))
333
dlist.append(client.core.move_storage([torrent_id], opts['path']))
335
dlist.append(version_deferred.addCallback(create_path, opts['movedone']))
336
dlist.append(client.core.set_torrent_move_completed(torrent_id, True))
337
dlist.append(client.core.set_torrent_move_completed_path(torrent_id, opts['movedone']))
338
log.debug('%s move on complete set to %s' % (entry['title'], opts['movedone']))
341
def apply_label(result, torrent_id, label):
342
"""Gets called after labels and torrent were added to deluge."""
343
return client.label.set_torrent(torrent_id, label)
345
dlist.append(label_deferred.addCallback(apply_label, torrent_id, opts['label']))
346
if 'queuetotop' in opts:
347
if opts['queuetotop']:
348
dlist.append(client.core.queue_top([torrent_id]))
349
log.debug('%s moved to top of queue' % entry['title'])
351
dlist.append(client.core.queue_bottom([torrent_id]))
352
log.debug('%s moved to bottom of queue' % entry['title'])
353
if opts.get('content_filename') or opts.get('main_file_only'):
355
def on_get_torrent_status(status):
356
"""Gets called with torrent status, including file info.
357
Loops through files and renames anything qualifies for content renaming."""
360
# Checks the download path as well as the move completed path for existence of the file
361
if os.path.exists(os.path.join(status['save_path'], filename)):
363
elif status.get('move_on_completed') and status.get('move_on_completed_path'):
364
if os.path.exists(os.path.join(status['move_on_completed_path'], filename)):
370
for file in status['files']:
371
# Only rename file if it is > 90% of the content
372
if file['size'] > (status['total_size'] * 0.9):
373
if opts.get('content_filename'):
374
filename = opts['content_filename'] + os.path.splitext(file['path'])[1]
376
if client.is_localhost():
378
# Try appending a (#) suffix till a unique filename is found
379
filename = ''.join([opts['content_filename'], '(', str(counter), ')', os.path.splitext(file['path'])[1]])
382
log.debug('Cannot ensure content_filename is unique when adding to a remote deluge daemon.')
383
log.debug('File %s in %s renamed to %s' % (file['path'], entry['title'], filename))
384
main_file_dlist.append(client.core.rename_files(torrent_id, [(file['index'], filename)]))
385
if opts.get('main_file_only'):
386
file_priorities = [1 if f['index'] == file['index'] else 0 for f in status['files']]
387
main_file_dlist.append(client.core.set_torrent_file_priorities(torrent_id, file_priorities))
388
return defer.DeferredList(main_file_dlist)
390
log.warning('No files in %s are > 90%% of content size, no files renamed.' % entry['title'])
392
status_keys = ['files', 'total_size', 'save_path', 'move_on_completed_path', 'move_on_completed']
393
dlist.append(client.core.get_torrent_status(torrent_id, status_keys).addCallback(on_get_torrent_status))
395
return defer.DeferredList(dlist)
397
def on_fail(result, feed, entry):
398
"""Gets called when daemon reports a failure adding the torrent."""
399
log.info('%s was not added to deluge! %s' % (entry['title'], result))
400
feed.fail(entry, 'Could not be added to deluge')
402
# dlist is a list of deferreds that must complete before we exit
404
# loop through entries to get a list of labels to add
405
labels = set([format_label(entry['label']) for entry in feed.accepted if entry.get('label')])
406
if config.get('label'):
407
labels.add(format_label(config['label']))
408
label_deferred = defer.succeed(True)
410
# Make sure the label plugin is available and enabled, then add appropriate labels
412
def on_get_enabled_plugins(plugins):
413
"""Gets called with the list of enabled deluge plugins."""
415
def on_label_enabled(result):
416
""" This runs when we verify the label plugin is enabled. """
418
def on_get_labels(d_labels):
419
"""Gets available labels from deluge, and adds any new labels we need."""
422
if not label in d_labels:
423
log.debug('Adding the label %s to deluge' % label)
424
dlist.append(client.label.add(label))
425
return defer.DeferredList(dlist)
427
return client.label.get_labels().addCallback(on_get_labels)
429
if 'Label' in plugins:
430
return on_label_enabled(True)
432
# Label plugin isn't enabled, so we check if it's available and enable it.
434
def on_get_available_plugins(plugins):
435
"""Gets plugins available to deluge, enables Label plugin if available."""
436
if 'Label' in plugins:
437
log.debug('Enabling label plugin in deluge')
438
return client.core.enable_plugin('Label').addCallback(on_label_enabled)
440
log.error('Label plugin is not installed in deluge')
442
return client.core.get_available_plugins().addCallback(on_get_available_plugins)
444
label_deferred = client.core.get_enabled_plugins().addCallback(on_get_enabled_plugins)
445
dlist.append(label_deferred)
447
def on_get_daemon_info(ver):
448
"""Gets called with the daemon version info, stores it in self."""
449
log.debug('deluge version %s' % ver)
450
self.deluge_version = ver
452
version_deferred = client.daemon.info().addCallback(on_get_daemon_info)
453
dlist.append(version_deferred)
455
def on_get_session_state(torrent_ids):
456
"""Gets called with a list of torrent_ids loaded in the deluge session.
457
Adds new torrents and modifies the settings for ones already in the session."""
460
for entry in feed.accepted:
462
def add_entry(entry, opts):
463
magnet, filedump = None, None
464
"""Adds an entry to the deluge session"""
465
if entry.get('url', '').startswith('magnet:'):
466
magnet = entry['url']
468
if not os.path.exists(entry['file']):
469
feed.fail(entry, 'Downloaded temp file \'%s\' doesn\'t exist!' % entry['file'])
473
f = open(entry['file'], 'rb')
474
filedump = base64.encodestring(f.read())
478
log.debug('Adding %s to deluge.' % entry['title'])
480
return client.core.add_torrent_magnet(magnet, opts)
482
return client.core.add_torrent_file(entry['title'], filedump, opts)
484
# Generate deluge options dict for torrent add
485
path = replace_from_entry(entry.get('path', config['path']), entry, 'path', log.error)
488
add_opts['download_location'] = make_valid_path(os.path.expanduser(path))
489
for fopt, dopt in self.options.iteritems():
490
value = entry.get(fopt, config.get(fopt))
491
if value is not None:
492
add_opts[dopt] = value
494
add_opts['stop_at_ratio'] = True
495
# Make another set of options, that get set after the torrent has been added
496
content_filename = entry.get('content_filename', config.get('content_filename', ''))
497
movedone = replace_from_entry(entry.get('movedone', config['movedone']), entry, 'movedone', log.error)
498
modify_opts = {'movedone': make_valid_path(os.path.expanduser(movedone)),
499
'label': format_label(entry.get('label', config['label'])),
500
'queuetotop': entry.get('queuetotop', config.get('queuetotop')),
501
'content_filename': replace_from_entry(content_filename, entry, 'content_filename', log.error),
502
'main_file_only': entry.get('main_file_only', config.get('main_file_only', False))}
504
torrent_id = entry.get('deluge_id') or entry.get('torrent_info_hash')
505
torrent_id = torrent_id and torrent_id.lower()
506
if torrent_id in torrent_ids:
507
log.info('%s is already loaded in deluge, setting options' % entry['title'])
508
# Entry has a deluge id, verify the torrent is still in the deluge session and apply options
509
# Since this is already loaded in deluge, we may also need to change the path
510
modify_opts['path'] = add_opts.pop('download_location', None)
511
dlist.extend([set_torrent_options(torrent_id, entry, modify_opts),
512
client.core.set_torrent_options([torrent_id], add_opts)])
514
dlist.append(add_entry(entry, add_opts).addCallbacks(set_torrent_options, on_fail,
515
callbackArgs=(entry, modify_opts), errbackArgs=(feed, entry)))
516
return defer.DeferredList(dlist)
517
dlist.append(client.core.get_session_state().addCallback(on_get_session_state))
519
def on_complete(result):
520
"""Gets called when all of our tasks for deluge daemon are complete."""
522
tasks = defer.DeferredList(dlist).addBoth(on_complete)
524
def on_timeout(result):
525
"""Gets called if tasks have not completed in 30 seconds.
526
Should only happen when something goes wrong."""
527
log.error('Timed out while adding torrents to deluge.')
528
log.debug('dlist: %s' % result.resultList)
531
# Schedule a disconnect to happen in 30 seconds if FlexGet hangs while connected to Deluge
532
reactor.callLater(30, lambda: tasks.called or on_timeout(tasks))
534
def on_connect_fail(result, feed):
535
"""Gets called when connection to deluge daemon fails."""
536
log.debug('Connect to deluge daemon failed, result: %s' % result)
537
reactor.callLater(0, reactor.pause, PluginError('Could not connect to deluge daemon', log))
540
"""Gets called when we disconnect from the daemon."""
541
# pause the reactor, so flexget can continue
542
reactor.callLater(0, reactor.pause)
544
client.set_disconnect_callback(on_disconnect)
549
username=config['user'],
550
password=config['pass'])
552
d.addCallback(on_connect_success, feed).addErrback(on_connect_fail, feed)
553
result = reactor.run()
554
if isinstance(result, Exception):
555
# If an exception was returned from the reactor run, raise it here
558
def on_feed_exit(self, feed):
559
"""Make sure all temp files are cleaned up when feed exits"""
560
# If download plugin is enabled, it will handle cleanup.
561
if not 'download' in feed.config:
562
download = get_plugin_by_name('download')
563
download.instance.cleanup_temp_files(feed)
565
def on_feed_abort(self, feed):
566
"""Make sure all temp files are cleaned up when feed is aborted."""
567
# If download plugin is enabled, it will handle cleanup.
568
if not 'download' in feed.config:
569
download = get_plugin_by_name('download')
570
download.instance.cleanup_temp_files(feed)
571
# stop the reactor when we abort
572
self.on_process_end(feed)
574
def on_process_end(self, feed):
575
"""Shut down the twisted reactor after all feeds have run."""
577
from twisted.internet import reactor
578
if not reactor._stopped:
579
log.debug('Stopping twisted reactor.')
582
register_plugin(OutputDeluge, 'deluge')