flexget.plugins.output_deluge
Covered: 105 lines
Missed: 370 lines
Skipped 108 lines
Percent: 22 %
  1
import logging
  2
import time
  3
import os
  4
import base64
  5
import re
  6
from flexget.utils.tools import replace_from_entry, make_valid_path
  7
from flexget.plugin import register_plugin, PluginError, priority, get_plugin_by_name
  9
log = logging.getLogger('deluge')
 11
try:
 12
    from twisted.python import log as twisted_log
 13
    from twisted.internet.main import installReactor
 14
    from twisted.internet.selectreactor import SelectReactor
 16
    class PausingReactor(SelectReactor):
 17
        """A SelectReactor that can be paused and resumed."""
 19
        def __init__(self):
 20
            SelectReactor.__init__(self)
 21
            self.paused = False
 22
            self._return_value = None
 23
            self._release_requested = False
 24
            self._mainLoopGen = None
 26
        def _mainLoopGenerator(self):
 27
            """Generator that acts as mainLoop, but yields when requested."""
 28
            while self._started:
 29
                try:
 30
                    while self._started:
 33
                        self.runUntilCurrent()
 34
                        t2 = self.timeout()
 35
                        t = self.running and t2
 36
                        self.doIteration(t)
 38
                        if self._release_requested:
 39
                            self._release_requested = False
 40
                            self.paused = True
 41
                            yield self._return_value
 42
                except:
 43
                    twisted_log.msg("Unexpected error in main loop.")
 44
                    twisted_log.err()
 45
                else:
 46
                    twisted_log.msg('Main loop terminated.')
 48
        def run(self, installSignalHandlers=False):
 49
            """Starts or resumes the reactor."""
 50
            if not self._started:
 51
                self.startRunning(installSignalHandlers)
 52
                self._mainLoopGen = self._mainLoopGenerator()
 53
            try:
 54
                self.paused = False
 55
                return self._mainLoopGen.next()
 56
            except StopIteration:
 57
                pass
 59
        def pause(self, return_value=None):
 60
            """Causes reactor to pause after this iteration.
 61
            If :return_value: is specified, it will be returned by the reactor.run call."""
 62
            self._return_value = return_value
 63
            self._release_requested = True
 65
        def stop(self):
 66
            """Stops the reactor."""
 67
            SelectReactor.stop(self)
 69
            if self.paused:
 70
                self.run()
 73
    installReactor(PausingReactor())
 75
except ImportError:
 77
    pass
 80
class OutputDeluge(object):
 82
    """
 83
        Add the torrents directly to deluge, supporting custom save paths.
 84
    """
 86
    def validator(self):
 87
        from flexget import validator
 88
        root = validator.factory()
 89
        root.accept('boolean')
 90
        deluge = root.accept('dict')
 91
        deluge.accept('text', key='host')
 92
        deluge.accept('integer', key='port')
 93
        deluge.accept('text', key='user')
 94
        deluge.accept('text', key='pass')
 95
        deluge.accept('path', key='path', allow_replacement=True)
 96
        deluge.accept('path', key='movedone', allow_replacement=True)
 97
        deluge.accept('text', key='label')
 98
        deluge.accept('boolean', key='queuetotop')
 99
        deluge.accept('boolean', key='automanaged')
100
        deluge.accept('number', key='maxupspeed')
101
        deluge.accept('number', key='maxdownspeed')
102
        deluge.accept('integer', key='maxconnections')
103
        deluge.accept('integer', key='maxupslots')
104
        deluge.accept('number', key='ratio')
105
        deluge.accept('boolean', key='removeatratio')
106
        deluge.accept('boolean', key='addpaused')
107
        deluge.accept('boolean', key='compact')
108
        deluge.accept('text', key='content_filename')
109
        deluge.accept('boolean', key='main_file_only')
110
        deluge.accept('boolean', key='enabled')
111
        return root
113
    def get_config(self, feed):
114
        config = feed.config.get('deluge', {})
115
        if isinstance(config, bool):
