vidformer

   1"""
   2vidformer-py is a Python 🐍 interface for [vidformer](https://github.com/ixlab/vidformer).
   3
   4**Quick links:**
   5* [📦 PyPI](https://pypi.org/project/vidformer/)
   6* [📘 Documentation - vidformer-py](https://ixlab.github.io/vidformer/vidformer-py/pdoc/)
   7* [📘 Documentation - vidformer.cv2](https://ixlab.github.io/vidformer/vidformer-py/pdoc/vidformer/cv2.html)
   8* [📘 Documentation - vidformer.supervision](https://ixlab.github.io/vidformer/vidformer-py/pdoc/vidformer/supervision.html)
   9* [🧑‍💻 Source Code](https://github.com/ixlab/vidformer/tree/main/vidformer-py/)
  10"""
  11
  12__version__ = "0.11.0"
  13
  14
  15import base64
  16import gzip
  17import json
  18import multiprocessing
  19import os
  20import random
  21import re
  22import socket
  23import struct
  24import subprocess
  25import threading
  26import time
  27import uuid
  28from fractions import Fraction
  29from urllib.parse import urlparse
  30
  31import msgpack
  32import numpy as np
  33import requests
  34
  35_in_notebook = False
  36try:
  37    from IPython import get_ipython
  38
  39    if "IPKernelApp" in get_ipython().config:
  40        _in_notebook = True
  41except Exception:
  42    pass
  43
  44
  45def _wait_for_url(url, max_attempts=150, delay=0.1):
  46    for attempt in range(max_attempts):
  47        try:
  48            response = requests.get(url)
  49            if response.status_code == 200:
  50                return response.text.strip()
  51            else:
  52                time.sleep(delay)
  53        except requests.exceptions.RequestException:
  54            time.sleep(delay)
  55    return None
  56
  57
  58def _play(namespace, hls_video_url, hls_js_url, method="html", status_url=None):
  59    # The namespace is so multiple videos in one tab don't conflict
  60
  61    if method == "html":
  62        from IPython.display import HTML
  63
  64        if not status_url:
  65            html_code = f"""
  66<!DOCTYPE html>
  67<html>
  68<head>
  69    <title>HLS Video Player</title>
  70    <!-- Include hls.js library -->
  71    <script src="{hls_js_url}"></script>
  72</head>
  73<body>
  74    <video id="video-{namespace}" controls width="640" height="360" autoplay></video>
  75    <script>
  76        var video = document.getElementById('video-{namespace}');
  77        var videoSrc = '{hls_video_url}';
  78
  79        if (Hls.isSupported()) {{
  80            var hls = new Hls();
  81            hls.loadSource(videoSrc);
  82            hls.attachMedia(video);
  83            hls.on(Hls.Events.MANIFEST_PARSED, function() {{
  84                video.play();
  85            }});
  86        }} else if (video.canPlayType('application/vnd.apple.mpegurl')) {{
  87            video.src = videoSrc;
  88            video.addEventListener('loadedmetadata', function() {{
  89                video.play();
  90            }});
  91        }} else {{
  92            console.error('This browser does not appear to support HLS.');
  93        }}
  94    </script>
  95</body>
  96</html>
  97"""
  98            return HTML(data=html_code)
  99        else:
 100            html_code = f"""
 101<!DOCTYPE html>
 102<html>
 103<head>
 104    <title>HLS Video Player</title>
 105    <script src="{hls_js_url}"></script>
 106</head>
 107<body>
 108    <div id="container-{namespace}"></div>
 109    <script>
 110        var statusUrl = '{status_url}';
 111        var videoSrc = '{hls_video_url}';
 112        var videoNamespace = '{namespace}';
 113
 114        function showWaiting() {{
 115            document.getElementById('container-{namespace}').textContent = 'Waiting...';
 116            pollStatus();
 117        }}
 118
 119        function pollStatus() {{
 120            setTimeout(function() {{
 121                fetch(statusUrl)
 122                    .then(r => r.json())
 123                    .then(res => {{
 124                        if (res.ready) {{
 125                            document.getElementById('container-{namespace}').textContent = '';
 126                            attachHls();
 127                        }} else {{
 128                            pollStatus();
 129                        }}
 130                    }})
 131                    .catch(e => {{
 132                        console.error(e);
 133                        pollStatus();
 134                    }});
 135            }}, 250);
 136        }}
 137
 138        function attachHls() {{
 139            var container = document.getElementById('container-{namespace}');
 140            container.textContent = '';
 141            var video = document.createElement('video');
 142            video.id = 'video-' + videoNamespace;
 143            video.controls = true;
 144            video.width = 640;
 145            video.height = 360;
 146            container.appendChild(video);
 147            if (Hls.isSupported()) {{
 148                var hls = new Hls();
 149                hls.loadSource(videoSrc);
 150                hls.attachMedia(video);
 151                hls.on(Hls.Events.MANIFEST_PARSED, function() {{
 152                    video.play();
 153                }});
 154            }} else if (video.canPlayType('application/vnd.apple.mpegurl')) {{
 155                video.src = videoSrc;
 156                video.addEventListener('loadedmetadata', function() {{
 157                    video.play();
 158                }});
 159            }}
 160        }}
 161
 162        fetch(statusUrl)
 163            .then(r => r.json())
 164            .then(res => {{
 165                if (res.ready) {{
 166                    attachHls();
 167                }} else {{
 168                    showWaiting();
 169                }}
 170            }})
 171            .catch(e => {{
 172                console.error(e);
 173                showWaiting();
 174            }});
 175    </script>
 176</body>
 177</html>
 178"""
 179        return HTML(data=html_code)
 180    elif method == "link":
 181        return hls_video_url
 182    else:
 183        raise ValueError("Invalid method")
 184
 185
 186def _feb_expr_coded_as_scalar(expr) -> bool:
 187    if type(expr) is tuple:
 188        expr = list(expr)
 189    if type(expr) is FilterExpr:
 190        return False
 191    if type(expr) is list:
 192        if len(expr) > 3:
 193            return False
 194        else:
 195            return all([type(x) is int and x >= -(2**15) and x < 2**15 for x in expr])
 196    else:
 197        assert type(expr) in [int, float, str, bytes, SourceExpr, bool, list]
 198        return True
 199
 200
 201class _FrameExpressionBlock:
 202    def __init__(self):
 203        self._functions = []
 204        self._literals = []
 205        self._sources = []
 206        self._kwarg_keys = []
 207        self._source_fracs = []
 208        self._exprs = []
 209        self._frame_exprs = []
 210
 211    def __len__(self):
 212        return len(self._frame_exprs)
 213
 214    def insert_expr(self, expr):
 215        if type(expr) is SourceExpr or type(expr) is FilterExpr:
 216            return self.insert_frame_expr(expr)
 217        else:
 218            return self.insert_data_expr(expr)
 219
 220    def insert_data_expr(self, data):
 221        if type(data) is tuple:
 222            data = list(data)
 223        if type(data) is bool:
 224            self._exprs.append(0x01000000_00000000 | int(data))
 225            return len(self._exprs) - 1
 226        elif type(data) is int:
 227            if data >= -(2**31) and data < 2**31:
 228                self._exprs.append(data & 0xFFFFFFFF)
 229            else:
 230                self._literals.append(_json_arg(data, skip_data_anot=True))
 231                self._exprs.append(0x40000000_00000000 | len(self._literals) - 1)
 232            return len(self._exprs) - 1
 233        elif type(data) is float:
 234            self._exprs.append(
 235                0x02000000_00000000 | int.from_bytes(struct.pack("f", data)[::-1])
 236            )
 237        elif type(data) is str:
 238            self._literals.append(_json_arg(data, skip_data_anot=True))
 239            self._exprs.append(0x40000000_00000000 | len(self._literals) - 1)
 240        elif type(data) is bytes:
 241            self._literals.append(_json_arg(data, skip_data_anot=True))
 242            self._exprs.append(0x40000000_00000000 | len(self._literals) - 1)
 243        elif type(data) is list:
 244            if len(data) == 0:
 245                self._exprs.append(0x03000000_00000000)
 246                return len(self._exprs) - 1
 247            if (
 248                len(data) == 1
 249                and type(data[0]) is int
 250                and data[0] >= -(2**15)
 251                and data[0] < 2**15
 252            ):
 253                self._exprs.append(0x04000000_00000000 | (data[0] & 0xFFFF))
 254                return len(self._exprs) - 1
 255            if (
 256                len(data) == 2
 257                and type(data[0]) is int
 258                and data[0] >= -(2**15)
 259                and data[0] < 2**15
 260                and type(data[1]) is int
 261                and data[1] >= -(2**15)
 262                and data[1] < 2**15
 263            ):
 264                self._exprs.append(
 265                    0x05000000_00000000
 266                    | ((data[0] & 0xFFFF) << 16)
 267                    | (data[1] & 0xFFFF)
 268                )
 269                return len(self._exprs) - 1
 270            if (
 271                len(data) == 3
 272                and type(data[0]) is int
 273                and data[0] >= -(2**15)
 274                and data[0] < 2**15
 275                and type(data[1]) is int
 276                and data[1] >= -(2**15)
 277                and data[1] < 2**15
 278                and type(data[2]) is int
 279                and data[2] >= -(2**15)
 280                and data[2] < 2**15
 281            ):
 282                self._exprs.append(
 283                    0x06000000_00000000
 284                    | ((data[0] & 0xFFFF) << 32)
 285                    | ((data[1] & 0xFFFF) << 16)
 286                    | (data[2] & 0xFFFF)
 287                )
 288                return len(self._exprs) - 1
 289            out = len(self._exprs)
 290            member_idxs = []
 291            for member in data:
 292                if _feb_expr_coded_as_scalar(member):
 293                    member_idxs.append(None)
 294                else:
 295                    member_idxs.append(self.insert_data_expr(member))
 296
 297            self._exprs.append(0x42000000_00000000 | len(data))
 298
 299            for i in range(len(data)):
 300                if member_idxs[i] is None:
 301                    self.insert_data_expr(data[i])
 302                else:
 303                    self._exprs.append(0x45000000_00000000 | member_idxs[i])
 304
 305            return out
 306        else:
 307            raise Exception("Invalid data type")
 308
 309    def insert_frame_expr(self, frame):
 310        if type(frame) is SourceExpr:
 311            source = frame._source._name
 312            if source in self._sources:
 313                source_idx = self._sources.index(source)
 314            else:
 315                source_idx = len(self._sources)
 316                self._sources.append(source)
 317            if frame._is_iloc:
 318                self._exprs.append(
 319                    0x43000000_00000000 | (source_idx << 32) | frame._idx
 320                )
 321            else:
 322                idx = len(self._source_fracs) // 2
 323                self._source_fracs.append(frame._idx.numerator)
 324                self._source_fracs.append(frame._idx.denominator)
 325                self._exprs.append(0x44000000_00000000 | (source_idx << 32) | idx)
 326            return len(self._exprs) - 1
 327        elif type(frame) is FilterExpr:
 328            func = frame._filter._func
 329            if func in self._functions:
 330                func_idx = self._functions.index(func)
 331            else:
 332                func_idx = len(self._functions)
 333                self._functions.append(func)
 334            len_args = len(frame._args)
 335            len_kwargs = len(frame._kwargs)
 336
 337            arg_idxs = []
 338            for arg in frame._args:
 339                if _feb_expr_coded_as_scalar(arg):
 340                    arg_idxs.append(None)
 341                else:
 342                    arg_idxs.append(self.insert_expr(arg))
 343            kwarg_idxs = {}
 344            for k, v in frame._kwargs.items():
 345                if _feb_expr_coded_as_scalar(v):
 346                    kwarg_idxs[k] = None
 347                else:
 348                    kwarg_idxs[k] = self.insert_expr(v)
 349
 350            out_idx = len(self._exprs)
 351            self._exprs.append(
 352                0x41000000_00000000 | (len_args << 24) | (len_kwargs << 16) | func_idx
 353            )
 354            for i in range(len_args):
 355                if arg_idxs[i] is None:
 356                    # It's a scalar
 357                    self.insert_expr(frame._args[i])
 358                else:
 359                    # It's an expression pointer
 360                    self._exprs.append(0x45000000_00000000 | arg_idxs[i])
 361            for k, v in frame._kwargs.items():
 362                if k in self._kwarg_keys:
 363                    k_idx = self._kwarg_keys.index(k)
 364                else:
 365                    k_idx = len(self._kwarg_keys)
 366                    self._kwarg_keys.append(k)
 367                self._exprs.append(0x46000000_00000000 | k_idx)
 368                if kwarg_idxs[k] is None:
 369                    # It's a scalar
 370                    self.insert_expr(v)
 371                else:
 372                    # It's an expression pointer
 373                    self._exprs.append(0x45000000_00000000 | kwarg_idxs[k])
 374            return out_idx
 375        else:
 376            raise Exception("Invalid frame type")
 377
 378    def insert_frame(self, frame):
 379        idx = self.insert_frame_expr(frame)
 380        self._frame_exprs.append(idx)
 381
 382    def as_dict(self):
 383        return {
 384            "functions": self._functions,
 385            "literals": self._literals,
 386            "sources": self._sources,
 387            "kwarg_keys": self._kwarg_keys,
 388            "source_fracs": self._source_fracs,
 389            "exprs": self._exprs,
 390            "frame_exprs": self._frame_exprs,
 391        }
 392
 393
 394class IgniSource:
 395    def __init__(self, id: str, src):
 396        self._name = id
 397        self._fmt = {
 398            "width": src["width"],
 399            "height": src["height"],
 400            "pix_fmt": src["pix_fmt"],
 401        }
 402        self._ts = [Fraction(x[0], x[1]) for x in src["ts"]]
 403        self.iloc = _SourceILoc(self)
 404
 405    def id(self) -> str:
 406        return self._name
 407
 408    def fmt(self):
 409        return {**self._fmt}
 410
 411    def ts(self) -> list[Fraction]:
 412        return self._ts.copy()
 413
 414    def __len__(self):
 415        return len(self._ts)
 416
 417    def __getitem__(self, idx):
 418        if type(idx) is not Fraction:
 419            raise Exception("Source index must be a Fraction")
 420        return SourceExpr(self, idx, False)
 421
 422    def __repr__(self):
 423        return f"IgniSource({self._name})"
 424
 425
 426class IgniSpec:
 427    def __init__(self, id: str, src):
 428        self._id = id
 429        self._fmt = {
 430            "width": src["width"],
 431            "height": src["height"],
 432            "pix_fmt": src["pix_fmt"],
 433        }
 434        self._vod_endpoint = src["vod_endpoint"]
 435        parsed_url = urlparse(self._vod_endpoint)
 436        self._hls_js_url = f"{parsed_url.scheme}://{parsed_url.netloc}/hls.js"
 437
 438    def id(self) -> str:
 439        return self._id
 440
 441    def play(self, *args, **kwargs):
 442        url = f"{self._vod_endpoint}playlist.m3u8"
 443        status_url = f"{self._vod_endpoint}status"
 444        hls_js_url = self._hls_js_url
 445        return _play(self._id, url, hls_js_url, *args, **kwargs, status_url=status_url)
 446
 447
 448class IgniServer:
 449    def __init__(self, endpoint: str, api_key: str):
 450        if not endpoint.startswith("http://") and not endpoint.startswith("https://"):
 451            raise Exception("Endpoint must start with http:// or https://")
 452        if endpoint.endswith("/"):
 453            raise Exception("Endpoint must not end with /")
 454        self._endpoint = endpoint
 455
 456        self._api_key = api_key
 457        self._session = requests.Session()
 458        self._session.headers.update({"Authorization": f"Bearer {self._api_key}"})
 459        response = self._session.get(
 460            f"{self._endpoint}/auth",
 461            headers={"Authorization": f"Bearer {self._api_key}"},
 462        )
 463        if not response.ok:
 464            raise Exception(response.text)
 465        response = response.json()
 466        assert response["status"] == "ok"
 467
 468    def get_source(self, id: str) -> IgniSource:
 469        assert type(id) is str
 470        response = self._session.get(
 471            f"{self._endpoint}/source/{id}",
 472            headers={"Authorization": f"Bearer {self._api_key}"},
 473        )
 474        if not response.ok:
 475            raise Exception(response.text)
 476        response = response.json()
 477        return IgniSource(response["id"], response)
 478
 479    def list_sources(self) -> list[str]:
 480        response = self._session.get(
 481            f"{self._endpoint}/source",
 482            headers={"Authorization": f"Bearer {self._api_key}"},
 483        )
 484        if not response.ok:
 485            raise Exception(response.text)
 486        response = response.json()
 487        return response
 488
 489    def delete_source(self, id: str):
 490        assert type(id) is str
 491        response = self._session.delete(
 492            f"{self._endpoint}/source/{id}",
 493            headers={"Authorization": f"Bearer {self._api_key}"},
 494        )
 495        if not response.ok:
 496            raise Exception(response.text)
 497        response = response.json()
 498        assert response["status"] == "ok"
 499
 500    def search_source(
 501        self, name, stream_idx, storage_service, storage_config
 502    ) -> list[str]:
 503        assert type(name) is str
 504        assert type(stream_idx) is int
 505        assert type(storage_service) is str
 506        assert type(storage_config) is dict
 507        for k, v in storage_config.items():
 508            assert type(k) is str
 509            assert type(v) is str
 510        req = {
 511            "name": name,
 512            "stream_idx": stream_idx,
 513            "storage_service": storage_service,
 514            "storage_config": storage_config,
 515        }
 516        response = self._session.post(
 517            f"{self._endpoint}/source/search",
 518            json=req,
 519            headers={"Authorization": f"Bearer {self._api_key}"},
 520        )
 521        if not response.ok:
 522            raise Exception(response.text)
 523        response = response.json()
 524        return response
 525
 526    def create_source(
 527        self, name, stream_idx, storage_service, storage_config
 528    ) -> IgniSource:
 529        assert type(name) is str
 530        assert type(stream_idx) is int
 531        assert type(storage_service) is str
 532        assert type(storage_config) is dict
 533        for k, v in storage_config.items():
 534            assert type(k) is str
 535            assert type(v) is str
 536        req = {
 537            "name": name,
 538            "stream_idx": stream_idx,
 539            "storage_service": storage_service,
 540            "storage_config": storage_config,
 541        }
 542        response = self._session.post(
 543            f"{self._endpoint}/source",
 544            json=req,
 545            headers={"Authorization": f"Bearer {self._api_key}"},
 546        )
 547        if not response.ok:
 548            raise Exception(response.text)
 549        response = response.json()
 550        assert response["status"] == "ok"
 551        id = response["id"]
 552        return self.get_source(id)
 553
 554    def source(self, name, stream_idx, storage_service, storage_config) -> IgniSource:
 555        """Convenience function for accessing sources.
 556
 557        Tries to find a source with the given name, stream_idx, storage_service, and storage_config.
 558        If no source is found, creates a new source with the given parameters.
 559        """
 560
 561        sources = self.search_source(name, stream_idx, storage_service, storage_config)
 562        if len(sources) == 0:
 563            return self.create_source(name, stream_idx, storage_service, storage_config)
 564        return self.get_source(sources[0])
 565
 566    def get_spec(self, id: str) -> IgniSpec:
 567        assert type(id) is str
 568        response = self._session.get(
 569            f"{self._endpoint}/spec/{id}",
 570            headers={"Authorization": f"Bearer {self._api_key}"},
 571        )
 572        if not response.ok:
 573            raise Exception(response.text)
 574        response = response.json()
 575        return IgniSpec(response["id"], response)
 576
 577    def list_specs(self) -> list[str]:
 578        response = self._session.get(
 579            f"{self._endpoint}/spec",
 580            headers={"Authorization": f"Bearer {self._api_key}"},
 581        )
 582        if not response.ok:
 583            raise Exception(response.text)
 584        response = response.json()
 585        return response
 586
 587    def create_spec(
 588        self,
 589        width,
 590        height,
 591        pix_fmt,
 592        vod_segment_length,
 593        frame_rate,
 594        ready_hook=None,
 595        steer_hook=None,
 596        ttl=None,
 597    ) -> IgniSpec:
 598        assert type(width) is int
 599        assert type(height) is int
 600        assert type(pix_fmt) is str
 601        assert type(vod_segment_length) is Fraction
 602        assert type(frame_rate) is Fraction
 603        assert type(ready_hook) is str or ready_hook is None
 604        assert type(steer_hook) is str or steer_hook is None
 605        assert ttl is None or type(ttl) is int
 606
 607        req = {
 608            "width": width,
 609            "height": height,
 610            "pix_fmt": pix_fmt,
 611            "vod_segment_length": [
 612                vod_segment_length.numerator,
 613                vod_segment_length.denominator,
 614            ],
 615            "frame_rate": [frame_rate.numerator, frame_rate.denominator],
 616            "ready_hook": ready_hook,
 617            "steer_hook": steer_hook,
 618            "ttl": ttl,
 619        }
 620        response = self._session.post(
 621            f"{self._endpoint}/spec",
 622            json=req,
 623            headers={"Authorization": f"Bearer {self._api_key}"},
 624        )
 625        if not response.ok:
 626            raise Exception(response.text)
 627        response = response.json()
 628        assert response["status"] == "ok"
 629        return self.get_spec(response["id"])
 630
 631    def delete_spec(self, id: str):
 632        assert type(id) is str
 633        response = self._session.delete(
 634            f"{self._endpoint}/spec/{id}",
 635            headers={"Authorization": f"Bearer {self._api_key}"},
 636        )
 637        if not response.ok:
 638            raise Exception(response.text)
 639        response = response.json()
 640        assert response["status"] == "ok"
 641
 642    def push_spec_part(self, spec_id, pos, frames, terminal):
 643        if type(spec_id) is IgniSpec:
 644            spec_id = spec_id._id
 645        assert type(spec_id) is str
 646        assert type(pos) is int
 647        assert type(frames) is list
 648        assert type(terminal) is bool
 649
 650        req_frames = []
 651        for frame in frames:
 652            assert type(frame) is tuple
 653            assert len(frame) == 2
 654            t = frame[0]
 655            f = frame[1]
 656            assert type(t) is Fraction
 657            assert f is None or type(f) is SourceExpr or type(f) is FilterExpr
 658            req_frames.append(
 659                [
 660                    [t.numerator, t.denominator],
 661                    f._to_json_spec() if f is not None else None,
 662                ]
 663            )
 664
 665        req = {
 666            "pos": pos,
 667            "frames": req_frames,
 668            "terminal": terminal,
 669        }
 670        response = self._session.post(
 671            f"{self._endpoint}/spec/{spec_id}/part",
 672            json=req,
 673            headers={"Authorization": f"Bearer {self._api_key}"},
 674        )
 675        if not response.ok:
 676            raise Exception(response.text)
 677        response = response.json()
 678        assert response["status"] == "ok"
 679
 680    def push_spec_part_block(
 681        self, spec_id: str, pos, blocks, terminal, compression="gzip"
 682    ):
 683        if type(spec_id) is IgniSpec:
 684            spec_id = spec_id._id
 685        assert type(spec_id) is str
 686        assert type(pos) is int
 687        assert type(blocks) is list
 688        assert type(terminal) is bool
 689        assert compression is None or compression == "gzip"
 690
 691        req_blocks = []
 692        for block in blocks:
 693            assert type(block) is _FrameExpressionBlock
 694            block_body = block.as_dict()
 695            block_frames = len(block_body["frame_exprs"])
 696            block_body = json.dumps(block_body).encode("utf-8")
 697            if compression == "gzip":
 698                block_body = gzip.compress(block_body, 1)
 699            block_body = base64.b64encode(block_body).decode("utf-8")
 700            req_blocks.append(
 701                {
 702                    "frames": block_frames,
 703                    "compression": compression,
 704                    "body": block_body,
 705                }
 706            )
 707
 708        req = {
 709            "pos": pos,
 710            "terminal": terminal,
 711            "blocks": req_blocks,
 712        }
 713        response = self._session.post(
 714            f"{self._endpoint}/spec/{spec_id}/part_block",
 715            json=req,
 716            headers={"Authorization": f"Bearer {self._api_key}"},
 717        )
 718        if not response.ok:
 719            raise Exception(response.text)
 720        response = response.json()
 721        assert response["status"] == "ok"
 722
 723    def frame(self, width, height, pix_fmt, frame_expr, compression="gzip"):
 724        assert type(frame_expr) is FilterExpr or type(frame_expr) is SourceExpr
 725        assert compression is None or compression in ["gzip"]
 726        feb = _FrameExpressionBlock()
 727        feb.insert_frame(frame_expr)
 728        feb_body = feb.as_dict()
 729
 730        feb_body = json.dumps(feb_body).encode("utf-8")
 731        if compression == "gzip":
 732            feb_body = gzip.compress(feb_body, 1)
 733        feb_body = base64.b64encode(feb_body).decode("utf-8")
 734        req = {
 735            "width": width,
 736            "height": height,
 737            "pix_fmt": pix_fmt,
 738            "compression": compression,
 739            "block": {
 740                "frames": 1,
 741                "compression": compression,
 742                "body": feb_body,
 743            },
 744        }
 745        response = self._session.post(
 746            f"{self._endpoint}/frame",
 747            json=req,
 748            headers={"Authorization": f"Bearer {self._api_key}"},
 749        )
 750        if not response.ok:
 751            raise Exception(response.text)
 752        response_body = response.content
 753        assert type(response_body) is bytes
 754        if compression == "gzip":
 755            response_body = gzip.decompress(response_body)
 756        return response_body
 757
 758
 759class YrdenSpec:
 760    """
 761    A video transformation specification.
 762
 763    See https://ixlab.github.io/vidformer/concepts.html for more information.
 764    """
 765
 766    def __init__(self, domain: list[Fraction], render, fmt: dict):
 767        self._domain = domain
 768        self._render = render
 769        self._fmt = fmt
 770
 771    def __repr__(self):
 772        if len(self._domain) <= 20:
 773            lines = []
 774            for i, t in enumerate(self._domain):
 775                frame_expr = self._render(t, i)
 776                lines.append(
 777                    f"{t.numerator}/{t.denominator} => {frame_expr}",
 778                )
 779            return "\n".join(lines)
 780        else:
 781            lines = []
 782            for i, t in enumerate(self._domain[:10]):
 783                frame_expr = self._render(t, i)
 784                lines.append(
 785                    f"{t.numerator}/{t.denominator} => {frame_expr}",
 786                )
 787            lines.append("...")
 788            for i, t in enumerate(self._domain[-10:]):
 789                frame_expr = self._render(t, i)
 790                lines.append(
 791                    f"{t.numerator}/{t.denominator} => {frame_expr}",
 792                )
 793            return "\n".join(lines)
 794
 795    def _sources(self):
 796        s = set()
 797        for i, t in enumerate(self._domain):
 798            frame_expr = self._render(t, i)
 799            s = s.union(frame_expr._sources())
 800        return s
 801
 802    def _to_json_spec(self):
 803        frames = []
 804        s = set()
 805        f = {}
 806        for i, t in enumerate(self._domain):
 807            frame_expr = self._render(t, i)
 808            s = s.union(frame_expr._sources())
 809            f = {**f, **frame_expr._filters()}
 810            frame = [[t.numerator, t.denominator], frame_expr._to_json_spec()]
 811            frames.append(frame)
 812        return {"frames": frames}, s, f
 813
 814    def play(self, server, method="html", verbose=False):
 815        """Play the video live in the notebook."""
 816
 817        spec, sources, filters = self._to_json_spec()
 818        spec_json_bytes = json.dumps(spec).encode("utf-8")
 819        spec_obj_json_gzip = gzip.compress(spec_json_bytes, compresslevel=1)
 820        spec_obj_json_gzip_b64 = base64.b64encode(spec_obj_json_gzip).decode("utf-8")
 821
 822        sources = [
 823            {
 824                "name": s._name,
 825                "path": s._path,
 826                "stream": s._stream,
 827                "service": s._service.as_json() if s._service is not None else None,
 828            }
 829            for s in sources
 830        ]
 831        filters = {
 832            k: {
 833                "filter": v._func,
 834                "args": v._kwargs,
 835            }
 836            for k, v in filters.items()
 837        }
 838
 839        if verbose:
 840            print(f"Sending to server. Spec is {len(spec_obj_json_gzip_b64)} bytes")
 841
 842        resp = server._new(spec_obj_json_gzip_b64, sources, filters, self._fmt)
 843        hls_video_url = resp["stream_url"]
 844        hls_player_url = resp["player_url"]
 845        namespace = resp["namespace"]
 846        hls_js_url = server.hls_js_url()
 847
 848        if method == "link":
 849            return hls_video_url
 850        if method == "player":
 851            return hls_player_url
 852        if method == "iframe":
 853            from IPython.display import IFrame
 854
 855            return IFrame(hls_player_url, width=1280, height=720)
 856        if method == "html":
 857            from IPython.display import HTML
 858
 859            # We add a namespace to the video element to avoid conflicts with other videos
 860            html_code = f"""
 861<!DOCTYPE html>
 862<html>
 863<head>
 864    <title>HLS Video Player</title>
 865    <!-- Include hls.js library -->
 866    <script src="{hls_js_url}"></script>
 867</head>
 868<body>
 869    <!-- Video element -->
 870    <video id="video-{namespace}" controls width="640" height="360" autoplay></video>
 871    <script>
 872        var video = document.getElementById('video-{namespace}');
 873        var videoSrc = '{hls_video_url}';
 874        var hls = new Hls();
 875        hls.loadSource(videoSrc);
 876        hls.attachMedia(video);
 877        hls.on(Hls.Events.MANIFEST_PARSED, function() {{
 878            video.play();
 879        }});
 880    </script>
 881</body>
 882</html>
 883"""
 884            return HTML(data=html_code)
 885        else:
 886            return hls_player_url
 887
 888    def load(self, server):
 889        spec, sources, filters = self._to_json_spec()
 890        spec_json_bytes = json.dumps(spec).encode("utf-8")
 891        spec_obj_json_gzip = gzip.compress(spec_json_bytes, compresslevel=1)
 892        spec_obj_json_gzip_b64 = base64.b64encode(spec_obj_json_gzip).decode("utf-8")
 893
 894        sources = [
 895            {
 896                "name": s._name,
 897                "path": s._path,
 898                "stream": s._stream,
 899                "service": s._service.as_json() if s._service is not None else None,
 900            }
 901            for s in sources
 902        ]
 903        filters = {
 904            k: {
 905                "filter": v._func,
 906                "args": v._kwargs,
 907            }
 908            for k, v in filters.items()
 909        }
 910        resp = server._new(spec_obj_json_gzip_b64, sources, filters, self._fmt)
 911        namespace = resp["namespace"]
 912        return YrdenLoader(server, namespace, self._domain)
 913
 914    def save(self, server, pth, encoder=None, encoder_opts=None, format=None):
 915        """Save the video to a file."""
 916
 917        assert encoder is None or type(encoder) is str
 918        assert encoder_opts is None or type(encoder_opts) is dict
 919        if encoder_opts is not None:
 920            for k, v in encoder_opts.items():
 921                assert type(k) is str and type(v) is str
 922
 923        spec, sources, filters = self._to_json_spec()
 924        spec_json_bytes = json.dumps(spec).encode("utf-8")
 925        spec_obj_json_gzip = gzip.compress(spec_json_bytes, compresslevel=1)
 926        spec_obj_json_gzip_b64 = base64.b64encode(spec_obj_json_gzip).decode("utf-8")
 927
 928        sources = [
 929            {
 930                "name": s._name,
 931                "path": s._path,
 932                "stream": s._stream,
 933                "service": s._service.as_json() if s._service is not None else None,
 934            }
 935            for s in sources
 936        ]
 937        filters = {
 938            k: {
 939                "filter": v._func,
 940                "args": v._kwargs,
 941            }
 942            for k, v in filters.items()
 943        }
 944
 945        resp = server._export(
 946            pth,
 947            spec_obj_json_gzip_b64,
 948            sources,
 949            filters,
 950            self._fmt,
 951            encoder,
 952            encoder_opts,
 953            format,
 954        )
 955
 956        return resp
 957
 958    def _vrod_bench(self, server):
 959        out = {}
 960        pth = "spec.json"
 961        start_t = time.time()
 962        with open(pth, "w") as outfile:
 963            spec, sources, filters = self._to_json_spec()
 964            outfile.write(json.dumps(spec))
 965
 966        sources = [
 967            {
 968                "name": s._name,
 969                "path": s._path,
 970                "stream": s._stream,
 971                "service": s._service.as_json() if s._service is not None else None,
 972            }
 973            for s in sources
 974        ]
 975        filters = {
 976            k: {
 977                "filter": v._func,
 978                "args": v._kwargs,
 979            }
 980            for k, v in filters.items()
 981        }
 982        end_t = time.time()
 983        out["vrod_create_spec"] = end_t - start_t
 984
 985        start = time.time()
 986        resp = server._new(pth, sources, filters, self._fmt)
 987        end = time.time()
 988        out["vrod_register"] = end - start
 989
 990        stream_url = resp["stream_url"]
 991        first_segment = stream_url.replace("stream.m3u8", "segment-0.ts")
 992
 993        start = time.time()
 994        r = requests.get(first_segment)
 995        r.raise_for_status()
 996        end = time.time()
 997        out["vrod_first_segment"] = end - start
 998        return out
 999
