- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- #
- # gPodder - A media aggregator and podcast client
- # Copyright (c) 2005-2014 Thomas Perl and the gPodder Team
- #
- # gPodder is free software; you can redistribute it and/or modify
- # it under the terms of the GNU General Public License as published by
- # the Free Software Foundation; either version 3 of the License, or
- # (at your option) any later version.
- #
- # gPodder is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # GNU General Public License for more details.
- #
- # You should have received a copy of the GNU General Public License
- # along with this program. If not, see <http://www.gnu.org/licenses/>.
- #
- # gpo - A better command-line interface to gPodder using the gPodder API
- # by Thomas Perl <thp@gpodder.org>; 2009-05-07
- """
- Usage: gpo [--verbose|-v] [COMMAND] [params...]
- - Subscription management -
- subscribe URL [TITLE] Subscribe to a new feed at URL (as TITLE)
- search QUERY Search the gpodder.net directory for QUERY
- toplist Show the gpodder.net top-subscribe podcasts
- import FILENAME|URL Subscribe to all podcasts in an OPML file
- export FILENAME Export all subscriptions to an OPML file
- rename URL TITLE Rename feed at URL to TITLE
- unsubscribe URL Unsubscribe from feed at URL
- enable URL Enable feed updates for the feed at URL
- disable URL Disable feed updates for the feed at URL
- info URL Show information about feed at URL
- list List all subscribed podcasts
- update [URL] Check for new episodes (all or only at URL)
- - Episode management -
- download [URL] Download new episodes (all or only from URL)
- pending [URL] List new episodes (all or only from URL)
- episodes [URL] List episodes (all or only from URL)
- - Configuration -
- set [key] [value] List one (all) keys or set to a new value
- - Other commands -
- youtube URL Resolve the YouTube URL to a download URL
- rewrite OLDURL NEWURL Change the feed URL of [OLDURL] to [NEWURL]
- webui [public] Start gPodder's Web UI server
- (public = listen on all network interfaces)
- pipe Start gPodder in pipe-based IPC server mode
- """
- from __future__ import print_function
- import sys
- import collections
- import os
- import re
- import inspect
- import functools
- try:
- import readline
- except ImportError:
- readline = None
- import shlex
- import pydoc
- import logging
- try:
- import termios
- import fcntl
- import struct
- except ImportError:
- termios = None
- fcntl = None
- struct = None
- # A poor man's argparse/getopt - but it works for our use case :)
- verbose = False
- for flag in ('-v', '--verbose'):
- if flag in sys.argv:
- sys.argv.remove(flag)
- verbose = True
- break
- gpodder_script = sys.argv[0]
- if os.path.islink(gpodder_script):
- gpodder_script = os.readlink(gpodder_script)
- gpodder_dir = os.path.join(os.path.dirname(gpodder_script), '..')
- # TODO: Read parent directory links as well (/bin -> /usr/bin, like on Fedora, see Bug #1618)
- # This would allow /usr/share/gpodder/ (not /share/gpodder/) to be found from /bin/gpodder
- prefix = os.path.abspath(os.path.normpath(gpodder_dir))
- src_dir = os.path.join(prefix, 'src')
- if os.path.exists(os.path.join(src_dir, 'gpodder', '__init__.py')):
- # Run gPodder from local source folder (not installed)
- sys.path.insert(0, src_dir)
- import gpodder
- _ = gpodder.gettext
- N_ = gpodder.ngettext
- gpodder.images_folder = os.path.join(prefix, 'share', 'gpodder', 'images')
- gpodder.prefix = prefix
- # This is the command-line UI variant
- gpodder.ui.cli = True
- # Platform detection (i.e. MeeGo 1.2 Harmattan, etc..)
- gpodder.detect_platform()
- have_ansi = sys.stdout.isatty() and not gpodder.ui.win32
- interactive_console = sys.stdin.isatty() and sys.stdout.isatty()
- is_single_command = False
- from gpodder import log
- log.setup(verbose)
- from gpodder import core
- from gpodder import download
- from gpodder import my
- from gpodder import opml
- from gpodder import util
- from gpodder import youtube
- from gpodder import model
- from gpodder.config import config_value_to_string
- def incolor(color_id, s):
- if have_ansi and cli._config.ui.cli.colors:
- return '\033[9%dm%s\033[0m' % (color_id, s)
- return s
- def safe_print(*args, **kwargs):
- def convert(arg):
- return unicode(util.convert_bytes(arg))
- ofile = kwargs.get('file', sys.stdout)
- output = u' '.join(map(convert, args))
- if ofile.encoding is None:
- output = util.sanitize_encoding(output)
- else:
- output = output.encode(ofile.encoding, 'replace')
- try:
- ofile.write(output)
- except Exception, e:
- print("""
- Please report this to http://bugs.gpodder.org/:
- args = %s
- map(convert, args) = %s
- Exception = %s
- """ % (repr(args), repr(map(convert, args)), e))
- ofile.write(kwargs.get('end', os.linesep))
- ofile.flush()
- # On Python 3 the encoding insanity is gone, so our safe_print()
- # function simply becomes the normal print() function. Good stuff!
- if sys.version_info >= (3,):
- safe_print = print
- # ANSI Colors: red = 1, green = 2, yellow = 3, blue = 4
- inred, ingreen, inyellow, inblue = (functools.partial(incolor, x)
- for x in range(1, 5))
- def FirstArgumentIsPodcastURL(function):
- """Decorator for functions that take a podcast URL as first arg"""
- setattr(function, '_first_arg_is_podcast', True)
- return function
- def get_terminal_size():
- if None in (termios, fcntl, struct):
- return (80, 24)
- s = struct.pack('HHHH', 0, 0, 0, 0)
- stdout = sys.stdout.fileno()
- x = fcntl.ioctl(stdout, termios.TIOCGWINSZ, s)
- rows, cols, xp, yp = struct.unpack('HHHH', x)
- return rows, cols
- class gPodderCli(object):
- COLUMNS = 80
- EXIT_COMMANDS = ('quit', 'exit', 'bye')
- def __init__(self):
- self.core = core.Core()
- self._db = self.core.db
- self._config = self.core.config
- self._model = self.core.model
- self._current_action = ''
- self._commands = dict((name.rstrip('_'), func)
- for name, func in inspect.getmembers(self)
- if inspect.ismethod(func) and not name.startswith('_'))
- self._prefixes, self._expansions = self._build_prefixes_expansions()
- self._prefixes.update({'?': 'help'})
- self._valid_commands = sorted(self._prefixes.values())
- gpodder.user_extensions.on_ui_initialized(self.core.model,
- self._extensions_podcast_update_cb,
- self._extensions_episode_download_cb)
- def _build_prefixes_expansions(self):
- prefixes = {}
- expansions = collections.defaultdict(list)
- names = sorted(self._commands.keys())
- names.extend(self.EXIT_COMMANDS)
- # Generator for all prefixes of a given string (longest first)
- # e.g. ['gpodder', 'gpodde', 'gpodd', 'gpod', 'gpo', 'gp', 'g']
- mkprefixes = lambda n: (n[:x] for x in xrange(len(n), 0, -1))
- # Return True if the given prefix is unique in "names"
- is_unique = lambda p: len([n for n in names if n.startswith(p)]) == 1
- for name in names:
- is_still_unique = True
- unique_expansion = None
- for prefix in mkprefixes(name):
- if is_unique(prefix):
- unique_expansion = '[%s]%s' % (prefix, name[len(prefix):])
- prefixes[prefix] = name
- continue
- if unique_expansion is not None:
- expansions[prefix].append(unique_expansion)
- continue
- return prefixes, expansions
- def _extensions_podcast_update_cb(self, podcast):
- self._info(_('Podcast update requested by extensions.'))
- self._update_podcast(podcast)
- def _extensions_episode_download_cb(self, episode):
- self._info(_('Episode download requested by extensions.'))
- self._download_episode(episode)
- def _start_action(self, msg, *args):
- line = util.convert_bytes(msg % args)
- if len(line) > self.COLUMNS-7:
- line = line[:self.COLUMNS-7-3] + '...'
- else:
- line = line + (' '*(self.COLUMNS-7-len(line)))
- self._current_action = line
- safe_print(self._current_action, end='')
- def _update_action(self, progress):
- if have_ansi:
- progress = '%3.0f%%' % (progress*100.,)
- result = '['+inblue(progress)+']'
- safe_print('\r' + self._current_action + result, end='')
- def _finish_action(self, success=True, skip=False):
- if skip:
- result = '['+inyellow('SKIP')+']'
- elif success:
- result = '['+ingreen('DONE')+']'
- else:
- result = '['+inred('FAIL')+']'
- if have_ansi:
- safe_print('\r' + self._current_action + result)
- else:
- safe_print(result)
- self._current_action = ''
- def _atexit(self):
- self.core.shutdown()
- # -------------------------------------------------------------------
- def import_(self, url):
- for channel in opml.Importer(url).items:
- self.subscribe(channel['url'], channel.get('title'))
- def export(self, filename):
- podcasts = self._model.get_podcasts()
- opml.Exporter(filename).write(podcasts)
- def get_podcast(self, url, create=False, check_only=False):
- """Get a specific podcast by URL
- Returns a podcast object for the URL or None if
- the podcast has not been subscribed to.
- """
- url = util.normalize_feed_url(url)
- if url is None:
- self._error(_('Invalid url: %s') % url)
- return None
- # Subscribe to new podcast
- if create:
- return self._model.load_podcast(url, create=True,
- max_episodes=self._config.max_episodes_per_feed)
- # Load existing podcast
- for podcast in self._model.get_podcasts():
- if podcast.url == url:
- return podcast
- if not check_only:
- self._error(_('You are not subscribed to %s.') % url)
- return None
- def subscribe(self, url, title=None):
- existing = self.get_podcast(url, check_only=True)
- if existing is not None:
- self._error(_('Already subscribed to %s.') % existing.url)
- return True
- try:
- podcast = self.get_podcast(url, create=True)
- if podcast is None:
- self._error(_('Cannot subscribe to %s.') % url)
- return True
- if title is not None:
- podcast.rename(title)
- podcast.save()
- except Exception, e:
- logger.warn('Cannot subscribe: %s', e, exc_info=True)
- if hasattr(e, 'strerror'):
- self._error(e.strerror)
- else:
- self._error(str(e))
- return True
- self._db.commit()
- self._info(_('Successfully added %s.' % url))
- return True
- def _print_config(self, search_for):
- for key in self._config.all_keys():
- if search_for is None or search_for.lower() in key.lower():
- value = config_value_to_string(self._config._lookup(key))
- safe_print(key, '=', value)
- def set(self, key=None, value=None):
- if value is None:
- self._print_config(key)
- return
- try:
- current_value = self._config._lookup(key)
- current_type = type(current_value)
- except KeyError:
- self._error(_('This configuration option does not exist.'))
- return
- if current_type == dict:
- self._error(_('Can only set leaf configuration nodes.'))
- return
- self._config.update_field(key, value)
- self.set(key)
- @FirstArgumentIsPodcastURL
- def rename(self, url, title):
- podcast = self.get_podcast(url)
- if podcast is not None:
- old_title = podcast.title
- podcast.rename(title)
- self._db.commit()
- self._info(_('Renamed %(old_title)s to %(new_title)s.') % {
- 'old_title': util.convert_bytes(old_title),
- 'new_title': util.convert_bytes(title),
- })
- return True
- @FirstArgumentIsPodcastURL
- def unsubscribe(self, url):
- podcast = self.get_podcast(url)
- if podcast is None:
- self._error(_('You are not subscribed to %s.') % url)
- else:
- podcast.delete()
- self._db.commit()
- self._error(_('Unsubscribed from %s.') % url)
- return True
- def is_episode_new(self, episode):
- return (episode.state == gpodder.STATE_NORMAL and episode.is_new)
- def _episodesList(self, podcast):
- def status_str(episode):
- # is new
- if self.is_episode_new(episode):
- return u' * '
- # is downloaded
- if (episode.state == gpodder.STATE_DOWNLOADED):
- return u' Γûë '
- # is deleted
- if (episode.state == gpodder.STATE_DELETED):
- return u' Γûæ '
- return u' '
- episodes = (u'%3d. %s %s' % (i+1, status_str(e), e.title)
- for i, e in enumerate(podcast.get_all_episodes()))
- return episodes
- @FirstArgumentIsPodcastURL
- def info(self, url):
- podcast = self.get_podcast(url)
- if podcast is None:
- self._error(_('You are not subscribed to %s.') % url)
- else:
- def feed_update_status_msg(podcast):
- if podcast.pause_subscription:
- return "disabled"
- return "enabled"
- title, url, status = podcast.title, podcast.url, \
- feed_update_status_msg(podcast)
- episodes = self._episodesList(podcast)
- episodes = u'\n '.join(episodes)
- self._pager(u"""
- Title: %(title)s
- URL: %(url)s
- Feed update is %(status)s
- Episodes:
- %(episodes)s
- """ % locals())
- return True
- @FirstArgumentIsPodcastURL
- def episodes(self, url=None):
- output = []
- for podcast in self._model.get_podcasts():
- podcast_printed = False
- if url is None or podcast.url == url:
- episodes = self._episodesList(podcast)
- episodes = u'\n '.join(episodes)
- output.append(u"""
- Episodes from %s:
- %s
- """ % (podcast.url, episodes))
- self._pager(u'\n'.join(output))
- return True
- def list(self):
- for podcast in self._model.get_podcasts():
- if not podcast.pause_subscription:
- safe_print('#', ingreen(podcast.title))
- else:
- safe_print('#', inred(podcast.title),
- '-', _('Updates disabled'))
- safe_print(podcast.url)
- return True
- def _update_podcast(self, podcast):
- self._start_action(' %s', podcast.title)
- try:
- podcast.update()
- self._finish_action()
- except Exception, e:
- self._finish_action(False)
- def _pending_message(self, count):
- return N_('%(count)d new episode', '%(count)d new episodes',
- count) % {'count': count}
- @FirstArgumentIsPodcastURL
- def update(self, url=None):
- count = 0
- safe_print(_('Checking for new episodes'))
- for podcast in self._model.get_podcasts():
- if url is not None and podcast.url != url:
- continue
- if not podcast.pause_subscription:
- self._update_podcast(podcast)
- count += sum(1 for e in podcast.get_all_episodes() if self.is_episode_new(e))
- else:
- self._start_action(_('Skipping %(podcast)s') % {
- 'podcast': podcast.title})
- self._finish_action(skip=True)
- util.delete_empty_folders(gpodder.downloads)
- safe_print(inblue(self._pending_message(count)))
- return True
- @FirstArgumentIsPodcastURL
- def pending(self, url=None):
- count = 0
- for podcast in self._model.get_podcasts():
- podcast_printed = False
- if url is None or podcast.url == url:
- for episode in podcast.get_all_episodes():
- if self.is_episode_new(episode):
- if not podcast_printed:
- safe_print('#', ingreen(podcast.title))
- podcast_printed = True
- safe_print(' ', episode.title)
- count += 1
- util.delete_empty_folders(gpodder.downloads)
- safe_print(inblue(self._pending_message(count)))
- return True
- def _download_episode(self, episode):
- self._start_action('Downloading %s', episode.title)
- task = download.DownloadTask(episode, self._config)
- task.add_progress_callback(self._update_action)
- task.status = download.DownloadTask.QUEUED
- task.run()
- self._finish_action()
- @FirstArgumentIsPodcastURL
- def download(self, url=None):
- episodes = []
- for podcast in self._model.get_podcasts():
- if url is None or podcast.url == url:
- for episode in podcast.get_all_episodes():
- if self.is_episode_new(episode):
- episodes.append(episode)
- if self._config.downloads.chronological_order:
- # download older episodes first
- episodes = list(model.Model.sort_episodes_by_pubdate(episodes))
- last_podcast = None
- for episode in episodes:
- if episode.channel != last_podcast:
- safe_print(inblue(episode.channel.title))
- last_podcast = episode.channel
- self._download_episode(episode)
- util.delete_empty_folders(gpodder.downloads)
- safe_print(len(episodes), 'episodes downloaded.')
- return True
- @FirstArgumentIsPodcastURL
- def disable(self, url):
- podcast = self.get_podcast(url)
- if podcast is None:
- self._error(_('You are not subscribed to %s.') % url)
- else:
- if not podcast.pause_subscription:
- podcast.pause_subscription = True
- podcast.save()
- self._db.commit()
- self._error(_('Disabling feed update from %s.') % url)
- return True
- @FirstArgumentIsPodcastURL
- def enable(self, url):
- podcast = self.get_podcast(url)
- if podcast is None:
- self._error(_('You are not subscribed to %s.') % url)
- else:
- if podcast.pause_subscription:
- podcast.pause_subscription = False
- podcast.save()
- self._db.commit()
- self._error(_('Enabling feed update from %s.') % url)
- return True
- def youtube(self, url):
- fmt_ids = youtube.get_fmt_ids(self._config.youtube)
- yurl = youtube.get_real_download_url(url, fmt_ids)
- safe_print(yurl)
- return True
- def webui(self, public=None):
- from gpodder import webui
- if public == 'public':
- # Warn the user that the web UI is listening on all network
- # interfaces, which could lead to problems.
- # Only use this on a trusted, private network!
- self._warn(_('Listening on ALL network interfaces.'))
- webui.main(only_localhost=False, core=self.core)
- else:
- webui.main(core=self.core)
- def pipe(self):
- from gpodder import pipe
- pipe.main(core=self.core)
- def search(self, *terms):
- query = ' '.join(terms)
- if not query:
- return
- directory = my.Directory()
- results = directory.search(query)
- self._show_directory_results(results)
- def toplist(self):
- directory = my.Directory()
- results = directory.toplist()
- self._show_directory_results(results, True)
- def _show_directory_results(self, results, multiple=False):
- if not results:
- self._error(_('No podcasts found.'))
- return
- if not interactive_console or is_single_command:
- safe_print('\n'.join(url for title, url in results))
- return
- def show_list():
- self._pager('\n'.join(u'%3d: %s\n %s' %
- (index+1, title, url if title != url else '')
- for index, (title, url) in enumerate(results)))
- show_list()
- msg = _('Enter index to subscribe, ? for list')
- while True:
- index = raw_input(msg + ': ')
- if not index:
- return
- if index == '?':
- show_list()
- continue
- try:
- index = int(index)
- except ValueError:
- self._error(_('Invalid value.'))
- continue
- if not (1 <= index <= len(results)):
- self._error(_('Invalid value.'))
- continue
- title, url = results[index-1]
- self._info(_('Adding %s...') % title)
- self.subscribe(url)
- if not multiple:
- break
- @FirstArgumentIsPodcastURL
- def rewrite(self, old_url, new_url):
- podcast = self.get_podcast(old_url)
- if podcast is None:
- self._error(_('You are not subscribed to %s.') % old_url)
- else:
- result = podcast.rewrite_url(new_url)
- if result is None:
- self._error(_('Invalid URL: %s') % new_url)
- else:
- new_url = result
- self._error(_('Changed URL from %(old_url)s to %(new_url)s.') %
- {
- 'old_url': old_url,
- 'new_url': new_url,
- })
- return True
- def help(self):
- safe_print(stylize(__doc__), file=sys.stderr, end='')
- return True
- # -------------------------------------------------------------------
- def _pager(self, output):
- if have_ansi:
- # Need two additional rows for command prompt
- rows_needed = len(output.splitlines()) + 2
- rows, cols = get_terminal_size()
- if rows_needed < rows:
- safe_print(output)
- else:
- pydoc.pager(util.sanitize_encoding(output))
- else:
- safe_print(output)
- def _shell(self):
- safe_print(os.linesep.join(x.strip() for x in ("""
- gPodder %(__version__)s "%(__relname__)s" (%(__date__)s) - %(__url__)s
- %(__copyright__)s
- License: %(__license__)s
- Entering interactive shell. Type 'help' for help.
- Press Ctrl+D (EOF) or type 'quit' to quit.
- """ % gpodder.__dict__).splitlines()))
- if readline is not None:
- readline.parse_and_bind('tab: complete')
- readline.set_completer(self._tab_completion)
- readline.set_completer_delims(' ')
- while True:
- try:
- line = raw_input('gpo> ')
- except EOFError:
- safe_print('')
- break
- except KeyboardInterrupt:
- safe_print('')
- continue
- if self._prefixes.get(line, line) in self.EXIT_COMMANDS:
- break
- try:
- args = shlex.split(line)
- except ValueError, value_error:
- self._error(_('Syntax error: %(error)s') %
- {'error': value_error})
- continue
- try:
- self._parse(args)
- except KeyboardInterrupt:
- self._error('Keyboard interrupt.')
- except EOFError:
- self._error('EOF.')
- self._atexit()
- def _error(self, *args):
- safe_print(inred(' '.join(args)), file=sys.stderr)
- # Warnings look like error messages for now
- _warn = _error
- def _info(self, *args):
- safe_print(*args)
- def _checkargs(self, func, command_line):
- args, varargs, keywords, defaults = inspect.getargspec(func)
- args.pop(0) # Remove "self" from args
- defaults = defaults or ()
- minarg, maxarg = len(args)-len(defaults), len(args)
- if len(command_line) < minarg or (len(command_line) > maxarg and \
- varargs is None):
- self._error('Wrong argument count for %s.' % func.__name__)
- return False
- return func(*command_line)
- def _tab_completion_podcast(self, text, count):
- """Tab completion for podcast URLs"""
- urls = [p.url for p in self._model.get_podcasts() if text in p.url]
- if count < len(urls):
- return urls[count]
- return None
- def _tab_completion(self, text, count):
- """Tab completion function for readline"""
- if readline is None:
- return None
- current_line = readline.get_line_buffer()
- if text == current_line:
- for name in self._valid_commands:
- if name.startswith(text):
- if count == 0:
- return name
- else:
- count -= 1
- else:
- args = current_line.split()
- command = args.pop(0)
- command_function = getattr(self, command, None)
- if not command_function:
- return None
- if getattr(command_function, '_first_arg_is_podcast', False):
- if not args or (len(args) == 1 and not current_line.endswith(' ')):
- return self._tab_completion_podcast(text, count)
- return None
- def _parse_single(self, command_line):
- try:
- result = self._parse(command_line)
- except KeyboardInterrupt:
- self._error('Keyboard interrupt.')
- result = -1
- self._atexit()
- return result
- def _parse(self, command_line):
- if not command_line:
- return False
- command = command_line.pop(0)
- # Resolve command aliases
- command = self._prefixes.get(command, command)
- if command in self._commands:
- func = self._commands[command]
- if inspect.ismethod(func):
- return self._checkargs(func, command_line)
- if command in self._expansions:
- safe_print(_('Ambiguous command. Did you mean..'))
- for cmd in self._expansions[command]:
- safe_print(' ', inblue(cmd))
- else:
- self._error(_('The requested function is not available.'))
- return False
- def stylize(s):
- s = re.sub(r' .{27}', lambda m: inblue(m.group(0)), s)
- s = re.sub(r' - .*', lambda m: ingreen(m.group(0)), s)
- return s
- if __name__ == '__main__':
- logger = logging.getLogger(__name__)
- cli = gPodderCli()
- args = sys.argv[1:]
- if args:
- is_single_command = True
- cli._parse_single(args)
- elif interactive_console:
- cli._shell()
- else:
- safe_print(__doc__, end='')