116
            config = {'enabled': config}
117
        config.setdefault('host', 'localhost')
118
        config.setdefault('port', 58846)
119
        config.setdefault('user', '')
120
        config.setdefault('pass', '')
121
        config.setdefault('enabled', True)
122
        config.setdefault('path', '')
123
        config.setdefault('movedone', '')
124
        config.setdefault('label', '')
125
        return config
127
    def __init__(self):
128
        self.deluge12 = None
129
        self.deluge_version = None
130
        self.reactorRunning = 0
131
        self.options = {'maxupspeed': 'max_upload_speed', 'maxdownspeed': 'max_download_speed', \
132
            'maxconnections': 'max_connections', 'maxupslots': 'max_upload_slots', \
133
            'automanaged': 'auto_managed', 'ratio': 'stop_ratio', 'removeatratio': 'remove_at_ratio', \
134
            'addpaused': 'add_paused', 'compact': 'compact_allocation'}
136
    @priority(120)
137
    def on_process_start(self, feed):
138
        """
139
            Register the usable set: keywords. Detect what version of deluge is loaded.
140
        """
141
        set_plugin = get_plugin_by_name('set')
142
        set_plugin.instance.register_keys({'path': 'text', 'movedone': 'text', \
143
            'queuetotop': 'boolean', 'label': 'text', 'automanaged': 'boolean', \
144
            'maxupspeed': 'number', 'maxdownspeed': 'number', 'maxupslots': 'integer', \
145
            'maxconnections': 'integer', 'ratio': 'number', 'removeatratio': 'boolean', \
146
            'addpaused': 'boolean', 'compact': 'boolean', 'content_filename': 'text', 'main_file_only': 'boolean'})
147
        if self.deluge12 is None:
148
            logger = log.info if feed.manager.options.test else log.debug
149
            try:
150
                log.debug('Testing for deluge 1.1 API')
151
                from deluge.ui.client import sclient
152
                log.debug('1.1 API found')
153
            except:
154
                log.debug('Testing for deluge 1.2 API')
155
                try:
156
                    from deluge.ui.client import client
157
                except ImportError, e:
158
                    raise PluginError('Deluge module and it\'s dependencies required. ImportError: %s' % e, log)
159
                try:
160
                    from twisted.internet import reactor
161
                except:
162
                    raise PluginError('Twisted module required', log)
163
                logger('Using deluge 1.2 api')
164
                self.deluge12 = True
165
            else:
166
                logger('Using deluge 1.1 api')
167
                self.deluge12 = False
169
    @priority(120)
170
    def on_feed_download(self, feed):
171
        """
172
            call download plugin to generate the temp files we will load into deluge
173
            then verify they are valid torrents
174
        """
175
        import deluge.ui.common
176
        config = self.get_config(feed)
177
        if not config['enabled']:
178
            return
180
        if not 'download' in feed.config:
181
            download = get_plugin_by_name('download')
182
            download.instance.get_temp_files(feed, handle_magnets=True)
185
        for entry in feed.accepted:
186
            if os.path.exists(entry.get('file', '')):
188
                try:
189
                    deluge.ui.common.TorrentInfo(entry['file'])
190
                except Exception:
191
                    feed.fail(entry, 'Invalid torrent file')
192
                    log.error('Torrent file appears invalid for: %s', entry['title'])
194
    @priority(135)
195
    def on_feed_output(self, feed):
196
        """Add torrents to deluge at exit."""
197
        config = self.get_config(feed)
199
        if feed.manager.options.learn:
200
            return
201
        if not config['enabled'] or not (feed.accepted or feed.manager.options.test):
202
            return
204
        add_to_deluge = self.add_to_deluge12 if self.deluge12 else self.add_to_deluge11
205
        add_to_deluge(feed, config)
207
        if not 'download' in feed.config:
208
            for entry in feed.accepted + feed.failed:
209
                if os.path.exists(entry.get('file', '')):
210
                    os.remove(entry['file'])