1000    def _dve2_bench(self, server):
1001        pth = "spec.json"
1002        out = {}
1003        start_t = time.time()
1004        with open(pth, "w") as outfile:
1005            spec, sources, filters = self._to_json_spec()
1006            outfile.write(json.dumps(spec))
1007
1008        sources = [
1009            {
1010                "name": s._name,
1011                "path": s._path,
1012                "stream": s._stream,
1013                "service": s._service.as_json() if s._service is not None else None,
1014            }
1015            for s in sources
1016        ]
1017        filters = {
1018            k: {
1019                "filter": v._func,
1020                "args": v._kwargs,
1021            }
1022            for k, v in filters.items()
1023        }
1024        end_t = time.time()
1025        out["dve2_create_spec"] = end_t - start_t
1026
1027        start = time.time()
1028        resp = server._export(pth, sources, filters, self._fmt, None, None)
1029        resp.raise_for_status()
1030        end = time.time()
1031        out["dve2_exec"] = end - start
1032        return out
1033
1034
1035class YrdenLoader:
1036    def __init__(self, server, namespace: str, domain):
1037        self._server = server
1038        self._namespace = namespace
1039        self._domain = domain
1040
1041    def _chunk(self, start_i, end_i):
1042        return self._server._raw(self._namespace, start_i, end_i)
1043
1044    def __len__(self):
1045        return len(self._domain)
1046
1047    def _find_index_by_rational(self, value):
1048        if value not in self._domain:
1049            raise ValueError(f"Rational timestamp {value} is not in the domain")
1050        return self._domain.index(value)
1051
1052    def __getitem__(self, index):
1053        if isinstance(index, slice):
1054            start = index.start if index.start is not None else 0
1055            end = index.stop if index.stop is not None else len(self._domain)
1056            assert start >= 0 and start < len(self._domain)
1057            assert end >= 0 and end <= len(self._domain)
1058            assert start <= end
1059            num_frames = end - start
1060            all_bytes = self._chunk(start, end - 1)
1061            all_bytes_len = len(all_bytes)
1062            assert all_bytes_len % num_frames == 0
1063            return [
1064                all_bytes[
1065                    i
1066                    * all_bytes_len
1067                    // num_frames : (i + 1)
1068                    * all_bytes_len
1069                    // num_frames
1070                ]
1071                for i in range(num_frames)
1072            ]
1073        elif isinstance(index, int):
1074            assert index >= 0 and index < len(self._domain)
1075            return self._chunk(index, index)
1076        else:
1077            raise TypeError(
1078                "Invalid argument type for iloc. Use a slice or an integer."
1079            )
1080
1081
1082class YrdenServer:
1083    """
1084    A connection to a Yrden server.
1085
1086    A yrden server is the main API for local use of vidformer.
1087    """
1088
1089    def __init__(self, domain=None, port=None, bin=None, hls_prefix=None):
1090        """
1091        Connect to a Yrden server
1092
1093        Can either connect to an existing server, if domain and port are provided, or start a new server using the provided binary.
1094        If no domain or binary is provided, the `VIDFORMER_BIN` environment variable is used.
1095        """
1096
1097        self._domain = domain
1098        self._port = port
1099        self._proc = None
1100        if self._port is None:
1101            if bin is None:
1102                if os.getenv("VIDFORMER_BIN") is not None:
1103                    bin = os.getenv("VIDFORMER_BIN")
1104                else:
1105                    bin = "vidformer-cli"
1106
1107            self._domain = "localhost"
1108            self._port = random.randint(49152, 65535)
1109            cmd = [bin, "yrden", "--port", str(self._port)]
1110            if _in_notebook:
1111                # We need to print the URL in the notebook
1112                # This is a trick to get VS Code to forward the port
1113                cmd += ["--print-url"]
1114
1115            if hls_prefix is not None:
1116                if type(hls_prefix) is not str:
1117                    raise Exception("hls_prefix must be a string")
1118                cmd += ["--hls-prefix", hls_prefix]
1119
1120            self._proc = subprocess.Popen(cmd)
1121
1122        version = _wait_for_url(f"http://{self._domain}:{self._port}/")
1123        if version is None:
1124            raise Exception("Failed to connect to server")
1125
1126        expected_version = f"vidformer-yrden v{__version__}"
1127        if version != expected_version:
1128            print(
1129                f"Warning: Expected version `{expected_version}`, got `{version}`. API may not be compatible!"
1130            )
1131
1132    def _source(self, name: str, path: str, stream: int, service):
1133        r = requests.post(
1134            f"http://{self._domain}:{self._port}/source",
1135            json={
1136                "name": name,
1137                "path": path,
1138                "stream": stream,
1139                "service": service.as_json() if service is not None else None,
1140            },
1141        )
1142        if not r.ok:
1143            raise Exception(r.text)
1144
1145        resp = r.json()
1146        resp["ts"] = [Fraction(x[0], x[1]) for x in resp["ts"]]
1147        return resp
1148
1149    def _new(self, spec, sources, filters, fmt):
1150        req = {
1151            "spec": spec,
1152            "sources": sources,
1153            "filters": filters,
1154            "width": fmt["width"],
1155            "height": fmt["height"],
1156            "pix_fmt": fmt["pix_fmt"],
1157        }
1158
1159        r = requests.post(f"http://{self._domain}:{self._port}/new", json=req)
1160        if not r.ok:
1161            raise Exception(r.text)
1162
1163        return r.json()
1164
1165    def _export(self, pth, spec, sources, filters, fmt, encoder, encoder_opts, format):
1166        req = {
1167            "spec": spec,
1168            "sources": sources,
1169            "filters": filters,
1170            "width": fmt["width"],
1171            "height": fmt["height"],
1172            "pix_fmt": fmt["pix_fmt"],
1173            "output_path": pth,
1174            "encoder": encoder,
1175            "encoder_opts": encoder_opts,
1176            "format": format,
1177        }
1178
1179        r = requests.post(f"http://{self._domain}:{self._port}/export", json=req)
1180        if not r.ok:
1181            raise Exception(r.text)
1182
1183        return r.json()
1184
1185    def _raw(self, namespace, start_i, end_i):
1186        r = requests.get(
1187            f"http://{self._domain}:{self._port}/{namespace}/raw/{start_i}-{end_i}"
1188        )
1189        if not r.ok:
1190            raise Exception(r.text)
1191        return r.content
1192
1193    def hls_js_url(self):
1194        """Return the link to the yrden-hosted hls.js file"""
1195        return f"http://{self._domain}:{self._port}/hls.js"
1196
1197    def __del__(self):
1198        if self._proc is not None:
1199            self._proc.kill()
1200
1201
1202class YrdenSource:
1203    """A video source."""
1204
1205    def __init__(
1206        self, server: YrdenServer, name: str, path: str, stream: int, service=None
1207    ):
1208        if service is None:
1209            # check if path is a http URL and, if so, automatically set the service
1210            # for example, the following code should work with just vf.Source(server, "tos_720p", "https://f.dominik.win/data/dve2/tos_720p.mp4")
1211            # this creates a storage service with endpoint "https://f.dominik.win/" and path "data/dve2/tos_720p.mp4"
1212            # don't use the root parameter in this case
1213
1214            match = re.match(r"(http|https)://([^/]+)(.*)", path)
1215            if match is not None:
1216                endpoint = f"{match.group(1)}://{match.group(2)}"
1217                path = match.group(3)
1218                # remove leading slash
1219                if path.startswith("/"):
1220                    path = path[1:]
1221                service = YrdenStorageService("http", endpoint=endpoint)
1222
1223        self._server = server
1224        self._name = name
1225        self._path = path
1226        self._stream = stream
1227        self._service = service
1228
1229        self.iloc = _SourceILoc(self)
1230
1231        self._src = self._server._source(
1232            self._name, self._path, self._stream, self._service
1233        )
1234
1235    def fmt(self):
1236        return {
1237            "width": self._src["width"],
1238            "height": self._src["height"],
1239            "pix_fmt": self._src["pix_fmt"],
1240        }
1241
1242    def ts(self):
1243        return self._src["ts"]
1244
1245    def __len__(self):
1246        return len(self._src["ts"])
1247
1248    def __getitem__(self, idx):
1249        if type(idx) is not Fraction:
1250            raise Exception("Source index must be a Fraction")
1251        return SourceExpr(self, idx, False)
1252
1253    def play(self, *args, **kwargs):
1254        """Play the video live in the notebook."""
1255
1256        domain = self.ts()
1257
1258        def render(t, _i):
1259            return self[t]
1260
1261        spec = YrdenSpec(domain, render, self.fmt())
1262        return spec.play(*args, **kwargs)
1263
1264
1265class YrdenStorageService:
1266    def __init__(self, service: str, **kwargs):
1267        if type(service) is not str:
1268            raise Exception("Service name must be a string")
1269        self._service = service
1270        for k, v in kwargs.items():
1271            if type(v) is not str:
1272                raise Exception(f"Value of {k} must be a string")
1273        self._config = kwargs
1274
1275    def as_json(self):
1276        return {"service": self._service, "config": self._config}
1277
1278    def __repr__(self):
1279        return f"{self._service}(config={self._config})"
1280
1281
1282class SourceExpr:
1283    def __init__(self, source, idx, is_iloc):
1284        self._source = source
1285        self._idx = idx
1286        self._is_iloc = is_iloc
1287
1288    def __repr__(self):
1289        if self._is_iloc:
1290            return f"{self._source._name}.iloc[{self._idx}]"
1291        else:
1292            return f"{self._source._name}[{self._idx}]"
1293
1294    def _to_json_spec(self):
1295        if self._is_iloc:
1296            return {
1297                "Source": {
1298                    "video": self._source._name,
1299                    "index": {"ILoc": int(self._idx)},
1300                }
1301            }
1302        else:
1303            return {
1304                "Source": {
1305                    "video": self._source._name,
1306                    "index": {"T": [self._idx.numerator, self._idx.denominator]},
1307                }
1308            }
1309
1310    def _sources(self):
1311        return set([self._source])
1312
1313    def _filters(self):
1314        return {}
1315
1316
1317class _SourceILoc:
1318    def __init__(self, source):
1319        self._source = source
1320
1321    def __getitem__(self, idx):
1322        if type(idx) is not int:
1323            raise Exception(f"Source iloc index must be an integer, got a {type(idx)}")
1324        return SourceExpr(self._source, idx, True)
1325
1326
1327def _json_arg(arg, skip_data_anot=False):
1328    if type(arg) is FilterExpr or type(arg) is SourceExpr:
1329        return {"Frame": arg._to_json_spec()}
1330    elif type(arg) is int:
1331        if skip_data_anot:
1332            return {"Int": arg}
1333        return {"Data": {"Int": arg}}
1334    elif type(arg) is str:
1335        if skip_data_anot:
1336            return {"String": arg}
1337        return {"Data": {"String": arg}}
1338    elif type(arg) is bytes:
1339        arg = list(arg)
1340        if skip_data_anot:
1341            return {"Bytes": arg}
1342        return {"Data": {"Bytes": arg}}
1343    elif type(arg) is float:
1344        if skip_data_anot:
1345            return {"Float": arg}
1346        return {"Data": {"Float": arg}}
1347    elif type(arg) is bool:
1348        if skip_data_anot:
1349            return {"Bool": arg}
1350        return {"Data": {"Bool": arg}}
1351    elif type(arg) is tuple or type(arg) is list:
1352        if skip_data_anot:
1353            return {"List": [_json_arg(x, True) for x in list(arg)]}
1354        return {"Data": {"List": [_json_arg(x, True) for x in list(arg)]}}
1355    else:
1356        raise Exception(f"Unknown arg type: {type(arg)}")
1357
1358
1359class Filter:
1360    """A video filter."""
1361
1362    def __init__(self, name: str, tl_func=None, **kwargs):
1363        self._name = name
1364
1365        # tl_func is the top level func, which is the true implementation, not just a pretty name
1366        if tl_func is None:
1367            self._func = name
1368        else:
1369            self._func = tl_func
1370
1371        # filter infra args, not invocation args
1372        for k, v in kwargs.items():
1373            if type(v) is not str:
1374                raise Exception(f"Value of {k} must be a string")
1375        self._kwargs = kwargs
1376
1377    def __call__(self, *args, **kwargs):
1378        return FilterExpr(self, args, kwargs)
1379
1380
1381class FilterExpr:
1382    def __init__(self, filter: Filter, args, kwargs):
1383        self._filter = filter
1384        self._args = args
1385        self._kwargs = kwargs
1386
1387    def __repr__(self):
1388        args = []
1389        for arg in self._args:
1390            val = f'"{arg}"' if type(arg) is str else str(arg)
1391            args.append(str(val))
1392        for k, v in self._kwargs.items():
1393            val = f'"{v}"' if type(v) is str else str(v)
1394            args.append(f"{k}={val}")
1395        return f"{self._filter._name}({', '.join(args)})"
1396
1397    def _to_json_spec(self):
1398        args = []
1399        for arg in self._args:
1400            args.append(_json_arg(arg))
1401        kwargs = {}
1402        for k, v in self._kwargs.items():
1403            kwargs[k] = _json_arg(v)
1404        return {"Filter": {"name": self._filter._name, "args": args, "kwargs": kwargs}}
1405
1406    def _sources(self):
1407        s = set()
1408        for arg in self._args:
1409            if type(arg) is FilterExpr or type(arg) is SourceExpr:
1410                s = s.union(arg._sources())
1411        for arg in self._kwargs.values():
1412            if type(arg) is FilterExpr or type(arg) is SourceExpr:
1413                s = s.union(arg._sources())
1414        return s
1415
1416    def _filters(self):
1417        f = {self._filter._name: self._filter}
1418        for arg in self._args:
1419            if type(arg) is FilterExpr:
1420                f = {**f, **arg._filters()}
1421        for arg in self._kwargs.values():
1422            if type(arg) is FilterExpr:
1423                f = {**f, **arg._filters()}
1424        return f
1425
1426
1427class UDF:
1428    """User-defined filter superclass"""
1429
1430    def __init__(self, name: str):
1431        self._name = name
1432        self._socket_path = None
1433        self._p = None
1434
1435    def filter(self, *args, **kwargs):
1436        raise Exception("User must implement the filter method")
1437
1438    def filter_type(self, *args, **kwargs):
1439        raise Exception("User must implement the filter_type method")
1440
1441    def into_filter(self):
1442        assert self._socket_path is None
1443        self._socket_path = f"/tmp/vidformer-{self._name}-{str(uuid.uuid4())}.sock"
1444        self._p = multiprocessing.Process(
1445            target=_run_udf_host, args=(self, self._socket_path)
1446        )
1447        self._p.start()
1448        return Filter(
1449            name=self._name, tl_func="IPC", socket=self._socket_path, func=self._name
1450        )
1451
1452    def _handle_connection(self, connection):
1453        try:
1454            while True:
1455                frame_len = connection.recv(4)
1456                if not frame_len or len(frame_len) != 4:
1457                    break
1458                frame_len = int.from_bytes(frame_len, byteorder="big")
1459                data = connection.recv(frame_len)
1460                if not data:
1461                    break
1462
1463                while len(data) < frame_len:
1464                    new_data = connection.recv(frame_len - len(data))
1465                    if not new_data:
1466                        raise Exception("Partial data received")
1467                    data += new_data
1468
1469                obj = msgpack.unpackb(data, raw=False)
1470                f_op, f_args, f_kwargs = (
1471                    obj["op"],
1472                    obj["args"],
1473                    obj["kwargs"],
1474                )
1475
1476                response = None
1477                if f_op == "filter":
1478                    f_args = [self._deser_filter(x) for x in f_args]
1479                    f_kwargs = {k: self._deser_filter(v) for k, v in f_kwargs}
1480                    response = self.filter(*f_args, **f_kwargs)
1481                    if type(response) is not UDFFrame:
1482                        raise Exception(
1483                            f"filter must return a UDFFrame, got {type(response)}"
1484                        )
1485                    if response.frame_type().pix_fmt() != "rgb24":
1486                        raise Exception(
1487                            f"filter must return a frame with pix_fmt 'rgb24', got {response.frame_type().pix_fmt()}"
1488                        )
1489
1490                    response = response._response_ser()
1491                elif f_op == "filter_type":
1492                    f_args = [self._deser_filter_type(x) for x in f_args]
1493                    f_kwargs = {k: self._deser_filter_type(v) for k, v in f_kwargs}
1494                    response = self.filter_type(*f_args, **f_kwargs)
1495                    if type(response) is not UDFFrameType:
1496                        raise Exception(
1497                            f"filter_type must return a UDFFrameType, got {type(response)}"
1498                        )
1499                    if response.pix_fmt() != "rgb24":
1500                        raise Exception(
1501                            f"filter_type must return a frame with pix_fmt 'rgb24', got {response.pix_fmt()}"
1502                        )
1503                    response = response._response_ser()
1504                else:
1505                    raise Exception(f"Unknown operation: {f_op}")
1506
1507                response = msgpack.packb(response, use_bin_type=True)
1508                response_len = len(response).to_bytes(4, byteorder="big")
1509                connection.sendall(response_len)
1510                connection.sendall(response)
1511        finally:
1512            connection.close()
1513
1514    def _deser_filter_type(self, obj):
1515        assert type(obj) is dict
1516        keys = list(obj.keys())
1517        assert len(keys) == 1
1518        type_key = keys[0]
1519        assert type_key in ["FrameType", "String", "Int", "Bool"]
1520
1521        if type_key == "FrameType":
1522            frame = obj[type_key]
1523            assert type(frame) is dict
1524            assert "width" in frame
1525            assert "height" in frame
1526            assert "format" in frame
1527            assert type(frame["width"]) is int
1528            assert type(frame["height"]) is int
1529            assert frame["format"] == 2  # AV_PIX_FMT_RGB24
1530            return UDFFrameType(frame["width"], frame["height"], "rgb24")
1531        elif type_key == "String":
1532            assert type(obj[type_key]) is str
1533            return obj[type_key]
1534        elif type_key == "Int":
1535            assert type(obj[type_key]) is int
1536            return obj[type_key]
1537        elif type_key == "Bool":
1538            assert type(obj[type_key]) is bool
1539            return obj[type_key]
1540        else:
1541            assert False, f"Unknown type: {type_key}"
1542
1543    def _deser_filter(self, obj):
1544        assert type(obj) is dict
1545        keys = list(obj.keys())
1546        assert len(keys) == 1
1547        type_key = keys[0]
1548        assert type_key in ["Frame", "String", "Int", "Bool"]
1549
1550        if type_key == "Frame":
1551            frame = obj[type_key]
1552            assert type(frame) is dict
1553            assert "data" in frame
1554            assert "width" in frame
1555            assert "height" in frame
1556            assert "format" in frame
1557            assert type(frame["width"]) is int
1558            assert type(frame["height"]) is int
1559            assert frame["format"] == "rgb24"
1560            assert type(frame["data"]) is bytes
1561
1562            data = np.frombuffer(frame["data"], dtype=np.uint8)
1563            data = data.reshape(frame["height"], frame["width"], 3)
1564            return UDFFrame(
1565                data, UDFFrameType(frame["width"], frame["height"], "rgb24")
1566            )
1567        elif type_key == "String":
1568            assert type(obj[type_key]) is str
1569            return obj[type_key]
1570        elif type_key == "Int":
1571            assert type(obj[type_key]) is int
1572            return obj[type_key]
1573        elif type_key == "Bool":
1574            assert type(obj[type_key]) is bool
1575            return obj[type_key]
1576        else:
1577            assert False, f"Unknown type: {type_key}"
1578
1579    def _host(self, socket_path: str):
1580        if os.path.exists(socket_path):
1581            os.remove(socket_path)
1582
1583        # start listening on the socket
1584        sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1585        sock.bind(socket_path)
1586        sock.listen(1)
1587
1588        while True:
1589            # accept incoming connection
1590            connection, client_address = sock.accept()
1591            thread = threading.Thread(
1592                target=self._handle_connection, args=(connection,)
1593            )
1594            thread.start()
1595
1596    def __del__(self):
1597        if self._socket_path is not None:
1598            self._p.terminate()
1599            if os.path.exists(self._socket_path):
1600                # it's possible the process hasn't even created the socket yet
1601                os.remove(self._socket_path)
1602
1603
1604class UDFFrameType:
1605    """
1606    Frame type for use in UDFs.
1607    """
1608
1609    def __init__(self, width: int, height: int, pix_fmt: str):
1610        assert type(width) is int
1611        assert type(height) is int
1612        assert type(pix_fmt) is str
1613
1614        self._width = width
1615        self._height = height
1616        self._pix_fmt = pix_fmt
1617
1618    def width(self):
1619        return self._width
1620
1621    def height(self):
1622        return self._height
1623
1624    def pix_fmt(self):
1625        return self._pix_fmt
1626
1627    def _response_ser(self):
1628        return {
1629            "frame_type": {
1630                "width": self._width,
1631                "height": self._height,
1632                "format": 2,  # AV_PIX_FMT_RGB24
1633            }
1634        }
1635
1636    def __repr__(self):
1637        return f"FrameType<{self._width}x{self._height}, {self._pix_fmt}>"
1638
1639
1640class UDFFrame:
1641    """A symbolic reference to a frame for use in UDFs."""
1642
1643    def __init__(self, data: np.ndarray, f_type: UDFFrameType):
1644        assert type(data) is np.ndarray
1645        assert type(f_type) is UDFFrameType
1646
1647        # We only support RGB24 for now
1648        assert data.dtype == np.uint8
1649        assert data.shape[2] == 3
1650
1651        # check type matches
1652        assert data.shape[0] == f_type.height()
1653        assert data.shape[1] == f_type.width()
1654        assert f_type.pix_fmt() == "rgb24"
1655
1656        self._data = data
1657        self._f_type = f_type
1658
1659    def data(self):
1660        return self._data
1661
1662    def frame_type(self):
1663        return self._f_type
1664
1665    def _response_ser(self):
1666        return {
1667            "frame": {
1668                "data": self._data.tobytes(),
1669                "width": self._f_type.width(),
1670                "height": self._f_type.height(),
1671                "format": "rgb24",
1672            }
1673        }
1674
1675    def __repr__(self):
1676        return f"Frame<{self._f_type.width()}x{self._f_type.height()}, {self._f_type.pix_fmt()}>"
1677
1678
1679def _run_udf_host(udf: UDF, socket_path: str):
1680    udf._host(socket_path)
class IgniSource:
395class IgniSource:
396    def __init__(self, id: str, src):
397        self._name = id
398        self._fmt = {
399            "width": src["width"],
400            "height": src["height"],
401            "pix_fmt": src["pix_fmt"],
402        }
403        self._ts = [Fraction(x[0], x[1]) for x in src["ts"]]
404        self.iloc = _SourceILoc(self)
405
406    def id(self) -> str:
407        return self._name
408
409    def fmt(self):
410        return {**self._fmt}
411
412    def ts(self) -> list[Fraction]:
413        return self._ts.copy()
414
415    def __len__(self):
416        return len(self._ts)
417
418    def __getitem__(self, idx):
419        if type(idx) is not Fraction:
420            raise Exception("Source index must be a Fraction")
421        return SourceExpr(self, idx, False)
422
423    def __repr__(self):
424        return f"IgniSource({self._name})"
IgniSource(id: str, src)
396    def __init__(self, id: str, src):
397        self._name = id
398        self._fmt = {
399            "width": src["width"],
400            "height": src["height"],
401            "pix_fmt": src["pix_fmt"],
402        }
403        self._ts = [Fraction(x[0], x[1]) for x in src["ts"]]
404        self.iloc = _SourceILoc(self)
iloc
def id(self) -> str:
406    def id(self) -> str:
407        return self._name
def fmt(self):
409    def fmt(self):
410        return {**self._fmt}
def ts(self) -> list[fractions.Fraction]:
412    def ts(self) -> list[Fraction]:
413        return self._ts.copy()
class IgniSpec:
427class IgniSpec:
428    def __init__(self, id: str, src):
429        self._id = id
430        self._fmt = {
431            "width": src["width"],
432            "height": src["height"],
433            "pix_fmt": src["pix_fmt"],
434        }
435        self._vod_endpoint = src["vod_endpoint"]
436        parsed_url = urlparse(self._vod_endpoint)
437        self._hls_js_url = f"{parsed_url.scheme}://{parsed_url.netloc}/hls.js"
438
439    def id(self) -> str:
440        return self._id
441
442    def play(self, *args, **kwargs):
443        url = f"{self._vod_endpoint}playlist.m3u8"
444        status_url = f"{self._vod_endpoint}status"
445        hls_js_url = self._hls_js_url
446        return _play(self._id, url, hls_js_url, *args, **kwargs, status_url=status_url)
IgniSpec(id: str, src)
428    def __init__(self, id: str, src):
429        self._id = id
430        self._fmt = {
431            "width": src["width"],
432            "height": src["height"],
433            "pix_fmt": src["pix_fmt"],
434        }
435        self._vod_endpoint = src["vod_endpoint"]
436        parsed_url = urlparse(self._vod_endpoint)
437        self._hls_js_url = f"{parsed_url.scheme}://{parsed_url.netloc}/hls.js"
def id(self) -> str:
439    def id(self) -> str:
440        return self._id
def play(self, *args, **kwargs):
442    def play(self, *args, **kwargs):
443        url = f"{self._vod_endpoint}playlist.m3u8"
444        status_url = f"{self._vod_endpoint}status"
445        hls_js_url = self._hls_js_url
446        return _play(self._id, url, hls_js_url, *args, **kwargs, status_url=status_url)
class IgniServer:
449class IgniServer:
450    def __init__(self, endpoint: str, api_key: str):
451        if not endpoint.startswith("http://") and not endpoint.startswith("https://"):
452            raise Exception("Endpoint must start with http:// or https://")
453        if endpoint.endswith("/"):
454            raise Exception("Endpoint must not end with /")
455        self._endpoint = endpoint
456
457        self._api_key = api_key
458        self._session = requests.Session()
459        self._session.headers.update({"Authorization": f"Bearer {self._api_key}"})
460        response = self._session.get(
461            f"{self._endpoint}/auth",
462            headers={"Authorization": f"Bearer {self._api_key}"},
463        )
464        if not response.ok:
465            raise Exception(response.text)
466        response = response.json()
467        assert response["status"] == "ok"
468
469    def get_source(self, id: str) -> IgniSource:
470        assert type(id) is str
471        response = self._session.get(
472            f"{self._endpoint}/source/{id}",
473            headers={"Authorization": f"Bearer {self._api_key}"},
474        )
475        if not response.ok:
476            raise Exception(response.text)
477        response = response.json()
478        return IgniSource(response["id"], response)
479
480    def list_sources(self) -> list[str]:
481        response = self._session.get(
482            f"{self._endpoint}/source",
483            headers={"Authorization": f"Bearer {self._api_key}"},
484        )
485        if not response.ok:
486            raise Exception(response.text)
487        response = response.json()
488        return response
489
490    def delete_source(self, id: str):
491        assert type(id) is str
492        response = self._session.delete(
493            f"{self._endpoint}/source/{id}",
494            headers={"Authorization": f"Bearer {self._api_key}"},
495        )
496        if not response.ok:
497            raise Exception(response.text)
498        response = response.json()
499        assert response["status"] == "ok"
500
501    def search_source(
502        self, name, stream_idx, storage_service, storage_config
503    ) -> list[str]:
504        assert type(name) is str
505        assert type(stream_idx) is int
506        assert type(storage_service) is str
507        assert type(storage_config) is dict
508        for k, v in storage_config.items():
509            assert type(k) is str
510            assert type(v) is str
511        req = {
512            "name": name,
513            "stream_idx": stream_idx,
514            "storage_service": storage_service,
515            "storage_config": storage_config,
516        }
517        response = self._session.post(
518            f"{self._endpoint}/source/search",
519            json=req,
520            headers={"Authorization": f"Bearer {self._api_key}"},
521        )
522        if not response.ok:
523            raise Exception(response.text)
524        response = response.json()
525        return response
526
527    def create_source(
528        self, name, stream_idx, storage_service, storage_config
529    ) -> IgniSource:
530        assert type(name) is str
531        assert type(stream_idx) is int
532        assert type(storage_service) is str
533        assert type(storage_config) is dict
534        for k, v in storage_config.items():
535            assert type(k) is str
536            assert type(v) is str
537        req = {
538            "name": name,
539            "stream_idx": stream_idx,
540            "storage_service": storage_service,
541            "storage_config": storage_config,
542        }
543        response = self._session.post(
544            f"{self._endpoint}/source",
545            json=req,
546            headers={"Authorization": f"Bearer {self._api_key}"},
547        )
548        if not response.ok:
549            raise Exception(response.text)
550        response = response.json()
551        assert response["status"] == "ok"
552        id = response["id"]
553        return self.get_source(id)
554
555    def source(self, name, stream_idx, storage_service, storage_config) -> IgniSource:
556        """Convenience function for accessing sources.
557
558        Tries to find a source with the given name, stream_idx, storage_service, and storage_config.
559        If no source is found, creates a new source with the given parameters.
560        """
561
562        sources = self.search_source(name, stream_idx, storage_service, storage_config)
563        if len(sources) == 0:
564            return self.create_source(name, stream_idx, storage_service, storage_config)
565        return self.get_source(sources[0])
566
567    def get_spec(self, id: str) -> IgniSpec:
568        assert type(id) is str
569        response = self._session.get(
570            f"{self._endpoint}/spec/{id}",
571            headers={"Authorization": f"Bearer {self._api_key}"},
572        )
573        if not response.ok:
574            raise Exception(response.text)
575        response = response.json()
576        return IgniSpec(response["id"], response)
577
578    def list_specs(self) -> list[str]:
579        response = self._session.get(
580            f"{self._endpoint}/spec",
581            headers={"Authorization": f"Bearer {self._api_key}"},
582        )
583        if not response.ok:
584            raise Exception(response.text)
585        response = response.json()
586        return response
587
588    def create_spec(
589        self,
590        width,
591        height,
592        pix_fmt,
593        vod_segment_length,
594        frame_rate,
595        ready_hook=None,
596        steer_hook=None,
597        ttl=None,
598    ) -> IgniSpec:
599        assert type(width) is int
600        assert type(height) is int
601        assert type(pix_fmt) is str
602        assert type(vod_segment_length) is Fraction
603        assert type(frame_rate) is Fraction
604        assert type(ready_hook) is str or ready_hook is None
605        assert type(steer_hook) is str or steer_hook is None
606        assert ttl is None or type(ttl) is int
607
608        req = {
609            "width": width,
610            "height": height,
611            "pix_fmt": pix_fmt,
612            "vod_segment_length": [
613                vod_segment_length.numerator,
614                vod_segment_length.denominator,
615            ],
616            "frame_rate": [frame_rate.numerator, frame_rate.denominator],
617            "ready_hook": ready_hook,
618            "steer_hook": steer_hook,
619            "ttl": ttl,
620        }
621        response = self._session.post(
622            f"{self._endpoint}/spec",
623            json=req,
624            headers={"Authorization": f"Bearer {self._api_key}"},
625        )
626        if not response.ok:
627            raise Exception(response.text)
628        response = response.json()
629        assert response["status"] == "ok"
630        return self.get_spec(response["id"])
631
632    def delete_spec(self, id: str):
633        assert type(id) is str
634        response = self._session.delete(
635            f"{self._endpoint}/spec/{id}",
636            headers={"Authorization": f"Bearer {self._api_key}"},
637        )
638        if not response.ok:
639            raise Exception(response.text)
640        response = response.json()
641        assert response["status"] == "ok"
642
643    def push_spec_part(self, spec_id, pos, frames, terminal):
644        if type(spec_id) is IgniSpec:
645            spec_id = spec_id._id
646        assert type(spec_id) is str
647        assert type(pos) is int
648        assert type(frames) is list
649        assert type(terminal) is bool
650
651        req_frames = []
652        for frame in frames:
653            assert type(frame) is tuple
654            assert len(frame) == 2
655            t = frame[0]
656            f = frame[1]
657            assert type(t) is Fraction
658            assert f is None or type(f) is SourceExpr or type(f) is FilterExpr
659            req_frames.append(
660                [
661                    [t.numerator, t.denominator],
662                    f._to_json_spec() if f is not None else None,
663                ]
664            )
665
666        req = {
667            "pos": pos,
668            "frames": req_frames,
669            "terminal": terminal,
670        }
671        response = self._session.post(
672            f"{self._endpoint}/spec/{spec_id}/part",
673            json=req,
674            headers={"Authorization": f"Bearer {self._api_key}"},
675        )
676        if not response.ok:
677            raise Exception(response.text)
678        response = response.json()
679        assert response["status"] == "ok"
680
681    def push_spec_part_block(
682        self, spec_id: str, pos, blocks, terminal, compression="gzip"
683    ):
684        if type(spec_id) is IgniSpec:
685            spec_id = spec_id._id
686        assert type(spec_id) is str
687        assert type(pos) is int
688        assert type(blocks) is list
689        assert type(terminal) is bool
690        assert compression is None or compression == "gzip"
691
692        req_blocks = []
693        for block in blocks:
694            assert type(block) is _FrameExpressionBlock
695            block_body = block.as_dict()
696            block_frames = len(block_body["frame_exprs"])
697            block_body = json.dumps(block_body).encode("utf-8")
698            if compression == "gzip":
699                block_body = gzip.compress(block_body, 1)
700            block_body = base64.b64encode(block_body).decode("utf-8")
701            req_blocks.append(
702                {
703                    "frames": block_frames,
704                    "compression": compression,
705                    "body": block_body,
706                }
707            )
708
709        req = {
710            "pos": pos,
711            "terminal": terminal,
712            "blocks": req_blocks,
713        }
714        response = self._session.post(
715            f"{self._endpoint}/spec/{spec_id}/part_block",
716            json=req,
717            headers={"Authorization": f"Bearer {self._api_key}"},
718        )
719        if not response.ok:
720            raise Exception(response.text)
721        response = response.json()
722        assert response["status"] == "ok"
723
724    def frame(self, width, height, pix_fmt, frame_expr, compression="gzip"):
725        assert type(frame_expr) is FilterExpr or type(frame_expr) is SourceExpr
726        assert compression is None or compression in ["gzip"]
727        feb = _FrameExpressionBlock()
728        feb.insert_frame(frame_expr)
729        feb_body = feb.as_dict()
730
731        feb_body = json.dumps(feb_body).encode("utf-8")
732        if compression == "gzip":
733            feb_body = gzip.compress(feb_body, 1)
734        feb_body = base64.b64encode(feb_body).decode("utf-8")
735        req = {
736            "width": width,
737            "height": height,
738            "pix_fmt": pix_fmt,
739            "compression": compression,
740            "block": {
741                "frames": 1,
742                "compression": compression,
743                "body": feb_body,
744            },
745        }
746        response = self._session.post(
747            f"{self._endpoint}/frame",
748            json=req,
749            headers={"Authorization": f"Bearer {self._api_key}"},
750        )
751        if not response.ok:
752            raise Exception(response.text)
753        response_body = response.content
754        assert type(response_body) is bytes
755        if compression == "gzip":
756            response_body = gzip.decompress(response_body)
757        return response_body
IgniServer(endpoint: str, api_key: str)
450    def __init__(self, endpoint: str, api_key: str):
451        if not endpoint.startswith("http://") and not endpoint.startswith("https://"):
452            raise Exception("Endpoint must start with http:// or https://")
453        if endpoint.endswith("/"):
454            raise Exception("Endpoint must not end with /")
455        self._endpoint = endpoint
456
457        self._api_key = api_key
458        self._session = requests.Session()
459        self._session.headers.update({"Authorization": f"Bearer {self._api_key}"})
460        response = self._session.get(
461            f"{self._endpoint}/auth",
462            headers={"Authorization": f"Bearer {self._api_key}"},
463        )
464        if not response.ok:
465            raise Exception(response.text)
466        response = response.json()
467        assert response["status"] == "ok"
def get_source(self, id: str) -> IgniSource:
469    def get_source(self, id: str) -> IgniSource:
470        assert type(id) is str
471        response = self._session.get(
472            f"{self._endpoint}/source/{id}",
473            headers={"Authorization": f"Bearer {self._api_key}"},
474        )
475        if not response.ok:
476            raise Exception(response.text)
477        response = response.json()
478        return IgniSource(response["id"], response)
def list_sources(self) -> list[str]:
480    def list_sources(self) -> list[str]:
481        response = self._session.get(
482            f"{self._endpoint}/source",
483            headers={"Authorization": f"Bearer {self._api_key}"},
484        )
485        if not response.ok:
486            raise Exception(response.text)
487        response = response.json()
488        return response
def delete_source(self, id: str):
490    def delete_source(self, id: str):
491        assert type(id) is str
492        response = self._session.delete(
493            f"{self._endpoint}/source/{id}",
494            headers={"Authorization": f"Bearer {self._api_key}"},
495        )
496        if not response.ok:
497            raise Exception(response.text)
498        response = response.json()
499        assert response["status"] == "ok"
def search_source(self, name, stream_idx, storage_service, storage_config) -> list[str]:
501    def search_source(
502        self, name, stream_idx, storage_service, storage_config
503    ) -> list[str]:
504        assert type(name) is str
505        assert type(stream_idx) is int
506        assert type(storage_service) is str
507        assert type(storage_config) is dict
508        for k, v in storage_config.items():
509            assert type(k) is str
510            assert type(v) is str
511        req = {
512            "name": name,
513            "stream_idx": stream_idx,
514            "storage_service": storage_service,
515            "storage_config": storage_config,
516        }
517        response = self._session.post(
518            f"{self._endpoint}/source/search",
519            json=req,
520            headers={"Authorization": f"Bearer {self._api_key}"},
521        )
522        if not response.ok:
523            raise Exception(response.text)
524        response = response.json()
525        return response
def create_source( self, name, stream_idx, storage_service, storage_config) -> IgniSource:
527    def create_source(
528        self, name, stream_idx, storage_service, storage_config
529    ) -> IgniSource:
530        assert type(name) is str
531        assert type(stream_idx) is int
532        assert type(storage_service) is str
533        assert type(storage_config) is dict
534        for k, v in storage_config.items():
535            assert type(k) is str
536            assert type(v) is str
537        req = {
538            "name": name,
539            "stream_idx": stream_idx,
540            "storage_service": storage_service,
541            "storage_config": storage_config,
542        }
543        response = self._session.post(
544            f"{self._endpoint}/source",
545            json=req,
546            headers={"Authorization": f"Bearer {self._api_key}"},
547        )
548        if not response.ok:
549            raise Exception(response.text)
550        response = response.json()
551        assert response["status"] == "ok"
552        id = response["id"]
553        return self.get_source(id)
def source( self, name, stream_idx, storage_service, storage_config) -> IgniSource:
555    def source(self, name, stream_idx, storage_service, storage_config) -> IgniSource:
556        """Convenience function for accessing sources.
557
558        Tries to find a source with the given name, stream_idx, storage_service, and storage_config.
559        If no source is found, creates a new source with the given parameters.
560        """
561
562        sources = self.search_source(name, stream_idx, storage_service, storage_config)
563        if len(sources) == 0:
564            return self.create_source(name, stream_idx, storage_service, storage_config)
565        return self.get_source(sources[0])

