Skip to content
This repository has been archived by the owner on Oct 1, 2024. It is now read-only.

Python API

Mike Szymaniak edited this page Jul 25, 2020 · 1 revision

Synchronization using Python API

There are two API levels available: simple high level synchronize() function and allowing more control SyncController class. Example usage of both are presented. For more information see API documentation.

Higl level API

Single call to synchronize() function is enough to perform synchronization. This function will block until finish. Eventual required assets will be downloaded.

import subsync
import sys, logging

def main():
    if len(sys.argv) != 7:
        print('usage: {} SUB_PATH SUB_LANG REF_PATH REF_STREAM_NO REF_LANG OUT_PATH'.format(sys.argv[0]))
        return

    # show logs from synchronizer
    logging.basicConfig(level=logging.INFO)

    # configure synchronization task
    sub = { 'path': sys.argv[1], 'lang': sys.argv[2] }
    ref = { 'path': sys.argv[3], 'stream': int(sys.argv[4]), 'lang': sys.argv[5] }
    out = { 'path': sys.argv[6] }

    # start synchronization - this will block until synchronization finishes
    result, status = subsync.synchronize(sub, ref, out, onError=onError)

    # print results
    if result.success:
        print('# subtitles saved to {}'.format(result.path))
    else:
        print('# synchronization failed!')
    print('# STATUS:')
    print(status)

def onError(task, source, error):
    print('# error {}: {}'.format(source, error))

if __name__ == "__main__":
    main()

Fine-grained API

Using SyncController object user has more control over synchronization process and status monitoring. It is provided at the cost of higher complexity. User is expected to assure that all required assets are installed locally.

In this example:

  1. Asset list is synchronized with assets server.
  2. Required assets are downloaded.
  3. Required assets are updated (if there is update).
  4. Status callbacks are registered.
  5. Synchronization is run asynchronously (without blocking).
  6. Waiting for synchronization end.
  7. Print result.
from subsync import SyncController, SyncTask, SubFile, RefFile, OutputFile, assetManager
import sys, logging

class MySynchronizer(object):

    def downloadMissingAssets(self, task):
        # update assets list (#1)
        listUpdater = assetManager().getAssetListUpdater()
        if not listUpdater.isRunning() and not listUpdater.isUpdated():
            listUpdater.run()
            listUpdater.wait()

        # get required assets for task (#2)
        assets = assetManager().getAssetsForTasks([ task ])

        # will throw if required assets are missing locally and from server
        assets.validate()

        # download assets that are not installed locally (if any) (#3)
        self.downloadAssets(assets.missing())

        # update assets with newer version on server
        self.downloadAssets(assets.hasUpdate())

        # make sure we have everything installed locally
        assets.validate(localOnly=True)

    def downloadAssets(self, assets):
        for asset in assets:
            print('# downloading asset {}'.format(asset.getPrettyName()))
            downloader = asset.downloader()
            try:
                downloader.run()
                downloader.wait(reraise=True)
            except:
                # make sure to stop threads on exception
                downloader.terminate()
                raise

    def synchronize(self, task):
        controller = SyncController(self) # (#4)
        try:
            # optional configuration
            controller.configure(windowSize=10*60, minEffort=0.2)

            #start synchronization (#5)
            controller.synchronize(task, timeout=5.0)

            # block until synchronization finishes (#6)
            controller.wait()

        except:
            # make sure to stop threads on exception (e.g. when CTRL+C)
            controller.terminate()
            raise

    def onJobStart(self, task):
        print('# synchronization started: {}'.format(task))

    def onJobEnd(self, task, status, result): # (#7)
        if result.success:
            print('# subtitles saved to {}'.format(result.path))
        else:
            print('# synchronization failed!')
        print('# STATUS:')
        print(status)

    def onJobUpdate(self, task, status):
        print('# synchronization {}'.format(status))

    def onError(self, task, source, error):
        print('# error {}: {}'.format(source, error))


def main():
    if len(sys.argv) != 7:
        print('usage: {} SUB_PATH SUB_LANG REF_PATH REF_STREAM_NO REF_LANG OUT_PATH'.format(sys.argv[0]))
        return

    logging.basicConfig(level=logging.WARN)

    # configure synchronization task
    sub = SubFile(path=sys.argv[1], lang=sys.argv[2])
    ref = RefFile(path=sys.argv[3], stream=int(sys.argv[4]), lang=sys.argv[5])
    out = OutputFile(path=sys.argv[6])
    task = SyncTask(sub, ref, out)

    sync = MySynchronizer()
    sync.downloadMissingAssets(task)
    sync.synchronize(task)


if __name__ == "__main__":
    main()
Clone this wiki locally