211
                    del(entry['file'])
213
    def add_to_deluge11(self, feed, config):
214
        """ Add torrents to deluge using deluge 1.1.x api. """
215
        try:
216
            from deluge.ui.client import sclient
217
        except:
218
            raise PluginError('Deluge module required', log)
220
        sclient.set_core_uri()
221
        for entry in feed.accepted:
222
            try:
223
                before = sclient.get_session_state()
224
            except Exception, (errno, msg):
225
                raise PluginError('Could not communicate with deluge core. %s' % msg, log)
226
            if feed.manager.options.test:
227
                return
228
            opts = {}
229
            path = entry.get('path', config['path'])
230
            if path:
231
                opts['download_location'] = os.path.expanduser(path % entry)
232
            for fopt, dopt in self.options.iteritems():
233
                value = entry.get(fopt, config.get(fopt))
234
                if value is not None:
235
                    opts[dopt] = value
236
                    if fopt == 'ratio':
237
                        opts['stop_at_ratio'] = True
240
            if not 'file' in entry:
241
                feed.fail(entry, 'file missing?')
242
                continue
245
            if not os.path.exists(entry['file']):
246
                tmp_path = os.path.join(feed.manager.config_base, 'temp')
247
                log.debug('entry: %s' % entry)
248
                log.debug('temp: %s' % ', '.join(os.listdir(tmp_path)))
249
                feed.fail(entry, 'Downloaded temp file \'%s\' doesn\'t exist!?' % entry['file'])
250
                continue
252
            sclient.add_torrent_file([entry['file']], [opts])
253
            log.info('%s torrent added to deluge with options %s' % (entry['title'], opts))
255
            movedone = entry.get('movedone', config['movedone'])
256
            label = entry.get('label', config['label']).lower()
257
            queuetotop = entry.get('queuetotop', config.get('queuetotop'))
260
            time.sleep(2)
261
            after = sclient.get_session_state()
262
            for item in after:
264
                if not item in before:
265
                    movedone = replace_from_entry(movedone, entry, 'movedone', log.error)
266
                    movedone = os.path.expanduser(movedone)
267
                    if movedone:
268
                        if not os.path.isdir(movedone):
269
                            log.debug('movedone path %s doesn\'t exist, creating' % (movedone))
270
                            os.makedirs(movedone)
271
                        log.debug('%s move on complete set to %s' % (entry['title'], movedone))
272
                        sclient.set_torrent_move_on_completed(item, True)
273
                        sclient.set_torrent_move_on_completed_path(item, movedone)
274
                    if label:
275
                        if not 'label' in sclient.get_enabled_plugins():
276
                            sclient.enable_plugin('label')
277
                        if not label in sclient.label_get_labels():
278
                            sclient.label_add(label)
279
                        log.debug('%s label set to \'%s\'' % (entry['title'], label))
280
                        sclient.label_set_torrent(item, label)
281
                    if queuetotop:
282
                        log.debug('%s moved to top of queue' % entry['title'])
283
                        sclient.queue_top([item])
284
                    break
285
            else:
286
                log.info('%s is already loaded in deluge. Cannot change label, movedone, or queuetotop' % entry['title'])
288
    def add_to_deluge12(self, feed, config):
289
        """Adds torrents to Deluge using the 1.2+ api and our custom twisted PausingReactor"""
291
        from deluge.ui.client import client
292
        from twisted.internet import reactor, defer
294
        def format_label(label):
295
            """Makes a string compliant with deluge label naming rules"""
296
            return re.sub('[^\w-]+', '_', label.lower())
298
        def on_connect_success(result, feed):
299
            """Gets called when successfully connected to a daemon."""
300
            if not result:
301
                log.debug('on_connect_success returned a failed result. BUG?')
303
            if feed.manager.options.test:
304
                log.debug('Test connection to deluge daemon successful.')
305
                client.disconnect()
306
                return
308
            def set_torrent_options(torrent_id, entry, opts):
309
                """Gets called when a torrent was added to the daemon."""