Convenience function for accessing sources.

Tries to find a source with the given name, stream_idx, storage_service, and storage_config. If no source is found, creates a new source with the given parameters.

def get_spec(self, id: str) -> IgniSpec:
567    def get_spec(self, id: str) -> IgniSpec:
568        assert type(id) is str
569        response = self._session.get(
570            f"{self._endpoint}/spec/{id}",
571            headers={"Authorization": f"Bearer {self._api_key}"},
572        )
573        if not response.ok:
574            raise Exception(response.text)
575        response = response.json()
576        return IgniSpec(response["id"], response)
def list_specs(self) -> list[str]:
578    def list_specs(self) -> list[str]:
579        response = self._session.get(
580            f"{self._endpoint}/spec",
581            headers={"Authorization": f"Bearer {self._api_key}"},
582        )
583        if not response.ok:
584            raise Exception(response.text)
585        response = response.json()
586        return response
def create_spec( self, width, height, pix_fmt, vod_segment_length, frame_rate, ready_hook=None, steer_hook=None, ttl=None) -> IgniSpec:
588    def create_spec(
589        self,
590        width,
591        height,
592        pix_fmt,
593        vod_segment_length,
594        frame_rate,
595        ready_hook=None,
596        steer_hook=None,
597        ttl=None,
598    ) -> IgniSpec:
599        assert type(width) is int
600        assert type(height) is int
601        assert type(pix_fmt) is str
602        assert type(vod_segment_length) is Fraction
603        assert type(frame_rate) is Fraction
604        assert type(ready_hook) is str or ready_hook is None
605        assert type(steer_hook) is str or steer_hook is None
606        assert ttl is None or type(ttl) is int
607
608        req = {
609            "width": width,
610            "height": height,
611            "pix_fmt": pix_fmt,
612            "vod_segment_length": [
613                vod_segment_length.numerator,
614                vod_segment_length.denominator,
615            ],
616            "frame_rate": [frame_rate.numerator, frame_rate.denominator],
617            "ready_hook": ready_hook,
618            "steer_hook": steer_hook,
619            "ttl": ttl,
620        }
621        response = self._session.post(
622            f"{self._endpoint}/spec",
623            json=req,
624            headers={"Authorization": f"Bearer {self._api_key}"},
625        )
626        if not response.ok:
627            raise Exception(response.text)
628        response = response.json()
629        assert response["status"] == "ok"
630        return self.get_spec(response["id"])
def delete_spec(self, id: str):
632    def delete_spec(self, id: str):
633        assert type(id) is str
634        response = self._session.delete(
635            f"{self._endpoint}/spec/{id}",
636            headers={"Authorization": f"Bearer {self._api_key}"},
637        )
638        if not response.ok:
639            raise Exception(response.text)
640        response = response.json()
641        assert response["status"] == "ok"
def push_spec_part(self, spec_id, pos, frames, terminal):
643    def push_spec_part(self, spec_id, pos, frames, terminal):
644        if type(spec_id) is IgniSpec:
645            spec_id = spec_id._id
646        assert type(spec_id) is str
647        assert type(pos) is int
648        assert type(frames) is list
649        assert type(terminal) is bool
650
651        req_frames = []
652        for frame in frames:
653            assert type(frame) is tuple
654            assert len(frame) == 2
655            t = frame[0]
656            f = frame[1]
657            assert type(t) is Fraction
658            assert f is None or type(f) is SourceExpr or type(f) is FilterExpr
659            req_frames.append(
660                [
661                    [t.numerator, t.denominator],
662                    f._to_json_spec() if f is not None else None,
663                ]
664            )
665
666        req = {
667            "pos": pos,
668            "frames": req_frames,
669            "terminal": terminal,
670        }
671        response = self._session.post(
672            f"{self._endpoint}/spec/{spec_id}/part",
673            json=req,
674            headers={"Authorization": f"Bearer {self._api_key}"},
675        )
676        if not response.ok:
677            raise Exception(response.text)
678        response = response.json()
679        assert response["status"] == "ok"
def push_spec_part_block(self, spec_id: str, pos, blocks, terminal, compression='gzip'):
681    def push_spec_part_block(
682        self, spec_id: str, pos, blocks, terminal, compression="gzip"
683    ):
684        if type(spec_id) is IgniSpec:
685            spec_id = spec_id._id
686        assert type(spec_id) is str
687        assert type(pos) is int
688        assert type(blocks) is list
689        assert type(terminal) is bool
690        assert compression is None or compression == "gzip"
691
692        req_blocks = []
693        for block in blocks:
694            assert type(block) is _FrameExpressionBlock
695            block_body = block.as_dict()
696            block_frames = len(block_body["frame_exprs"])
697            block_body = json.dumps(block_body).encode("utf-8")
698            if compression == "gzip":
699                block_body = gzip.compress(block_body, 1)
700            block_body = base64.b64encode(block_body).decode("utf-8")
701            req_blocks.append(
702                {
703                    "frames": block_frames,
704                    "compression": compression,
705                    "body": block_body,
706                }
707            )
708
709        req = {
710            "pos": pos,
711            "terminal": terminal,
712            "blocks": req_blocks,
713        }
714        response = self._session.post(
715            f"{self._endpoint}/spec/{spec_id}/part_block",
716            json=req,
717            headers={"Authorization": f"Bearer {self._api_key}"},
718        )
719        if not response.ok:
720            raise Exception(response.text)
721        response = response.json()
722        assert response["status"] == "ok"
def frame(self, width, height, pix_fmt, frame_expr, compression='gzip'):
724    def frame(self, width, height, pix_fmt, frame_expr, compression="gzip"):
725        assert type(frame_expr) is FilterExpr or type(frame_expr) is SourceExpr
726        assert compression is None or compression in ["gzip"]
727        feb = _FrameExpressionBlock()
728        feb.insert_frame(frame_expr)
729        feb_body = feb.as_dict()
730
731        feb_body = json.dumps(feb_body).encode("utf-8")
732        if compression == "gzip":
733            feb_body = gzip.compress(feb_body, 1)
734        feb_body = base64.b64encode(feb_body).decode("utf-8")
735        req = {
736            "width": width,
737            "height": height,
738            "pix_fmt": pix_fmt,
739            "compression": compression,
740            "block": {
741                "frames": 1,
742                "compression": compression,
743                "body": feb_body,
744            },
745        }
746        response = self._session.post(
747            f"{self._endpoint}/frame",
748            json=req,
749            headers={"Authorization": f"Bearer {self._api_key}"},
750        )
751        if not response.ok:
752            raise Exception(response.text)
753        response_body = response.content
754        assert type(response_body) is bytes
755        if compression == "gzip":
756            response_body = gzip.decompress(response_body)
757        return response_body
class YrdenSpec:
 760class YrdenSpec:
 761    """
 762    A video transformation specification.
 763
 764    See https://ixlab.github.io/vidformer/concepts.html for more information.
 765    """
 766
 767    def __init__(self, domain: list[Fraction], render, fmt: dict):
 768        self._domain = domain
 769        self._render = render
 770        self._fmt = fmt
 771
 772    def __repr__(self):
 773        if len(self._domain) <= 20:
 774            lines = []
 775            for i, t in enumerate(self._domain):
 776                frame_expr = self._render(t, i)
 777                lines.append(
 778                    f"{t.numerator}/{t.denominator} => {frame_expr}",
 779                )
 780            return "\n".join(lines)
 781        else:
 782            lines = []
 783            for i, t in enumerate(self._domain[:10]):
 784                frame_expr = self._render(t, i)
 785                lines.append(
 786                    f"{t.numerator}/{t.denominator} => {frame_expr}",
 787                )
 788            lines.append("...")
 789            for i, t in enumerate(self._domain[-10:]):
 790                frame_expr = self._render(t, i)
 791                lines.append(
 792                    f"{t.numerator}/{t.denominator} => {frame_expr}",
 793                )
 794            return "\n".join(lines)
 795
 796    def _sources(self):
 797        s = set()
 798        for i, t in enumerate(self._domain):
 799            frame_expr = self._render(t, i)
 800            s = s.union(frame_expr._sources())
 801        return s
 802
 803    def _to_json_spec(self):
 804        frames = []
 805        s = set()
 806        f = {}
 807        for i, t in enumerate(self._domain):
 808            frame_expr = self._render(t, i)
 809            s = s.union(frame_expr._sources())
 810            f = {**f, **frame_expr._filters()}
 811            frame = [[t.numerator, t.denominator], frame_expr._to_json_spec()]
 812            frames.append(frame)
 813        return {"frames": frames}, s, f
 814
 815    def play(self, server, method="html", verbose=False):
 816        """Play the video live in the notebook."""
 817
 818        spec, sources, filters = self._to_json_spec()
 819        spec_json_bytes = json.dumps(spec).encode("utf-8")
 820        spec_obj_json_gzip = gzip.compress(spec_json_bytes, compresslevel=1)
 821        spec_obj_json_gzip_b64 = base64.b64encode(spec_obj_json_gzip).decode("utf-8")
 822
 823        sources = [
 824            {
 825                "name": s._name,
 826                "path": s._path,
 827                "stream": s._stream,
 828                "service": s._service.as_json() if s._service is not None else None,
 829            }
 830            for s in sources
 831        ]
 832        filters = {
 833            k: {
 834                "filter": v._func,
 835                "args": v._kwargs,
 836            }
 837            for k, v in filters.items()
 838        }
 839
 840        if verbose:
 841            print(f"Sending to server. Spec is {len(spec_obj_json_gzip_b64)} bytes")
 842
 843        resp = server._new(spec_obj_json_gzip_b64, sources, filters, self._fmt)
 844        hls_video_url = resp["stream_url"]
 845        hls_player_url = resp["player_url"]
 846        namespace = resp["namespace"]
 847        hls_js_url = server.hls_js_url()
 848
 849        if method == "link":
 850            return hls_video_url
 851        if method == "player":
 852            return hls_player_url
 853        if method == "iframe":
 854            from IPython.display import IFrame
 855
 856            return IFrame(hls_player_url, width=1280, height=720)
 857        if method == "html":
 858            from IPython.display import HTML
 859
 860            # We add a namespace to the video element to avoid conflicts with other videos
 861            html_code = f"""
 862<!DOCTYPE html>
 863<html>
 864<head>
 865    <title>HLS Video Player</title>
 866    <!-- Include hls.js library -->
 867    <script src="{hls_js_url}"></script>
 868</head>
 869<body>
 870    <!-- Video element -->
 871    <video id="video-{namespace}" controls width="640" height="360" autoplay></video>
 872    <script>
 873        var video = document.getElementById('video-{namespace}');
 874        var videoSrc = '{hls_video_url}';
 875        var hls = new Hls();
 876        hls.loadSource(videoSrc);
 877        hls.attachMedia(video);
 878        hls.on(Hls.Events.MANIFEST_PARSED, function() {{
 879            video.play();
 880        }});
 881    </script>
 882</body>
 883</html>
 884"""
 885            return HTML(data=html_code)
 886        else:
 887            return hls_player_url
 888
 889    def load(self, server):
 890        spec, sources, filters = self._to_json_spec()
 891        spec_json_bytes = json.dumps(spec).encode("utf-8")
 892        spec_obj_json_gzip = gzip.compress(spec_json_bytes, compresslevel=1)
 893        spec_obj_json_gzip_b64 = base64.b64encode(spec_obj_json_gzip).decode("utf-8")
 894
 895        sources = [
 896            {
 897                "name": s._name,
 898                "path": s._path,
 899                "stream": s._stream,
 900                "service": s._service.as_json() if s._service is not None else None,
 901            }
 902            for s in sources
 903        ]
 904        filters = {
 905            k: {
 906                "filter": v._func,
 907                "args": v._kwargs,
 908            }
 909            for k, v in filters.items()
 910        }
 911        resp = server._new(spec_obj_json_gzip_b64, sources, filters, self._fmt)
 912        namespace = resp["namespace"]
 913        return YrdenLoader(server, namespace, self._domain)
 914
 915    def save(self, server, pth, encoder=None, encoder_opts=None, format=None):
 916        """Save the video to a file."""
 917
 918        assert encoder is None or type(encoder) is str
 919        assert encoder_opts is None or type(encoder_opts) is dict
 920        if encoder_opts is not None:
 921            for k, v in encoder_opts.items():
 922                assert type(k) is str and type(v) is str
 923
 924        spec, sources, filters = self._to_json_spec()
 925        spec_json_bytes = json.dumps(spec).encode("utf-8")
 926        spec_obj_json_gzip = gzip.compress(spec_json_bytes, compresslevel=1)
 927        spec_obj_json_gzip_b64 = base64.b64encode(spec_obj_json_gzip).decode("utf-8")
 928
 929        sources = [
 930            {
 931                "name": s._name,
 932                "path": s._path,
 933                "stream": s._stream,
 934                "service": s._service.as_json() if s._service is not None else None,
 935            }
 936            for s in sources
 937        ]
 938        filters = {
 939            k: {
 940                "filter": v._func,
 941                "args": v._kwargs,
 942            }
 943            for k, v in filters.items()
 944        }
 945
 946        resp = server._export(
 947            pth,
 948            spec_obj_json_gzip_b64,
 949            sources,
 950            filters,
 951            self._fmt,
 952            encoder,
 953            encoder_opts,
 954            format,
 955        )
 956
 957        return resp
 958
 959    def _vrod_bench(self, server):
 960        out = {}
 961        pth = "spec.json"
 962        start_t = time.time()
 963        with open(pth, "w") as outfile:
 964            spec, sources, filters = self._to_json_spec()
 965            outfile.write(json.dumps(spec))
 966
 967        sources = [
 968            {
 969                "name": s._name,
 970                "path": s._path,
 971                "stream": s._stream,
 972                "service": s._service.as_json() if s._service is not None else None,
 973            }
 974            for s in sources
 975        ]
 976        filters = {
 977            k: {
 978                "filter": v._func,
 979                "args": v._kwargs,
 980            }
 981            for k, v in filters.items()
 982        }
 983        end_t = time.time()
 984        out["vrod_create_spec"] = end_t - start_t
 985
 986        start = time.time()
 987        resp = server._new(pth, sources, filters, self._fmt)
 988        end = time.time()
 989        out["vrod_register"] = end - start
 990
 991        stream_url = resp["stream_url"]
 992        first_segment = stream_url.replace("stream.m3u8", "segment-0.ts")
 993
 994        start = time.time()
 995        r = requests.get(first_segment)
 996        r.raise_for_status()
 997        end = time.time()
 998        out["vrod_first_segment"] = end - start
 999        return out
1000
1001    def _dve2_bench(self, server):
1002        pth = "spec.json"
1003        out = {}
1004        start_t = time.time()
1005        with open(pth, "w") as outfile:
1006            spec, sources, filters = self._to_json_spec()
1007            outfile.write(json.dumps(spec))
1008
1009        sources = [
1010            {
1011                "name": s._name,
1012                "path": s._path,
1013                "stream": s._stream,
1014                "service": s._service.as_json() if s._service is not None else None,
1015            }
1016            for s in sources
1017        ]
1018        filters = {
1019            k: {
1020                "filter": v._func,
1021                "args": v._kwargs,
1022            }
1023            for k, v in filters.items()
1024        }
1025        end_t = time.time()
1026        out["dve2_create_spec"] = end_t - start_t
1027
1028        start = time.time()
1029        resp = server._export(pth, sources, filters, self._fmt, None, None)
1030        resp.raise_for_status()
1031        end = time.time()
1032        out["dve2_exec"] = end - start
1033        return out

A video transformation specification.

See https://ixlab.github.io/vidformer/concepts.html for more information.

YrdenSpec(domain: list[fractions.Fraction], render, fmt: dict)
767    def __init__(self, domain: list[Fraction], render, fmt: dict):
768        self._domain = domain
769        self._render = render
770        self._fmt = fmt
def play(self, server, method='html', verbose=False):
815    def play(self, server, method="html", verbose=False):
816        """Play the video live in the notebook."""
817
818        spec, sources, filters = self._to_json_spec()
819        spec_json_bytes = json.dumps(spec).encode("utf-8")
820        spec_obj_json_gzip = gzip.compress(spec_json_bytes, compresslevel=1)
821        spec_obj_json_gzip_b64 = base64.b64encode(spec_obj_json_gzip).decode("utf-8")
822
823        sources = [
824            {
825                "name": s._name,
826                "path": s._path,
827                "stream": s._stream,
828                "service": s._service.as_json() if s._service is not None else None,
829            }
830            for s in sources
831        ]
832        filters = {
833            k: {
834                "filter": v._func,
835                "args": v._kwargs,
836            }
837            for k, v in filters.items()
838        }
839
840        if verbose:
841            print(f"Sending to server. Spec is {len(spec_obj_json_gzip_b64)} bytes")
842
843        resp = server._new(spec_obj_json_gzip_b64, sources, filters, self._fmt)
844        hls_video_url = resp["stream_url"]
845        hls_player_url = resp["player_url"]
846        namespace = resp["namespace"]
847        hls_js_url = server.hls_js_url()
848
849        if method == "link":
850            return hls_video_url
851        if method == "player":
852            return hls_player_url
853        if method == "iframe":
854            from IPython.display import IFrame
855
856            return IFrame(hls_player_url, width=1280, height=720)
857        if method == "html":
858            from IPython.display import HTML
859
860            # We add a namespace to the video element to avoid conflicts with other videos
861            html_code = f"""
862<!DOCTYPE html>
863<html>
864<head>
865    <title>HLS Video Player</title>
866    <!-- Include hls.js library -->
867    <script src="{hls_js_url}"></script>
868</head>
869<body>
870    <!-- Video element -->
871    <video id="video-{namespace}" controls width="640" height="360" autoplay></video>
872    <script>
873        var video = document.getElementById('video-{namespace}');
874        var videoSrc = '{hls_video_url}';
875        var hls = new Hls();
876        hls.loadSource(videoSrc);
877        hls.attachMedia(video);
878        hls.on(Hls.Events.MANIFEST_PARSED, function() {{
879            video.play();
880        }});
881    </script>
882</body>
883</html>
884"""
885            return HTML(data=html_code)
886        else:
887            return hls_player_url