310
                dlist = []
311
                if not torrent_id:
312
                    log.error('There was an error adding %s to deluge.' % entry['title'])
314
                    return
315
                log.info('%s successfully added to deluge.' % entry['title'])
316
                entry['deluge_id'] = torrent_id
318
                def create_path(result, path):
319
                    """Creates the specified path if deluge is older than 1.3"""
320
                    from deluge.common import VersionSplit
322
                    if VersionSplit('1.3.0') > VersionSplit(self.deluge_version):
323
                        if client.is_localhost():
324
                            if not os.path.isdir(path):
325
                                log.debug('path %s doesn\'t exist, creating' % path)
326
                                os.makedirs(path)
327
                        else:
328
                            log.warning('If path does not exist on the machine running the daemon, move will fail.')
330
                if opts.get('path'):
331
                    dlist.append(version_deferred.addCallback(create_path, opts['path']))
332
                    log.debug('Moving storage for %s to %s' % (entry['title'], opts['path']))
333
                    dlist.append(client.core.move_storage([torrent_id], opts['path']))
334
                if opts['movedone']:
335
                    dlist.append(version_deferred.addCallback(create_path, opts['movedone']))
336
                    dlist.append(client.core.set_torrent_move_completed(torrent_id, True))
337
                    dlist.append(client.core.set_torrent_move_completed_path(torrent_id, opts['movedone']))
338
                    log.debug('%s move on complete set to %s' % (entry['title'], opts['movedone']))
339
                if opts['label']:
341
                    def apply_label(result, torrent_id, label):
342
                        """Gets called after labels and torrent were added to deluge."""
343
                        return client.label.set_torrent(torrent_id, label)
345
                    dlist.append(label_deferred.addCallback(apply_label, torrent_id, opts['label']))
346
                if 'queuetotop' in opts:
347
                    if opts['queuetotop']:
348
                        dlist.append(client.core.queue_top([torrent_id]))
349
                        log.debug('%s moved to top of queue' % entry['title'])
350
                    else:
351
                        dlist.append(client.core.queue_bottom([torrent_id]))
352
                        log.debug('%s moved to bottom of queue' % entry['title'])
353
                if opts.get('content_filename') or opts.get('main_file_only'):
355
                    def on_get_torrent_status(status):
356
                        """Gets called with torrent status, including file info.
357
                        Loops through files and renames anything qualifies for content renaming."""
359
                        def file_exists():
361
                            if os.path.exists(os.path.join(status['save_path'], filename)):
362
                                return True
363
                            elif status.get('move_on_completed') and status.get('move_on_completed_path'):
364
                                if os.path.exists(os.path.join(status['move_on_completed_path'], filename)):
365
                                    return True
366
                            else:
367
                                return False
369
                        main_file_dlist = []
370
                        for file in status['files']:
372
                            if file['size'] > (status['total_size'] * 0.9):
373
                                if opts.get('content_filename'):
374
                                    filename = opts['content_filename'] + os.path.splitext(file['path'])[1]
375
                                    counter = 1
376
                                    if client.is_localhost():
377
                                        while file_exists():
379
                                            filename = ''.join([opts['content_filename'], '(', str(counter), ')', os.path.splitext(file['path'])[1]])
380
                                            counter += 1
381
                                    else:
382
                                        log.debug('Cannot ensure content_filename is unique when adding to a remote deluge daemon.')
383
                                    log.debug('File %s in %s renamed to %s' % (file['path'], entry['title'], filename))
384
                                    main_file_dlist.append(client.core.rename_files(torrent_id, [(file['index'], filename)]))
385
                                if opts.get('main_file_only'):
386
                                    file_priorities = [1 if f['index'] == file['index'] else 0 for f in status['files']]
387
                                    main_file_dlist.append(client.core.set_torrent_file_priorities(torrent_id, file_priorities))
388
                                return defer.DeferredList(main_file_dlist)
389
                        else:
390
                            log.warning('No files in %s are > 90%% of content size, no files renamed.' % entry['title'])