Play the video live in the notebook.

def load(self, server):
889    def load(self, server):
890        spec, sources, filters = self._to_json_spec()
891        spec_json_bytes = json.dumps(spec).encode("utf-8")
892        spec_obj_json_gzip = gzip.compress(spec_json_bytes, compresslevel=1)
893        spec_obj_json_gzip_b64 = base64.b64encode(spec_obj_json_gzip).decode("utf-8")
894
895        sources = [
896            {
897                "name": s._name,
898                "path": s._path,
899                "stream": s._stream,
900                "service": s._service.as_json() if s._service is not None else None,
901            }
902            for s in sources
903        ]
904        filters = {
905            k: {
906                "filter": v._func,
907                "args": v._kwargs,
908            }
909            for k, v in filters.items()
910        }
911        resp = server._new(spec_obj_json_gzip_b64, sources, filters, self._fmt)
912        namespace = resp["namespace"]
913        return YrdenLoader(server, namespace, self._domain)
def save(self, server, pth, encoder=None, encoder_opts=None, format=None):
915    def save(self, server, pth, encoder=None, encoder_opts=None, format=None):
916        """Save the video to a file."""
917
918        assert encoder is None or type(encoder) is str
919        assert encoder_opts is None or type(encoder_opts) is dict
920        if encoder_opts is not None:
921            for k, v in encoder_opts.items():
922                assert type(k) is str and type(v) is str
923
924        spec, sources, filters = self._to_json_spec()
925        spec_json_bytes = json.dumps(spec).encode("utf-8")
926        spec_obj_json_gzip = gzip.compress(spec_json_bytes, compresslevel=1)
927        spec_obj_json_gzip_b64 = base64.b64encode(spec_obj_json_gzip).decode("utf-8")
928
929        sources = [
930            {
931                "name": s._name,
932                "path": s._path,
933                "stream": s._stream,
934                "service": s._service.as_json() if s._service is not None else None,
935            }
936            for s in sources
937        ]
938        filters = {
939            k: {
940                "filter": v._func,
941                "args": v._kwargs,
942            }
943            for k, v in filters.items()
944        }
945
946        resp = server._export(
947            pth,
948            spec_obj_json_gzip_b64,
949            sources,
950            filters,
951            self._fmt,
952            encoder,
953            encoder_opts,
954            format,
955        )
956
957        return resp

Save the video to a file.

class YrdenLoader:
1036class YrdenLoader:
1037    def __init__(self, server, namespace: str, domain):
1038        self._server = server
1039        self._namespace = namespace
1040        self._domain = domain
1041
1042    def _chunk(self, start_i, end_i):
1043        return self._server._raw(self._namespace, start_i, end_i)
1044
1045    def __len__(self):
1046        return len(self._domain)
1047
1048    def _find_index_by_rational(self, value):
1049        if value not in self._domain:
1050            raise ValueError(f"Rational timestamp {value} is not in the domain")
1051        return self._domain.index(value)
1052
1053    def __getitem__(self, index):
1054        if isinstance(index, slice):
1055            start = index.start if index.start is not None else 0
1056            end = index.stop if index.stop is not None else len(self._domain)
1057            assert start >= 0 and start < len(self._domain)
1058            assert end >= 0 and end <= len(self._domain)
1059            assert start <= end
1060            num_frames = end - start
1061            all_bytes = self._chunk(start, end - 1)
1062            all_bytes_len = len(all_bytes)
1063            assert all_bytes_len % num_frames == 0
1064            return [
1065                all_bytes[
1066                    i
1067                    * all_bytes_len
1068                    // num_frames : (i + 1)
1069                    * all_bytes_len
1070                    // num_frames
1071                ]
1072                for i in range(num_frames)
1073            ]
1074        elif isinstance(index, int):
1075            assert index >= 0 and index < len(self._domain)
1076            return self._chunk(index, index)
1077        else:
1078            raise TypeError(
1079                "Invalid argument type for iloc. Use a slice or an integer."
1080            )
YrdenLoader(server, namespace: str, domain)
1037    def __init__(self, server, namespace: str, domain):
1038        self._server = server
1039        self._namespace = namespace
1040        self._domain = domain
class YrdenServer:
1083class YrdenServer:
1084    """
1085    A connection to a Yrden server.
1086
1087    A yrden server is the main API for local use of vidformer.
1088    """
1089
1090    def __init__(self, domain=None, port=None, bin=None, hls_prefix=None):
1091        """
1092        Connect to a Yrden server
1093
1094        Can either connect to an existing server, if domain and port are provided, or start a new server using the provided binary.
1095        If no domain or binary is provided, the `VIDFORMER_BIN` environment variable is used.
1096        """
1097
1098        self._domain = domain
1099        self._port = port
1100        self._proc = None
1101        if self._port is None:
1102            if bin is None:
1103                if os.getenv("VIDFORMER_BIN") is not None:
1104                    bin = os.getenv("VIDFORMER_BIN")
1105                else:
1106                    bin = "vidformer-cli"
1107
1108            self._domain = "localhost"
1109            self._port = random.randint(49152, 65535)
1110            cmd = [bin, "yrden", "--port", str(self._port)]
1111            if _in_notebook:
1112                # We need to print the URL in the notebook
1113                # This is a trick to get VS Code to forward the port
1114                cmd += ["--print-url"]
1115
1116            if hls_prefix is not None:
1117                if type(hls_prefix) is not str:
1118                    raise Exception("hls_prefix must be a string")
1119                cmd += ["--hls-prefix", hls_prefix]
1120
1121            self._proc = subprocess.Popen(cmd)
1122
1123        version = _wait_for_url(f"http://{self._domain}:{self._port}/")
1124        if version is None:
1125            raise Exception("Failed to connect to server")
1126
1127        expected_version = f"vidformer-yrden v{__version__}"
1128        if version != expected_version:
1129            print(
1130                f"Warning: Expected version `{expected_version}`, got `{version}`. API may not be compatible!"
1131            )
1132
1133    def _source(self, name: str, path: str, stream: int, service):
1134        r = requests.post(
1135            f"http://{self._domain}:{self._port}/source",
1136            json={
1137                "name": name,
1138                "path": path,
1139                "stream": stream,
1140                "service": service.as_json() if service is not None else None,
1141            },
1142        )
1143        if not r.ok:
1144            raise Exception(r.text)
1145
1146        resp = r.json()
1147        resp["ts"] = [Fraction(x[0], x[1]) for x in resp["ts"]]
1148        return resp
1149
1150    def _new(self, spec, sources, filters, fmt):
1151        req = {
1152            "spec": spec,
1153            "sources": sources,
1154            "filters": filters,
1155            "width": fmt["width"],
1156            "height": fmt["height"],
1157            "pix_fmt": fmt["pix_fmt"],
1158        }
1159
1160        r = requests.post(f"http://{self._domain}:{self._port}/new", json=req)
1161        if not r.ok:
1162            raise Exception(r.text)
1163
1164        return r.json()
1165
1166    def _export(self, pth, spec, sources, filters, fmt, encoder, encoder_opts, format):
1167        req = {
1168            "spec": spec,
1169            "sources": sources,
1170            "filters": filters,
1171            "width": fmt["width"],
1172            "height": fmt["height"],
1173            "pix_fmt": fmt["pix_fmt"],
1174            "output_path": pth,
1175            "encoder": encoder,
1176            "encoder_opts": encoder_opts,
1177            "format": format,
1178        }
1179
1180        r = requests.post(f"http://{self._domain}:{self._port}/export", json=req)
1181        if not r.ok:
1182            raise Exception(r.text)
1183
1184        return r.json()
1185
1186    def _raw(self, namespace, start_i, end_i):
1187        r = requests.get(
1188            f"http://{self._domain}:{self._port}/{namespace}/raw/{start_i}-{end_i}"
1189        )
1190        if not r.ok:
1191            raise Exception(r.text)
1192        return r.content
1193
1194    def hls_js_url(self):
1195        """Return the link to the yrden-hosted hls.js file"""
1196        return f"http://{self._domain}:{self._port}/hls.js"
1197
1198    def __del__(self):
1199        if self._proc is not None:
1200            self._proc.kill()