392
                    status_keys = ['files', 'total_size', 'save_path', 'move_on_completed_path', 'move_on_completed']
393
                    dlist.append(client.core.get_torrent_status(torrent_id, status_keys).addCallback(on_get_torrent_status))
395
                return defer.DeferredList(dlist)
397
            def on_fail(result, feed, entry):
398
                """Gets called when daemon reports a failure adding the torrent."""
399
                log.info('%s was not added to deluge! %s' % (entry['title'], result))
400
                feed.fail(entry, 'Could not be added to deluge')
403
            dlist = []
405
            labels = set([format_label(entry['label']) for entry in feed.accepted if entry.get('label')])
406
            if config.get('label'):
407
                labels.add(format_label(config['label']))
408
            label_deferred = defer.succeed(True)
409
            if labels:
412
                def on_get_enabled_plugins(plugins):
413
                    """Gets called with the list of enabled deluge plugins."""
415
                    def on_label_enabled(result):
416
                        """ This runs when we verify the label plugin is enabled. """
418
                        def on_get_labels(d_labels):
419
                            """Gets available labels from deluge, and adds any new labels we need."""
420
                            dlist = []
421
                            for label in labels:
422
                                if not label in d_labels:
423
                                    log.debug('Adding the label %s to deluge' % label)
424
                                    dlist.append(client.label.add(label))
425
                            return defer.DeferredList(dlist)
427
                        return client.label.get_labels().addCallback(on_get_labels)
429
                    if 'Label' in plugins:
430
                        return on_label_enabled(True)
431
                    else:
434
                        def on_get_available_plugins(plugins):
435
                            """Gets plugins available to deluge, enables Label plugin if available."""
436
                            if 'Label' in plugins:
437
                                log.debug('Enabling label plugin in deluge')
438
                                return client.core.enable_plugin('Label').addCallback(on_label_enabled)
439
                            else:
440
                                log.error('Label plugin is not installed in deluge')
442
                        return client.core.get_available_plugins().addCallback(on_get_available_plugins)
444
                label_deferred = client.core.get_enabled_plugins().addCallback(on_get_enabled_plugins)
445
                dlist.append(label_deferred)
447
            def on_get_daemon_info(ver):
448
                """Gets called with the daemon version info, stores it in self."""
449
                log.debug('deluge version %s' % ver)
450
                self.deluge_version = ver
452
            version_deferred = client.daemon.info().addCallback(on_get_daemon_info)
453
            dlist.append(version_deferred)
455
            def on_get_session_state(torrent_ids):
456
                """Gets called with a list of torrent_ids loaded in the deluge session.
457
                Adds new torrents and modifies the settings for ones already in the session."""
458
                dlist = []
460
                for entry in feed.accepted:
462
                    def add_entry(entry, opts):
463
                        magnet, filedump = None, None
464
                        """Adds an entry to the deluge session"""
465
                        if entry.get('url', '').startswith('magnet:'):
466
                            magnet = entry['url']
467
                        else:
468
                            if not os.path.exists(entry['file']):
469
                                feed.fail(entry, 'Downloaded temp file \'%s\' doesn\'t exist!' % entry['file'])
470
                                del(entry['file'])
471
                                return
472
                            try:
473
                                f = open(entry['file'], 'rb')
474
                                filedump = base64.encodestring(f.read())
475
                            finally:
476
                                f.close()
478
                        log.debug('Adding %s to deluge.' % entry['title'])
479
                        if magnet:
480
                            return client.core.add_torrent_magnet(magnet, opts)
481
                        else:
482
                            return client.core.add_torrent_file(entry['title'], filedump, opts)
485
                    path = replace_from_entry(entry.get('path', config['path']), entry, 'path', log.error)
486
                    add_opts = {}
487
                    if path:
488
                        add_opts['download_location'] = make_valid_path(os.path.expanduser(path))
489
                    for fopt, dopt in self.options.iteritems():