A connection to a Yrden server.

A yrden server is the main API for local use of vidformer.

YrdenServer(domain=None, port=None, bin=None, hls_prefix=None)
1090    def __init__(self, domain=None, port=None, bin=None, hls_prefix=None):
1091        """
1092        Connect to a Yrden server
1093
1094        Can either connect to an existing server, if domain and port are provided, or start a new server using the provided binary.
1095        If no domain or binary is provided, the `VIDFORMER_BIN` environment variable is used.
1096        """
1097
1098        self._domain = domain
1099        self._port = port
1100        self._proc = None
1101        if self._port is None:
1102            if bin is None:
1103                if os.getenv("VIDFORMER_BIN") is not None:
1104                    bin = os.getenv("VIDFORMER_BIN")
1105                else:
1106                    bin = "vidformer-cli"
1107
1108            self._domain = "localhost"
1109            self._port = random.randint(49152, 65535)
1110            cmd = [bin, "yrden", "--port", str(self._port)]
1111            if _in_notebook:
1112                # We need to print the URL in the notebook
1113                # This is a trick to get VS Code to forward the port
1114                cmd += ["--print-url"]
1115
1116            if hls_prefix is not None:
1117                if type(hls_prefix) is not str:
1118                    raise Exception("hls_prefix must be a string")
1119                cmd += ["--hls-prefix", hls_prefix]
1120
1121            self._proc = subprocess.Popen(cmd)
1122
1123        version = _wait_for_url(f"http://{self._domain}:{self._port}/")
1124        if version is None:
1125            raise Exception("Failed to connect to server")
1126
1127        expected_version = f"vidformer-yrden v{__version__}"
1128        if version != expected_version:
1129            print(
1130                f"Warning: Expected version `{expected_version}`, got `{version}`. API may not be compatible!"
1131            )

Connect to a Yrden server

Can either connect to an existing server, if domain and port are provided, or start a new server using the provided binary. If no domain or binary is provided, the VIDFORMER_BIN environment variable is used.

def hls_js_url(self):
1194    def hls_js_url(self):
1195        """Return the link to the yrden-hosted hls.js file"""
1196        return f"http://{self._domain}:{self._port}/hls.js"

Return the link to the yrden-hosted hls.js file

class YrdenSource:
1203class YrdenSource:
1204    """A video source."""
1205
1206    def __init__(
1207        self, server: YrdenServer, name: str, path: str, stream: int, service=None
1208    ):
1209        if service is None:
1210            # check if path is a http URL and, if so, automatically set the service
1211            # for example, the following code should work with just vf.Source(server, "tos_720p", "https://f.dominik.win/data/dve2/tos_720p.mp4")
1212            # this creates a storage service with endpoint "https://f.dominik.win/" and path "data/dve2/tos_720p.mp4"
1213            # don't use the root parameter in this case
1214
1215            match = re.match(r"(http|https)://([^/]+)(.*)", path)
1216            if match is not None:
1217                endpoint = f"{match.group(1)}://{match.group(2)}"
1218                path = match.group(3)
1219                # remove leading slash
1220                if path.startswith("/"):
1221                    path = path[1:]
1222                service = YrdenStorageService("http", endpoint=endpoint)
1223
1224        self._server = server
1225        self._name = name
1226        self._path = path
1227        self._stream = stream
1228        self._service = service
1229
1230        self.iloc = _SourceILoc(self)
1231
1232        self._src = self._server._source(
1233            self._name, self._path, self._stream, self._service
1234        )
1235
1236    def fmt(self):
1237        return {
1238            "width": self._src["width"],
1239            "height": self._src["height"],
1240            "pix_fmt": self._src["pix_fmt"],
1241        }
1242
1243    def ts(self):
1244        return self._src["ts"]
1245
1246    def __len__(self):
1247        return len(self._src["ts"])
1248
1249    def __getitem__(self, idx):
1250        if type(idx) is not Fraction:
1251            raise Exception("Source index must be a Fraction")
1252        return SourceExpr(self, idx, False)
1253
1254    def play(self, *args, **kwargs):
1255        """Play the video live in the notebook."""
1256
1257        domain = self.ts()
1258
1259        def render(t, _i):
1260            return self[t]
1261
1262        spec = YrdenSpec(domain, render, self.fmt())
1263        return spec.play(*args, **kwargs)

A video source.

YrdenSource( server: YrdenServer, name: str, path: str, stream: int, service=None)
1206    def __init__(
1207        self, server: YrdenServer, name: str, path: str, stream: int, service=None
1208    ):
1209        if service is None:
1210            # check if path is a http URL and, if so, automatically set the service
1211            # for example, the following code should work with just vf.Source(server, "tos_720p", "https://f.dominik.win/data/dve2/tos_720p.mp4")
1212            # this creates a storage service with endpoint "https://f.dominik.win/" and path "data/dve2/tos_720p.mp4"
1213            # don't use the root parameter in this case
1214
1215            match = re.match(r"(http|https)://([^/]+)(.*)", path)
1216            if match is not None:
1217                endpoint = f"{match.group(1)}://{match.group(2)}"
1218                path = match.group(3)
1219                # remove leading slash
1220                if path.startswith("/"):
1221                    path = path[1:]
1222                service = YrdenStorageService("http", endpoint=endpoint)
1223
1224        self._server = server
1225        self._name = name
1226        self._path = path
1227        self._stream = stream
1228        self._service = service
1229
1230        self.iloc = _SourceILoc(self)
1231
1232        self._src = self._server._source(
1233            self._name, self._path, self._stream, self._service
1234        )
iloc
def fmt(self):
1236    def fmt(self):
1237        return {
1238            "width": self._src["width"],
1239            "height": self._src["height"],
1240            "pix_fmt": self._src["pix_fmt"],
1241        }
def ts(self):
1243    def ts(self):
1244        return self._src["ts"]
def play(self, *args, **kwargs):
1254    def play(self, *args, **kwargs):
1255        """Play the video live in the notebook."""
1256
1257        domain = self.ts()
1258
1259        def render(t, _i):
1260            return self[t]
1261
1262        spec = YrdenSpec(domain, render, self.fmt())
1263        return spec.play(*args, **kwargs)

Play the video live in the notebook.

class YrdenStorageService:
1266class YrdenStorageService:
1267    def __init__(self, service: str, **kwargs):
1268        if type(service) is not str:
1269            raise Exception("Service name must be a string")
1270        self._service = service
1271        for k, v in kwargs.items():
1272            if type(v) is not str:
1273                raise Exception(f"Value of {k} must be a string")
1274        self._config = kwargs
1275
1276    def as_json(self):
1277        return {"service": self._service, "config": self._config}
1278
1279    def __repr__(self):
1280        return f"{self._service}(config={self._config})"
YrdenStorageService(service: str, **kwargs)
1267    def __init__(self, service: str, **kwargs):
1268        if type(service) is not str:
1269            raise Exception("Service name must be a string")
1270        self._service = service
1271        for k, v in kwargs.items():
1272            if type(v) is not str:
1273                raise Exception(f"Value of {k} must be a string")
1274        self._config = kwargs
def as_json(self):
1276    def as_json(self):
1277        return {"service": self._service, "config": self._config}
class SourceExpr:
1283class SourceExpr:
1284    def __init__(self, source, idx, is_iloc):
1285        self._source = source
1286        self._idx = idx
1287        self._is_iloc = is_iloc
1288
1289    def __repr__(self):
1290        if self._is_iloc:
1291            return f"{self._source._name}.iloc[{self._idx}]"
1292        else:
1293            return f"{self._source._name}[{self._idx}]"
1294
1295    def _to_json_spec(self):
1296        if self._is_iloc:
1297            return {
1298                "Source": {
1299                    "video": self._source._name,
1300                    "index": {"ILoc": int(self._idx)},
1301                }
1302            }
1303        else:
1304            return {
1305                "Source": {
1306                    "video": self._source._name,
1307                    "index": {"T": [self._idx.numerator, self._idx.denominator]},
1308                }
1309            }
1310
1311    def _sources(self):
1312        return set([self._source])
1313
1314    def _filters(self):
1315        return {}
SourceExpr(source, idx, is_iloc)
1284    def __init__(self, source, idx, is_iloc):
1285        self._source = source
1286        self._idx = idx
1287        self._is_iloc = is_iloc
class Filter:
1360class Filter:
1361    """A video filter."""
1362
1363    def __init__(self, name: str, tl_func=None, **kwargs):
1364        self._name = name
1365
1366        # tl_func is the top level func, which is the true implementation, not just a pretty name
1367        if tl_func is None:
1368            self._func = name
1369        else:
1370            self._func = tl_func
1371
1372        # filter infra args, not invocation args
1373        for k, v in kwargs.items():
1374            if type(v) is not str:
1375                raise Exception(f"Value of {k} must be a string")
1376        self._kwargs = kwargs
1377
1378    def __call__(self, *args, **kwargs):
1379        return FilterExpr(self, args, kwargs)

A video filter.

Filter(name: str, tl_func=None, **kwargs)
1363    def __init__(self, name: str, tl_func=None, **kwargs):
1364        self._name = name
1365
1366        # tl_func is the top level func, which is the true implementation, not just a pretty name
1367        if tl_func is None:
1368            self._func = name
1369        else:
1370            self._func = tl_func
1371
1372        # filter infra args, not invocation args
1373        for k, v in kwargs.items():
1374            if type(v) is not str:
1375                raise Exception(f"Value of {k} must be a string")
1376        self._kwargs = kwargs
class FilterExpr:
1382class FilterExpr:
1383    def __init__(self, filter: Filter, args, kwargs):
1384        self._filter = filter
1385        self._args = args
1386        self._kwargs = kwargs
1387
1388    def __repr__(self):
1389        args = []
1390        for arg in self._args:
1391            val = f'"{arg}"' if type(arg) is str else str(arg)
1392            args.append(str(val))
1393        for k, v in self._kwargs.items():
1394            val = f'"{v}"' if type(v) is str else str(v)
1395            args.append(f"{k}={val}")
1396        return f"{self._filter._name}({', '.join(args)})"
1397
1398    def _to_json_spec(self):
1399        args = []
1400        for arg in self._args:
1401            args.append(_json_arg(arg))
1402        kwargs = {}
1403        for k, v in self._kwargs.items():
1404            kwargs[k] = _json_arg(v)
1405        return {"Filter": {"name": self._filter._name, "args": args, "kwargs": kwargs}}
1406
1407    def _sources(self):
1408        s = set()
1409        for arg in self._args:
1410            if type(arg) is FilterExpr or type(arg) is SourceExpr:
1411                s = s.union(arg._sources())
1412        for arg in self._kwargs.values():
1413            if type(arg) is FilterExpr or type(arg) is SourceExpr:
1414                s = s.union(arg._sources())
1415        return s
1416
1417    def _filters(self):
1418        f = {self._filter._name: self._filter}
1419        for arg in self._args:
1420            if type(arg) is FilterExpr:
1421                f = {**f, **arg._filters()}
1422        for arg in self._kwargs.values():
1423            if type(arg) is FilterExpr:
1424                f = {**f, **arg._filters()}
1425        return f
FilterExpr(filter: Filter, args, kwargs)
1383    def __init__(self, filter: Filter, args, kwargs):
1384        self._filter = filter
1385        self._args = args
1386        self._kwargs = kwargs
class UDF:
1428class UDF:
1429    """User-defined filter superclass"""
1430
1431    def __init__(self, name: str):
1432        self._name = name
1433        self._socket_path = None
1434        self._p = None
1435
1436    def filter(self, *args, **kwargs):
1437        raise Exception("User must implement the filter method")
1438
1439    def filter_type(self, *args, **kwargs):
1440        raise Exception("User must implement the filter_type method")
1441
1442    def into_filter(self):
1443        assert self._socket_path is None
1444        self._socket_path = f"/tmp/vidformer-{self._name}-{str(uuid.uuid4())}.sock"
1445        self._p = multiprocessing.Process(
1446            target=_run_udf_host, args=(self, self._socket_path)
1447        )
1448        self._p.start()
1449        return Filter(
1450            name=self._name, tl_func="IPC", socket=self._socket_path, func=self._name
1451        )
1452
1453    def _handle_connection(self, connection):
1454        try:
1455            while True:
1456                frame_len = connection.recv(4)
1457                if not frame_len or len(frame_len) != 4:
1458                    break
1459                frame_len = int.from_bytes(frame_len, byteorder="big")
1460                data = connection.recv(frame_len)
1461                if not data:
1462                    break
1463
1464                while len(data) < frame_len:
1465                    new_data = connection.recv(frame_len - len(data))
1466                    if not new_data:
1467                        raise Exception("Partial data received")
1468                    data += new_data
1469
1470                obj = msgpack.unpackb(data, raw=False)
1471                f_op, f_args, f_kwargs = (
1472                    obj["op"],
1473                    obj["args"],
1474                    obj["kwargs"],
1475                )
1476
1477                response = None
1478                if f_op == "filter":
1479                    f_args = [self._deser_filter(x) for x in f_args]
1480                    f_kwargs = {k: self._deser_filter(v) for k, v in f_kwargs}
1481                    response = self.filter(*f_args, **f_kwargs)
1482                    if type(response) is not UDFFrame:
1483                        raise Exception(
1484                            f"filter must return a UDFFrame, got {type(response)}"
1485                        )
1486                    if response.frame_type().pix_fmt() != "rgb24":
1487                        raise Exception(
1488                            f"filter must return a frame with pix_fmt 'rgb24', got {response.frame_type().pix_fmt()}"
1489                        )
1490
1491                    response = response._response_ser()
1492                elif f_op == "filter_type":
1493                    f_args = [self._deser_filter_type(x) for x in f_args]
1494                    f_kwargs = {k: self._deser_filter_type(v) for k, v in f_kwargs}
1495                    response = self.filter_type(*f_args, **f_kwargs)
1496                    if type(response) is not UDFFrameType:
1497                        raise Exception(
1498                            f"filter_type must return a UDFFrameType, got {type(response)}"
1499                        )
1500                    if response.pix_fmt() != "rgb24":
1501                        raise Exception(
1502                            f"filter_type must return a frame with pix_fmt 'rgb24', got {response.pix_fmt()}"
1503                        )
1504                    response = response._response_ser()
1505                else:
1506                    raise Exception(f"Unknown operation: {f_op}")
1507
1508                response = msgpack.packb(response, use_bin_type=True)
1509                response_len = len(response).to_bytes(4, byteorder="big")
1510                connection.sendall(response_len)
1511                connection.sendall(response)
1512        finally:
1513            connection.close()
1514
1515    def _deser_filter_type(self, obj):
1516        assert type(obj) is dict
1517        keys = list(obj.keys())
1518        assert len(keys) == 1
1519        type_key = keys[0]
1520        assert type_key in ["FrameType", "String", "Int", "Bool"]
1521
1522        if type_key == "FrameType":
1523            frame = obj[type_key]
1524            assert type(frame) is dict
1525            assert "width" in frame
1526            assert "height" in frame
1527            assert "format" in frame
1528            assert type(frame["width"]) is int
1529            assert type(frame["height"]) is int
1530            assert frame["format"] == 2  # AV_PIX_FMT_RGB24
1531            return UDFFrameType(frame["width"], frame["height"], "rgb24")
1532        elif type_key == "String":
1533            assert type(obj[type_key]) is str
1534            return obj[type_key]
1535        elif type_key == "Int":
1536            assert type(obj[type_key]) is int
1537            return obj[type_key]
1538        elif type_key == "Bool":
1539            assert type(obj[type_key]) is bool
1540            return obj[type_key]
1541        else:
1542            assert False, f"Unknown type: {type_key}"
1543
1544    def _deser_filter(self, obj):
1545        assert type(obj) is dict
1546        keys = list(obj.keys())
1547        assert len(keys) == 1
1548        type_key = keys[0]
1549        assert type_key in ["Frame", "String", "Int", "Bool"]
1550
1551        if type_key == "Frame":
1552            frame = obj[type_key]
1553            assert type(frame) is dict
1554            assert "data" in frame
1555            assert "width" in frame
1556            assert "height" in frame
1557            assert "format" in frame
1558            assert type(frame["width"]) is int
1559            assert type(frame["height"]) is int
1560            assert frame["format"] == "rgb24"
1561            assert type(frame["data"]) is bytes
1562
1563            data = np.frombuffer(frame["data"], dtype=np.uint8)
1564            data = data.reshape(frame["height"], frame["width"], 3)
1565            return UDFFrame(
1566                data, UDFFrameType(frame["width"], frame["height"], "rgb24")
1567            )
1568        elif type_key == "String":
1569            assert type(obj[type_key]) is str
1570            return obj[type_key]
1571        elif type_key == "Int":
1572            assert type(obj[type_key]) is int
1573            return obj[type_key]
1574        elif type_key == "Bool":
1575            assert type(obj[type_key]) is bool
1576            return obj[type_key]
1577        else:
1578            assert False, f"Unknown type: {type_key}"
1579
1580    def _host(self, socket_path: str):
1581        if os.path.exists(socket_path):
1582            os.remove(socket_path)
1583
1584        # start listening on the socket
1585        sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1586        sock.bind(socket_path)
1587        sock.listen(1)
1588
1589        while True:
1590            # accept incoming connection
1591            connection, client_address = sock.accept()
1592            thread = threading.Thread(
1593                target=self._handle_connection, args=(connection,)
1594            )
1595            thread.start()
1596
1597    def __del__(self):
1598        if self._socket_path is not None:
1599            self._p.terminate()
1600            if os.path.exists(self._socket_path):
1601                # it's possible the process hasn't even created the socket yet
1602                os.remove(self._socket_path)

User-defined filter superclass

UDF(name: str)
1431    def __init__(self, name: str):
1432        self._name = name
1433        self._socket_path = None
1434        self._p = None
def filter(self, *args, **kwargs):
1436    def filter(self, *args, **kwargs):
1437        raise Exception("User must implement the filter method")
def filter_type(self, *args, **kwargs):
1439    def filter_type(self, *args, **kwargs):
1440        raise Exception("User must implement the filter_type method")
def into_filter(self):
1442    def into_filter(self):
1443        assert self._socket_path is None
1444        self._socket_path = f"/tmp/vidformer-{self._name}-{str(uuid.uuid4())}.sock"
1445        self._p = multiprocessing.Process(
1446            target=_run_udf_host, args=(self, self._socket_path)
1447        )
1448        self._p.start()
1449        return Filter(
1450            name=self._name, tl_func="IPC", socket=self._socket_path, func=self._name
1451        )
class UDFFrameType:
1605class UDFFrameType:
1606    """
1607    Frame type for use in UDFs.
1608    """
1609
1610    def __init__(self, width: int, height: int, pix_fmt: str):
1611        assert type(width) is int
1612        assert type(height) is int
1613        assert type(pix_fmt) is str
1614
1615        self._width = width
1616        self._height = height
1617        self._pix_fmt = pix_fmt
1618
1619    def width(self):
1620        return self._width
1621
1622    def height(self):
1623        return self._height
1624
1625    def pix_fmt(self):
1626        return self._pix_fmt
1627
1628    def _response_ser(self):
1629        return {
1630            "frame_type": {
1631                "width": self._width,
1632                "height": self._height,
1633                "format": 2,  # AV_PIX_FMT_RGB24
1634            }
1635        }
1636
1637    def __repr__(self):
1638        return f"FrameType<{self._width}x{self._height}, {self._pix_fmt}>"

Frame type for use in UDFs.

UDFFrameType(width: int, height: int, pix_fmt: str)
1610    def __init__(self, width: int, height: int, pix_fmt: str):
1611        assert type(width) is int
1612        assert type(height) is int
1613        assert type(pix_fmt) is str
1614
1615        self._width = width
1616        self._height = height
1617        self._pix_fmt = pix_fmt
def width(self):
1619    def width(self):
1620        return self._width
def height(self):
1622    def height(self):
1623        return self._height
def pix_fmt(self):
1625    def pix_fmt(self):
1626        return self._pix_fmt
class UDFFrame:
1641class UDFFrame:
1642    """A symbolic reference to a frame for use in UDFs."""
1643
1644    def __init__(self, data: np.ndarray, f_type: UDFFrameType):
1645        assert type(data) is np.ndarray
1646        assert type(f_type) is UDFFrameType
1647
1648        # We only support RGB24 for now
1649        assert data.dtype == np.uint8
1650        assert data.shape[2] == 3
1651
1652        # check type matches
1653        assert data.shape[0] == f_type.height()
1654        assert data.shape[1] == f_type.width()
1655        assert f_type.pix_fmt() == "rgb24"
1656
1657        self._data = data
1658        self._f_type = f_type
1659
1660    def data(self):
1661        return self._data
1662
1663    def frame_type(self):
1664        return self._f_type
1665
1666    def _response_ser(self):
1667        return {
1668            "frame": {
1669                "data": self._data.tobytes(),
1670                "width": self._f_type.width(),
1671                "height": self._f_type.height(),
1672                "format": "rgb24",
1673            }
1674        }
1675
1676    def __repr__(self):
1677        return f"Frame<{self._f_type.width()}x{self._f_type.height()}, {self._f_type.pix_fmt()}>"

A symbolic reference to a frame for use in UDFs.

UDFFrame(data: numpy.ndarray, f_type: UDFFrameType)
1644    def __init__(self, data: np.ndarray, f_type: UDFFrameType):
1645        assert type(data) is np.ndarray
1646        assert type(f_type) is UDFFrameType
1647
1648        # We only support RGB24 for now
1649        assert data.dtype == np.uint8
1650        assert data.shape[2] == 3
1651
1652        # check type matches
1653        assert data.shape[0] == f_type.height()
1654        assert data.shape[1] == f_type.width()
1655        assert f_type.pix_fmt() == "rgb24"
1656
1657        self._data = data
1658        self._f_type = f_type
def data(self):
1660    def data(self):
1661        return self._data
def frame_type(self):
1663    def frame_type(self):
1664        return self._f_type