490
                        value = entry.get(fopt, config.get(fopt))
491
                        if value is not None:
492
                            add_opts[dopt] = value
493
                            if fopt == 'ratio':
494
                                add_opts['stop_at_ratio'] = True
496
                    content_filename = entry.get('content_filename', config.get('content_filename', ''))
497
                    movedone = replace_from_entry(entry.get('movedone', config['movedone']), entry, 'movedone', log.error)
498
                    modify_opts = {'movedone': make_valid_path(os.path.expanduser(movedone)),
499
                            'label': format_label(entry.get('label', config['label'])),
500
                            'queuetotop': entry.get('queuetotop', config.get('queuetotop')),
501
                            'content_filename': replace_from_entry(content_filename, entry, 'content_filename', log.error),
502
                            'main_file_only': entry.get('main_file_only', config.get('main_file_only', False))}
504
                    torrent_id = entry.get('deluge_id') or entry.get('torrent_info_hash')
505
                    torrent_id = torrent_id and torrent_id.lower()
506
                    if torrent_id in torrent_ids:
507
                        log.info('%s is already loaded in deluge, setting options' % entry['title'])
510
                        modify_opts['path'] = add_opts.pop('download_location', None)
511
                        dlist.extend([set_torrent_options(torrent_id, entry, modify_opts),
512
                                      client.core.set_torrent_options([torrent_id], add_opts)])
513
                    else:
514
                        dlist.append(add_entry(entry, add_opts).addCallbacks(set_torrent_options, on_fail,
515
                                callbackArgs=(entry, modify_opts), errbackArgs=(feed, entry)))
516
                return defer.DeferredList(dlist)
517
            dlist.append(client.core.get_session_state().addCallback(on_get_session_state))
519
            def on_complete(result):
520
                """Gets called when all of our tasks for deluge daemon are complete."""
521
                client.disconnect()
522
            tasks = defer.DeferredList(dlist).addBoth(on_complete)
524
            def on_timeout(result):
525
                """Gets called if tasks have not completed in 30 seconds.
526
                Should only happen when something goes wrong."""
527
                log.error('Timed out while adding torrents to deluge.')
528
                log.debug('dlist: %s' % result.resultList)
529
                client.disconnect()
532
            reactor.callLater(30, lambda: tasks.called or on_timeout(tasks))
534
        def on_connect_fail(result, feed):
535
            """Gets called when connection to deluge daemon fails."""
536
            log.debug('Connect to deluge daemon failed, result: %s' % result)
537
            reactor.callLater(0, reactor.pause, PluginError('Could not connect to deluge daemon', log))
539
        def on_disconnect():
540
            """Gets called when we disconnect from the daemon."""
542
            reactor.callLater(0, reactor.pause)
544
        client.set_disconnect_callback(on_disconnect)
546
        d = client.connect(
547
            host=config['host'],
548
            port=config['port'],
549
            username=config['user'],
550
            password=config['pass'])
552
        d.addCallback(on_connect_success, feed).addErrback(on_connect_fail, feed)
553
        result = reactor.run()
554
        if isinstance(result, Exception):
556
            raise result
558
    def on_feed_exit(self, feed):
559
        """Make sure all temp files are cleaned up when feed exits"""
561
        if not 'download' in feed.config:
562
            download = get_plugin_by_name('download')
563
            download.instance.cleanup_temp_files(feed)
565
    def on_feed_abort(self, feed):
566
        """Make sure all temp files are cleaned up when feed is aborted."""
568
        if not 'download' in feed.config:
569
            download = get_plugin_by_name('download')
570
            download.instance.cleanup_temp_files(feed)
572
        self.on_process_end(feed)
574
    def on_process_end(self, feed):
575
        """Shut down the twisted reactor after all feeds have run."""
576
        if self.deluge12:
577
            from twisted.internet import reactor
578
            if not reactor._stopped:
579
                log.debug('Stopping twisted reactor.')
580
                reactor.stop()
582
register_plugin(OutputDeluge, 'deluge')